# redis-07-事件处理

在redis-cli中输入set key hello 回车后,redis服务端发生了什么?这次我们就一起来看看。 本次使用的redis源码版本为:3.0.0

# 了解两个问题,两个概念

# 1 redis是单线程的?

redis对数据事件的处理是单线程的,但它也有其他的线程,比如用于同步的,用于持久化的。

# 2 redis为什么那么快?

(1). redis的数据处理都是在内存里进行。

(2).单线程操作,省去了上下文切换,线程间竞争的资源消耗。

(3).用了IO多路复用技术,包装了select 、 epoll 、 evport 和 kqueue函数库,在源码中对应的文件为ae_select.c 、 ae_epoll.c 、ae_evport.c 、 ae_kqueue.c。在ae.c中定义了宏,默认选择性能高的方案,select是所有系统都支持的,用它保底。

#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

# 3 事件分类

Redis 服务器是一个事件驱动程序, 服务器处理的事件分为时间事件文件事件两类。

文件事件分为 AE_READABLE 事件(读事件)和 AE_WRITABLE 事件(写事件)两类

时间事件分为定时事件和周期性事件: 定时事件只在指定的时间达到一次, 而周期性事件则每隔一段时间到达一次。

文件事件和时间事件之间是合作关系, 服务器会轮流处理这两种事件, 并且处理事件的过程中也不会进行抢占。

# 4 文件事件处理器

文件事件处理器由套接字、 I/O 多路复用程序、 文件事件分派器(dispatcher)、 以及事件处理器组成。

rdis1.png

过程如下:

  1. 套接字产生事件后,通过I/O 多路复用程序入队到一个队列里面
  2. 通过这个队列, 以有序(sequentially)、同步(synchronously)、每次一个套接字的方式向文件事件分派器传送

redis2.png 3. 文件事物分派器根据套接字的事件类型,分派给不同的事件处理器 4. 事件处理器进行处理,最常用的是连接应答处理器、 命令请求处理器和命令回复处理器。

# 主流程

回到最开始的问题,从客户端运行一个命令到最后得到结果的整个过程如下:

  1. 客户端向服务器发起连接
  2. 服务端监听到AE_READABLE 事件,触发连接应答处理器,对客户端的连接请求进行应答,创建客户端套接字和客户端状态,并将客户端套接字的 AE_READABLE 事件与命令请求处理器进行关联
  3. 客户端发送命令,生产AE_READABLE 事件,引发命令请求处理器执行
  4. 执行后会产生相应的命令回复,服务器将客户端套接字的 AE_WRITABLE 事件与命令回复处理器进行关联: 当客户端尝试读取命令回复的时候, 客户端套接字将产生 AE_WRITABLE 事件, 触发命令回复处理器执行, 当命令回复处理器将命令回复全部写入到套接字之后, 服务器就会解除客户端套接字的 AE_WRITABLE 事件与命令回复处理器之间的关联。

# 源码分析

我们再来看下redis的源码,分析下redis对事件的处理。

# aeEventLoop

aeEventLoop在redis中负责文件事件和时间事件的结构体,是redis事件处理的核心部分,在ae.h的87行到98行,内容如下:

typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;

里面有三个事件数组:

aeFileEvent:文件事件

aeFiredEvent:待处理的文件事件

aeTimeEvent:时间事件

# 事件处理过程

redis启动后,实现会初始化一些配置,最后调用aeMain函数,陷入aeEventLoop循环,等待外部事件: redis.c

int main(int argc, char **argv) {
    struct timeval tv;
    ...
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeMain(server.el); /* 调用aeMain函数 */
    aeDeleteEventLoop(server.el);
    return 0;
}

aeMain函数位于ae.c中,里面有个while循环,会不停地调用aeProcessEvents对事件进行处理

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

启动redis后,通过strace跟踪下,会发现它在不断地循环:

[root@10 sys]# strace -p 3466
strace: Process 3466 attached
epoll_wait(3, [], 10128, 99)            = 0
open("/proc/3466/stat", O_RDONLY)       = 5
read(5, "3466 (redis-server) R 1 3466 346"..., 4096) = 360
close(5)                                = 0
epoll_wait(3, [], 10128, 100)           = 0
open("/proc/3466/stat", O_RDONLY)       = 5
read(5, "3466 (redis-server) R 1 3466 346"..., 4096) = 360
close(5)                                = 0
epoll_wait(3, [], 10128, 100)           = 0
open("/proc/3466/stat", O_RDONLY)       = 5
read(5, "3466 (redis-server) R 1 3466 346"..., 4096) = 360
close(5)                                = 0
epoll_wait(3, [], 10128, 100)           = 0
open("/proc/3466/stat", O_RDONLY)       = 5
read(5, "3466 (redis-server) R 1 3466 346"..., 4096) = 360
close(5)                                = 0
epoll_wait(3, [], 10128, 100)           = 0
open("/proc/3466/stat", O_RDONLY)       = 5
read(5, "3466 (redis-server) R 1 3466 346"..., 4096) = 360
close(5)                                = 0
epoll_wait(3, [], 10128, 100)           = 0
open("/proc/3466/stat", O_RDONLY)       = 5
read(5, "3466 (redis-server) R 1 3466 346"..., 4096) = 360

aeProcessEvents函数位于ae.c的352行-426行,内容如下:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;

            /* Calculate the time missing for the nearest
             * timer to fire. */
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

	    /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

371行 if (shortest) { 到 398行 } 是对io多路复用的时间处理。

从400行 numevents开始对文件事件时间事件进行处理。

# 文件事件

if (fe->mask & mask & AE_READABLE) {
    rfired = 1;
    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
    if (!rfired || fe->wfileProc != fe->rfileProc)
        fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}

可以看到是先处理读事件,再处理写事件。其中的rfileProc和wfileProc就是在文件事件被创建时传入的函数指针:aeCreateFileEvent,内容如下

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

# 时间事件

时间事件的处理在 processTimeEvents 中进行:

static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;
    time_t now = time(NULL);

    /* If the system clock is moved to the future, and then set back to the
     * right value, time events may be delayed in a random way. Often this
     * means that scheduled operations will not be performed soon enough.
     *
     * Here we try to detect system clock skews, and force all the time
     * events to be processed ASAP when this happens: the idea is that
     * processing events earlier is less dangerous than delaying them
     * indefinitely, and practice suggests it is. */
    if (now < eventLoop->lastTime) {
        te = eventLoop->timeEventHead;
        while(te) {
            te->when_sec = 0;
            te = te->next;
        }
    }
    eventLoop->lastTime = now;

    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    while(te) {
        long now_sec, now_ms;
        long long id;

        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        aeGetTime(&now_sec, &now_ms);
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;

            id = te->id;
            retval = te->timeProc(eventLoop, id, te->clientData);
            processed++;
            /* After an event is processed our time event list may
             * no longer be the same, so we restart from head.
             * Still we make sure to don't process events registered
             * by event handlers itself in order to don't loop forever.
             * To do so we saved the max ID we want to handle.
             *
             * FUTURE OPTIMIZATIONS:
             * Note that this is NOT great algorithmically. Redis uses
             * a single time event so it's not a problem but the right
             * way to do this is to add the new elements on head, and
             * to flag deleted elements in a special way for later
             * deletion (putting references to the nodes to delete into
             * another linked list). */
            if (retval != AE_NOMORE) {
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                aeDeleteTimeEvent(eventLoop, id);
            }
            te = eventLoop->timeEventHead;
        } else {
            te = te->next;
        }
    }
    return processed;
}

# 参考:

文件事件

redis中的事件循环