1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3 #include "src/config.h"
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <stdbool.h>
7 #include <string.h>
8 #include <unistd.h>
9 #include <fcntl.h>
10 #include <assert.h>
11 #ifndef WIN32
12 #include <poll.h>
13 #include <limits.h>
14 #endif
15 #include "memcached.h"
16 #include "cproxy.h"
17 #include "mcs.h"
18 #include "log.h"
19
20 /* TODO: This timeout is inherited from zstored, but use it where? */
21
22 #define DOWNSTREAM_DEFAULT_LINGER 1000
23 #ifndef INFTIM
24 #define INFTIM -1
25 #endif
26
27 #ifdef WIN32
is_blocking(DWORD dw)28 static int is_blocking(DWORD dw) {
29 return (dw == WSAEWOULDBLOCK);
30 }
31
is_in_progress(DWORD dw)32 static int is_in_progress(DWORD dw) {
33 return (dw == WSAEINPROGRESS);
34 }
35 #else
is_blocking(int dw)36 static int is_blocking(int dw) {
37 return (dw == EAGAIN || dw == EWOULDBLOCK);
38 }
39
is_in_progress(int dw)40 static int is_in_progress(int dw) {
41 return (dw == EINPROGRESS);
42 }
43 #endif
44
45
46 /* The lvb stands for libvbucket. */
47
48 mcs_st *lvb_create(mcs_st *ptr, const char *config,
49 const char *default_usr,
50 const char *default_pwd,
51 const char *opts);
52 void lvb_free_data(mcs_st *ptr);
53 bool lvb_stable_update(mcs_st *curr_version, mcs_st *next_version);
54 uint32_t lvb_key_hash(mcs_st *ptr, const char *key, size_t key_length,
55 int *vbucket);
56 void lvb_server_invalid_vbucket(mcs_st *ptr, int server_index,
57 int vbucket);
58
59 /* The lmc stands for libmemcached. */
60
61 mcs_st *lmc_create(mcs_st *ptr, const char *config,
62 const char *default_usr,
63 const char *default_pwd,
64 const char *opts);
65 void lmc_free_data(mcs_st *ptr);
66 uint32_t lmc_key_hash(mcs_st *ptr, const char *key, size_t key_length,
67 int *vbucket);
68
69 /* ---------------------------------------------------------------------- */
70
mcs_create(mcs_st *ptr, const char *config, const char *default_usr, const char *default_pwd, const char *opts)71 mcs_st *mcs_create(mcs_st *ptr, const char *config,
72 const char *default_usr,
73 const char *default_pwd,
74 const char *opts) {
75 if (config[0] == '{') {
76 if (settings.verbose > 2) {
77 moxi_log_write("mcs_create using libvbucket\n");
78 }
79 return lvb_create(ptr, config, default_usr, default_pwd, opts);
80 }
81 if (config[0] != '{') {
82 if (settings.verbose > 2) {
83 moxi_log_write("mcs_create using libmemcached\n");
84 }
85 return lmc_create(ptr, config, default_usr, default_pwd, opts);
86 }
87 moxi_log_write("ERROR: unconfigured hash library\n");
88 exit(1);
89
90 return NULL;
91 }
92
mcs_free(mcs_st *ptr)93 void mcs_free(mcs_st *ptr) {
94 if (ptr->kind == MCS_KIND_LIBVBUCKET) {
95 lvb_free_data(ptr);
96 }
97 if (ptr->kind == MCS_KIND_LIBMEMCACHED) {
98 lmc_free_data(ptr);
99 }
100 ptr->kind = MCS_KIND_UNKNOWN;
101
102 if (ptr->servers) {
103 int i;
104 for (i = 0; i < ptr->nservers; i++) {
105 if (ptr->servers[i].usr != NULL) {
106 free(ptr->servers[i].usr);
107 }
108 if (ptr->servers[i].pwd != NULL) {
109 free(ptr->servers[i].pwd);
110 }
111 }
112 free(ptr->servers);
113 }
114
115 memset(ptr, 0, sizeof(*ptr));
116 }
117
mcs_stable_update(mcs_st *curr_version, mcs_st *next_version)118 bool mcs_stable_update(mcs_st *curr_version, mcs_st *next_version) {
119 if (curr_version->kind == MCS_KIND_LIBVBUCKET) {
120 return lvb_stable_update(curr_version, next_version);
121 }
122
123 /* TODO: MCS_KIND_LIBMEMCACHED impl for stable update. */
124
125 return false;
126 }
127
mcs_server_count(mcs_st *ptr)128 uint32_t mcs_server_count(mcs_st *ptr) {
129 return (uint32_t) ptr->nservers;
130 }
131
mcs_server_index(mcs_st *ptr, int i)132 mcs_server_st *mcs_server_index(mcs_st *ptr, int i) {
133 return &ptr->servers[i];
134 }
135
mcs_key_hash(mcs_st *ptr, const char *key, size_t key_length, int *vbucket)136 uint32_t mcs_key_hash(mcs_st *ptr, const char *key, size_t key_length,
137 int *vbucket) {
138 if (ptr->kind == MCS_KIND_LIBVBUCKET) {
139 return lvb_key_hash(ptr, key, key_length, vbucket);
140 }
141 if (ptr->kind == MCS_KIND_LIBMEMCACHED) {
142 return lmc_key_hash(ptr, key, key_length, vbucket);
143 }
144 return 0;
145 }
146
mcs_server_invalid_vbucket(mcs_st *ptr, int server_index, int vbucket)147 void mcs_server_invalid_vbucket(mcs_st *ptr, int server_index,
148 int vbucket) {
149 if (ptr->kind == MCS_KIND_LIBVBUCKET) {
150 lvb_server_invalid_vbucket(ptr, server_index, vbucket);
151 }
152 }
153
154 /* ---------------------------------------------------------------------- */
155
lvb_create(mcs_st *ptr, const char *config, const char *default_usr, const char *default_pwd, const char *opts)156 mcs_st *lvb_create(mcs_st *ptr, const char *config,
157 const char *default_usr,
158 const char *default_pwd,
159 const char *opts) {
160 VBUCKET_CONFIG_HANDLE vch;
161 (void) opts;
162
163 assert(ptr);
164 memset(ptr, 0, sizeof(*ptr));
165 ptr->kind = MCS_KIND_LIBVBUCKET;
166
167 vch = vbucket_config_parse_string(config);
168 if (vch != NULL) {
169 ptr->data = vch;
170 ptr->nservers = vbucket_config_get_num_servers(vch);
171 if (ptr->nservers > 0) {
172 ptr->servers = calloc(sizeof(mcs_server_st), ptr->nservers);
173 if (ptr->servers != NULL) {
174 int i, j;
175 for (i = 0; i < ptr->nservers; i++) {
176 ptr->servers[i].fd = -1;
177 }
178
179 for (j = 0; j < ptr->nservers; j++) {
180 const char *user;
181 const char *password;
182 const char *hostport = vbucket_config_get_server(vch, j);
183 if (hostport != NULL &&
184 strlen(hostport) > 0 &&
185 strlen(hostport) < sizeof(ptr->servers[j].hostname) - 1) {
186 char *colon;
187 strncpy(ptr->servers[j].hostname,
188 hostport,
189 sizeof(ptr->servers[j].hostname) - 1);
190 colon = strchr(ptr->servers[j].hostname, ':');
191 if (colon != NULL) {
192 *colon = '\0';
193 ptr->servers[j].port = atoi(colon + 1);
194 if (ptr->servers[j].port <= 0) {
195 moxi_log_write("mcs_create failed, could not parse port: %s\n",
196 config);
197 break;
198 }
199 } else {
200 moxi_log_write("mcs_create failed, missing port: %s\n",
201 config);
202 break;
203 }
204 } else {
205 moxi_log_write("mcs_create failed, unknown server: %s\n",
206 config);
207 break;
208 }
209
210 user = vbucket_config_get_user(vch);
211 if (user != NULL) {
212 ptr->servers[j].usr = strdup(user);
213 } else if (default_usr != NULL) {
214 ptr->servers[j].usr = strdup(default_usr);
215 }
216
217 password = vbucket_config_get_password(vch);
218 if (password != NULL) {
219 ptr->servers[j].pwd = strdup(password);
220 } else if (default_pwd != NULL) {
221 ptr->servers[j].pwd = strdup(default_pwd);
222 }
223 }
224
225 if (j >= ptr->nservers) {
226 return ptr;
227 }
228 }
229 }
230 } else {
231 moxi_log_write("mcs_create failed, vbucket_config_parse_string: %s\n",
232 config);
233 }
234
235 mcs_free(ptr);
236
237 return NULL;
238 }
239
lvb_free_data(mcs_st *ptr)240 void lvb_free_data(mcs_st *ptr) {
241 assert(ptr->kind == MCS_KIND_LIBVBUCKET);
242
243 if (ptr->data != NULL) {
244 vbucket_config_destroy((VBUCKET_CONFIG_HANDLE) ptr->data);
245 }
246
247 ptr->data = NULL;
248 }
249
250 /* Returns true if curr_version could be updated with next_version in
251 * a low-impact stable manner (server-list is the same), allowing the
252 * same connections to be reused. Or returns false if the delta was
253 * too large for an in-place updating of curr_version with information
254 * from next_version.
255 *
256 * The next_version may be destroyed in this call, and the caller
257 * should afterwards only call mcs_free() on the next_version.
258 */
lvb_stable_update(mcs_st *curr_version, mcs_st *next_version)259 bool lvb_stable_update(mcs_st *curr_version, mcs_st *next_version) {
260 bool rv = false;
261 VBUCKET_CONFIG_DIFF *diff;
262
263 assert(curr_version->kind == MCS_KIND_LIBVBUCKET);
264 assert(curr_version->data != NULL);
265 assert(next_version->kind == MCS_KIND_LIBVBUCKET);
266 assert(next_version->data != NULL);
267
268 diff = vbucket_compare((VBUCKET_CONFIG_HANDLE) curr_version->data,
269 (VBUCKET_CONFIG_HANDLE) next_version->data);
270 if (diff != NULL) {
271 if (!diff->sequence_changed) {
272 vbucket_config_destroy((VBUCKET_CONFIG_HANDLE) curr_version->data);
273 curr_version->data = next_version->data;
274 next_version->data = 0;
275
276 rv = true;
277 }
278
279 vbucket_free_diff(diff);
280 }
281
282 return rv;
283 }
284
lvb_key_hash(mcs_st *ptr, const char *key, size_t key_length, int *vbucket)285 uint32_t lvb_key_hash(mcs_st *ptr, const char *key, size_t key_length,
286 int *vbucket) {
287 VBUCKET_CONFIG_HANDLE vch;
288 int v;
289
290 assert(ptr->kind == MCS_KIND_LIBVBUCKET);
291 assert(ptr->data != NULL);
292
293 vch = (VBUCKET_CONFIG_HANDLE) ptr->data;
294
295 v = vbucket_get_vbucket_by_key(vch, key, key_length);
296 if (vbucket != NULL) {
297 *vbucket = v;
298 }
299
300 return (uint32_t) vbucket_get_master(vch, v);
301 }
302
lvb_server_invalid_vbucket(mcs_st *ptr, int server_index, int vbucket)303 void lvb_server_invalid_vbucket(mcs_st *ptr, int server_index,
304 int vbucket) {
305 VBUCKET_CONFIG_HANDLE vch;
306
307 assert(ptr->kind == MCS_KIND_LIBVBUCKET);
308 assert(ptr->data != NULL);
309
310 vch = (VBUCKET_CONFIG_HANDLE) ptr->data;
311
312 vbucket_found_incorrect_master(vch, vbucket, server_index);
313 }
314
315
316 /* ---------------------------------------------------------------------- */
317
lmc_create(mcs_st *ptr, const char *config, const char *default_usr, const char *default_pwd, const char *opts)318 mcs_st *lmc_create(mcs_st *ptr, const char *config,
319 const char *default_usr,
320 const char *default_pwd,
321 const char *opts) {
322 memcached_st *mst;
323
324 assert(ptr);
325 memset(ptr, 0, sizeof(*ptr));
326 ptr->kind = MCS_KIND_LIBMEMCACHED;
327
328 mst = memcached_create(NULL);
329 if (mst != NULL) {
330 memcached_server_st *mservers;
331 memcached_behavior_t b = MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED;
332 uint64_t v = 1;
333
334 if (opts != NULL) {
335 if (strstr(opts, "distribution:ketama-weighted") != NULL) {
336 b = MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED;
337 v = 1;
338 } else if (strstr(opts, "distribution:ketama") != NULL) {
339 b = MEMCACHED_BEHAVIOR_KETAMA;
340 v = 1;
341 } else if (strstr(opts, "distribution:modula") != NULL) {
342 b = MEMCACHED_BEHAVIOR_KETAMA;
343 v = 0;
344 }
345 }
346
347 memcached_behavior_set(mst, b, v);
348 memcached_behavior_set(mst, MEMCACHED_BEHAVIOR_NO_BLOCK, 1);
349 memcached_behavior_set(mst, MEMCACHED_BEHAVIOR_TCP_NODELAY, 1);
350
351 mservers = memcached_servers_parse(config);
352 if (mservers != NULL) {
353 memcached_server_push(mst, mservers);
354
355 ptr->data = mst;
356 ptr->nservers = (int) memcached_server_list_count(mservers);
357 if (ptr->nservers > 0) {
358 ptr->servers = calloc(sizeof(mcs_server_st), ptr->nservers);
359 if (ptr->servers != NULL) {
360 int i;
361 int j;
362 for (i = 0; i < ptr->nservers; i++) {
363 ptr->servers[i].fd = -1;
364 }
365
366 for (j = 0; j < ptr->nservers; j++) {
367 strncpy(ptr->servers[j].hostname,
368 memcached_server_name(mservers + j),
369 sizeof(ptr->servers[j].hostname) - 1);
370 ptr->servers[j].port =
371 (int) memcached_server_port(mservers + j);
372 if (ptr->servers[j].port <= 0) {
373 moxi_log_write("lmc_create failed, could not parse port: %s\n",
374 config);
375 break;
376 }
377
378 if (default_usr != NULL) {
379 ptr->servers[j].usr = strdup(default_usr);
380 }
381
382 if (default_pwd != NULL) {
383 ptr->servers[j].pwd = strdup(default_pwd);
384 }
385 }
386
387 if (j >= ptr->nservers) {
388 memcached_server_list_free(mservers);
389
390 return ptr;
391 }
392 }
393 }
394
395 memcached_server_list_free(mservers);
396 }
397 }
398
399 mcs_free(ptr);
400
401 return NULL;
402 }
403
lmc_free_data(mcs_st *ptr)404 void lmc_free_data(mcs_st *ptr) {
405 assert(ptr->kind == MCS_KIND_LIBMEMCACHED);
406
407 if (ptr->data != NULL) {
408 memcached_free((memcached_st *) ptr->data);
409 }
410
411 ptr->data = NULL;
412 }
413
lmc_key_hash(mcs_st *ptr, const char *key, size_t key_length, int *vbucket)414 uint32_t lmc_key_hash(mcs_st *ptr, const char *key, size_t key_length, int *vbucket) {
415 assert(ptr->kind == MCS_KIND_LIBMEMCACHED);
416 assert(ptr->data != NULL);
417
418 if (vbucket != NULL) {
419 *vbucket = -1;
420 }
421
422 return memcached_generate_hash((memcached_st *) ptr->data, key, key_length);
423 }
424
425 /* ---------------------------------------------------------------------- */
426
mcs_server_st_quit(mcs_server_st *ptr, uint8_t io_death)427 void mcs_server_st_quit(mcs_server_st *ptr, uint8_t io_death) {
428 (void) io_death;
429
430 /* TODO: Should send QUIT cmd. */
431
432 if (ptr->fd != INVALID_SOCKET) {
433 closesocket(ptr->fd);
434 }
435 ptr->fd = INVALID_SOCKET;
436 }
437
mcs_server_st_connect(mcs_server_st *ptr, int *errno_out, bool blocking)438 mcs_return mcs_server_st_connect(mcs_server_st *ptr, int *errno_out, bool blocking) {
439 if (ptr->fd != INVALID_SOCKET) {
440 if (errno_out != NULL) {
441 *errno_out = 0;
442 }
443
444 return MCS_SUCCESS;
445 }
446
447 if (errno_out != NULL) {
448 *errno_out = -1;
449 }
450
451 ptr->fd = mcs_connect(ptr->hostname, ptr->port, errno_out, blocking);
452 if (ptr->fd != INVALID_SOCKET) {
453 return MCS_SUCCESS;
454 }
455
456 return MCS_FAILURE;
457 }
458
mcs_connect(const char *hostname, int portnum, int *errno_out, bool blocking)459 SOCKET mcs_connect(const char *hostname, int portnum,
460 int *errno_out, bool blocking) {
461 SOCKET ret = INVALID_SOCKET;
462 struct addrinfo *ai = NULL;
463 struct addrinfo *next = NULL;
464 struct addrinfo hints;
465 char port[50];
466 int error;
467
468 if (errno_out != NULL) {
469 *errno_out = -1;
470 }
471
472 memset(&hints, 0, sizeof(struct addrinfo));
473 hints.ai_flags = AI_PASSIVE;
474 hints.ai_socktype = SOCK_STREAM;
475 hints.ai_family = AF_UNSPEC;
476
477 snprintf(port, sizeof(port), "%d", portnum);
478
479 error = getaddrinfo(hostname, port, &hints, &ai);
480 if (error != 0) {
481 #if 0
482 if (error != EAI_SYSTEM) {
483 /* settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL, */
484 /* "getaddrinfo(): %s\n", gai_strerror(error)); */
485 } else {
486 /* settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL, */
487 /* "getaddrinfo(): %s\n", strerror(error)); */
488 }
489 #endif
490 return INVALID_SOCKET;
491 }
492
493 for (next = ai; next; next = next->ai_next) {
494 SOCKET sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
495 if (sock == INVALID_SOCKET) {
496 /* settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL, */
497 /* "Failed to create socket: %s\n", */
498 /* strerror(errno)); */
499 continue;
500 }
501
502 /* If the caller wants non-blocking, set the sock options */
503 /* now so even the connect() becomes non-blocking. */
504
505 if (!blocking && (mcs_set_sock_opt(sock) != MCS_SUCCESS)) {
506 closesocket(sock);
507 continue;
508 }
509
510 if (connect(sock, ai->ai_addr, (socklen_t)ai->ai_addrlen) == SOCKET_ERROR) {
511 #ifdef WIN32
512 DWORD errno_last = WSAGetLastError();
513 #else
514 int errno_last = errno;
515 #endif
516 if (errno_out != NULL) {
517 *errno_out = errno_last;
518 }
519
520 if (!blocking && (is_in_progress(errno_last) ||
521 is_blocking(errno_last))) {
522 ret = sock;
523 break;
524 }
525
526 /* settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL, */
527 /* "Failed to connect socket: %s\n", */
528 /* strerror(errno)); */
529 closesocket(sock);
530 continue;
531 }
532
533 if (mcs_set_sock_opt(sock) == MCS_SUCCESS) {
534 ret = sock;
535 break;
536 }
537
538 closesocket(sock);
539 }
540
541 freeaddrinfo(ai);
542
543 return ret;
544 }
545
mcs_set_sock_opt(SOCKET sock)546 mcs_return mcs_set_sock_opt(SOCKET sock) {
547 /* jsh: todo
548 TODO: from zstored set_socket_options()...
549
550 if (fd type == MEMCACHED_CONNECTION_UDP)
551 return true;
552
553 #ifdef HAVE_SNDTIMEO
554 if (ptr->root->snd_timeout) {
555 int error;
556 struct timeval waittime;
557
558 waittime.tv_sec = 0;
559 waittime.tv_usec = ptr->root->snd_timeout;
560
561 error = setsockopt(ptr->fd, SOL_SOCKET, SO_SNDTIMEO,
562 &waittime, (socklen_t)sizeof(struct timeval));
563 WATCHPOINT_ASSERT(error == 0);
564 }
565 #endif
566
567 #ifdef HAVE_RCVTIMEO
568 if (ptr->root->rcv_timeout) {
569 int error;
570 struct timeval waittime;
571
572 waittime.tv_sec = 0;
573 waittime.tv_usec = ptr->root->rcv_timeout;
574
575 error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVTIMEO,
576 &waittime, (socklen_t)sizeof(struct timeval));
577 WATCHPOINT_ASSERT(error == 0);
578 }
579 #endif
580
581 {
582 int error;
583 struct linger linger;
584
585 linger.l_onoff = 1;
586 linger.l_linger = DOWNSTREAM_DEFAULT_LINGER;
587 error = setsockopt(fd, SOL_SOCKET, SO_LINGER,
588 &linger, (socklen_t)sizeof(struct linger));
589 }
590
591 if (ptr->root->send_size) {
592 int error;
593
594 error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDBUF,
595 &ptr->root->send_size, (socklen_t)sizeof(int));
596 WATCHPOINT_ASSERT(error == 0);
597 }
598
599 if (ptr->root->recv_size) {
600 int error;
601
602 error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVBUF,
603 &ptr->root->recv_size, (socklen_t)sizeof(int));
604 WATCHPOINT_ASSERT(error == 0);
605 }
606 */
607 if (evutil_make_socket_nonblocking(sock) == -1) {
608 return MCS_FAILURE;
609 }
610
611 int flags = 1;
612
613 setsockopt(sock, IPPROTO_TCP, TCP_NODELAY,
614 (void*)&flags, (socklen_t)sizeof(flags));
615
616 return MCS_SUCCESS;
617 }
618
mcs_io_write(SOCKET fd, const void *buffer, size_t length)619 ssize_t mcs_io_write(SOCKET fd, const void *buffer, size_t length) {
620 assert(fd != -1);
621
622 return send(fd, buffer, (int)length, 0);
623 }
624
625 #ifdef WIN32
mcs_io_read(SOCKET fd, void *dta, size_t size, struct timeval *timeout_in)626 mcs_return mcs_io_read(SOCKET fd, void *dta, size_t size, struct timeval *timeout_in) {
627 struct timeval my_timeout; /* Linux select() modifies its timeout param. */
628 struct timeval *timeout = NULL;
629
630 if (timeout_in != NULL &&
631 (timeout_in->tv_sec != 0 ||
632 timeout_in->tv_usec != 0)) {
633 my_timeout = *timeout_in;
634 timeout = &my_timeout;
635 }
636
637 char *data = dta;
638 size_t done = 0;
639
640 while (done < size) {
641 fd_set readfds[FD_SETSIZE];
642 FD_ZERO(readfds);
643 FD_SET(fd, readfds);
644
645 fd_set errfds[FD_SETSIZE];
646 FD_ZERO(errfds);
647 FD_SET(fd, errfds);
648
649 int s = select(FD_SETSIZE, readfds, NULL, errfds, timeout);
650 if (s == 0) {
651 return MCS_TIMEOUT;
652 }
653
654 if (s != 1 || FD_ISSET(fd, errfds) || !FD_ISSET(fd, readfds)) {
655 return MCS_FAILURE;
656 }
657
658 ssize_t n = recv(fd, data + done, 1, 0);
659 if (n == -1 || n == 0) {
660 return MCS_FAILURE;
661 }
662
663 done += (size_t) n;
664 }
665
666 return MCS_SUCCESS;
667 }
668 #else
669
__get_time_ms(const struct timeval *tv)670 static uint64_t __get_time_ms(const struct timeval *tv) {
671 struct timeval now;
672
673 if (tv == NULL) {
674 if (gettimeofday(&now, NULL) != 0) {
675 return 0;
676 }
677 tv = &now;
678 }
679 return (uint64_t)tv->tv_sec * 1000 + (uint64_t)tv->tv_usec / 1000;
680 }
681
mcs_io_read(SOCKET fd, void *dta, size_t size, struct timeval *timeout_in)682 mcs_return mcs_io_read(SOCKET fd, void *dta, size_t size, struct timeval *timeout_in) {
683 uint64_t start_ms = 0;
684 uint64_t timeout_ms = 0;
685 uint64_t now_ms = 0;
686 char *data;
687 size_t done;
688 struct pollfd pfd[1];
689
690 if (timeout_in != NULL) {
691 start_ms = __get_time_ms(NULL);
692 timeout_ms = __get_time_ms(timeout_in);
693 now_ms = start_ms;
694 }
695
696 data = dta;
697 done = 0;
698
699 while (done < size) {
700 int timeout = INFTIM;
701 int s;
702 ssize_t n;
703
704 pfd[0].fd = fd;
705 pfd[0].events = POLLIN;
706 pfd[0].revents = 0;
707
708 if (timeout_in != NULL) {
709 if (timeout_ms == 0) {
710 /* ensure we poll at least once */
711 timeout = 0;
712 } else {
713 uint64_t taken_ms = now_ms - start_ms;
714 if (taken_ms >= timeout_ms) {
715 /* just check (boundary case) */
716 timeout = 0;
717 } else {
718 uint64_t left_ms = timeout_ms - taken_ms;
719 timeout = (left_ms > INT_MAX) ? INT_MAX : left_ms;
720 }
721 }
722 }
723 s = poll(pfd, 1, timeout);
724 if (s == 0) {
725 return MCS_TIMEOUT;
726 }
727
728 if (s != 1 || (pfd[0].revents & (POLLERR|POLLHUP|POLLNVAL)) || !(pfd[0].revents & POLLIN)) {
729 return MCS_FAILURE;
730 }
731
732 n = read(fd, data + done, 1);
733 if (n == -1 || n == 0) {
734 return MCS_FAILURE;
735 }
736
737 done += (size_t) n;
738 now_ms = __get_time_ms(NULL);
739 }
740
741 return MCS_SUCCESS;
742 }
743 #endif
744
mcs_io_reset(SOCKET fd)745 void mcs_io_reset(SOCKET fd) {
746 (void) fd;
747
748 /* TODO: memcached_io_reset(ptr); */
749 }
750
mcs_server_st_hostname(mcs_server_st *ptr)751 const char *mcs_server_st_hostname(mcs_server_st *ptr) {
752 return ptr->hostname;
753 }
754
mcs_server_st_port(mcs_server_st *ptr)755 int mcs_server_st_port(mcs_server_st *ptr) {
756 return ptr->port;
757 }
758
mcs_server_st_fd(mcs_server_st *ptr)759 SOCKET mcs_server_st_fd(mcs_server_st *ptr) {
760 return ptr->fd;
761 }
762
mcs_server_st_usr(mcs_server_st *ptr)763 const char *mcs_server_st_usr(mcs_server_st *ptr) {
764 return ptr->usr;
765 }
766
mcs_server_st_pwd(mcs_server_st *ptr)767 const char *mcs_server_st_pwd(mcs_server_st *ptr) {
768 return ptr->pwd;
769 }
770
mcs_server_st_ident(mcs_server_st *msst, bool is_ascii)771 char *mcs_server_st_ident(mcs_server_st *msst, bool is_ascii) {
772 char *buf;
773 assert(msst != NULL);
774
775 buf = is_ascii ? msst->ident_a : msst->ident_b;
776 if (buf[0] == '\0') {
777 const char *usr = mcs_server_st_usr(msst);
778 const char *pwd = mcs_server_st_pwd(msst);
779
780 snprintf(buf, MCS_IDENT_SIZE,
781 "%s:%d:%s:%s:%d",
782 mcs_server_st_hostname(msst),
783 mcs_server_st_port(msst),
784 usr, pwd,
785 is_ascii);
786 }
787
788 return buf;
789 }
790