Memcache源码阅读(3)---处理用户输入

桃扇骨 2022-07-13 00:45 251阅读 0赞

我看的源码版本是1.2.4

从main函数开始,看看作者怎么初始化,然后再看作者怎么监听连接,怎么接受用户的输入,怎么处理用户的输入。

主程序

main函数里的一些操作

  1. 自定义中断处理函数
  2. 从命令行中获取用户打开memcached的指令值
  3. 设置core文件
  4. 初始化资源,统计资源,分配内存
  5. 屏蔽SIGPIPE信号,因为SIGPIPE的默认处理是中断程序,客户端断开连接后,memcached再往客户端写数据就会触发该信号,此时memcached应该要继续运行的,所以要屏蔽该信号。
  6. 监听用户的连接,作者用到libevent的事件驱动模型,一个主线程负责监听连接,将接受到的连接交给工作线程去处理。

主程序监听套接字的设置

  1. //创建一个套接字
  2. l_socket = server_socket(settings.port, 0);
  3. //对监听套接字设置事件处理,这里主要是为套接字l_socket设置事件处理函数
  4. //将l_socket设置为conn_listening状态,一直在这个状态中运行。
  5. //后面就会看到关于状态机的处理代码。
  6. listen_conn = conn_new(l_socket, conn_listening,
  7. EV_READ | EV_PERSIST, 1, false, main_base);
  8. //程序会一直在这里等待,有时间到来就会调用注册的时间处理函数
  9. event_base_loop(main_base, 0);

主线程创建一个套接字,然后对这个套接字设置一个状态,为conn_listening,这个套接字一直都处理conn_listening状态,它一直负责接收用户的连接。主线程收到客户的连接,就会将这个连接交给工作线程处理,然后主线程继续回到监听状态,等待下一个连接。

细节描述

  1. //为sfd添加事件处理函数。memcached使用libevent这个库来进行事件处理。
  2. conn *conn_new(const int sfd, const int init_state, const int event_flags,
  3. const int read_buffer_size, const bool is_udp, struct event_base *base) {
  4. conn *c = conn_from_freelist();
  5. ...
  6. 一些对conn的初始化
  7. ...
  8. //设置事件处理,时间处理函数为event_handler
  9. event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
  10. event_base_set(base, &c->event);
  11. c->ev_flags = event_flags;
  12. event_add(&c->event, 0)
  13. return c;
  14. }
  15. //事件处理就是交给drive_machine,他的参数是一个连接,每个连接都有一个状态,与它接收到的数据,每个工作线程可以根据它现在的状态和收据做对应的操作。
  16. void event_handler(const int fd, const short which, void *arg) {
  17. conn *c;
  18. c = (conn *)arg;
  19. drive_machine(c);
  20. }
  21. //所有的连接的事件处理函数都是它,它会根据连接的状态和数据来作对应的处理
  22. static void drive_machine(conn *c) {
  23. while (!stop) {
  24. switch(c->state) {
  25. case conn_listening:
  26. //监听进程一直都在执行这里的代码
  27. //接收连接,然后分发给工作线程,注意,交给工作线程时设置初始状态为conn_read
  28. sfd = accept(c->sfd, &addr, &addrlen))
  29. dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
  30. DATA_BUFFER_SIZE, false);
  31. break;
  32. case conn_read:
  33. //接收命令行传过来的参数,就是第二篇文章讲的第一行命令,
  34. //如set a 0 0 4
  35. //try_read_command判断接收的消息有没有一个可以操作的命令,
  36. //如果可以操作,就会跳到对应的状态,否则继续在这个状态,继续接收数据。
  37. //这里接收数据是通过try_read_network这个函数来接收数据的。
  38. if (try_read_command(c) != 0) {
  39. continue;
  40. }
  41. //这里是用来接收用户传来的信息
  42. if ((c->udp ? try_read_udp(c) : try_read_network(c)) != 0) {
  43. continue;
  44. }
  45. stop = true;
  46. break;
  47. case conn_nread:
  48. //接收用户的数据块
  49. //如set a 0 0 4后的 1234
  50. if (c->rlbytes == 0) {
  51. complete_nread(c);
  52. break;
  53. }
  54. res = read(c->sfd, c->ritem, c->rlbytes);
  55. if (res > 0) {
  56. break;
  57. }
  58. if (res == 0) { /* end of stream */
  59. conn_set_state(c, conn_closing);
  60. break;
  61. }
  62. if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
  63. stop = true;
  64. break;
  65. }
  66. /* otherwise we have a real error, on which we close the connection */
  67. conn_set_state(c, conn_closing);
  68. break;
  69. //这些后面再讲
  70. case conn_swallow:
  71. case conn_write:
  72. case conn_mwrite:
  73. case conn_closing:
  74. }
  75. }
  76. return;
  77. }
  78. //因为一个客户端与memcached是一个tcp长连接,这个长连接一般会保持到用户主动退出或者网络断开,
  79. //还有这个套接字是异步套接字,我觉得理解这个概念很重要,用户输入完一个命令之后可以从这个while循环中返回。
  80. //因为是异步套接字,所以这个函数在用户不需要断开连接也是可以返回的,
  81. //然后一会再检查能不能构成一个可以执行的命令,
  82. //如果可以就执行,不行就返回0,等待数据。
  83. static int try_read_network(conn *c) {
  84. int gotdata = 0;
  85. int res;
  86. while (1) {
  87. if (c->rbytes >= c->rsize) {
  88. char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
  89. c->rcurr = c->rbuf = new_rbuf;
  90. c->rsize *= 2;
  91. }
  92. res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
  93. if (res > 0) {
  94. continue;
  95. }
  96. if (res == 0) {
  97. /* connection closed */
  98. conn_set_state(c, conn_closing);
  99. return 1;
  100. }
  101. if (res == -1) {
  102. if (errno == EAGAIN || errno == EWOULDBLOCK) break;
  103. else return 0;
  104. }
  105. }
  106. return gotdata;
  107. }
  108. //这个函数就是获取用户输入的第一行命令的一个预处理,将一个命令的\r\n结束改为以\0作为结束。
  109. //比如说用户输入 “get a\r\n“那么经过这里处理就变成 “get a\0\n“,为的是给后面做分割
  110. static int try_read_command(conn *c) {
  111. char *el, *cont;
  112. if (c->rbytes == 0)
  113. return 0;
  114. el = memchr(c->rcurr, '\n', c->rbytes);
  115. if (!el)
  116. return 0;
  117. cont = el + 1;
  118. if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
  119. el--;
  120. }
  121. *el = '\0';
  122. process_command(c, c->rcurr);
  123. return 1;
  124. }
  125. //处理用户输入,这里只会处理用户输入的第一行。
  126. //即使用户输入的是 set a 0 0 4\r\n
  127. // 1234\r\n
  128. //第一个\r\n后的1234不是这里处理的,它会交由drive_machine的conn_nread这里来读取和处理。
  129. static void process_command(conn *c, char *command) {
  130. token_t tokens[MAX_TOKENS];
  131. size_t ntokens;
  132. //用户输入的字符串分割开,tokens数组放着这些值
  133. ntokens = tokenize_command(command, tokens, MAX_TOKENS);
  134. if (ntokens >= 3 &&
  135. ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
  136. (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
  137. process_get_command(c, tokens, ntokens, false);
  138. }else if (ntokens == 6 &&
  139. ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
  140. (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
  141. (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
  142. (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
  143. (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
  144. process_update_command(c, tokens, ntokens, comm, false);
  145. }
  146. ...
  147. //处理其他的命令
  148. return;
  149. }
  150. //分割字符串
  151. //如用户输入get a\r\n它这里会返回3,分别为get, a, a. 这是我测试的结果,原因我也不清楚。
  152. //用户输入set a 0 0 4\r\n它会返回6,分别为set, a, 0, 0, 4, 4.
  153. static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
  154. char *s, *e;
  155. size_t ntokens = 0;
  156. assert(command != NULL && tokens != NULL && max_tokens > 1);
  157. for (s = e = command; ntokens < max_tokens - 1; ++e) {
  158. if (*e == ' ') {
  159. if (s != e) {
  160. tokens[ntokens].value = s;
  161. tokens[ntokens].length = e - s;
  162. ntokens++;
  163. *e = '\0';
  164. }
  165. s = e + 1;
  166. }
  167. else if (*e == '\0') {
  168. if (s != e) {
  169. tokens[ntokens].value = s;
  170. tokens[ntokens].length = e - s;
  171. ntokens++;
  172. }
  173. break; /* string end */
  174. }
  175. }
  176. /*
  177. * If we scanned the whole string, the terminal value pointer is null,
  178. * otherwise it is the first unprocessed character.
  179. */
  180. tokens[ntokens].value = *e == '\0' ? NULL : e;
  181. tokens[ntokens].length = 0;
  182. ntokens++;
  183. return ntokens;
  184. }
  185. //对tokens进行保存,改变状态,交由drive_machine去处理
  186. static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
  187. char *key;
  188. size_t nkey;
  189. int flags;
  190. time_t exptime;
  191. int vlen, old_vlen;
  192. uint64_t req_cas_id;
  193. item *it, *old_it;
  194. //将命令行中的各个值提取出来
  195. key = tokens[KEY_TOKEN].value;
  196. nkey = tokens[KEY_TOKEN].length;
  197. flags = strtoul(tokens[2].value, NULL, 10);
  198. exptime = strtol(tokens[3].value, NULL, 10);
  199. vlen = strtol(tokens[4].value, NULL, 10);
  200. //分配一个item的空间来寸数据,第二行输入的数据是存在item里面的
  201. it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
  202. c->item = it;
  203. c->ritem = ITEM_data(it);
  204. c->rlbytes = it->nbytes;
  205. c->item_comm = comm;
  206. //将这个连接的状态设置为conn_nread,交由drive_machine去处理,现在可以返回到drive_machine那里看看你conn_nread要做什么了,conn_nread主要是要读取接下来的数据,接下来的数据是读到item里面去的哦。接下来如果要保存到hash桶里,也是直接用这个item的指针而已。读完数据之后会执行complete_nread.
  207. conn_set_state(c, conn_nread);
  208. }
  209. //将数据保存到memcached
  210. static void complete_nread(conn *c) {
  211. assert(c != NULL);
  212. item *it = c->item;
  213. int comm = c->item_comm;
  214. int ret;
  215. if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
  216. out_string(c, "CLIENT_ERROR bad data chunk");
  217. } else {
  218. ret = store_item(it, comm);
  219. if (ret == 1)
  220. out_string(c, "STORED");
  221. else if(ret == 2)
  222. out_string(c, "EXISTS");
  223. else if(ret == 3)
  224. out_string(c, "NOT_FOUND");
  225. else
  226. out_string(c, "NOT_STORED");
  227. }
  228. item_remove(c->item); /* release the c->item reference */
  229. c->item = 0;
  230. }

这个处理输入的分析我就写到这里了。我这里只写了处理update(set, add, …)这个命令的操作作为例子,展示了从建立连接,接受请求,接受数据,到处理请求这一个流程是怎么走的。截取的源代码省略了很多安全处理的操作。

发表评论

表情:
评论列表 (有 0 条评论,251人围观)

还没有评论,来说两句吧...

相关阅读