1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 
3 #include "src/config.h"
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <stdbool.h>
7 #include <string.h>
8 #include <unistd.h>
9 #include <fcntl.h>
10 #include <assert.h>
11 #ifndef WIN32
12 #include <poll.h>
13 #include <limits.h>
14 #endif
15 #include "memcached.h"
16 #include "cproxy.h"
17 #include "mcs.h"
18 #include "log.h"
19 
20 /* TODO: This timeout is inherited from zstored, but use it where? */
21 
22 #define DOWNSTREAM_DEFAULT_LINGER 1000
23 #ifndef INFTIM
24 #define INFTIM -1
25 #endif
26 
27 #ifdef WIN32
is_blocking(DWORD dw)28 static int is_blocking(DWORD dw) {
29     return (dw == WSAEWOULDBLOCK);
30 }
31 
is_in_progress(DWORD dw)32 static int is_in_progress(DWORD dw) {
33     return (dw == WSAEINPROGRESS);
34 }
35 #else
is_blocking(int dw)36 static int is_blocking(int dw) {
37     return (dw == EAGAIN || dw == EWOULDBLOCK);
38 }
39 
is_in_progress(int dw)40 static int is_in_progress(int dw) {
41     return (dw == EINPROGRESS);
42 }
43 #endif
44 
45 
46 /* The lvb stands for libvbucket. */
47 
48 mcs_st  *lvb_create(mcs_st *ptr, const char *config,
49                     const char *default_usr,
50                     const char *default_pwd,
51                     const char *opts);
52 void     lvb_free_data(mcs_st *ptr);
53 bool     lvb_stable_update(mcs_st *curr_version, mcs_st *next_version);
54 uint32_t lvb_key_hash(mcs_st *ptr, const char *key, size_t key_length,
55                       int *vbucket);
56 void     lvb_server_invalid_vbucket(mcs_st *ptr, int server_index,
57                                     int vbucket);
58 
59 /* The lmc stands for libmemcached. */
60 
61 mcs_st  *lmc_create(mcs_st *ptr, const char *config,
62                     const char *default_usr,
63                     const char *default_pwd,
64                     const char *opts);
65 void     lmc_free_data(mcs_st *ptr);
66 uint32_t lmc_key_hash(mcs_st *ptr, const char *key, size_t key_length,
67                       int *vbucket);
68 
69 /* ---------------------------------------------------------------------- */
70 
mcs_create(mcs_st *ptr, const char *config, const char *default_usr, const char *default_pwd, const char *opts)71 mcs_st *mcs_create(mcs_st *ptr, const char *config,
72                    const char *default_usr,
73                    const char *default_pwd,
74                    const char *opts) {
75     if (config[0] == '{') {
76         if (settings.verbose > 2) {
77             moxi_log_write("mcs_create using libvbucket\n");
78         }
79         return lvb_create(ptr, config, default_usr, default_pwd, opts);
80     }
81     if (config[0] != '{') {
82         if (settings.verbose > 2) {
83             moxi_log_write("mcs_create using libmemcached\n");
84         }
85         return lmc_create(ptr, config, default_usr, default_pwd, opts);
86     }
87     moxi_log_write("ERROR: unconfigured hash library\n");
88     exit(1);
89 
90     return NULL;
91 }
92 
mcs_free(mcs_st *ptr)93 void mcs_free(mcs_st *ptr) {
94     if (ptr->kind == MCS_KIND_LIBVBUCKET) {
95         lvb_free_data(ptr);
96     }
97     if (ptr->kind == MCS_KIND_LIBMEMCACHED) {
98         lmc_free_data(ptr);
99     }
100     ptr->kind = MCS_KIND_UNKNOWN;
101 
102     if (ptr->servers) {
103         int i;
104         for (i = 0; i < ptr->nservers; i++) {
105             if (ptr->servers[i].usr != NULL) {
106                 free(ptr->servers[i].usr);
107             }
108             if (ptr->servers[i].pwd != NULL) {
109                 free(ptr->servers[i].pwd);
110             }
111         }
112         free(ptr->servers);
113     }
114 
115     memset(ptr, 0, sizeof(*ptr));
116 }
117 
mcs_stable_update(mcs_st *curr_version, mcs_st *next_version)118 bool mcs_stable_update(mcs_st *curr_version, mcs_st *next_version) {
119     if (curr_version->kind == MCS_KIND_LIBVBUCKET) {
120         return lvb_stable_update(curr_version, next_version);
121     }
122 
123     /* TODO: MCS_KIND_LIBMEMCACHED impl for stable update. */
124 
125     return false;
126 }
127 
mcs_server_count(mcs_st *ptr)128 uint32_t mcs_server_count(mcs_st *ptr) {
129     return (uint32_t) ptr->nservers;
130 }
131 
mcs_server_index(mcs_st *ptr, int i)132 mcs_server_st *mcs_server_index(mcs_st *ptr, int i) {
133     return &ptr->servers[i];
134 }
135 
mcs_key_hash(mcs_st *ptr, const char *key, size_t key_length, int *vbucket)136 uint32_t mcs_key_hash(mcs_st *ptr, const char *key, size_t key_length,
137                       int *vbucket) {
138     if (ptr->kind == MCS_KIND_LIBVBUCKET) {
139         return lvb_key_hash(ptr, key, key_length, vbucket);
140     }
141     if (ptr->kind == MCS_KIND_LIBMEMCACHED) {
142         return lmc_key_hash(ptr, key, key_length, vbucket);
143     }
144     return 0;
145 }
146 
mcs_server_invalid_vbucket(mcs_st *ptr, int server_index, int vbucket)147 void mcs_server_invalid_vbucket(mcs_st *ptr, int server_index,
148                                 int vbucket) {
149     if (ptr->kind == MCS_KIND_LIBVBUCKET) {
150         lvb_server_invalid_vbucket(ptr, server_index, vbucket);
151     }
152 }
153 
154 /* ---------------------------------------------------------------------- */
155 
lvb_create(mcs_st *ptr, const char *config, const char *default_usr, const char *default_pwd, const char *opts)156 mcs_st *lvb_create(mcs_st *ptr, const char *config,
157                    const char *default_usr,
158                    const char *default_pwd,
159                    const char *opts) {
160     VBUCKET_CONFIG_HANDLE vch;
161     (void) opts;
162 
163     assert(ptr);
164     memset(ptr, 0, sizeof(*ptr));
165     ptr->kind = MCS_KIND_LIBVBUCKET;
166 
167     vch = vbucket_config_parse_string(config);
168     if (vch != NULL) {
169         ptr->data = vch;
170         ptr->nservers = vbucket_config_get_num_servers(vch);
171         if (ptr->nservers > 0) {
172             ptr->servers = calloc(sizeof(mcs_server_st), ptr->nservers);
173             if (ptr->servers != NULL) {
174                 int i, j;
175                 for (i = 0; i < ptr->nservers; i++) {
176                     ptr->servers[i].fd = -1;
177                 }
178 
179                 for (j = 0; j < ptr->nservers; j++) {
180                     const char *user;
181                     const char *password;
182                     const char *hostport = vbucket_config_get_server(vch, j);
183                     if (hostport != NULL &&
184                         strlen(hostport) > 0 &&
185                         strlen(hostport) < sizeof(ptr->servers[j].hostname) - 1) {
186                         char *colon;
187                         strncpy(ptr->servers[j].hostname,
188                                 hostport,
189                                 sizeof(ptr->servers[j].hostname) - 1);
190                         colon = strchr(ptr->servers[j].hostname, ':');
191                         if (colon != NULL) {
192                             *colon = '\0';
193                             ptr->servers[j].port = atoi(colon + 1);
194                             if (ptr->servers[j].port <= 0) {
195                                 moxi_log_write("mcs_create failed, could not parse port: %s\n",
196                                         config);
197                                 break;
198                             }
199                         } else {
200                             moxi_log_write("mcs_create failed, missing port: %s\n",
201                                     config);
202                             break;
203                         }
204                     } else {
205                         moxi_log_write("mcs_create failed, unknown server: %s\n",
206                                 config);
207                         break;
208                     }
209 
210                     user = vbucket_config_get_user(vch);
211                     if (user != NULL) {
212                         ptr->servers[j].usr = strdup(user);
213                     } else if (default_usr != NULL) {
214                         ptr->servers[j].usr = strdup(default_usr);
215                     }
216 
217                     password = vbucket_config_get_password(vch);
218                     if (password != NULL) {
219                         ptr->servers[j].pwd = strdup(password);
220                     } else if (default_pwd != NULL) {
221                         ptr->servers[j].pwd = strdup(default_pwd);
222                     }
223                 }
224 
225                 if (j >= ptr->nservers) {
226                     return ptr;
227                 }
228             }
229         }
230     } else {
231         moxi_log_write("mcs_create failed, vbucket_config_parse_string: %s\n",
232                        config);
233     }
234 
235     mcs_free(ptr);
236 
237     return NULL;
238 }
239 
lvb_free_data(mcs_st *ptr)240 void lvb_free_data(mcs_st *ptr) {
241     assert(ptr->kind == MCS_KIND_LIBVBUCKET);
242 
243     if (ptr->data != NULL) {
244         vbucket_config_destroy((VBUCKET_CONFIG_HANDLE) ptr->data);
245     }
246 
247     ptr->data = NULL;
248 }
249 
250 /* Returns true if curr_version could be updated with next_version in
251  * a low-impact stable manner (server-list is the same), allowing the
252  * same connections to be reused.  Or returns false if the delta was
253  * too large for an in-place updating of curr_version with information
254  * from next_version.
255  *
256  * The next_version may be destroyed in this call, and the caller
257  * should afterwards only call mcs_free() on the next_version.
258  */
lvb_stable_update(mcs_st *curr_version, mcs_st *next_version)259 bool lvb_stable_update(mcs_st *curr_version, mcs_st *next_version) {
260     bool rv = false;
261     VBUCKET_CONFIG_DIFF *diff;
262 
263     assert(curr_version->kind == MCS_KIND_LIBVBUCKET);
264     assert(curr_version->data != NULL);
265     assert(next_version->kind == MCS_KIND_LIBVBUCKET);
266     assert(next_version->data != NULL);
267 
268     diff = vbucket_compare((VBUCKET_CONFIG_HANDLE) curr_version->data,
269                            (VBUCKET_CONFIG_HANDLE) next_version->data);
270     if (diff != NULL) {
271         if (!diff->sequence_changed) {
272             vbucket_config_destroy((VBUCKET_CONFIG_HANDLE) curr_version->data);
273             curr_version->data = next_version->data;
274             next_version->data = 0;
275 
276             rv = true;
277         }
278 
279         vbucket_free_diff(diff);
280     }
281 
282     return rv;
283 }
284 
lvb_key_hash(mcs_st *ptr, const char *key, size_t key_length, int *vbucket)285 uint32_t lvb_key_hash(mcs_st *ptr, const char *key, size_t key_length,
286                       int *vbucket) {
287     VBUCKET_CONFIG_HANDLE vch;
288     int v;
289 
290     assert(ptr->kind == MCS_KIND_LIBVBUCKET);
291     assert(ptr->data != NULL);
292 
293     vch = (VBUCKET_CONFIG_HANDLE) ptr->data;
294 
295     v = vbucket_get_vbucket_by_key(vch, key, key_length);
296     if (vbucket != NULL) {
297         *vbucket = v;
298     }
299 
300     return (uint32_t) vbucket_get_master(vch, v);
301 }
302 
lvb_server_invalid_vbucket(mcs_st *ptr, int server_index, int vbucket)303 void lvb_server_invalid_vbucket(mcs_st *ptr, int server_index,
304                                 int vbucket) {
305     VBUCKET_CONFIG_HANDLE vch;
306 
307     assert(ptr->kind == MCS_KIND_LIBVBUCKET);
308     assert(ptr->data != NULL);
309 
310     vch = (VBUCKET_CONFIG_HANDLE) ptr->data;
311 
312     vbucket_found_incorrect_master(vch, vbucket, server_index);
313 }
314 
315 
316 /* ---------------------------------------------------------------------- */
317 
lmc_create(mcs_st *ptr, const char *config, const char *default_usr, const char *default_pwd, const char *opts)318 mcs_st *lmc_create(mcs_st *ptr, const char *config,
319                    const char *default_usr,
320                    const char *default_pwd,
321                    const char *opts) {
322     memcached_st *mst;
323 
324     assert(ptr);
325     memset(ptr, 0, sizeof(*ptr));
326     ptr->kind = MCS_KIND_LIBMEMCACHED;
327 
328     mst = memcached_create(NULL);
329     if (mst != NULL) {
330         memcached_server_st *mservers;
331         memcached_behavior_t b = MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED;
332         uint64_t             v = 1;
333 
334         if (opts != NULL) {
335             if (strstr(opts, "distribution:ketama-weighted") != NULL) {
336                 b = MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED;
337                 v = 1;
338             } else if (strstr(opts, "distribution:ketama") != NULL) {
339                 b = MEMCACHED_BEHAVIOR_KETAMA;
340                 v = 1;
341             } else if (strstr(opts, "distribution:modula") != NULL) {
342                 b = MEMCACHED_BEHAVIOR_KETAMA;
343                 v = 0;
344             }
345         }
346 
347         memcached_behavior_set(mst, b, v);
348         memcached_behavior_set(mst, MEMCACHED_BEHAVIOR_NO_BLOCK, 1);
349         memcached_behavior_set(mst, MEMCACHED_BEHAVIOR_TCP_NODELAY, 1);
350 
351         mservers = memcached_servers_parse(config);
352         if (mservers != NULL) {
353             memcached_server_push(mst, mservers);
354 
355             ptr->data     = mst;
356             ptr->nservers = (int) memcached_server_list_count(mservers);
357             if (ptr->nservers > 0) {
358                 ptr->servers = calloc(sizeof(mcs_server_st), ptr->nservers);
359                 if (ptr->servers != NULL) {
360                     int i;
361                     int j;
362                     for (i = 0; i < ptr->nservers; i++) {
363                         ptr->servers[i].fd = -1;
364                     }
365 
366                     for (j = 0; j < ptr->nservers; j++) {
367                         strncpy(ptr->servers[j].hostname,
368                                 memcached_server_name(mservers + j),
369                                 sizeof(ptr->servers[j].hostname) - 1);
370                         ptr->servers[j].port =
371                             (int) memcached_server_port(mservers + j);
372                         if (ptr->servers[j].port <= 0) {
373                             moxi_log_write("lmc_create failed, could not parse port: %s\n",
374                                            config);
375                             break;
376                         }
377 
378                         if (default_usr != NULL) {
379                             ptr->servers[j].usr = strdup(default_usr);
380                         }
381 
382                         if (default_pwd != NULL) {
383                             ptr->servers[j].pwd = strdup(default_pwd);
384                         }
385                     }
386 
387                     if (j >= ptr->nservers) {
388                         memcached_server_list_free(mservers);
389 
390                         return ptr;
391                     }
392                 }
393             }
394 
395             memcached_server_list_free(mservers);
396         }
397     }
398 
399     mcs_free(ptr);
400 
401     return NULL;
402 }
403 
lmc_free_data(mcs_st *ptr)404 void lmc_free_data(mcs_st *ptr) {
405     assert(ptr->kind == MCS_KIND_LIBMEMCACHED);
406 
407     if (ptr->data != NULL) {
408         memcached_free((memcached_st *) ptr->data);
409     }
410 
411     ptr->data = NULL;
412 }
413 
lmc_key_hash(mcs_st *ptr, const char *key, size_t key_length, int *vbucket)414 uint32_t lmc_key_hash(mcs_st *ptr, const char *key, size_t key_length, int *vbucket) {
415     assert(ptr->kind == MCS_KIND_LIBMEMCACHED);
416     assert(ptr->data != NULL);
417 
418     if (vbucket != NULL) {
419         *vbucket = -1;
420     }
421 
422     return memcached_generate_hash((memcached_st *) ptr->data, key, key_length);
423 }
424 
425 /* ---------------------------------------------------------------------- */
426 
mcs_server_st_quit(mcs_server_st *ptr, uint8_t io_death)427 void mcs_server_st_quit(mcs_server_st *ptr, uint8_t io_death) {
428     (void) io_death;
429 
430     /* TODO: Should send QUIT cmd. */
431 
432     if (ptr->fd != INVALID_SOCKET) {
433         closesocket(ptr->fd);
434     }
435     ptr->fd = INVALID_SOCKET;
436 }
437 
mcs_server_st_connect(mcs_server_st *ptr, int *errno_out, bool blocking)438 mcs_return mcs_server_st_connect(mcs_server_st *ptr, int *errno_out, bool blocking) {
439     if (ptr->fd != INVALID_SOCKET) {
440         if (errno_out != NULL) {
441             *errno_out = 0;
442         }
443 
444         return MCS_SUCCESS;
445     }
446 
447     if (errno_out != NULL) {
448         *errno_out = -1;
449     }
450 
451     ptr->fd = mcs_connect(ptr->hostname, ptr->port, errno_out, blocking);
452     if (ptr->fd != INVALID_SOCKET) {
453         return MCS_SUCCESS;
454     }
455 
456     return MCS_FAILURE;
457 }
458 
mcs_connect(const char *hostname, int portnum, int *errno_out, bool blocking)459 SOCKET mcs_connect(const char *hostname, int portnum,
460                 int *errno_out, bool blocking) {
461     SOCKET ret = INVALID_SOCKET;
462     struct addrinfo *ai   = NULL;
463     struct addrinfo *next = NULL;
464     struct addrinfo hints;
465     char port[50];
466     int error;
467 
468     if (errno_out != NULL) {
469         *errno_out = -1;
470     }
471 
472     memset(&hints, 0, sizeof(struct addrinfo));
473     hints.ai_flags = AI_PASSIVE;
474     hints.ai_socktype = SOCK_STREAM;
475     hints.ai_family = AF_UNSPEC;
476 
477     snprintf(port, sizeof(port), "%d", portnum);
478 
479     error = getaddrinfo(hostname, port, &hints, &ai);
480     if (error != 0) {
481 #if 0
482         if (error != EAI_SYSTEM) {
483             /* settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL, */
484             /*                                 "getaddrinfo(): %s\n", gai_strerror(error)); */
485         } else {
486             /* settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL, */
487             /*                                 "getaddrinfo(): %s\n", strerror(error)); */
488         }
489 #endif
490         return INVALID_SOCKET;
491     }
492 
493     for (next = ai; next; next = next->ai_next) {
494         SOCKET sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
495         if (sock == INVALID_SOCKET) {
496             /* settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL, */
497             /*                                 "Failed to create socket: %s\n", */
498             /*                                 strerror(errno)); */
499             continue;
500         }
501 
502         /* If the caller wants non-blocking, set the sock options */
503         /* now so even the connect() becomes non-blocking. */
504 
505         if (!blocking && (mcs_set_sock_opt(sock) != MCS_SUCCESS)) {
506             closesocket(sock);
507             continue;
508         }
509 
510         if (connect(sock, ai->ai_addr, (socklen_t)ai->ai_addrlen) == SOCKET_ERROR) {
511 #ifdef WIN32
512             DWORD errno_last = WSAGetLastError();
513 #else
514             int errno_last = errno;
515 #endif
516             if (errno_out != NULL) {
517                 *errno_out = errno_last;
518             }
519 
520             if (!blocking && (is_in_progress(errno_last) ||
521                               is_blocking(errno_last))) {
522                 ret = sock;
523                 break;
524             }
525 
526             /* settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL, */
527             /*                                 "Failed to connect socket: %s\n", */
528             /*                                 strerror(errno)); */
529             closesocket(sock);
530             continue;
531         }
532 
533         if (mcs_set_sock_opt(sock) == MCS_SUCCESS) {
534             ret = sock;
535             break;
536         }
537 
538         closesocket(sock);
539     }
540 
541     freeaddrinfo(ai);
542 
543     return ret;
544 }
545 
mcs_set_sock_opt(SOCKET sock)546 mcs_return mcs_set_sock_opt(SOCKET sock) {
547     /* jsh: todo
548        TODO: from zstored set_socket_options()...
549 
550     if (fd type == MEMCACHED_CONNECTION_UDP)
551        return true;
552 
553 #ifdef HAVE_SNDTIMEO
554     if (ptr->root->snd_timeout) {
555         int error;
556         struct timeval waittime;
557 
558         waittime.tv_sec = 0;
559         waittime.tv_usec = ptr->root->snd_timeout;
560 
561         error = setsockopt(ptr->fd, SOL_SOCKET, SO_SNDTIMEO,
562                            &waittime, (socklen_t)sizeof(struct timeval));
563         WATCHPOINT_ASSERT(error == 0);
564     }
565 #endif
566 
567 #ifdef HAVE_RCVTIMEO
568     if (ptr->root->rcv_timeout) {
569         int error;
570         struct timeval waittime;
571 
572         waittime.tv_sec = 0;
573         waittime.tv_usec = ptr->root->rcv_timeout;
574 
575         error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVTIMEO,
576                           &waittime, (socklen_t)sizeof(struct timeval));
577         WATCHPOINT_ASSERT(error == 0);
578     }
579 #endif
580 
581   {
582     int error;
583     struct linger linger;
584 
585     linger.l_onoff = 1;
586     linger.l_linger = DOWNSTREAM_DEFAULT_LINGER;
587     error = setsockopt(fd, SOL_SOCKET, SO_LINGER,
588                        &linger, (socklen_t)sizeof(struct linger));
589   }
590 
591   if (ptr->root->send_size) {
592     int error;
593 
594     error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDBUF,
595                       &ptr->root->send_size, (socklen_t)sizeof(int));
596     WATCHPOINT_ASSERT(error == 0);
597   }
598 
599   if (ptr->root->recv_size) {
600     int error;
601 
602     error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVBUF,
603                       &ptr->root->recv_size, (socklen_t)sizeof(int));
604     WATCHPOINT_ASSERT(error == 0);
605   }
606   */
607     if (evutil_make_socket_nonblocking(sock) == -1) {
608         return MCS_FAILURE;
609     }
610 
611     int flags = 1;
612 
613     setsockopt(sock, IPPROTO_TCP, TCP_NODELAY,
614                (void*)&flags, (socklen_t)sizeof(flags));
615 
616     return MCS_SUCCESS;
617 }
618 
mcs_io_write(SOCKET fd, const void *buffer, size_t length)619 ssize_t mcs_io_write(SOCKET fd, const void *buffer, size_t length) {
620     assert(fd != -1);
621 
622     return send(fd, buffer, (int)length, 0);
623 }
624 
625 #ifdef WIN32
mcs_io_read(SOCKET fd, void *dta, size_t size, struct timeval *timeout_in)626 mcs_return mcs_io_read(SOCKET fd, void *dta, size_t size, struct timeval *timeout_in) {
627     struct timeval my_timeout; /* Linux select() modifies its timeout param. */
628     struct timeval *timeout = NULL;
629 
630     if (timeout_in != NULL &&
631         (timeout_in->tv_sec != 0 ||
632          timeout_in->tv_usec != 0)) {
633         my_timeout = *timeout_in;
634         timeout = &my_timeout;
635     }
636 
637     char *data = dta;
638     size_t done = 0;
639 
640     while (done < size) {
641         fd_set readfds[FD_SETSIZE];
642         FD_ZERO(readfds);
643         FD_SET(fd, readfds);
644 
645         fd_set errfds[FD_SETSIZE];
646         FD_ZERO(errfds);
647         FD_SET(fd, errfds);
648 
649         int s = select(FD_SETSIZE, readfds, NULL, errfds, timeout);
650         if (s == 0) {
651             return MCS_TIMEOUT;
652         }
653 
654         if (s != 1 || FD_ISSET(fd, errfds) || !FD_ISSET(fd, readfds)) {
655             return MCS_FAILURE;
656         }
657 
658         ssize_t n = recv(fd, data + done, 1, 0);
659         if (n == -1 || n == 0) {
660             return MCS_FAILURE;
661         }
662 
663         done += (size_t) n;
664     }
665 
666     return MCS_SUCCESS;
667 }
668 #else
669 
__get_time_ms(const struct timeval *tv)670 static uint64_t __get_time_ms(const struct timeval *tv) {
671     struct timeval now;
672 
673     if (tv == NULL) {
674         if (gettimeofday(&now, NULL) != 0) {
675             return 0;
676         }
677         tv = &now;
678     }
679     return (uint64_t)tv->tv_sec * 1000 + (uint64_t)tv->tv_usec / 1000;
680 }
681 
mcs_io_read(SOCKET fd, void *dta, size_t size, struct timeval *timeout_in)682 mcs_return mcs_io_read(SOCKET fd, void *dta, size_t size, struct timeval *timeout_in) {
683     uint64_t start_ms = 0;
684     uint64_t timeout_ms = 0;
685     uint64_t now_ms = 0;
686     char *data;
687     size_t done;
688     struct pollfd pfd[1];
689 
690     if (timeout_in != NULL) {
691         start_ms = __get_time_ms(NULL);
692         timeout_ms = __get_time_ms(timeout_in);
693         now_ms = start_ms;
694     }
695 
696     data = dta;
697     done = 0;
698 
699     while (done < size) {
700         int timeout = INFTIM;
701         int s;
702         ssize_t n;
703 
704         pfd[0].fd = fd;
705         pfd[0].events = POLLIN;
706         pfd[0].revents = 0;
707 
708         if (timeout_in != NULL) {
709             if (timeout_ms == 0) {
710                 /* ensure we poll at least once */
711                 timeout = 0;
712             } else {
713                 uint64_t taken_ms = now_ms - start_ms;
714                 if (taken_ms >= timeout_ms) {
715                     /* just check (boundary case) */
716                     timeout = 0;
717                 } else {
718                     uint64_t left_ms = timeout_ms - taken_ms;
719                     timeout = (left_ms > INT_MAX) ? INT_MAX : left_ms;
720                 }
721             }
722         }
723         s = poll(pfd, 1, timeout);
724         if (s == 0) {
725             return MCS_TIMEOUT;
726         }
727 
728         if (s != 1 || (pfd[0].revents & (POLLERR|POLLHUP|POLLNVAL)) || !(pfd[0].revents & POLLIN)) {
729             return MCS_FAILURE;
730         }
731 
732         n = read(fd, data + done, 1);
733         if (n == -1 || n == 0) {
734             return MCS_FAILURE;
735         }
736 
737         done += (size_t) n;
738         now_ms = __get_time_ms(NULL);
739     }
740 
741     return MCS_SUCCESS;
742 }
743 #endif
744 
mcs_io_reset(SOCKET fd)745 void mcs_io_reset(SOCKET fd) {
746     (void) fd;
747 
748     /* TODO: memcached_io_reset(ptr); */
749 }
750 
mcs_server_st_hostname(mcs_server_st *ptr)751 const char *mcs_server_st_hostname(mcs_server_st *ptr) {
752     return ptr->hostname;
753 }
754 
mcs_server_st_port(mcs_server_st *ptr)755 int mcs_server_st_port(mcs_server_st *ptr) {
756     return ptr->port;
757 }
758 
mcs_server_st_fd(mcs_server_st *ptr)759 SOCKET mcs_server_st_fd(mcs_server_st *ptr) {
760     return ptr->fd;
761 }
762 
mcs_server_st_usr(mcs_server_st *ptr)763 const char *mcs_server_st_usr(mcs_server_st *ptr) {
764     return ptr->usr;
765 }
766 
mcs_server_st_pwd(mcs_server_st *ptr)767 const char *mcs_server_st_pwd(mcs_server_st *ptr) {
768     return ptr->pwd;
769 }
770 
mcs_server_st_ident(mcs_server_st *msst, bool is_ascii)771 char *mcs_server_st_ident(mcs_server_st *msst, bool is_ascii) {
772     char *buf;
773     assert(msst != NULL);
774 
775     buf = is_ascii ? msst->ident_a : msst->ident_b;
776     if (buf[0] == '\0') {
777         const char *usr = mcs_server_st_usr(msst);
778         const char *pwd = mcs_server_st_pwd(msst);
779 
780         snprintf(buf, MCS_IDENT_SIZE,
781                  "%s:%d:%s:%s:%d",
782                  mcs_server_st_hostname(msst),
783                  mcs_server_st_port(msst),
784                  usr, pwd,
785                  is_ascii);
786     }
787 
788     return buf;
789 }
790