Low level IO multiplex
Cross-plateform and static polymorphism
1、Redis ae是能够支持多个Unix-like OS的,这就是本节标题"cross-plateform"的含义
2、由于各个Unix-like OS的IO multiplex/polling system call并不统一,因此Redis ae在各种implementation上建立了一层抽象层(interface)
3、通过static polymorphism来实现interface到implementation的dispatch
Static polymorphism
Static polymorphism的实现依赖于如下:
1、plateform detection macro
2、void pointer type erasure(generic programming)
/* State of an event based program */
typedef struct aeEventLoop {
void *apidata; /* This is used for polling API specific data */
};
3、建立abstraction(interface)、上层依赖于abstraction而不是implementation
Interface
struct aeApiState
保存IO multiplex/polling object的resource handle
int aeApiCreate(aeEventLoop *eventLoop)
重要是创建 struct aeApiState
object,并将struct aeApiState
object存放到eventLoop.apidata
。
它会调用OS IO multiplex/polling system call。
int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
添加一个event。
int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
顾名思义,进行轮训。
Implementation
redis的ae的IO multiplex实现:
在ae.c
中有如下代码:
/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
在以上四个文件中,都提供了系统的API和data structure;这是c中实现类似于静态多态性的一种方式;
epoll
查看了一下source code,Redis在使用epoll
的时候,并没有指定edge trigger、level trigger,根据下面文章的内容可知,epoll默认是level trigger,也就是说,Redis使用的是level trigger。
1、chinaunix epoll默认模式(LT),几乎总是触发EPOLL-OUT事件,怎么解决?
Requests edge-triggered notification for the associated file descriptor. The default behavior for epoll is level-triggered. See epoll(7) for more detailed information about edge-triggered and level-triggered notification.
struct aeApiState
#include <sys/epoll.h>
typedef struct aeApiState {
int epfd; // epoll instance的file descriptor
struct epoll_event *events; // 用于epoll_wait中,用于接收已经出发的事件 每个file descriptor都有一个对应的epoll_event,它是一个dynamic array
} aeApiState;
aeApiCreate
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}
aeApiPoll
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
select
下面以select
为例来进行说明(因为APUE中所介绍的就是`select)。
struct aeApiState
#include <sys/select.h>
#include <string.h>
typedef struct aeApiState {
fd_set rfds, wfds; // rfds表示的是read file descriptor,wfds表示的是write file descriptor
/* We need to have a copy of the fd sets as it's not safe to reuse
* FD sets after select(). */
fd_set _rfds, _wfds;
} aeApiState;
aeApiCreate
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
FD_ZERO(&state->rfds);
FD_ZERO(&state->wfds);
eventLoop->apidata = state; //设置apidata,从上面可以,目前 仅仅关注的是read和write
return 0;
}
aeApiPoll
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, j, numevents = 0;
memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
retval = select(eventLoop->maxfd+1,
&state->_rfds,&state->_wfds,NULL,tvp);
if (retval > 0) {
for (j = 0; j <= eventLoop->maxfd; j++) { // process的file descriptor是依次递增的,所以此处可以使用for循环;
int mask = 0;
aeFileEvent *fe = &eventLoop->events[j];
if (fe->mask == AE_NONE) continue;
if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
mask |= AE_READABLE;
if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
mask |= AE_WRITABLE;
eventLoop->fired[numevents].fd = j;
eventLoop->fired[numevents].mask = mask;
numevents++;
}
}
return numevents;
}