前置学习:Redis server启动源码-CSDN博客
Redis采用单线程Reactor模型
三个关键角色,即 reactor、acceptor、handler
三类处理事件,即连接事件、写事件、读事件。
建立连接(Acceptor)、监听accept、read、write事件(Reactor)、处理事件(Handler)
概念性的东西,实际简单一些,看源码简单些,定义事件、事件监听、事件分发处理。
注册事件
aeCreateEventLoop
初始化EventLoop,添加文件事件源、已就绪文件事件源、epoll事件源。
注册文件事件源、已就绪文件事件源
1、定义文件事件结构和已就绪事件结构
/* File event structure** 文件事件结构*/
typedef struct aeFileEvent {// 监听事件类型掩码,// 值可以是 AE_READABLE 或 AE_WRITABLE ,// 或者 AE_READABLE | AE_WRITABLEint mask; /* one of AE_(READABLE|WRITABLE) */// 读事件处理器aeFileProc *rfileProc;// 写事件处理器aeFileProc *wfileProc;// 多路复用库的私有数据void *clientData;} aeFileEvent;
/* A fired event** 已就绪事件*/
typedef struct aeFiredEvent {// 已就绪文件描述符int fd;// 事件类型掩码,// 值可以是 AE_READABLE 或 AE_WRITABLE// 或者是两者的或int mask;} aeFiredEvent;
2、初始化事件源(EventLoop),并将文件事件结构和已就绪事件结构加入事件源中。
/* State of an event based program ** 事件处理器的状态*/
typedef struct aeEventLoop {// 目前已注册的最大描述符int maxfd; /* highest file descriptor currently registered */// 目前已追踪的最大描述符int setsize; /* max number of file descriptors tracked */// 用于生成时间事件 idlong long timeEventNextId;// 最后一次执行时间事件的时间time_t lastTime; /* Used to detect system clock skew */// 已注册的文件事件aeFileEvent *events; /* Registered events */// 已就绪的文件事件aeFiredEvent *fired; /* Fired events */// 时间事件aeTimeEvent *timeEventHead;// 事件处理器的开关int stop;// 多路复用库的私有数据void *apidata; /* This is used for polling API specific data */// 在处理事件前要执行的函数aeBeforeSleepProc *beforesleep;} aeEventLoop;/** 初始化事件处理器状态*/
aeEventLoop *aeCreateEventLoop(int setsize) {aeEventLoop *eventLoop;int i;aeMain// 创建事件状态结构if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;// 初始化文件事件结构和已就绪文件事件结构数组eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;// 设置数组大小eventLoop->setsize = setsize;// 初始化执行最近一次执行时间eventLoop->lastTime = time(NULL);// 初始化时间事件结构eventLoop->timeEventHead = NULL;eventLoop->timeEventNextId = 0;eventLoop->stop = 0;eventLoop->maxfd = -1;eventLoop->beforesleep = NULL;if (aeApiCreate(eventLoop) == -1) goto err;/* Events with mask == AE_NONE are not set. So let's initialize the* vector with it. */// 初始化监听事件for (i = 0; i < setsize; i++)eventLoop->events[i].mask = AE_NONE;// 返回事件循环return eventLoop;err:if (eventLoop) {zfree(eventLoop->events);zfree(eventLoop->fired);zfree(eventLoop);}return NULL;
}
注册Epoll事件
1、定义Epoll事件结构体
/** 事件状态*/
typedef struct aeApiState {// epoll_event 实例描述符int epfd;// 事件槽struct epoll_event *events;} aeApiState;
2、添加epoll事件源到EventLoop中
/** 创建一个新的 epoll 实例,并将它赋值给 eventLoop*/
static int aeApiCreate(aeEventLoop *eventLoop) {aeApiState *state = zmalloc(sizeof(aeApiState));if (!state) return -1;// 初始化事件槽空间state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);if (!state->events) {zfree(state);return -1;}// 创建 epoll 实例state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */if (state->epfd == -1) {zfree(state->events);zfree(state);return -1;}// 赋值给 eventLoopeventLoop->apidata = state;return 0;
}
aeCreateFileEvent
初始化事件回调,往Eventloop里面添加读事件的处理回调,写事件的处理回调,Epoll事件的处理回调。
/** 根据 mask 参数的值,监听 fd 文件的状态,* 当 fd 可用时,执行 proc 函数*/
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData)
{if (fd >= eventLoop->setsize) {errno = ERANGE;return AE_ERR;}if (fd >= eventLoop->setsize) return AE_ERR;// 取出文件事件结构aeFileEvent *fe = &eventLoop->events[fd];// 监听指定 fd 的指定事件if (aeApiAddEvent(eventLoop, fd, mask) == -1)return AE_ERR;// 设置文件事件类型,以及事件的处理器fe->mask |= mask;if (mask & AE_READABLE) fe->rfileProc = proc;if (mask & AE_WRITABLE) fe->wfileProc = proc;// 私有数据fe->clientData = clientData;// 如果有需要,更新事件处理器的最大 fdif (fd > eventLoop->maxfd)eventLoop->maxfd = fd;return AE_OK;
}static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {aeApiState *state = eventLoop->apidata;struct epoll_event ee;/* If the fd was already monitored for some event, we need a MOD* operation. Otherwise we need an ADD operation. ** 如果 fd 没有关联任何事件,那么这是一个 ADD 操作。** 如果已经关联了某个/某些事件,那么这是一个 MOD 操作。*/int op = eventLoop->events[fd].mask == AE_NONE ?EPOLL_CTL_ADD : EPOLL_CTL_MOD;// 注册事件到 epollee.events = 0;mask |= eventLoop->events[fd].mask; /* Merge old events */if (mask & AE_READABLE) ee.events |= EPOLLIN;if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;ee.data.u64 = 0; /* avoid valgrind warning */ee.data.fd = fd;if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;return 0;
}
aeCreateTimeEvent
1、定义时间事件结构
/* Time event structure** 时间事件结构*/
typedef struct aeTimeEvent {// 时间事件的唯一标识符long long id; /* time event identifier. */// 事件的到达时间long when_sec; /* seconds */long when_ms; /* milliseconds */// 事件处理函数aeTimeProc *timeProc;// 事件释放函数aeEventFinalizerProc *finalizerProc;// 多路复用库的私有数据void *clientData;// 指向下个时间事件结构,形成链表struct aeTimeEvent *next;} aeTimeEvent;
2、往EventLoop添加事件源回调处理方法。
/** 创建时间事件*/
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,aeTimeProc *proc, void *clientData,aeEventFinalizerProc *finalizerProc)
{// 更新时间计数器long long id = eventLoop->timeEventNextId++;// 创建时间事件结构aeTimeEvent *te;te = zmalloc(sizeof(*te));if (te == NULL) return AE_ERR;// 设置 IDte->id = id;// 设定处理事件的时间aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);// 设置事件处理器te->timeProc = proc;te->finalizerProc = finalizerProc;// 设置私有数据te->clientData = clientData;// 将新事件放入表头te->next = eventLoop->timeEventHead;eventLoop->timeEventHead = te;return id;
}
事件分发
执行EventLoop事件源里面的所有事件回调
/* Process every pending time event, then every pending file event* (that may be registered by time event callbacks just processed).** 处理所有已到达的时间事件,以及所有已就绪的文件事件。** Without special flags the function sleeps until some file event* fires, or when the next time event occurs (if any).** 如果不传入特殊 flags 的话,那么函数睡眠直到文件事件就绪,* 或者下个时间事件到达(如果有的话)。** If flags is 0, the function does nothing and returns.* 如果 flags 为 0 ,那么函数不作动作,直接返回。** if flags has AE_ALL_EVENTS set, all the kind of events are processed.* 如果 flags 包含 AE_ALL_EVENTS ,所有类型的事件都会被处理。** if flags has AE_FILE_EVENTS set, file events are processed.* 如果 flags 包含 AE_FILE_EVENTS ,那么处理文件事件。** if flags has AE_TIME_EVENTS set, time events are processed.* 如果 flags 包含 AE_TIME_EVENTS ,那么处理时间事件。** if flags has AE_DONT_WAIT set the function returns ASAP until all* the events that's possible to process without to wait are processed.* 如果 flags 包含 AE_DONT_WAIT ,* 那么函数在处理完所有不许阻塞的事件之后,即刻返回。** The function returns the number of events processed. * 函数的返回值为已处理事件的数量*/
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{int processed = 0, numevents;/* Nothing to do? return ASAP */if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;/* Note that we want call select() even if there are no* file events to process as long as we want to process time* events, in order to sleep until the next time event is ready* to fire. */if (eventLoop->maxfd != -1 ||((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {int j;aeTimeEvent *shortest = NULL;struct timeval tv, *tvp;// 获取最近的时间事件if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))shortest = aeSearchNearestTimer(eventLoop);if (shortest) {// 如果时间事件存在的话// 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间long now_sec, now_ms;/* Calculate the time missing for the nearest* timer to fire. */// 计算距今最近的时间事件还要多久才能达到// 并将该时间距保存在 tv 结构中aeGetTime(&now_sec, &now_ms);tvp = &tv;tvp->tv_sec = shortest->when_sec - now_sec;if (shortest->when_ms < now_ms) {tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;tvp->tv_sec --;} else {tvp->tv_usec = (shortest->when_ms - now_ms)*1000;}// 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞)if (tvp->tv_sec < 0) tvp->tv_sec = 0;if (tvp->tv_usec < 0) tvp->tv_usec = 0;} else {// 执行到这一步,说明没有时间事件// 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度/* If we have to check for events but need to return* ASAP because of AE_DONT_WAIT we need to set the timeout* to zero */if (flags & AE_DONT_WAIT) {// 设置文件事件不阻塞tv.tv_sec = tv.tv_usec = 0;tvp = &tv;} else {/* Otherwise we can block */// 文件事件可以阻塞直到有事件到达为止tvp = NULL; /* wait forever */}}// 处理文件事件,阻塞时间由 tvp 决定numevents = aeApiPoll(eventLoop, tvp);for (j = 0; j < numevents; j++) {// 从已就绪数组中获取事件aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];int mask = eventLoop->fired[j].mask;int fd = eventLoop->fired[j].fd;int rfired = 0;/* note the fe->mask & mask & ... code: maybe an already processed* event removed an element that fired and we still didn't* processed, so we check if the event is still valid. */// 读事件if (fe->mask & mask & AE_READABLE) {// rfired 确保读/写事件只能执行其中一个rfired = 1;fe->rfileProc(eventLoop,fd,fe->clientData,mask);}// 写事件if (fe->mask & mask & AE_WRITABLE) {if (!rfired || fe->wfileProc != fe->rfileProc)fe->wfileProc(eventLoop,fd,fe->clientData,mask);}processed++;}}/* Check time events */// 执行时间事件if (flags & AE_TIME_EVENTS)processed += processTimeEvents(eventLoop);return processed; /* return the number of processed file/time events */
}