市场来说网站建设销售发展怎么样,杭州有哪些做网站的公司好,泰安房产信息网网签查询,专业团队介绍文案Redis 是cs架构(服务端-客户端)#xff0c;典型的一对多的服务器应用程序。多个客户通过网络与Redis服务器进行通信。那么在linux环境中是使用epoll(我们也 只讨论linux环境的#xff0c;便于学习)。 通过使用I/O多路复用技术#xff0c; redis 服务器使用单线程单进程的…Redis 是cs架构(服务端-客户端)典型的一对多的服务器应用程序。多个客户通过网络与Redis服务器进行通信。那么在linux环境中是使用epoll(我们也 只讨论linux环境的便于学习)。 通过使用I/O多路复用技术 redis 服务器使用单线程单进程的方式处理命令请求并与多个客户端连接进行网络通讯。
redis的网络框架和Muduo是很相似的若是弄懂了Muduo后在来看Redis的网络部分那是很轻松的。这里推荐下我写的cppServer程序有很详细的章节解说。0.仿造muduo,实现linux服务器开发思路 看懂这个后很有助于看懂Muduo和Redis的网络部分。
1.Redis的main()函数-----建立监听的开始
//server.c
int main(int argc, char **argv) {//省略很多.......initServer();//省略很多.......//server.el 类型是struct aeEventLoop*aeMain(server.el);aeDeleteEventLoop(server.el);return 0;
}
目前main()函数中关于网络的主要就这几个函数其他的先省略了。
而要介绍这几个函数需要先介绍一些数据结构才行。
2.网络相关的数据结构
这里就介绍aeEventLoop,aeFileEventaeTimeEventaeFiredEvent4个结构体(都在ae.h文件中)。
aeEventLoop是重中之重带出其他三个结构体。
1.struct aeEventLoop
这个就类似我写的程序里面的EventLoop。
aeEventLoop是Reactor模型的具体抽象把网络读写事件和时间事件(定时器任务)可以统一到一起处理。
events实际是个大小为setsize的数组管理着网络IO事件。events数组的下标表示的是fd,而对应的元素是该fd关注的IO事件(aeFileEvent结构体)。fired数组是已被触发读写的网络事件数组。其下标仅仅是索引index,该索引位置上的元素是记录了被触发的fd及其对应的事件类型。 在Redis中是使用链表存储定时器任务的。timeEventHead表示的是定时器链表的头结点。而由于每次定时器任务结点都是从链表头部插入的所以timeEventHead记录的是最晚插入的结点。因为是直接插入放在头结点所有对链表进行排序所以头结点不一定是最早超时的任务 beforesleep和aftersleep是每次事件循坏之前和之后需要执行的回调函数。(即是调用epoll_wait()前后 apidata封装来具体的IO多路复用的系统调用。linux主要有select、poll、epoll在Redis代码文件中分别对应ae.select.cc、ae_evport.cc、ae_epoll.cc。(这里我们只分析epoll)。
//ae.h
// 事件循环
typedef struct aeEventLoop {int maxfd; //目前已注册的最大文件描述符fdint setsize; //能注册的最大描述符数long long timeEventNextId; //下一个要注册的时间事件idtime_t lastTime; //最后一次执行时间事件的时间aeFileEvent *events; //是数组已注册的文件事件 (就是IO event)aeFiredEvent *fired; //数组已就绪的文件事件aeTimeEvent *timeEventHead; //定时器链表的头结点int stop; //eventLoop的开关void *apidata; //多路复用库的私有数据(epollfd相关的数据)aeBeforeSleepProc *beforesleep;//在处理事件前要执行的回调函数(即是在执行epoll_wait()之前)aeBeforeSleepProc *aftersleep;//在处理事件后要执行的回调函数(即是在执行epoll_wait()之后)int flags; //设置的标识位
} aeEventLoop;//文件事件结构
typedef struct aeFileEvent {//监听事件类型即是关注的事件int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */aeFileProc *rfileProc; //读事件的回调函数aeFileProc *wfileProc; //写事件的回调函数void *clientData;
} aeFileEvent;//已就绪的事件
typedef struct aeFiredEvent {int fd; //已就绪文件描述符int mask; //事件类型
} aeFiredEvent;2.aeFileEvent用来管理要注册的IO事件
mask是需要注册的事件类型。其就是这几个事件经过或运算后的掩码包括AE_READABLE可读事件、AE_WRITABLE可写事件和AE_BARRIER该事件可以实现读写事件处理顺序的反转)。(AE_BARRIER可以先不了解没有影响的)clientData是万能指针void*,是执行回调处理函数的参数数据。rfileProc和wfileProc是回调函数分别在AE_READABLE和AE_WRITABLE类型时进行回调使用的。 要说说回调函数的类型。
//ae.h
//回调函数类型
//用c11表示的话 using aeFileProcstd::functionvoid(aeEventLoop* ,int,void*,int);//IO读写事件回调函数
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
//定时器事件回调函数
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
//删除定时事件的回调函数
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
//进入循环等待之前的回调函数
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop); 3.aeFiredEvent用来管理已经发生的IO事件
fd是已事件就绪的文件描述符mask是返回的已发生事件的事件类型通过epoll_wait()返回已发生的事件 4.时间事件是aeTimeEventtimeEventHead是注册的时间事件列表(链表)
每个时间事件都有一个事件idaeEventLoop中的timeEventNextId是下一个要注册的时间事件id。when_sec、when_ms是时间事件(定时器任务)的发生时间timeProc是时间事件的处理回调函数。说明aeTimeProc 需要返回一个 int 值代表下次该超时事件触发的时间间隔。如果返回 - 1则说明超时时间不需要再触发了标记为删除即可finalizerProc是时间事件要删除时的处理函数
//ae.h
//时间事件结构
typedef struct aeTimeEvent {long long id; //时间事件的唯一标识符自增//事件的到达时间(即是执行时间)long when_sec; /* seconds */long when_ms; /* milliseconds *///事件处理函数 (到期执行的回调函数)aeTimeProc *timeProc;//事件释放函数 (回调函数)aeEventFinalizerProc *finalizerProc;void *clientData; //多路复用库的私有数据//双向链表struct aeTimeEvent *prev; struct aeTimeEvent *next;int refcount; //以防止计时器事件在递归时间事件调用中释放
} aeTimeEvent; 那么我们接着看回main()函数中的关于网络的部分。
initServer()函数
void initServer(void) {//省略了很多不相关的代码.......//创建evEventLoop ,也可以说是创建epoll//server.el是aeEventLoop*类型server.el aeCreateEventLoop(server.maxclientsCONFIG_FDSET_INCR);//ipfd是服务器端fd,是数组因为有IPV4和IPV4,listenToPort(server.port,server.ipfd,server.ipfd_count);//函数内部调用sokcet(),bind(),listen()...for (j 0; j server.ipfd_count; j) {aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler,NULL);}
}
3.事件驱动初始化
1.aeCreateEventLoop()函数创建EventLoop。
其主要4步
1.分配内存给eventloop;
2.eventloop的成员进行初始化
3.创建epollfd(调用epoll_create());
4.初始化要关注的事件类型初始化阶段什么时间也没有关注。
aeEventLoop *aeCreateEventLoop(int setsize) {aeEventLoop *eventLoop;int i;//1.分配内存 zmalloc()函数是Redis封装的函数可以理解为是malloc()if ((eventLoop zmalloc(sizeof(*eventLoop))) NULL) goto err;//创建数组可见数组的长度就是setsizeeventLoop-events zmalloc(sizeof(aeFileEvent)*setsize);eventLoop-fired zmalloc(sizeof(aeFiredEvent)*setsize);if (eventLoop-events NULL || eventLoop-fired NULL) goto err;//2.初始化eventLoop-setsize setsize;eventLoop-lastTime time(NULL);eventLoop-timeEventHead NULL;eventLoop-timeEventNextId 0;eventLoop-stop 0;eventLoop-maxfd -1;eventLoop-beforesleep NULL;eventLoop-aftersleep NULL;eventLoop-flags 0;//3.创建epollfdif (aeApiCreate(eventLoop) -1) goto err;/* Events with mask AE_NONE are not set. So lets initialize the* vector with it. *//** 可以看到数组长度就是setsize同时创建之后将每一个event的mask属性置为AE_NONE(即是0)*对于eventLoop-events数组来说fd就是这个数组的下标。*例如当程序刚刚启动时候创建监听套接字按照标准规定该fd的值为3。此时就直接在 eventLoop-events下标为3的元素中存放相应event数据。*不过也基于文件描述符的这些特点意味着events数组的前三位一定不会有相应的fd赋值。*///4.初始化什么事件也没有关注maskAE_NONEfor (i 0; i setsize; i)eventLoop-events[i].mask AE_NONE;return eventLoop;err:if (eventLoop) {zfree(eventLoop-events);zfree(eventLoop-fired);zfree(eventLoop);}return NULL;
}
2.IO多路复用epoll的封装 epoll的封装都在ae_epoll.c文件中。这个是封装了I/O多路复用。
主要是封装一些epoll的函数(epoll_create,epoll_ctl)。
aeEventLoop中的apidata在epoll中表示为aeApiState。结构体aeApiState中的epfd为epoll的fdevents表示接受事件循环epoll_wait返回的触发读写的网络事件。 int aeApiCreate(aeEventLoop *eventLoop): 调用epoll_create()得到epfd并将aeApiState数据赋值给eventLoop-apidata。
int aeApiResize(aeEventLoop *eventLoop, int setsize): 重置eventLoop-apidata的events的大小。
int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask): 调用epeoll_ctl()根据mask对fd进行添加或修改。
void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) 调用epoll_ctl(),对fd进行删除。 int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) 调用epoll_wait(),得到已就绪的events并将已就绪的events赋值给eventLoop-fired。
//ae_epoll.c
typedef struct aeApiState {int epfd; //这个是epoll_create()返回来的fdstruct epoll_event *events; //用在epoll_wait(int epfd,epoll_event* events)函数内的参数
} aeApiState;//创建epollfd调用epoll_create()
static int aeApiCreate(aeEventLoop *eventLoop) {aeApiState *state zmalloc(sizeof(aeApiState));if (!state) return -1;state-events zmalloc(sizeof(struct epoll_event)*eventLoop-setsize);if (!state-events) {zfree(state);return -1;}state-epfd epoll_create(1024); /* 1024 is just a hint for the kernel */if (state-epfd -1) {zfree(state-events);zfree(state);return -1;}eventLoop-apidata state;//epoll的数据aeApiStatereturn 0;
}//重置events的大小
static int aeApiResize(aeEventLoop *eventLoop, int setsize) {aeApiState *state eventLoop-apidata;state-events zrealloc(state-events, sizeof(struct epoll_event)*setsize);return 0;
}//添加fd到epoll上调用epoll_ctl()
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {aeApiState *state eventLoop-apidata;struct epoll_event ee {0}; /* avoid valgrind warning *//* If the fd was already monitored for some event, we need a MOD* operation. Otherwise we need an ADD operation. */int op eventLoop-events[fd].mask AE_NONE ?EPOLL_CTL_ADD : EPOLL_CTL_MOD;ee.events 0;mask | eventLoop-events[fd].mask; /* Merge old events */if (mask AE_READABLE) ee.events | EPOLLIN;if (mask AE_WRITABLE) ee.events | EPOLLOUT;ee.data.fd fd;if (epoll_ctl(state-epfd,op,fd,ee) -1) return -1;return 0;
}static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {aeApiState *state eventLoop-apidata;struct epoll_event ee {0}; /* avoid valgrind warning */int mask eventLoop-events[fd].mask (~delmask);ee.events 0;if (mask AE_READABLE) ee.events | EPOLLIN;if (mask AE_WRITABLE) ee.events | EPOLLOUT;ee.data.fd fd;if (mask ! AE_NONE) {epoll_ctl(state-epfd,EPOLL_CTL_MOD,fd,ee);} else {/* Note, Kernel 2.6.9 requires a non null event pointer even for* EPOLL_CTL_DEL. */epoll_ctl(state-epfd,EPOLL_CTL_DEL,fd,ee);}
}static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {aeApiState *state eventLoop-apidata;int retval, numevents 0;retval epoll_wait(state-epfd,state-events,eventLoop-setsize,tvp ? (tvp-tv_sec*1000 tvp-tv_usec/1000) : -1);if (retval 0) {int j;numevents retval;for (j 0; j numevents; j) {int mask 0;struct epoll_event *e state-eventsj;if (e-events EPOLLIN) mask | AE_READABLE;if (e-events EPOLLOUT) mask | AE_WRITABLE;if (e-events EPOLLERR) mask | AE_WRITABLE|AE_READABLE;if (e-events EPOLLHUP) mask | AE_WRITABLE|AE_READABLE;//赋给已就绪的fired事件eventLoop-fired[j].fd e-data.fd;eventLoop-fired[j].mask mask;}}return numevents;
}
创建好EventLoop后需要进行服务器的初始化调用socket(),bind()等等操作。
4.listenToPort函数,进行服务器的常规初始化操作(socket,bind,listen)
bindaddr是个元素是char*的数组元素是服务器需要绑定的ip。
为了便于分析假定server.bindaddr_count是0,那么IPv4和IPv6都需要绑定。
//in server.c
//调用例子 listenToPort(server.port,server.ipfd,server.ipfd_count)
int listenToPort(int port, int *fds, int *count) {int j;if (server.bindaddr_count 0) server.bindaddr[0] NULL;for (j 0; j server.bindaddr_count || j 0; j) {if (server.bindaddr[j] NULL) {/* Bind * for both IPv6 and IPv4, we enter here only if* server.bindaddr_count 0. */fds[*count] anetTcp6Server(server.neterr,port,NULL,server.tcp_backlog);if (fds[*count] ! ANET_ERR) {anetNonBlock(NULL,fds[*count]); //设置非阻塞(*count);} if (*count 1) {/* Bind the IPv4 address as well. */fds[*count] anetTcpServer(server.neterr,port,NULL,server.tcp_backlog);if (fds[*count] ! ANET_ERR) {anetNonBlock(NULL,fds[*count]);(*count);} }} if (fds[*count] ANET_ERR) {//省略一些错误处理和打印日志return C_ERR;}anetNonBlock(NULL,fds[*count]);(*count);}return C_OK;
} 这里需要调用anetTcpServer()函数和anetTcp6Server()函数。
//in anet.c
int anetTcpServer(char *err, int port, char *bindaddr, int backlog)
{return _anetTcpServer(err, port, bindaddr, AF_INET, backlog);
}int anetTcp6Server(char *err, int port, char *bindaddr, int backlog)
{return _anetTcpServer(err, port, bindaddr, AF_INET6, backlog);
}static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
{int s -1, rv;char _port[6]; /* strlen(65535) */struct addrinfo hints, *servinfo, *p;snprintf(_port,6,%d,port);memset(hints,0,sizeof(hints));hints.ai_family af;hints.ai_socktype SOCK_STREAM;hints.ai_flags AI_PASSIVE; /* No effect if bindaddr ! NULL */if ((rv getaddrinfo(bindaddr,_port,hints,servinfo)) ! 0) {anetSetError(err, %s, gai_strerror(rv));return ANET_ERR;}/* getaddrinfo() returns a list of address structures.Try each address until we successfully bind(2).If socket(2) (or bind(2)) fails, we (close the socketand) try the next address. *///其中有绑定一个成功就会跳出for循环的for (p servinfo; p ! NULL; p p-ai_next) {if ((s socket(p-ai_family,p-ai_socktype,p-ai_protocol)) -1) //创建sockdfdcontinue;if (af AF_INET6 anetV6Only(err,s) ANET_ERR) goto error;if (anetSetReuseAddr(err,s) ANET_ERR) goto error; //设置端口服用if (anetListen(err,s,p-ai_addr,p-ai_addrlen,backlog) ANET_ERR) s ANET_ERR; //绑定并开始监听goto end;}if (p NULL) {anetSetError(err, unable to bind socket, errno: %d, errno);goto error;}error:if (s ! -1) close(s);s ANET_ERR;
end:freeaddrinfo(servinfo);return s;
}5.创建服务器的FileEvent并注册到eventLoop中
绑定并进行listenn()监听后就到了aeCreateFileEvent(),进行事件注册添加FileEvent到eventLoop中。
事件一共有两类
IO事件注册与删除aeCreateFileEventaeDeleteFileEvent时间事件注册与删除aeCreateTimeEventaeDeleteTimeEvent IO事件的注册通过aeApiAddEvent函数将套接字及其事件处理函数注册到epoll中。
//in ae.c
//根据mask参数的值 创建文件事件
// 监听fd 文件的状态
//当fd可用时执行proc函数(即是回调函数)
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData)
{if (fd eventLoop-setsize) {errno ERANGE;return AE_ERR;}aeFileEvent *fe eventLoop-events[fd];if (aeApiAddEvent(eventLoop, fd, mask) -1)//使用epoll的话即是调用epoll_ctlreturn AE_ERR;//设置文件事件类型以及事件的处理器(即是设置回调函数)fe-mask | mask;if (mask AE_READABLE) fe-rfileProc proc;if (mask AE_WRITABLE) fe-wfileProc proc;fe-clientData clientData;if (fd eventLoop-maxfd) //若符合则更新maxfdeventLoop-maxfd fd;return AE_OK;
}void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{if (fd eventLoop-setsize) return;aeFileEvent *fe eventLoop-events[fd];if (fe-mask AE_NONE) return;/* We want to always remove AE_BARRIER if set when AE_WRITABLE* is removed. */if (mask AE_WRITABLE) mask | AE_BARRIER;aeApiDelEvent(eventLoop, fd, mask); //调用epoll_ctlr(del)fe-mask fe-mask (~mask);if (fd eventLoop-maxfd fe-mask AE_NONE) {/* Update the max fd */int j;for (j eventLoop-maxfd-1; j 0; j--) //判断最大的fd的maskif (eventLoop-events[j].mask ! AE_NONE) break;eventLoop-maxfd j;}
}//创建时间事件
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,aeTimeProc *proc, void *clientData,aeEventFinalizerProc *finalizerProc)
{long long id eventLoop-timeEventNextId; //更新时间计数器这个id是时间事件的唯一标识符aeTimeEvent *te;te zmalloc(sizeof(*te));if (te NULL) return AE_ERR;te-id id;aeAddMillisecondsToNow(milliseconds,te-when_sec,te-when_ms); //设置到期时间//设置事件处理器即是设置回调函数te-timeProc proc;te-finalizerProc finalizerProc;//设置私有数据te-clientData clientData;te-prev NULL;te-next eventLoop-timeEventHead; //将新事件放入表头te-refcount 0;if (te-next)te-next-prev te;eventLoop-timeEventHead te;return id;
}int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
{aeTimeEvent *te eventLoop-timeEventHead; //这是个链表while(te) { //需要从头找到尾if (te-id id) {te-id AE_DELETED_EVENT_ID;return AE_OK;}te te-next;}return AE_ERR; /* NO event with the specified ID found */
}
那么现在通过 aeCreateFileEvent(server.el,server.ipfd[j],AE_READABLE,acceptTcpHandler,NULL);将服务器的监听fd和其事件回调函数acceptTcpHandler注册到epoll中并监听读事件。(acceptTcpHandler函数先不细讲这个函数肯定是有调用accept()的)
6.进行事件循环
启动aeMain函数阻塞等待事件发生并处理。
//文件事件 1
#define AE_FILE_EVENTS (10)
//时间事件 2
#define AE_TIME_EVENTS (11)
//文件事件和时间事件 4
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)
//不阻塞等待标识 8
#define AE_DONT_WAIT (12)#define AE_CALL_BEFORE_SLEEP (13)
#define AE_CALL_AFTER_SLEEP (14)void aeMain(aeEventLoop *eventLoop) {eventLoop-stop 0;while (!eventLoop-stop) {aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);}
}
该函数就是一个while()循环循环内部是aeProcessEvents函数。
下面注重来看下aeProcessEvents第一个参数是要处理的事件驱动框架第二个参数是要处理的事件类型在aeMain中表示处理包括IO事件和时间事件在内的所有事件以及回调前置函数beforesleep和回调后置函数aftersleep。
aeProcessEvents的主要步骤
回调函数beforsleep和aftersleep可以先省略不关注的。因为目前其还没有用途先知道有这两个回调函数就行。
1.计算epoll_wait()需要的阻塞时间。 如果是设置了AF_DONT_WAIT,那就是不阻塞epoll_wait()的超时时间就设置为0。如有定时器任务那么其阻塞时间即是定时器的最早超时时间这样可以防止定时器任务等待过久。
若是没有定时器任务那就永远等待下去直到有事件被触发。
2.执行beforsleep 在epoll_wait()阻塞之前执行一些任务防止因为阻塞时间过长而无法执行或者执行一些准备工作。
3.epoll_wait等待事件发生。
4.执行aftersleep。
5.处理发生的IO事件。
根据发生的事件类型来调用对应的回调函数。若是AE_READABLE类型调用rfileProc若是AE_WRITABLE类型调用wfileProc。一般来说先处理AE_READABLE类型事件该类事件一般为客户端连接或者命令然后处理AE_WRITABLE类型事件向客户端发送响应对于客户端的回复一般在beforesleep中就会执行完成
6.处理时间事件。若该时间事件时周期性的执行完后会再添加到时间事件链表的。
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{int processed 0, numevents;//没有时间事件和文件事件退出if (!(flags AE_TIME_EVENTS) !(flags AE_FILE_EVENTS)) return 0;//1.计算epoll_wait()需要的阻塞时间if (eventLoop-maxfd ! -1 ||((flags AE_TIME_EVENTS) !(flags AE_DONT_WAIT))) {int j;aeTimeEvent *shortest NULL;struct timeval tv, *tvp;if (flags AE_TIME_EVENTS !(flags AE_DONT_WAIT)) //表示有时间事件且需要阻塞epoll_wait()设置的时间就不是0shortest aeSearchNearestTimer(eventLoop); //找到最近的定时器超时时间if (shortest) {long now_sec, now_ms;aeGetTime(now_sec, now_ms);tvp tv;//定时器的到期时间,毫秒数long long ms (shortest-when_sec - now_sec)*1000 shortest-when_ms - now_ms;if (ms 0) {tvp-tv_sec ms/1000;tvp-tv_usec (ms % 1000)*1000;} else {tvp-tv_sec 0;tvp-tv_usec 0;}} else {//不需要阻塞if (flags AE_DONT_WAIT) {tv.tv_sec tv.tv_usec 0;tvp tv;} else {/* Otherwise we can block */tvp NULL; /* wait forever */}}//再次确认if (eventLoop-flags AE_DONT_WAIT) {tv.tv_sec tv.tv_usec 0;tvp tv;}//2.执行beforesleepif (eventLoop-beforesleep ! NULL flags AE_CALL_BEFORE_SLEEP)eventLoop-beforesleep(eventLoop); //在休眠前执行即是在epoll_wait()前)/* Call the multiplexing API, will return only on timeout or when* some event fires. *///3.执行epoll_waitnumevents aeApiPoll(eventLoop, tvp);//4.执行aftersleep/* After sleep callback. */if (eventLoop-aftersleep ! NULL flags AE_CALL_AFTER_SLEEP)eventLoop-aftersleep(eventLoop);//在休眠后执行即是在epoll_wait()之后)//5.逐个处理触发的事件for (j 0; j numevents; j) {aeFileEvent *fe eventLoop-events[eventLoop-fired[j].fd];int mask eventLoop-fired[j].mask; //这个是epoll_wait()返回的触发事件类型int fd eventLoop-fired[j].fd;int fired 0; /* Number of events fired for current fd. */int invert fe-mask AE_BARRIER;//触发可读事件if (!invert fe-mask mask AE_READABLE) {fe-rfileProc(eventLoop,fd,fe-clientData,mask);fired;fe eventLoop-events[fd]; /* Refresh in case of resize. */}//可写事件if (fe-mask mask AE_WRITABLE) {if (!fired || fe-wfileProc ! fe-rfileProc) {fe-wfileProc(eventLoop,fd,fe-clientData,mask);fired;}}/* If we have to invert the call, fire the readable event now* after the writable one. */if (invert) {fe eventLoop-events[fd]; /* Refresh in case of resize. */if ((fe-mask mask AE_READABLE) (!fired || fe-wfileProc ! fe-rfileProc)){fe-rfileProc(eventLoop,fd,fe-clientData,mask);fired;}}processed;}}//6.处理时间事件if (flags AE_TIME_EVENTS)processed processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */
}