文章目录

1.前言2.应用层的体现3.两个重要结构(1)eventpoll(2)epitem

4.四个函数(1)epoll_create源码(2)epoll_ctl源码(3)epoll_wait的源码(4)epoll_event_callback()

5.水平触发和边缘触发1.状态变化2.LT模式3.ET模式

6.epoll中的锁7.epoll的致命缺点(1)多线程扩展性场景一:同一个 listen fd 在多个 CPU 上调用 accept 系统调用

(2)epoll 所注册的 fd(file descriptor) 和实际内核中控制的结构 file description 拥有不同的生命周期

1.前言

好久好久没有更新博客了,最近一直在实习,刷算法找工作,忙里偷闲简单研究了一下epoll的源码。也是由于面试的时候经常被问到,我只会说那一套,什么epoll_create创建红黑树,以O(1)的方式去读取数据,它和poll与select的区别等等。本篇将从epoll的源码层面重新学习epoll。

2.应用层的体现

多路转接(epoll)实现我在这篇文章中实现了一个简单的epoll网络server。感兴趣的同学可以简单阅读一下,我只挑其中关键的代码来讲一下应用层是如何使用epoll的:

#include"Sock.hpp"

using namespace ns_Sock;

#define NUM 128

#include

#include

void Usage(char* proc)

{

cout<<"Usage \n\t"<

}

int main(int argc,char* argv[])

{

if(argc!=2)

{

Usage(argv[0]);

exit(-1);

}

uint16_t port=(uint16_t)atoi(argv[1]);

int listen_sock=Sock::Socket();

Sock::Bind(listen_sock,port);

Sock::Listen(listen_sock);

//建立epoll模型,获得epfd

int epfd=epoll_create(128);

//先添加listen_sock和它所关心的事件到内核中

struct epoll_event ev;

ev.events=EPOLLIN;

ev.data.fd=listen_sock;//虽然epoll_ctl有文件描述符,但是revs数组中的元素是epoll_event没有fd,因此需要将fd添加都epoll_event的data字段中

epoll_ctl(epfd,EPOLL_CTL_ADD,listen_sock,&ev);

//事件循环

volatile bool quit=false;

struct epoll_event revs[NUM];//由于epoll_wait的数组是输出型参数,因此需要接收

while(!quit)

{

int timeout=1000;

int n=epoll_wait(epfd,revs,NUM,-1);//epoll_wait会将epfd中就绪事件的epoll_event结构体放在revs数组中,返回值表示数组大小

switch(n)

{

case 0:

cout<<"timeout....."<

break;

case -1:

cerr<<"epoll error"<

break;

default:

cout<<"有事件就绪了"<

//处理就绪事件

for(int i=0;i

{

int sock=revs[i].data.fd;//暂时方案

cout<<"文件描述符"<

if(revs[i].events&EPOLLIN)//读事件就绪

{

cout<<"文件描述符"<

if(sock==listen_sock)

{

int fd=Sock::Accept(listen_sock);

if(fd>=0)

{

cout<<"获取新链接成功了"<

struct epoll_event _ev;

_ev.events=EPOLLIN;

_ev.data.fd=fd;

epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&_ev);

cout<<"已经把"<

}

}

//正常的读处理

else

{

cout<<"文件描述符"<

char buffer[1024];

ssize_t s=read(sock,buffer,sizeof(buffer)-1);

if(s>0)

{

buffer[s]=0;

cout<<"client["<

}

else if(s==0)

{

cout<<"client quit "<

close(sock);

epoll_ctl(epfd,EPOLL_CTL_DEL,sock,nullptr);//将该套接字从epoll空间关注的位置删除

cout<<"Sock:"<

}

else

{

cout<<"recv error"<

close(sock);

epoll_ctl(epfd,EPOLL_CTL_DEL,sock,nullptr);

cout<<"delete sock"<

}

}

}

}

}

}

}

这是我那篇博客的服务器端的代码,使用telnet是可以直接访问的,通过这段代码我们可以发现调用epoll的过程以及一些细节。首先就是众所周知的:

epoll_create创建一个epoll空间。接着调用epoll_ctl将一个文件描述符以及对该文件描述符需要关心的事件放进epoll空间中。然后调用epoll_wait进行等待就好了。事件就绪会使用epoll_wait这个函数来通知我们。

但仔细看代码还是会发现一些细节,在epoll空间建立完成后,添加的第一个文件描述符就是listen_sock,并且关心它的读事件。 在epoll_wait成功的时候,会返回一个就绪事件的数组,通过遍历这个数组去对数据进行操作,但当该数组元素表示的是listen_sock事件就绪的时候,需要在listen_sock中将新监听到的链接accept上来。这是对监听套接字独有的一种处理方式,也说明epoll的回调函数不止一个触发条件(数据就绪orl链接就绪)。

其中这里还涉及到了一个重要的结构体epoll_event,这个结构体在内核源码中也经常使用到,下面给出它在linux系统中的结构:

typedef union epoll_data

{

void *ptr;

int fd;

uint32_t u32;

uint64_t u64;

} epoll_data_t;

struct epoll_event

{

uint32_t events; /* Epoll events */

epoll_data_t data; /* User data variable */

} __EPOLL_PACKED;

其中它的events表示的是事件,data又是一个结构体,它其中的一个重要的东西就是fd。它表示该epoll_event是哪个套接字的。

3.两个重要结构

(1)eventpoll

其中一个结构名为eventpoll,当调用epoll_create的时候内核会自动创建它,所以其实所谓的epoll空间仅仅是一个结构体而已。

struct eventpoll {

/*

struct ep_rb_tree {

struct epitem *rbh_root;

}

*/

ep_rb_tree rbr; //rbr指向红黑树的根节点

int rbcnt; //红黑树中节点的数量(也就是添加了多少个TCP连接事件)

LIST_HEAD( ,epitem) rdlist; //rdlist指向双向链表的头节点;

/* 这个LIST_HEAD等价于

struct {

struct epitem *lh_first;

}rdlist;

*/

int rdnum; //双向链表中节点的数量(也就是有多少个TCP连接来事件了)

// ...略...

};

它包含了几个内容,有我们熟知的红黑树根节点,还有所谓的就绪队列(是一个双链表),以及红黑树与就绪队列的节点数量。

(2)epitem

这个是比较关键的一个结构体,epoll_ctl将文件描述符以及所关心的事件插入到epoll空间中,插入的就是这么个玩意。

struct epitem {

RB_ENTRY(epitem) rbn;

/* RB_ENTRY(epitem) rbn等价于

struct {

struct type *rbe_left; //指向左子树

struct type *rbe_right; //指向右子树

struct type *rbe_parent; //指向父节点

int rbe_color; //该节点的颜色

} rbn

*/

LIST_ENTRY(epitem) rdlink;

/* LIST_ENTRY(epitem) rdlink等价于

struct {

struct type *le_next; //指向下个元素

struct type **le_prev; //前一个元素的地址

}*/

int rdy; //判断该节点是否同时存在与红黑树和双向链表中

int sockfd; //socket句柄

struct epoll_event event; //存放用户填充的事件

};

首先它包含了一个红黑树节点的结构,其次他又包含了双向链表的结构,到这里我们就可以发现,红黑树和双向链表中插入的是同一个结构体epitem。也很容易想到epoll_ctl的本质其实就是使用传入的参数来构造一个epitem。 因此它包含的参数显而易见:sockfd表示该epitem对应的套接字,epoll_event表示需要关心的该套接字的事件。

4.四个函数

(1)epoll_create源码

int epoll_create(int size) {

//size没有什么卵用

if (size <= 0) return -1;

//tcp服务,我也不懂,目前不是重点

nty_tcp_manager *tcp = nty_get_tcp_manager();

if (!tcp) return -1;

struct _nty_socket *epsocket = nty_socket_allocate(NTY_TCP_SOCK_EPOLL);

if (epsocket == NULL) {

nty_trace_epoll("malloc failed\n");

return -1;

}

//1.建立一个eventpoll

struct eventpoll *ep = (struct eventpoll*)calloc(1, sizeof(struct eventpoll));

if (!ep) {

nty_free_socket(epsocket->id, 0);

return -1;

}

//2.初始化红黑树指针为空,节点数为0

ep->rbcnt = 0;

RB_INIT(&ep->rbr);

//3.双向链表头指向空

LIST_INIT(&ep->rdlist);

//线程操作,暂时不用关心

if (pthread_mutex_init(&ep->mtx, NULL)) {

free(ep);

nty_free_socket(epsocket->id, 0);

return -2;

}

if (pthread_mutex_init(&ep->cdmtx, NULL)) {

pthread_mutex_destroy(&ep->mtx);

free(ep);

nty_free_socket(epsocket->id, 0);

return -2;

}

if (pthread_cond_init(&ep->cond, NULL)) {

pthread_mutex_destroy(&ep->cdmtx);

pthread_mutex_destroy(&ep->mtx);

free(ep);

nty_free_socket(epsocket->id, 0);

return -2;

}

if (pthread_spin_init(&ep->lock, PTHREAD_PROCESS_SHARED)) {

pthread_cond_destroy(&ep->cond);

pthread_mutex_destroy(&ep->cdmtx);

pthread_mutex_destroy(&ep->mtx);

free(ep);

nty_free_socket(epsocket->id, 0);

return -2;

}

//4.保存ep对象,通过epid可以在系统中找到eventpoll结构

tcp->ep = (void*)ep;

epsocket->ep = (void*)ep;

return epsocket->id;

}

我们发现在epoll_create的时候会传入一个参数,但是这个参数似乎没有意义,从这里就可以很简单的看出了,这个size除了判断一下是不是大于0之外,之后什么都没做。因此这个参数是没有意义的。 它的主要功能我在代码中标注了,大致如下:

1.建立一个eventpoll对象,并为其分配空间。2.将该对象中红黑树根节点指向空,节点个数设为0。3.将该对象中双向链表头节点指向空。4.将eventpoll对象保存起来。以后可以通过epid找到它,并返回这个epid。

(2)epoll_ctl源码

下面来看看epoll_ctl都干了些什么,在讲解这个函数之前,我们还是先回头看一下我们是怎么向它传参的:

epoll_ctl(epfd,EPOLL_CTL_ADD,listen_sock,&ev);

这是我们将listen_sock加入到epoll空间中的代码,可以看到epfd用于寻找eventpoll结构体,EPOLL_CTL_ADD表示我是要加入一个epoll节点,listen_sock表示我加入的这个节点的文件描述符是listen_sock,&ev表示我关心的事件(使用地址传参为了节省空间),这一切是不是很明确了呢?下面我们来阅读这部分的源码。

int epoll_ctl(int epid, int op, int sockid, struct epoll_event *event) {

//tcp部分,暂时不管

nty_tcp_manager *tcp = nty_get_tcp_manager();

if (!tcp) return -1;

nty_trace_epoll(" epoll_ctl --> 1111111:%d, sockid:%d\n", epid, sockid);

//1.通过传入进来的epid找到对应的eventpoll结构体

struct _nty_socket *epsocket = tcp->fdtable->sockfds[epid];

//struct _nty_socket *socket = tcp->fdtable->sockfds[sockid];

//nty_trace_epoll(" epoll_ctl --> 1111111:%d, sockid:%d\n", epsocket->id, sockid);

if (epsocket->socktype == NTY_TCP_SOCK_UNUSED) {

errno = -EBADF;

return -1;

}

if (epsocket->socktype != NTY_TCP_SOCK_EPOLL) {

errno = -EINVAL;

return -1;

}

nty_trace_epoll(" epoll_ctl --> eventpoll\n");

struct eventpoll *ep = (struct eventpoll*)epsocket->ep;

if (!ep || (!event && op != EPOLL_CTL_DEL)) {

errno = -EINVAL;

return -1;

}

//2.判断如果是增加节点

if (op == EPOLL_CTL_ADD) {

pthread_mutex_lock(&ep->mtx);

//在栈区先创建一个对象,接收sockid

struct epitem tmp;

tmp.sockfd = sockid;

//查找这个节点是否在红黑树中,其实也就是根据epollfd查找的,这查找传入tmp,可能表示的是红黑树节点类型,这是一个小细节

struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp);

if (epi) {

nty_trace_epoll("rbtree is exist\n");

pthread_mutex_unlock(&ep->mtx);

return -1;

}

//如果不存在才为结构体分配完整空间

epi = (struct epitem*)calloc(1, sizeof(struct epitem));

if (!epi) {

pthread_mutex_unlock(&ep->mtx);

errno = -ENOMEM;

return -1;

}

//使用参数构建这个epitem节点

epi->sockfd = sockid;

memcpy(&epi->event, event, sizeof(struct epoll_event));

epi = RB_INSERT(_epoll_rb_socket, &ep->rbr, epi);

assert(epi == NULL);

ep->rbcnt ++;

pthread_mutex_unlock(&ep->mtx);

} else if (op == EPOLL_CTL_DEL) {

pthread_mutex_lock(&ep->mtx);

struct epitem tmp;

//先把fd分配给sockfd

tmp.sockfd = sockid;

//查找该节点是否已经存在,这也说明是根据sockfd查找的

struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp);

if (!epi) {

nty_trace_epoll("rbtree no exist\n");

pthread_mutex_unlock(&ep->mtx);

return -1;

}

//存在则删除即可

epi = RB_REMOVE(_epoll_rb_socket, &ep->rbr, epi);

if (!epi) {

nty_trace_epoll("rbtree is no exist\n");

pthread_mutex_unlock(&ep->mtx);

return -1;

}

ep->rbcnt --;

free(epi);

pthread_mutex_unlock(&ep->mtx);

} else if (op == EPOLL_CTL_MOD) {

//修改该节点

struct epitem tmp;

tmp.sockfd = sockid;

struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp);

//找到了就将该节点的事件进行修改,这里还有一个细节,修改之后会默认加入EPOLLERR | EPOLLHUP这两个事件监听,一个表示套接字发生错误,一个表示套接字被挂断。

if (epi) {

epi->event.events = event->events;

epi->event.events |= EPOLLERR | EPOLLHUP;

} else {

errno = -ENOENT;

return -1;

}

} else {

nty_trace_epoll("op is no exist\n");

assert(0);

}

return 0;

}

每根据一个不同的选择就创建一个结构,并查找它是不是在红黑树中可能有点冗余,感觉直接在上面创建一个即可。 这段代码看起来不难,其实细节有很多的,首先先来说一下大体的步骤:

根据传入的epfd找到对应的eventpoll结构。根据不同的选择类型进行if-else判断 1.首先所有操作,都要先在栈区建立一个epitem的结构,然后将sock传入其中。并使用红黑树查找函数传入该结构查找。 2.根据有没有来进行构造删除,或者修改。 以插入为例,没有则在堆区构建结构体对象,然后向红黑树中插入,使用memcpy来加入事件。 其中的两个细节包括:

都是先建立一个栈区对象,然后红黑树中进行查找的,这里可以优化一下。 在修改的时候,会加入监听事件:文件描述符出错,套接字被挂断。

(3)epoll_wait的源码

这个函数就是用于等待事件就绪,然后将他插入就绪队列中的,其中这里的epoll_event是一个输出型参数,它通常表示一个数组的首地址。这里可以再回顾一下它是怎么进行传参的:

int n=epoll_wait(epfd,revs,NUM,-1);

其中epfd显然还是去找eventpoll的,revs是一个数组首元素地址(我们建立一个数组,传入数组名其实就可以了),NUM是一个整数,表示多少个套接字就绪了就可以返回了,-1表示的是只要没有文件描述符就绪,就永久阻塞。 值得注意的是,这里我们没有回调函数的实现,也就是说暂时没有套接字就绪了将它插入等待队列中的操作,它的实现在后面讲。

int epoll_wait(int epid, struct epoll_event *events, int maxevents, int timeout) {

//tcp相关,可恶我写完这篇文章一定看看这是个啥东西

nty_tcp_manager *tcp = nty_get_tcp_manager();

if (!tcp) return -1;

//nty_socket_map *epsocket = &tcp->smap[epid];

//找到eventpoll

struct _nty_socket *epsocket = tcp->fdtable->sockfds[epid];

if (epsocket == NULL) return -1;

if (epsocket->socktype == NTY_TCP_SOCK_UNUSED) {

errno = -EBADF;

return -1;

}

if (epsocket->socktype != NTY_TCP_SOCK_EPOLL) {

errno = -EINVAL;

return -1;

}

struct eventpoll *ep = (struct eventpoll*)epsocket->ep;

if (!ep || !events || maxevents <= 0) {

errno = -EINVAL;

return -1;

}

//线程相关,先不研究

if (pthread_mutex_lock(&ep->cdmtx)) {

if (errno == EDEADLK) {

nty_trace_epoll("epoll lock blocked\n");

}

assert(0);

}

//如果rdnum为空,并且等待时间不为0的时候会等待一段时间

while (ep->rdnum == 0 && timeout != 0) {

ep->waiting = 1;

if (timeout > 0) {

struct timespec deadline;

clock_gettime(CLOCK_REALTIME, &deadline);

if (timeout >= 1000) {

int sec;

sec = timeout / 1000;

deadline.tv_sec += sec;

timeout -= sec * 1000;

}

deadline.tv_nsec += timeout * 1000000;

if (deadline.tv_nsec >= 1000000000) {

deadline.tv_sec++;

deadline.tv_nsec -= 1000000000;

}

int ret = pthread_cond_timedwait(&ep->cond, &ep->cdmtx, &deadline);

if (ret && ret != ETIMEDOUT) {

nty_trace_epoll("pthread_cond_timewait\n");

pthread_mutex_unlock(&ep->cdmtx);

return -1;

}

timeout = 0;

} else if (timeout < 0) {

int ret = pthread_cond_wait(&ep->cond, &ep->cdmtx);

if (ret) {

nty_trace_epoll("pthread_cond_wait\n");

pthread_mutex_unlock(&ep->cdmtx);

return -1;

}

}

ep->waiting = 0;

}

pthread_mutex_unlock(&ep->cdmtx);

pthread_spin_lock(&ep->lock);

int cnt = 0;

//哪个少将哪个作为事件的数量

int num = (ep->rdnum > maxevents ? maxevents : ep->rdnum);

int i = 0;

while (num != 0 && !LIST_EMPTY(&ep->rdlist)) { //EPOLLET

//每次从双向链表的头节点取得一个一个的节点

struct epitem *epi = LIST_FIRST(&ep->rdlist);

//把这个节点从双向链表中删除

LIST_REMOVE(epi, rdlink);

//标记这个节点不在双向链表中,但是在红黑树中

epi->rdy = 0;//只有当该节点再次被放入双向链表中的时候,才会置为1

//把标记的信息拷贝出来,拷贝到提供的events参数中

memcpy(&events[i++], &epi->event, sizeof(struct epoll_event));

//--,++的操作

num --;

cnt ++;

ep->rdnum --;

}

pthread_spin_unlock(&ep->lock);

return cnt;

}

(4)epoll_event_callback()

1.客户端connect()连入,服务器处于SYN_RCVD状态时 2.三路握手完成,服务器处于ESTABLISHED状态时 3.客户端close()断开连接,服务器处于FIN_WAIT_1和FIN_WAIT_2状态时 4.客户端send/write()数据,服务器可读时 5.服务器可以发送数据时

它的作用是向双向链表中添加一个红黑树的节点。它的参数有一个eventpoll类型,注意这里没有传epid,有一个sockid,有一个event。

int epoll_event_callback(struct eventpoll *ep, int sockid, uint32_t event) {

struct epitem tmp;

tmp.sockfd = sockid;

//首先根据sockid找到红黑树中的节点

struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp);

if (!epi) {

nty_trace_epoll("rbtree not exist\n");

assert(0);

}

//根据epi->rdy判断该节点是否在双向链表里

if (epi->rdy) {

//如果在就将事件填入events中

epi->event.events |= event;

return 1;

}

//如果不在,要做的就是将这个节点添加到双向链表中的表头位置

nty_trace_epoll("epoll_event_callback --> %d\n", epi->sockfd);

pthread_spin_lock(&ep->lock);

epi->rdy = 1;

LIST_INSERT_HEAD(&ep->rdlist, epi, rdlink);

ep->rdnum ++;

pthread_spin_unlock(&ep->lock);

pthread_mutex_lock(&ep->cdmtx);

pthread_cond_signal(&ep->cond);

pthread_mutex_unlock(&ep->cdmtx);

return 0;

}

5.水平触发和边缘触发

1.状态变化

要讲明白水平触发和边缘触发就需要知道都有哪些状态会触发,无非也就这四种:

可读:socket上有数据不可读:socket上没有数据了可写:socket上有空间可写不可写:socket上无空间可写

对于水平触发模式,一个事件只要有,就会一直被触发。对于边缘触发模式,一个事件只要从无到有就会被触发。

2.LT模式

读:只要接收缓冲区上有未读完的数据,就会一直被触发。 写:只要发送缓冲区上还有空间,就会一直被触发。如果程序依赖于可写事件触发去发送数据,要移除可写事件,避免无意义的触发。

在LT模式下,读事件触发后,可以按需收取想要的字节数,不用把本次接收到的数据收取干净。

3.ET模式

读:只有接收缓冲区上的数据从无到有,就会被触发一次。 写:只有发送缓冲区上由不可写到可写,就会触发,(由满到不满)

在ET模式下,当读事件触发后,需要将数据一次性读干净。

6.epoll中的锁

通过阅读上面的代码,我们发现访问红黑树和访问双向链表的时候会加锁。

红黑树:互斥锁,因为红黑树是一个自平衡的二叉搜索树,多线程访问的时候可能改变树的结构,因此加上互斥锁。 双向链表:自旋锁,是一个更轻量级的锁,因为双向链表的结构是不会改变的,通过自旋等待的方式获取锁,避免了切换上下文的开销。

7.epoll的致命缺点

(1)多线程扩展性

场景一:同一个 listen fd 在多个 CPU 上调用 accept 系统调用

水平触发:

1.内核收到了一个链接请求。同时唤醒了A和B两个在epoll_wait上等待的线程。

2.线程A epoll_wait成功,而线程B epoll_wait失败。

3.线程B其实不需要唤醒,造成惊群效应,消耗资源。

边缘触发:

1.内核收到了一个链接请求,由于是边缘触发所以只会唤醒一个线程,假设线程A被唤醒。

2.A正在accept的时候,突然又来了一个链接,此时由于由于监听套接字处于就绪状态,没有产生新事件,所以就不会发起通知。

3.由于不会发起通知,所以必须由A再去处理该链接,这样就造成了线程饥饿的问题。

(2)epoll 所注册的 fd(file descriptor) 和实际内核中控制的结构 file description 拥有不同的生命周期

epoll混淆了上图中的进程的文件描述符和系统中的文件描述符表。当进行EPOLLADD后,epoll其实监听的是系统级的文件描述符,所以当close(fd)的时候,如果对应的description只有它一个descriptor的时候,正常关闭。 但如果它有多个descriptor与之对应的话,就会发生即使将该文件描述符关闭了,但是还是可以接收到事件的情况。更糟糕的是,一旦你 close() 了这个 fd,再也没有机会把这个死掉的 fd 从 epoll 上摘除了。所以在删除之前一定要进行EPOLLDELETE。