您好,登录后才能下订单哦!
本篇内容介绍了“REdis命令处理流程处理过程是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
分析版本:REdis-5.0.4。
REdis命令处理流程可分解成三个独立的流程(不包括复制和持久化):
接受连接请求流程;
接收请求数据和处理请求流程,在这个过程并不会发送处理结果给Client,而只是将结果数据写入响应缓冲,将由响应请求流程来发送;
响应请求流程。
上述三个流程均是异步化的,并且没有直接的联系。它们的共同点均是通过REdis的简单事件驱动(AE,A simple event-driven)触发,对于Linux实际是epoll的包装,对于macOS为evport的包装,对于FreeBSD对kqueue的包装,对于其它则是select的包装。
可以把ae.h/ae.c看成是抽象基类,而ae_epoll.c、ae_select.c、ae_evport.c、ae_kqueue.c看成是ea的具体实现,以面向对象来看,大致如下图所示:
从上图可以看出,当没有任何数据时,进程将阻塞在函数aeApiPoll(对于epoll实际为epoll_wait)处直接超时。
如果有连接请求进来,或者有连接发送数据过来,或者有响应数据还未发送完成(连接变成可写),aeApiPoll均会立即从阻塞状态返回。
注意,只有fd被塞进了epoll,并没有将client或aeFileEvent塞入epoll。因此当一个连接被激活(比如有数据需要接收)时,需要通过fd来查找到aeFileEvent,而client因为在创建aeFileEvent时就被赋值给了aeFileEvent的clientData,因此只需要找到aeFileEvent即可。
全局对象server(类型为redisServer,定义在server.h中)维护了一个全局的aeEventLoop在,则aeEventLoop维护了一个aeFileEvent数组,并且aeFileEvent的数组下标为fd,因此很容易通过fd找到对应的aeFileEvent。
之所以没有将aeFileEvent直接注入到epoll,是为了统一事件驱动,比如select就不支持。在进程启动执行initServer时,会调用aeCreateEventLoop初始化该数组,数组大小大于配置项maxclients指定的值(额外加128),这利用了fd作为操作系统内核资源是循环利用的特性。
1. 接受连接请求流程
接受一个连接后,为该连接创建一个client对象,并将该client注册到epoll中,注册事件为EPLLIN(对应于ea的AE_READABLE)。
对应的伪代码:
int main() { // “ae”为“A simple event-driven”的缩写 void aeMain() { while (!eventLoop->stop) { // 响应从beforesleep开始, // 未完成部分才会走到aeApiPoll。 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); // aeProcessEvents处理各种事件,包括: // 1) 接受连接请求,为每个连接创建一个client // 2) 接收请求数据,和处理请求 // 3) 发送响应数据 // 4) 处理各类定时事件(调用processTimeEvents) int aeProcessEvents() { // aeApiPoll实为epoll或select等 aeApiPoll(); acceptTcpHandler(int fd) { // fd为listen套接字 // anetTcpAccept底层调用的是accept int cfd = anetTcpAccept(fd); acceptCommonHandler(cfd) { // createClient会将c添加server.clients中, // server.clients是一个链接。 client *c = createClient(cfd) { aeCreateFileEvent( server.el,fd,AE_READABLE, readQueryFromClient, c) { // mask值为AE_READABLE(对应于epoll的EPOLLIN), // 对于epoll实际调用的是epoll_ctl。 aeApiAddEvent(eventLoop,fd,mask); } } } } } } } }
2. 接收请求数据和处理请求流程
这一块会调用相应命令的处理函数,比如SET命令的处理函数setCommand,GET命令的处理函数getCommand。命令处理函数会修改内存数据。
并将处理的结果写入响应缓冲区,但并不立即发送给client。同时也会将处理结果写入AOF缓冲区,如果开启了AOF。以及将命令写入到复制积压缓冲区,如果有开启或有需要。还会将命令写入到slaves的缓冲区,如果需要。
响应请求在另一独立的流程中进行,本流程并不直接发送响应给client。
对应的伪代码
// 不包括响应命令,// 响应和接收处理是分开的两个过程。 int main() // server.c:4003{ // “ae”为“A simple event-driven”的缩写 void aeMain() // ae.c:496 {while (!eventLoop->stop) { // 发送响应先在beforesleep中进行, // 如果在beforesleep中没有发送完(比如响应的数据量过大), // 则后续的发送会由aeApiPoll触发。 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); int aeProcessEvents() // ae.c:358 {// aeApiPoll实为epoll或select等 aeApiPoll(); // readQueryFromClient是个回调函数,// 在创建client时注册:// client *createClient(int fd) {// aeCreateFileEvent(// server.el, fd,// AE_READABLE,// readQueryFromClient, c);// } void readQueryFromClient() // networking.c:1494{ // 这里调用read收数据 // client传过来的数据大小不能超过配置项client-query-buffer-limit指定的值。 // 默认大小为1G,足够覆盖大部场景。 // 如果超过大小,则可看到WARNING日志: // Closing client that reached max query buffer length // 实际中,一般远小于1G,所以可能将这个值调小一点,以增加对REdis的保护。 int nread = read(fd, c->querybuf, readlen); int processCommand(client*) // networking.c:2543 { redisCommand* lookupCommand(name) { // REdis所有命令存储 // 在struct redisServer的command表中: // struct redisServer { // dict *commands; // Command table // }; // 可将redisCommand看作一个C++抽象基本, // 该抽象基本定义了纯虚函数proc: // typedef void redisCommandProc(client *c); // struct redisCommand { // redisCommandProc *proc; // }; // 而command表中的每一个成员则为redisCommand的实现。 return dictFetchValue(commands,name); } void call(client*,flags) // server.c:2414{ // 回调具体的命令处理: // 如果是SET命令, // 实际调用的是t_string.c中的函数setCommand; // 如果是DEL命令, // 实际调用的是db.c中的函数delCommand。 redisCommand::proc(client*); void propagate(redisCommand*) // server.c:2315 {// 数据写入到AOF文件 feedAppendOnlyFile(); // aof.c:555 // 数据复制给所有Slaves void replicationFeedSlaves(slaves) // replication.c:173{ // 数据写入到复制积压(Backlog)缓冲区, // 注意积压缓冲区是一个循环缓冲区, // 如果满了,则从头覆盖写, // 循环缓冲区的大小, // 则配置项repl-backlog-size决定 feedReplicationBacklog(); // replication.c:126} } } } } } } } } // 以GET命令为列:// 这里的list实际为server.clients_pending_write// 所以需响应的client都添加到server.clients_pending_write链表中(可视为队列)// struct redisServer server; // Server global state#0 listAddNodeHead (list=0x7fe88bc0f210, value=0x7fe88bc64ec0) at adlist.c:92// 并不是所有的命令都需要WriteHandler,// 因此有些并不会调用clientInstallWriteHandler。#1 in clientInstallWriteHandler (c=0x7fe88bc64ec0) at networking.c:185#2 in prepareClientToWrite (c=0x7fe88bc64ec0) at networking.c:228#3 in addReplyString (c=0x7fe88bc64ec0, s=0x7ffdfc2e70c0 "$855\r\n", len=6) at networking.c:338#4 in addReplyLongLongWithPrefix (c=0x7fe88bc64ec0, ll=855, prefix=36 '$') at networking.c:515#5 in addReplyBulkLen (c=0x7fe88bc64ec0, obj=0x7fe889312840) at networking.c:557#6 in addReplyBulk (c=0x7fe88bc64ec0, obj=0x7fe889312840) at networking.c:562#7 in getGenericCommand (c=0x7fe88bc64ec0) at t_string.c:167#8 in getCommand (c=0x7fe88bc64ec0) at t_string.c:173#9 in call (c=0x7fe88bc64ec0, flags=15) at server.c:2437#10 in processCommand (c=0x7fe88bc64ec0) at server.c:2729#11 in processInputBuffer (c=0x7fe88bc64ec0) at networking.c:1451#12 in processInputBufferAndReplicate (c=0x7fe88bc64ec0) at networking.c:1486#13 in readQueryFromClient (el=0x7fe88bc30050, fd=8, privdata=0x7fe88bc64ec0, mask=1) at networking.c:1568#14 in aeProcessEvents (eventLoop=0x7fe88bc30050, flags=11) at ae.c:443#15 in aeMain (eventLoop=0x7fe88bc30050) at ae.c:501#16 in main (argc=2, argv=0x7ffdfc2e75b8) at server.c:4197
3. 响应请求流程
对于每一个有响应的命令,它的响应总是首先在beforesleep中进行,但如果一次没能发送完成,则会交给sendReplyToClient后续异步处理(以epoll为例,通过注册epoll的EPOLLOUT事件)。
对应的伪代码:
// 响应和接收处理是分开的两个过程。 int main() { // “ae”为“A simple event-driven”的缩写 void aeMain() { while (!eventLoop->stop) { // 调用eventLoop->beforesleep(eventLoop); // 但实际调用的是server.c中的beforeSleep: void beforeSleep(struct aeEventLoop*) { int handleClientsWithPendingWrites() { // REdis接收和处理 // 命令流程会设置clients_pending_write, // clients_pending_write实为一个队列链接。 // 当处理完一个命令后,调用clientInstallWriteHandler // 将当前client添加到clients_pending_write中。 // 但是有些命令并不需要响应,因此没有这个动作。 listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { int writeToClient(int fd,client* c) { write(fd,c->buf); // 如果全部发送完了, // 则调用aeDeleteFileEvent // 将fd从epoll中移除。 if (!clientHasPendingReplies(c)) { aeDeleteFileEvent( server.el, c->fd, AE_WRITABLE); // 从epoll中删除EPOLLOUT } } // 如果一次writeToClient调用没有发完, // 则将fd注册到epoll if (clientHasPendingReplies(c)) { // 下列动作是设置epoll的EPOLLOUT int ae_flags = AE_WRITABLE; // 将EPOLLOUT添加到epoll中 aeCreateFileEvent( server.el, c->fd, ae_flags, sendReplyToClient, c); } } } } // REdis接收和处理一个命令流程 aeProcessEvents(); } } }
“REdis命令处理流程处理过程是什么”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。