1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 
3 #include "src/config.h"
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <string.h>
7 #include <errno.h>
8 #include <platform/cbassert.h>
9 #include <fcntl.h>
10 #include <math.h>
11 #include "memcached.h"
12 #include "cproxy.h"
13 #include "work.h"
14 #include "log.h"
15 
16 #ifndef MOXI_BLOCKING_CONNECT
17 #define MOXI_BLOCKING_CONNECT false
18 #endif
19 
20 /* Internal forward declarations. */
21 
22 downstream *downstream_list_remove(downstream *head, downstream *d);
23 downstream *downstream_list_waiting_remove(downstream *head,
24                                            downstream **tail,
25                                            downstream *d);
26 
27 static void downstream_timeout(evutil_socket_t fd,
28                         const short which,
29                         void *arg);
30 static void wait_queue_timeout(evutil_socket_t fd,
31                         const short which,
32                         void *arg);
33 
34 conn *conn_list_remove(conn *head, conn **tail,
35                        conn *c, bool *found);
36 
37 bool is_compatible_request(conn *existing, conn *candidate);
38 
39 void propagate_error_msg(downstream *d, char *ascii_msg,
40                          protocol_binary_response_status binary_status);
41 
42 void downstream_reserved_time_sample(proxy_stats_td *ptds, uint64_t duration);
43 void downstream_connect_time_sample(proxy_stats_td *ptds, uint64_t duration);
44 
45 bool downstream_connect_init(downstream *d, mcs_server_st *msst,
46                              proxy_behavior *behavior, conn *c);
47 
48 int init_mcs_st(mcs_st *mst, char *config,
49                 const char *default_usr,
50                 const char *default_pwd,
51                 const char *opts);
52 
53 bool cproxy_on_connect_downstream_conn(conn *c);
54 
55 conn *zstored_acquire_downstream_conn(downstream *d,
56                                       LIBEVENT_THREAD *thread,
57                                       mcs_server_st *msst,
58                                       proxy_behavior *behavior,
59                                       bool *downstream_conn_max_reached);
60 
61 void zstored_release_downstream_conn(conn *dc, bool closing);
62 
63 void zstored_error_count(LIBEVENT_THREAD *thread,
64                          const char *host_ident,
65                          bool has_error);
66 
67 bool zstored_downstream_waiting_add(downstream *d, LIBEVENT_THREAD *thread,
68                                     mcs_server_st *msst,
69                                     proxy_behavior *behavior);
70 
71 bool zstored_downstream_waiting_remove(downstream *d);
72 
73 typedef struct {
74     conn      *dc;          /* Linked-list of available downstream conns. */
75     uint32_t   dc_acquired; /* Count of acquired (in-use) downstream conns. */
76     char      *host_ident;
77     uint32_t   error_count;
78     uint64_t   error_time;
79 
80     /* Head & tail of singly linked-list/queue, using */
81     /* downstream->next_waiting pointers, where we've reached */
82     /* downstream_conn_max, so there are waiting downstreams. */
83 
84     downstream *downstream_waiting_head;
85     downstream *downstream_waiting_tail;
86 } zstored_downstream_conns;
87 
88 zstored_downstream_conns *zstored_get_downstream_conns(LIBEVENT_THREAD *thread,
89                                                        const char *host_ident);
90 
91 bool cproxy_forward_or_error(downstream *d);
92 
93 int delink_from_downstream_conns(conn *c);
94 
95 int cproxy_num_active_proxies(proxy_main *m);
96 
97 /* Function tables. */
98 
99 conn_funcs cproxy_listen_funcs = {
100     .conn_init                   = cproxy_init_upstream_conn,
101     .conn_close                  = NULL,
102     .conn_connect                = NULL,
103     .conn_process_ascii_command  = NULL,
104     .conn_process_binary_command = NULL,
105     .conn_complete_nread_ascii   = NULL,
106     .conn_complete_nread_binary  = NULL,
107     .conn_pause                  = NULL,
108     .conn_realtime               = NULL,
109     .conn_state_change           = NULL,
110     .conn_binary_command_magic   = 0
111 };
112 
113 conn_funcs cproxy_upstream_funcs = {
114     .conn_init                   = NULL,
115     .conn_close                  = cproxy_on_close_upstream_conn,
116     .conn_connect                = NULL,
117     .conn_process_ascii_command  = cproxy_process_upstream_ascii,
118     .conn_process_binary_command = cproxy_process_upstream_binary,
119     .conn_complete_nread_ascii   = cproxy_process_upstream_ascii_nread,
120     .conn_complete_nread_binary  = cproxy_process_upstream_binary_nread,
121     .conn_pause                  = NULL,
122     .conn_realtime               = cproxy_realtime,
123     .conn_state_change           = cproxy_upstream_state_change,
124     .conn_binary_command_magic   = PROTOCOL_BINARY_REQ
125 };
126 
127 conn_funcs cproxy_downstream_funcs = {
128     .conn_init                   = cproxy_init_downstream_conn,
129     .conn_close                  = cproxy_on_close_downstream_conn,
130     .conn_connect                = cproxy_on_connect_downstream_conn,
131     .conn_process_ascii_command  = cproxy_process_downstream_ascii,
132     .conn_process_binary_command = cproxy_process_downstream_binary,
133     .conn_complete_nread_ascii   = cproxy_process_downstream_ascii_nread,
134     .conn_complete_nread_binary  = cproxy_process_downstream_binary_nread,
135     .conn_pause                  = cproxy_on_pause_downstream_conn,
136     .conn_realtime               = cproxy_realtime,
137     .conn_state_change           = NULL,
138     .conn_binary_command_magic   = PROTOCOL_BINARY_RES
139 };
140 
141 /* Main function to create a proxy struct.
142  */
cproxy_create(proxy_main *main, char *name, int port, char *config, uint32_t config_ver, proxy_behavior_pool *behavior_pool, int nthreads)143 proxy *cproxy_create(proxy_main *main,
144                      char    *name,
145                      int      port,
146                      char    *config,
147                      uint32_t config_ver,
148                      proxy_behavior_pool *behavior_pool,
149                      int nthreads) {
150     if (settings.verbose > 1) {
151         moxi_log_write("cproxy_create on port %d, name %s, config %s\n",
152                        port, name, config);
153     }
154 
155     cb_assert(name != NULL);
156     cb_assert(port > 0 || settings.socketpath != NULL);
157     cb_assert(config != NULL);
158     cb_assert(behavior_pool);
159     cb_assert(nthreads > 1); /* Main thread + at least one worker. */
160     cb_assert(nthreads == settings.num_threads);
161 
162     proxy *p = (proxy *) calloc(1, sizeof(proxy));
163     if (p != NULL) {
164         p->main       = main;
165         p->name       = trimstrdup(name);
166         p->port       = port;
167         p->config     = trimstrdup(config);
168         p->config_ver = config_ver;
169 
170         p->behavior_pool.base = behavior_pool->base;
171         p->behavior_pool.num  = behavior_pool->num;
172         p->behavior_pool.arr  = cproxy_copy_behaviors(behavior_pool->num,
173                                                       behavior_pool->arr);
174 
175         p->listening        = 0;
176         p->listening_failed = 0;
177 
178         p->next = NULL;
179 
180         cb_mutex_initialize(&p->proxy_lock);
181 
182         mcache_init(&p->front_cache, true, &mcache_item_funcs, true);
183         matcher_init(&p->front_cache_matcher, true);
184         matcher_init(&p->front_cache_unmatcher, true);
185 
186         matcher_init(&p->optimize_set_matcher, true);
187 
188         if (behavior_pool->base.front_cache_max > 0 &&
189             behavior_pool->base.front_cache_lifespan > 0) {
190             mcache_start(&p->front_cache,
191                          behavior_pool->base.front_cache_max);
192 
193             if (strlen(behavior_pool->base.front_cache_spec) > 0) {
194                 matcher_start(&p->front_cache_matcher,
195                               behavior_pool->base.front_cache_spec);
196             }
197 
198             if (strlen(behavior_pool->base.front_cache_unspec) > 0) {
199                 matcher_start(&p->front_cache_unmatcher,
200                               behavior_pool->base.front_cache_unspec);
201             }
202         }
203 
204         if (strlen(behavior_pool->base.optimize_set) > 0) {
205             matcher_start(&p->optimize_set_matcher,
206                           behavior_pool->base.optimize_set);
207         }
208 
209         p->thread_data_num = nthreads;
210         p->thread_data = (proxy_td *) calloc(p->thread_data_num,
211                                              sizeof(proxy_td));
212         if (p->thread_data != NULL &&
213             p->name != NULL &&
214             p->config != NULL &&
215             p->behavior_pool.arr != NULL) {
216             /* We start at 1, because thread[0] is the main listen/accept */
217             /* thread, and not a true worker thread.  Too lazy to save */
218             /* the wasted thread[0] slot memory. */
219             int i;
220             for (i = 1; i < p->thread_data_num; i++) {
221                 proxy_td *ptd = &p->thread_data[i];
222                 ptd->proxy = p;
223 
224                 ptd->config     = strdup(p->config);
225                 ptd->config_ver = p->config_ver;
226 
227                 ptd->behavior_pool.base = behavior_pool->base;
228                 ptd->behavior_pool.num  = behavior_pool->num;
229                 ptd->behavior_pool.arr =
230                     cproxy_copy_behaviors(behavior_pool->num,
231                                           behavior_pool->arr);
232 
233                 ptd->waiting_any_downstream_head = NULL;
234                 ptd->waiting_any_downstream_tail = NULL;
235                 ptd->downstream_reserved = NULL;
236                 ptd->downstream_released = NULL;
237                 ptd->downstream_tot = 0;
238                 ptd->downstream_num = 0;
239                 ptd->downstream_max = behavior_pool->base.downstream_max;
240                 ptd->downstream_assigns = 0;
241                 ptd->timeout_tv.tv_sec = 0;
242                 ptd->timeout_tv.tv_usec = 0;
243                 ptd->stats.stats.num_upstream = 0;
244                 ptd->stats.stats.num_downstream_conn = 0;
245 
246                 cproxy_reset_stats_td(&ptd->stats);
247 
248                 mcache_init(&ptd->key_stats, true,
249                             &mcache_key_stats_funcs, false);
250                 matcher_init(&ptd->key_stats_matcher, false);
251                 matcher_init(&ptd->key_stats_unmatcher, false);
252 
253                 if (behavior_pool->base.key_stats_max > 0 &&
254                     behavior_pool->base.key_stats_lifespan > 0) {
255                     mcache_start(&ptd->key_stats,
256                                  behavior_pool->base.key_stats_max);
257 
258                     if (strlen(behavior_pool->base.key_stats_spec) > 0) {
259                         matcher_start(&ptd->key_stats_matcher,
260                                       behavior_pool->base.key_stats_spec);
261                     }
262 
263                     if (strlen(behavior_pool->base.key_stats_unspec) > 0) {
264                         matcher_start(&ptd->key_stats_unmatcher,
265                                       behavior_pool->base.key_stats_unspec);
266                     }
267                 }
268             }
269 
270             return p;
271         }
272 
273         free(p->name);
274         free(p->config);
275         free(p->behavior_pool.arr);
276         free(p->thread_data);
277         free(p);
278     }
279 
280     return NULL;
281 }
282 
283 /* Must be called on the main listener thread.
284  */
cproxy_listen(proxy *p)285 int cproxy_listen(proxy *p) {
286     cb_assert(p != NULL);
287     cb_assert(is_listen_thread());
288 
289     if (settings.verbose > 1) {
290         moxi_log_write("cproxy_listen on port %d, downstream %s\n",
291                 p->port, p->config);
292     }
293 
294     /* Idempotent, remembers if it already created listening socket(s). */
295 
296     if (p->listening == 0) {
297         int listening;
298         enum protocol listen_protocol = negotiating_proxy_prot;
299 
300         if (IS_ASCII(settings.binding_protocol)) {
301             listen_protocol = proxy_upstream_ascii_prot;
302         }
303         if (IS_BINARY(settings.binding_protocol)) {
304             listen_protocol = proxy_upstream_binary_prot;
305         }
306 
307         listening = cproxy_listen_port(p->port, listen_protocol,
308                                        tcp_transport,
309                                        p,
310                                        &cproxy_listen_funcs);
311         if (listening > 0) {
312             p->listening += listening;
313         } else {
314             p->listening_failed++;
315 
316             if (settings.enable_mcmux_mode && settings.socketpath) {
317 #ifdef HAVE_SYS_UN_H
318                 moxi_log_write("error: could not access UNIX socket: %s\n",
319                                settings.socketpath);
320                 if (ml->log_mode != ERRORLOG_STDERR) {
321                     fprintf(stderr, "error: could not access UNIX socket: %s\n",
322                             settings.socketpath);
323                 }
324                 exit(EXIT_FAILURE);
325 #endif
326             }
327 
328             moxi_log_write("ERROR: could not listen on port %d. "
329                            "Please use -Z port_listen=PORT_NUM "
330                            "to specify a different port number.\n", p->port);
331             if (ml->log_mode != ERRORLOG_STDERR) {
332                 fprintf(stderr, "ERROR: could not listen on port %d. "
333                         "Please use -Z port_listen=PORT_NUM "
334                         "to specify a different port number.\n", p->port);
335             }
336             exit(EXIT_FAILURE);
337         }
338     }
339 
340     return (int)p->listening;
341 }
342 
cproxy_listen_port(int port, enum protocol protocol, enum network_transport transport, void *conn_extra, conn_funcs *funcs)343 int cproxy_listen_port(int port,
344                        enum protocol protocol,
345                        enum network_transport transport,
346                        void       *conn_extra,
347                        conn_funcs *funcs) {
348 
349     int   listening;
350     conn *listen_conn_orig;
351     conn *x;
352 
353     cb_assert(port > 0 || settings.socketpath != NULL);
354     cb_assert(conn_extra);
355     cb_assert(funcs);
356     cb_assert(is_listen_thread());
357 
358     listening = 0;
359     listen_conn_orig = listen_conn;
360 
361     x = listen_conn_orig;
362     while (x != NULL) {
363         if (x->extra != NULL && x->funcs == funcs) {
364             struct sockaddr_in s_in;
365             socklen_t sin_len = sizeof(s_in);
366             memset(&s_in, 0, sizeof(s_in));
367 
368             if (getsockname(x->sfd, (struct sockaddr *) &s_in, &sin_len) == 0) {
369                 int x_port = ntohs(s_in.sin_port);
370                 if (x_port == port) {
371                     if (settings.verbose > 1) {
372                         moxi_log_write(
373                                 "<%d cproxy listening reusing listener on port %d\n",
374                                 x->sfd, port);
375                     }
376 
377                     listening++;
378                 }
379             }
380         }
381 
382         x = x->next;
383     }
384 
385     if (listening > 0) {
386         /* If we're already listening on the required port, then */
387         /* we don't need to start a new server_socket().  This happens */
388         /* in the multi-bucket case with binary protocol buckets. */
389         /* There will be multiple proxy struct's (one per bucket), but */
390         /* only one proxy struct will actually be pointed at by a */
391         /* listening conn->extra (usually 11211). */
392 
393         /* TODO: Add a refcount to handle shutdown properly? */
394 
395         return listening;
396     }
397 #ifdef HAVE_SYS_UN_H
398     if (settings.socketpath ?
399         (server_socket_unix(settings.socketpath, settings.access) == 0) :
400         (server_socket(port, transport, NULL) == 0)) {
401 #else
402     if (server_socket(port, transport, NULL) == 0) {
403 #endif
404         conn *c;
405         cb_assert(listen_conn != NULL);
406 
407         /* The listen_conn global list is changed by server_socket(), */
408         /* which adds a new listening conn on port for each bindable */
409         /* host address. */
410 
411         /* For example, after the call to server_socket(), there */
412         /* might be two new listening conn's -- one for localhost, */
413         /* another for 127.0.0.1. */
414 
415         c = listen_conn;
416         while (c != NULL &&
417                c != listen_conn_orig) {
418             if (settings.verbose > 1) {
419                 moxi_log_write(
420                         "<%d cproxy listening on port %d\n",
421                         c->sfd, port);
422             }
423 
424             listening++;
425 
426             /* TODO: Listening conn's never seem to close, */
427             /*       but need to handle cleanup if they do, */
428             /*       such as if we handle graceful shutdown one day. */
429 
430             c->extra = conn_extra;
431             c->funcs = funcs;
432             c->protocol = protocol;
433             c = c->next;
434         }
435     }
436 
437     return listening;
438 }
439 
440 /* Finds the proxy_td associated with a worker thread.
441  */
442 proxy_td *cproxy_find_thread_data(proxy *p, cb_thread_t thread_id) {
443     if (p != NULL) {
444         int i = thread_index(thread_id);
445 
446         /* 0 is the main listen thread, not a worker thread. */
447         cb_assert(i > 0);
448         cb_assert(i < p->thread_data_num);
449 
450         if (i > 0 && i < p->thread_data_num) {
451             return &p->thread_data[i];
452         }
453     }
454 
455     return NULL;
456 }
457 
458 bool cproxy_init_upstream_conn(conn *c) {
459     char *default_name;
460     proxy *p;
461     int n;
462     proxy_td *ptd;
463 
464     cb_assert(c != NULL);
465 
466     /* We're called once per client/upstream conn early in its */
467     /* lifecycle, on the worker thread, so it's a good place */
468     /* to record the proxy_td into the conn->extra. */
469 
470     cb_assert(!is_listen_thread());
471 
472     p = c->extra;
473     cb_assert(p != NULL);
474     cb_assert(p->main != NULL);
475 
476     n = cproxy_num_active_proxies(p->main);
477     if (n <= 0) {
478         if (settings.verbose > 2) {
479             moxi_log_write("<%d disallowing upstream conn due to no buckets\n",
480                            c->sfd);
481         }
482 
483         return false;
484     }
485 
486     ptd = cproxy_find_thread_data(p, cb_thread_self());
487     cb_assert(ptd != NULL);
488 
489     /* Reassign the client/upstream conn to a different bucket */
490     /* if the default_bucket_name isn't the special FIRST_BUCKET */
491     /* value. */
492 
493     default_name = ptd->behavior_pool.base.default_bucket_name;
494     if (strcmp(default_name, FIRST_BUCKET) != 0) {
495         proxy *default_proxy;
496         if (settings.verbose > 2) {
497             moxi_log_write("<%d assigning to default bucket: %s\n",
498                            c->sfd, default_name);
499         }
500 
501         default_proxy =
502             cproxy_find_proxy_by_auth(p->main, default_name, "");
503 
504         /* If the ostensible default bucket is missing (possibly deleted), */
505         /* assign the client/upstream conn to the NULL BUCKET. */
506 
507         if (default_proxy == NULL) {
508             default_proxy =
509                 cproxy_find_proxy_by_auth(p->main, NULL_BUCKET, "");
510 
511             if (settings.verbose > 2) {
512                 moxi_log_write("<%d assigning to null bucket, "
513                                "default bucket missing: %s\n",
514                                c->sfd, default_name);
515             }
516         }
517 
518         if (default_proxy != NULL) {
519             proxy_td *default_ptd =
520                 cproxy_find_thread_data(default_proxy, cb_thread_self());
521             if (default_ptd != NULL) {
522                 ptd = default_ptd;
523             }
524         } else {
525             if (settings.verbose > 2) {
526                 moxi_log_write("<%d assigning to first bucket, "
527                                "missing default/null bucket: %s\n",
528                                c->sfd, default_name);
529             }
530         }
531     } else {
532         if (settings.verbose > 2) {
533             moxi_log_write("<%d assigning to first bucket\n", c->sfd);
534         }
535     }
536 
537     ptd->stats.stats.num_upstream++;
538     ptd->stats.stats.tot_upstream++;
539 
540     c->extra = ptd;
541     c->funcs = &cproxy_upstream_funcs;
542 
543     return true;
544 }
545 
546 bool cproxy_init_downstream_conn(conn *c) {
547     downstream *d = c->extra;
548     cb_assert(d != NULL);
549 
550     d->ptd->stats.stats.num_downstream_conn++;
551     d->ptd->stats.stats.tot_downstream_conn++;
552 
553     return true;
554 }
555 
556 void cproxy_on_close_upstream_conn(conn *c) {
557     downstream *d;
558     proxy_td *ptd;
559 
560     cb_assert(c != NULL);
561 
562     if (settings.verbose > 2) {
563         moxi_log_write("<%d cproxy_on_close_upstream_conn\n", c->sfd);
564     }
565 
566     ptd = c->extra;
567     if (c->extra == NULL) {
568         moxi_log_write("<%d cproxy_on_close_upstream_conn already closed\n",
569                        c->sfd);
570         return;
571     }
572     c->extra = NULL;
573 
574     if (ptd->stats.stats.num_upstream > 0) {
575         ptd->stats.stats.num_upstream--;
576     }
577 
578     /* Delink from any reserved downstream. */
579 
580     for (d = ptd->downstream_reserved; d != NULL; d = d->next) {
581         bool found = false;
582 
583         d->upstream_conn = conn_list_remove(d->upstream_conn, NULL,
584                                             c, &found);
585         if (d->upstream_conn == NULL) {
586             d->upstream_suffix = NULL;
587             d->upstream_suffix_len = 0;
588             d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
589             d->upstream_retry = 0;
590             d->target_host_ident = NULL;
591 
592             /* Don't need to do anything else, as we'll now just */
593             /* read and drop any remaining inflight downstream replies. */
594             /* Eventually, the downstream will be released. */
595         }
596 
597         /* If the downstream was reserved for this upstream conn, */
598         /* also clear the upstream from any multiget de-duplication */
599         /* tracking structures. */
600 
601         if (found) {
602             int n, i;
603             if (d->multiget != NULL) {
604                 genhash_iter(d->multiget, multiget_remove_upstream, c);
605             }
606 
607             /* The downstream conn's might have iov's that */
608             /* point to the upstream conn's buffers.  Also, the */
609             /* downstream conn might be in all sorts of states */
610             /* (conn_read, write, mwrite, pause), and we want */
611             /* to be careful about the downstream channel being */
612             /* half written. */
613 
614             /* The safest, but inefficient, thing to do then is */
615             /* to close any conn_mwrite downstream conns. */
616 
617             ptd->stats.stats.tot_downstream_close_on_upstream_close++;
618 
619             n = mcs_server_count(&d->mst);
620 
621             for (i = 0; i < n; i++) {
622                 conn *downstream_conn = d->downstream_conns[i];
623                 if (downstream_conn != NULL &&
624                     downstream_conn != NULL_CONN &&
625                     downstream_conn->state == conn_mwrite) {
626                     downstream_conn->msgcurr = 0;
627                     downstream_conn->msgused = 0;
628                     downstream_conn->iovused = 0;
629 
630                     cproxy_close_conn(downstream_conn);
631                 }
632             }
633         }
634     }
635 
636     /* Delink from wait queue. */
637 
638     ptd->waiting_any_downstream_head =
639         conn_list_remove(ptd->waiting_any_downstream_head,
640                          &ptd->waiting_any_downstream_tail,
641                          c, NULL);
642 }
643 
644 int delink_from_downstream_conns(conn *c) {
645     int n, k, i;
646     downstream *d = c->extra;
647     if (d == NULL) {
648         if (settings.verbose > 2) {
649             moxi_log_write("%d: delink_from_downstream_conn no-op\n",
650                            c->sfd);
651         }
652 
653         return -1;
654     }
655 
656     n = mcs_server_count(&d->mst);
657     k = -1; /* Index of conn. */
658 
659     for (i = 0; i < n; i++) {
660         if (d->downstream_conns[i] == c) {
661             d->downstream_conns[i] = NULL;
662 
663             if (settings.verbose > 2) {
664                 moxi_log_write(
665                         "<%d cproxy_on_close_downstream_conn quit_server\n",
666                         c->sfd);
667             }
668 
669             d->ptd->stats.stats.tot_downstream_quit_server++;
670 
671             mcs_server_st_quit(mcs_server_index(&d->mst, i), 1);
672             cb_assert(mcs_server_st_fd(mcs_server_index(&d->mst, i)) == -1);
673 
674             k = i;
675         }
676     }
677 
678     return k;
679 }
680 
681 void cproxy_on_close_downstream_conn(conn *c) {
682     conn *uc_retry = NULL;
683     downstream *d;
684     int k;
685     proxy_td *ptd;
686 
687     cb_assert(c != NULL);
688     cb_assert(c->sfd >= 0);
689     cb_assert(c->state == conn_closing);
690 
691     if (settings.verbose > 2) {
692         moxi_log_write("<%d cproxy_on_close_downstream_conn\n", c->sfd);
693     }
694 
695     d = c->extra;
696 
697     /* Might have been set to NULL during cproxy_free_downstream(). */
698     /* Or, when a downstream conn is in the thread-based free pool, it */
699     /* is not associated with any particular downstream. */
700 
701     if (d == NULL) {
702         /* TODO: See if we need to remove the downstream conn from the */
703         /* thread-based free pool.  This shouldn't happen, but we */
704         /* should then figure out how to put an cb_assert() here. */
705 
706         return;
707     }
708 
709     k = delink_from_downstream_conns(c);
710 
711     c->extra = NULL;
712 
713     if (c->thread != NULL &&
714         c->host_ident != NULL) {
715         zstored_error_count(c->thread, c->host_ident, true);
716     }
717 
718     ptd = d->ptd;
719     cb_assert(ptd);
720 
721     if (ptd->stats.stats.num_downstream_conn > 0) {
722         ptd->stats.stats.num_downstream_conn--;
723     }
724 
725     if (k < 0) {
726         /* If this downstream conn wasn't linked into the */
727         /* downstream, it was delinked already during connect error */
728         /* handling (where its slot was set to NULL_CONN already), */
729         /* or during downstream_timeout/conn_queue_timeout. */
730 
731         if (settings.verbose > 2) {
732             moxi_log_write("%d: skipping release dc in on_close_dc\n",
733                            c->sfd);
734         }
735 
736         return;
737     }
738 
739     if (d->upstream_conn != NULL &&
740         d->downstream_used == 1) {
741         /* TODO: Revisit downstream close error handling. */
742         /*       Should we propagate error when... */
743         /*       - any downstream conn closes? */
744         /*       - all downstream conns closes? */
745         /*       - last downstream conn closes?  Current behavior. */
746 
747         if (d->upstream_suffix == NULL) {
748             if (settings.verbose > 2) {
749                 moxi_log_write("<%d proxy downstream closed, upstream %d (%x)\n",
750                                c->sfd,
751                                d->upstream_conn->sfd,
752                                d->upstream_conn->protocol);
753             }
754 
755             if (IS_ASCII(d->upstream_conn->protocol)) {
756                 d->upstream_suffix = "SERVER_ERROR proxy downstream closed\r\n";
757 
758                 if (c->host_ident != NULL) {
759                     char *s = add_conn_suffix(d->upstream_conn);
760                     if (s != NULL) {
761                         snprintf(s, SUFFIX_SIZE - 1,
762                                  "SERVER_ERROR proxy downstream closed %s\r\n",
763                                  c->host_ident);
764                         s[SUFFIX_SIZE - 1] = '\0';
765                         d->upstream_suffix = s;
766 
767                         s = strchr(s, ':'); /* Clip to avoid sending user/pswd. */
768                         if (s != NULL) {
769                             *s++ = '\r';
770                             *s++ = '\n';
771                             *s = '\0';
772                         }
773                     }
774                 }
775 
776                 d->upstream_suffix_len = 0;
777             } else {
778                 d->upstream_status = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
779             }
780 
781             d->upstream_retry = 0;
782             d->target_host_ident = NULL;
783         }
784 
785         /* We sometimes see that drive_machine/transmit will not see */
786         /* a closed connection error during conn_mwrite, possibly */
787         /* due to non-blocking sockets.  Because of this, drive_machine */
788         /* thinks it has a successful downstream request send and */
789         /* moves the state forward trying to read a response from */
790         /* the downstream conn (conn_new_cmd, conn_read, etc), and */
791         /* only then do we finally see the conn close situation, */
792         /* ending up here.  That is, drive_machine only */
793         /* seems to move to conn_closing from conn_read. */
794 
795         /* If we haven't received any reply yet, we retry based */
796         /* on our cmd_retries counter. */
797 
798         /* TODO: Reconsider retry behavior, is it right in all situations? */
799 
800         if (c->rcurr != NULL &&
801             c->rbytes == 0 &&
802             d->downstream_used_start == d->downstream_used &&
803             d->downstream_used_start == 1 &&
804             d->upstream_conn->next == NULL &&
805             d->behaviors_arr != NULL) {
806             if (k >= 0 && k < d->behaviors_num) {
807                 int retry_max = d->behaviors_arr[k].downstream_retry;
808                 if (d->upstream_conn->cmd_retries < retry_max) {
809                     d->upstream_conn->cmd_retries++;
810                     uc_retry = d->upstream_conn;
811                     d->upstream_suffix = NULL;
812                     d->upstream_suffix_len = 0;
813                     d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
814                     d->upstream_retry = 0;
815                     d->target_host_ident = NULL;
816                 }
817             }
818         }
819 
820         if (uc_retry == NULL &&
821             d->upstream_suffix == NULL &&
822             IS_BINARY(d->upstream_conn->protocol)) {
823             protocol_binary_response_header *rh =
824                 cproxy_make_bin_error(d->upstream_conn,
825                                       PROTOCOL_BINARY_RESPONSE_EINTERNAL);
826             if (rh != NULL) {
827                 d->upstream_suffix = (char *) rh;
828                 d->upstream_suffix_len = sizeof(protocol_binary_response_header);
829             } else {
830                 d->ptd->stats.stats.err_oom++;
831                 cproxy_close_conn(d->upstream_conn);
832             }
833         }
834     }
835 
836     /* Are we over-decrementing here, and in handling conn_pause? */
837 
838     /* Case 1: we're in conn_pause, and socket is closed concurrently. */
839     /* We unpause due to reserve, we move to conn_write/conn_mwrite, */
840     /* fail and move to conn_closing.  So, no over-decrement. */
841 
842     /* Case 2: we're waiting for a downstream response in conn_new_cmd, */
843     /* and socket is closed concurrently.  State goes to conn_closing, */
844     /* so, no over-decrement. */
845 
846     /* Case 3: we've finished processing downstream response (in */
847     /* conn_parse_cmd or conn_nread), and the downstream socket */
848     /* is closed concurrently.  We then move to conn_pause, */
849     /* and same as Case 1. */
850 
851     cproxy_release_downstream_conn(d, c);
852 
853     /* Setup a retry after unwinding the call stack. */
854     /* We use the work_queue, because our caller, conn_close(), */
855     /* is likely to blow away our fd if we try to reconnect */
856     /* right now. */
857 
858     if (uc_retry != NULL) {
859         if (settings.verbose > 2) {
860             moxi_log_write("%d cproxy retrying\n", uc_retry->sfd);
861         }
862 
863         ptd->stats.stats.tot_retry++;
864 
865         cb_assert(uc_retry->thread);
866         cb_assert(uc_retry->thread->work_queue);
867 
868         work_send(uc_retry->thread->work_queue,
869                   upstream_retry, ptd, uc_retry);
870     }
871 }
872 
873 void upstream_retry(void *data0, void *data1) {
874     proxy_td *ptd = data0;
875     conn *uc = data1;
876 
877     cb_assert(ptd);
878     cb_assert(uc);
879 
880     cproxy_pause_upstream_for_downstream(ptd, uc);
881 }
882 
883 void cproxy_add_downstream(proxy_td *ptd) {
884     cb_assert(ptd != NULL);
885     cb_assert(ptd->proxy != NULL);
886 
887     if (ptd->downstream_max == 0 ||
888         ptd->downstream_num < ptd->downstream_max) {
889         if (settings.verbose > 2) {
890             moxi_log_write("cproxy_add_downstream %d %d\n",
891                     ptd->downstream_num,
892                     ptd->downstream_max);
893         }
894 
895         /* The config/behaviors will be NULL if the */
896         /* proxy is shutting down. */
897 
898         if (ptd->config != NULL &&
899             ptd->behavior_pool.arr != NULL) {
900             downstream *d =
901                 cproxy_create_downstream(ptd->config,
902                                          ptd->config_ver,
903                                          &ptd->behavior_pool);
904             if (d != NULL) {
905                 d->ptd = ptd;
906                 ptd->downstream_tot++;
907                 ptd->downstream_num++;
908                 cproxy_release_downstream(d, true);
909             } else {
910                 ptd->stats.stats.tot_downstream_create_failed++;
911             }
912         }
913     } else {
914         ptd->stats.stats.tot_downstream_max_reached++;
915     }
916 }
917 
918 downstream *cproxy_reserve_downstream(proxy_td *ptd) {
919     cb_assert(ptd != NULL);
920 
921     /* Loop in case we need to clear out downstreams */
922     /* that have outdated configs. */
923 
924     while (true) {
925         downstream *d;
926 
927         d = ptd->downstream_released;
928         if (d == NULL) {
929             cproxy_add_downstream(ptd);
930         }
931 
932         d = ptd->downstream_released;
933         if (d == NULL) {
934             return NULL;
935         }
936 
937         ptd->downstream_released = d->next;
938 
939         cb_assert(d->upstream_conn == NULL);
940         cb_assert(d->upstream_suffix == NULL);
941         cb_assert(d->upstream_suffix_len == 0);
942         cb_assert(d->upstream_status == PROTOCOL_BINARY_RESPONSE_SUCCESS);
943         cb_assert(d->upstream_retry == 0);
944         cb_assert(d->target_host_ident == NULL);
945         cb_assert(d->downstream_used == 0);
946         cb_assert(d->downstream_used_start == 0);
947         cb_assert(d->merger == NULL);
948         cb_assert(d->timeout_tv.tv_sec == 0);
949         cb_assert(d->timeout_tv.tv_usec == 0);
950         cb_assert(d->next_waiting == NULL);
951 
952         d->upstream_conn = NULL;
953         d->upstream_suffix = NULL;
954         d->upstream_suffix_len = 0;
955         d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
956         d->upstream_retry = 0;
957         d->upstream_retries = 0;
958         d->target_host_ident = NULL;
959         d->usec_start = 0;
960         d->downstream_used = 0;
961         d->downstream_used_start = 0;
962         d->merger = NULL;
963         d->timeout_tv.tv_sec = 0;
964         d->timeout_tv.tv_usec = 0;
965         d->next_waiting = NULL;
966 
967         if (cproxy_check_downstream_config(d)) {
968             bool found;
969             ptd->downstream_reserved =
970                 downstream_list_remove(ptd->downstream_reserved, d);
971             ptd->downstream_released =
972                 downstream_list_remove(ptd->downstream_released, d);
973 
974             found = zstored_downstream_waiting_remove(d);
975             cb_assert(!found);
976 
977             d->next = ptd->downstream_reserved;
978             ptd->downstream_reserved = d;
979 
980             ptd->stats.stats.tot_downstream_reserved++;
981 
982             return d;
983         }
984 
985         cproxy_free_downstream(d);
986     }
987 }
988 
989 bool cproxy_clear_timeout(downstream *d) {
990     bool rv = false;
991 
992     if (d->timeout_tv.tv_sec != 0 ||
993         d->timeout_tv.tv_usec != 0) {
994         evtimer_del(&d->timeout_event);
995         rv = true;
996     }
997 
998     d->timeout_tv.tv_sec = 0;
999     d->timeout_tv.tv_usec = 0;
1000 
1001     return rv;
1002 }
1003 
1004 bool cproxy_release_downstream(downstream *d, bool force) {
1005     int i;
1006     int n;
1007     bool found;
1008 
1009     cb_assert(d != NULL);
1010     cb_assert(d->ptd != NULL);
1011 
1012     if (settings.verbose > 2) {
1013         moxi_log_write("%d: release_downstream\n",
1014                        d->upstream_conn != NULL ?
1015                        d->upstream_conn->sfd : -1);
1016     }
1017 
1018     /* Always release the timeout_event, even if we're going to retry, */
1019     /* to avoid pegging CPU with leaked timeout_events. */
1020 
1021     cproxy_clear_timeout(d);
1022 
1023     /* If we need to retry the command, we do so here, */
1024     /* keeping the same downstream that would otherwise */
1025     /* be released. */
1026 
1027     if (!force && d->upstream_conn != NULL && d->upstream_retry > 0) {
1028         int max_retries;
1029         d->upstream_retry = 0;
1030         d->upstream_retries++;
1031 
1032         /* But, we can stop retrying if we've tried each server twice. */
1033         max_retries = cproxy_max_retries(d);
1034 
1035         if (d->upstream_retries <= max_retries) {
1036             if (settings.verbose > 2) {
1037                 moxi_log_write("%d: release_downstream,"
1038                                " instead retrying %d, %d <= %d, %d\n",
1039                                d->upstream_conn->sfd,
1040                                d->upstream_retry,
1041                                d->upstream_retries, max_retries,
1042                                d->ptd->stats.stats.tot_retry);
1043             }
1044 
1045             d->ptd->stats.stats.tot_retry++;
1046 
1047             if (cproxy_forward(d) == true) {
1048                 return true;
1049             } else {
1050                 d->ptd->stats.stats.tot_downstream_propagate_failed++;
1051 
1052                 propagate_error_msg(d, NULL, d->upstream_status);
1053             }
1054         } else {
1055             if (settings.verbose > 2) {
1056                 moxi_log_write("%d: release_downstream,"
1057                                " skipping retry %d, %d > %d\n",
1058                                d->upstream_conn->sfd,
1059                                d->upstream_retry,
1060                                d->upstream_retries, max_retries);
1061             }
1062         }
1063     }
1064 
1065     /* Record reserved_time histogram timings. */
1066 
1067     if (d->usec_start > 0) {
1068         uint64_t ux = usec_now() - d->usec_start;
1069 
1070         d->ptd->stats.stats.tot_downstream_reserved_time += ux;
1071 
1072         if (d->ptd->stats.stats.max_downstream_reserved_time < ux) {
1073             d->ptd->stats.stats.max_downstream_reserved_time = ux;
1074         }
1075 
1076         if (d->upstream_retries > 0) {
1077             d->ptd->stats.stats.tot_retry_time += ux;
1078 
1079             if (d->ptd->stats.stats.max_retry_time < ux) {
1080                 d->ptd->stats.stats.max_retry_time = ux;
1081             }
1082         }
1083 
1084         downstream_reserved_time_sample(&d->ptd->stats, ux);
1085     }
1086 
1087     d->ptd->stats.stats.tot_downstream_released++;
1088 
1089     /* Delink upstream conns. */
1090 
1091     while (d->upstream_conn != NULL) {
1092         conn *curr;
1093         if (d->merger != NULL) {
1094             /* TODO: Allow merger callback to be func pointer. */
1095 
1096             genhash_iter(d->merger,
1097                         protocol_stats_foreach_write,
1098                         d->upstream_conn);
1099 
1100             if (update_event(d->upstream_conn, EV_WRITE | EV_PERSIST)) {
1101                 conn_set_state(d->upstream_conn, conn_mwrite);
1102             } else {
1103                 d->ptd->stats.stats.err_oom++;
1104                 cproxy_close_conn(d->upstream_conn);
1105             }
1106         }
1107 
1108         if (settings.verbose > 2) {
1109             moxi_log_write("%d: release_downstream upstream_suffix %s status %x\n",
1110                            d->upstream_conn->sfd,
1111                            d->upstream_suffix_len == 0 ?
1112                            d->upstream_suffix : "(binary)",
1113                            d->upstream_status);
1114         }
1115 
1116         if (d->upstream_suffix != NULL) {
1117             /* Do a last write on the upstream.  For example, */
1118             /* the upstream_suffix might be "END\r\n" or other */
1119             /* way to mark the end of a scatter-gather or */
1120             /* multiline response. */
1121             int suffix_len;
1122 
1123             if (settings.verbose > 2) {
1124                 if (d->upstream_suffix_len > 0) {
1125                     moxi_log_write("%d: release_downstream"
1126                                    " writing suffix binary: %d\n",
1127                                    d->upstream_conn->sfd,
1128                                    d->upstream_suffix_len);
1129 
1130                     cproxy_dump_header(d->upstream_conn->sfd,
1131                                        d->upstream_suffix);
1132                 } else {
1133                     moxi_log_write("%d: release_downstream"
1134                                    " writing suffix ascii: %s\n",
1135                                    d->upstream_conn->sfd,
1136                                    d->upstream_suffix);
1137                 }
1138             }
1139 
1140             suffix_len = d->upstream_suffix_len;
1141             if (suffix_len == 0) {
1142                 suffix_len = (int)strlen(d->upstream_suffix);
1143             }
1144 
1145             if (add_iov(d->upstream_conn,
1146                         d->upstream_suffix,
1147                         suffix_len) == 0 &&
1148                 update_event(d->upstream_conn, EV_WRITE | EV_PERSIST)) {
1149                 conn_set_state(d->upstream_conn, conn_mwrite);
1150             } else {
1151                 d->ptd->stats.stats.err_oom++;
1152                 cproxy_close_conn(d->upstream_conn);
1153             }
1154         }
1155 
1156         curr = d->upstream_conn;
1157         d->upstream_conn = d->upstream_conn->next;
1158         curr->next = NULL;
1159     }
1160 
1161     /* Free extra hash tables. */
1162 
1163     if (d->multiget != NULL) {
1164         genhash_iter(d->multiget, multiget_foreach_free, d);
1165         genhash_free(d->multiget);
1166         d->multiget = NULL;
1167     }
1168 
1169     if (d->merger != NULL) {
1170         genhash_iter(d->merger, protocol_stats_foreach_free, NULL);
1171         genhash_free(d->merger);
1172         d->merger = NULL;
1173     }
1174 
1175     d->upstream_conn = NULL;
1176     d->upstream_suffix = NULL; /* No free(), expecting a static string. */
1177     d->upstream_suffix_len = 0;
1178     d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
1179     d->upstream_retry = 0;
1180     d->upstream_retries = 0;
1181     d->target_host_ident = NULL;
1182     d->usec_start = 0;
1183     d->downstream_used = 0;
1184     d->downstream_used_start = 0;
1185     d->multiget = NULL;
1186     d->merger = NULL;
1187 
1188     /* TODO: Consider adding a downstream->prev backpointer */
1189     /*       or doubly-linked list to save on this scan. */
1190 
1191     d->ptd->downstream_reserved =
1192         downstream_list_remove(d->ptd->downstream_reserved, d);
1193     d->ptd->downstream_released =
1194         downstream_list_remove(d->ptd->downstream_released, d);
1195 
1196     found = zstored_downstream_waiting_remove(d);
1197     cb_assert(!found);
1198 
1199     n = mcs_server_count(&d->mst);
1200 
1201     for (i = 0; i < n; i++) {
1202         conn *dc = d->downstream_conns[i];
1203         d->downstream_conns[i] = NULL;
1204         if (dc != NULL) {
1205             zstored_release_downstream_conn(dc, false);
1206         }
1207     }
1208 
1209     cproxy_clear_timeout(d); /* For MB-4334. */
1210 
1211     cb_assert(d->timeout_tv.tv_sec == 0);
1212     cb_assert(d->timeout_tv.tv_usec == 0);
1213 
1214     /* If this downstream still has the same configuration as our top-level */
1215     /* proxy config, go back onto the available, released downstream list. */
1216 
1217     if (cproxy_check_downstream_config(d) || force) {
1218         d->next = d->ptd->downstream_released;
1219         d->ptd->downstream_released = d;
1220 
1221         return true;
1222     }
1223 
1224     cproxy_free_downstream(d);
1225 
1226     return false;
1227 }
1228 
1229 void cproxy_free_downstream(downstream *d) {
1230     bool found;
1231     int n;
1232 
1233     cb_assert(d != NULL);
1234     cb_assert(d->ptd != NULL);
1235     cb_assert(d->upstream_conn == NULL);
1236     cb_assert(d->multiget == NULL);
1237     cb_assert(d->merger == NULL);
1238     cb_assert(d->timeout_tv.tv_sec == 0);
1239     cb_assert(d->timeout_tv.tv_usec == 0);
1240 
1241     if (settings.verbose > 2) {
1242         moxi_log_write("cproxy_free_downstream\n");
1243     }
1244 
1245     d->ptd->stats.stats.tot_downstream_freed++;
1246 
1247     d->ptd->downstream_reserved =
1248         downstream_list_remove(d->ptd->downstream_reserved, d);
1249     d->ptd->downstream_released =
1250         downstream_list_remove(d->ptd->downstream_released, d);
1251 
1252     found = zstored_downstream_waiting_remove(d);
1253     cb_assert(!found);
1254 
1255     d->ptd->downstream_num--;
1256     cb_assert(d->ptd->downstream_num >= 0);
1257 
1258     n = mcs_server_count(&d->mst);
1259 
1260     if (d->downstream_conns != NULL) {
1261         int i;
1262         for (i = 0; i < n; i++) {
1263             if (d->downstream_conns[i] != NULL &&
1264                 d->downstream_conns[i] != NULL_CONN) {
1265                 d->downstream_conns[i]->extra = NULL;
1266             }
1267         }
1268     }
1269 
1270     /* This will close sockets, which will force associated conn's */
1271     /* to go to conn_closing state.  Since we've already cleared */
1272     /* the conn->extra pointers, there's no extra release/free. */
1273 
1274     mcs_free(&d->mst);
1275 
1276     cproxy_clear_timeout(d);
1277 
1278     if (d->downstream_conns != NULL) {
1279         free(d->downstream_conns);
1280     }
1281 
1282     if (d->config != NULL) {
1283         free(d->config);
1284     }
1285 
1286     free(d->behaviors_arr);
1287     free(d);
1288 }
1289 
1290 /* The config input is something libmemcached can parse.
1291  * See mcs_server_st_parse().
1292  */
1293 downstream *cproxy_create_downstream(char *config,
1294                                      uint32_t config_ver,
1295                                      proxy_behavior_pool *behavior_pool) {
1296     downstream *d = calloc(1, sizeof(downstream));
1297     cb_assert(config != NULL);
1298     cb_assert(behavior_pool != NULL);
1299 
1300     if (d != NULL &&
1301         config != NULL &&
1302         config[0] != '\0') {
1303         d->config        = strdup(config);
1304         d->config_ver    = config_ver;
1305         d->behaviors_num = behavior_pool->num;
1306         d->behaviors_arr = cproxy_copy_behaviors(behavior_pool->num,
1307                                                  behavior_pool->arr);
1308 
1309         /* TODO: Handle non-uniform downstream protocols. */
1310 
1311         cb_assert(IS_PROXY(behavior_pool->base.downstream_protocol));
1312 
1313         if (settings.verbose > 2) {
1314             moxi_log_write(
1315                     "cproxy_create_downstream: %s, %u, %u\n",
1316                     config, config_ver,
1317                     behavior_pool->base.downstream_protocol);
1318         }
1319 
1320         if (d->config != NULL &&
1321             d->behaviors_arr != NULL) {
1322             char *usr = behavior_pool->base.usr[0] != '\0' ?
1323                 behavior_pool->base.usr :
1324                 NULL;
1325             char *pwd = behavior_pool->base.pwd[0] != '\0' ?
1326                 behavior_pool->base.pwd :
1327                 NULL;
1328 
1329             int nconns = init_mcs_st(&d->mst, d->config, usr, pwd,
1330                                      behavior_pool->base.mcs_opts);
1331             if (nconns > 0) {
1332                 d->downstream_conns = (conn **)
1333                     calloc(nconns, sizeof(conn *));
1334                 if (d->downstream_conns != NULL) {
1335                     return d;
1336                 }
1337 
1338                 mcs_free(&d->mst);
1339             }
1340         }
1341 
1342         free(d->config);
1343         free(d->behaviors_arr);
1344         free(d);
1345     }
1346 
1347     return NULL;
1348 }
1349 
1350 int init_mcs_st(mcs_st *mst, char *config,
1351                 const char *default_usr,
1352                 const char *default_pwd,
1353                 const char *opts) {
1354     cb_assert(mst);
1355     cb_assert(config);
1356 
1357     if (mcs_create(mst, config,
1358                    default_usr, default_pwd, opts) != NULL) {
1359         return mcs_server_count(mst);
1360     } else {
1361         if (settings.verbose > 1) {
1362             moxi_log_write("mcs_create failed: %s\n",
1363                     config);
1364         }
1365     }
1366 
1367     return 0;
1368 }
1369 
1370 /* See if the downstream config matches the top-level proxy config.
1371  */
1372 bool cproxy_check_downstream_config(downstream *d) {
1373     int rv = false;
1374 
1375     cb_assert(d != NULL);
1376     cb_assert(d->ptd != NULL);
1377     cb_assert(d->ptd->proxy != NULL);
1378 
1379     if (d->config_ver == d->ptd->config_ver) {
1380         rv = true;
1381     } else if (d->config != NULL &&
1382                d->ptd->config != NULL &&
1383                cproxy_equal_behaviors(d->behaviors_num,
1384                                       d->behaviors_arr,
1385                                       d->ptd->behavior_pool.num,
1386                                       d->ptd->behavior_pool.arr)) {
1387         /* Parse the proxy/parent's config to see if we can */
1388         /* reuse our existing downstream connections. */
1389 
1390         char *usr = d->ptd->behavior_pool.base.usr[0] != '\0' ?
1391             d->ptd->behavior_pool.base.usr :
1392             NULL;
1393         char *pwd = d->ptd->behavior_pool.base.pwd[0] != '\0' ?
1394             d->ptd->behavior_pool.base.pwd :
1395             NULL;
1396 
1397         mcs_st next;
1398 
1399         int n = init_mcs_st(&next, d->ptd->config, usr, pwd,
1400                             d->ptd->behavior_pool.base.mcs_opts);
1401         if (n > 0) {
1402             if (mcs_stable_update(&d->mst, &next)) {
1403                 if (settings.verbose > 2) {
1404                     moxi_log_write("check_downstream_config stable update\n");
1405                 }
1406 
1407                 free(d->config);
1408                 d->config     = strdup(d->ptd->config);
1409                 d->config_ver = d->ptd->config_ver;
1410                 rv = true;
1411             }
1412 
1413             mcs_free(&next);
1414         }
1415     }
1416 
1417     if (settings.verbose > 2) {
1418         moxi_log_write("check_downstream_config %u\n", rv);
1419     }
1420 
1421     return rv;
1422 }
1423 
1424 /* Returns -1 if the connections aren't fully assigned and ready. */
1425 /* In that case, the downstream has to wait for a downstream connection */
1426 /* to get out of the conn_connecting state. */
1427 
1428 /* The downstream connection might leave the conn_connecting state */
1429 /* with an error (unable to connect).  That case is handled by */
1430 /* tracking a NULL_CONN sentinel value. */
1431 
1432 /* Also, in the -1 result case, the d->upstream_conn should remain in */
1433 /* conn_pause state. */
1434 
1435 /* A server_index of -1 means to connect all downstreams, as the */
1436 /* caller probably needs to proxy a broadcast command. */
1437 
1438 int cproxy_connect_downstream(downstream *d, LIBEVENT_THREAD *thread,
1439                               int server_index) {
1440     int i = 0;
1441     int s = 0; /* Number connected. */
1442     int n;
1443     mcs_server_st *msst_actual;
1444 
1445     cb_assert(d != NULL);
1446     cb_assert(d->ptd != NULL);
1447     cb_assert(d->ptd->downstream_released != d); /* Should not be in free list. */
1448     cb_assert(d->downstream_conns != NULL);
1449     cb_assert(mcs_server_count(&d->mst) > 0);
1450     cb_assert(thread != NULL);
1451     cb_assert(thread->base != NULL);
1452 
1453     n = mcs_server_count(&d->mst);
1454 
1455     cb_assert(d->behaviors_num >= n);
1456     cb_assert(d->behaviors_arr != NULL);
1457 
1458     if (settings.verbose > 2) {
1459         moxi_log_write("%d: cproxy_connect_downstream server_index %d in %d\n",
1460                        d->upstream_conn->sfd, server_index, n);
1461     }
1462 
1463 
1464     if (server_index >= 0) {
1465         cb_assert(server_index < n);
1466         i = server_index;
1467         n = server_index + 1;
1468     }
1469 
1470     for (; i < n; i++) {
1471         cb_assert(IS_PROXY(d->behaviors_arr[i].downstream_protocol));
1472 
1473         msst_actual = mcs_server_index(&d->mst, i);
1474 
1475         /* Connect to downstream servers, if not already. */
1476 
1477         /* A NULL_CONN means that we tried to connect, but failed, */
1478         /* which is different than NULL (which means that we haven't */
1479         /* tried to connect yet). */
1480 
1481         if (d->downstream_conns[i] == NULL) {
1482             bool downstream_conn_max_reached = false;
1483             conn *c = d->upstream_conn;
1484             /*
1485              * mcmux compatiblity mode, one downstream struct will be associated
1486              * with downstream connection. So overwrite the default msst with the
1487              * value.
1488              */
1489             if (c && c->peer_host && c->peer_port) {
1490                 cb_assert(i == 0);
1491                 strncpy(msst_actual->hostname, c->peer_host, MCS_HOSTNAME_SIZE);
1492                 msst_actual->port = c->peer_port;
1493                 msst_actual->fd = -1;
1494                 msst_actual->ident_a[0] = msst_actual->ident_b[0] = 0;
1495             }
1496 
1497             d->downstream_conns[i] =
1498                 zstored_acquire_downstream_conn(d, thread,
1499                                                 msst_actual,
1500                                                 &d->behaviors_arr[i],
1501                                                 &downstream_conn_max_reached);
1502             if (c != NULL &&
1503                 i == server_index &&
1504                 d->downstream_conns[i] != NULL &&
1505                 d->downstream_conns[i] != NULL_CONN &&
1506                 d->target_host_ident == NULL) {
1507                 d->target_host_ident = add_conn_suffix(c);
1508                 if (d->target_host_ident != NULL) {
1509                     strncpy(d->target_host_ident,
1510                             msst_actual->hostname,
1511                             SUFFIX_SIZE);
1512                     d->target_host_ident[SUFFIX_SIZE - 1] = '\0';
1513                 }
1514             }
1515 
1516             if (d->downstream_conns[i] != NULL &&
1517                 d->downstream_conns[i] != NULL_CONN &&
1518                 d->downstream_conns[i]->state == conn_connecting) {
1519                 return -1;
1520             }
1521 
1522             if (d->downstream_conns[i] == NULL &&
1523                 downstream_conn_max_reached == true) {
1524                 if (settings.verbose > 2) {
1525                     moxi_log_write("%d: downstream_conn_max reached\n",
1526                                    d->upstream_conn->sfd);
1527                 }
1528 
1529                 if (zstored_downstream_waiting_add(d, thread,
1530                                                    msst_actual,
1531                                                    &d->behaviors_arr[i]) == true) {
1532                     /* Since we're waiting on the downstream conn queue, */
1533                     /* start a downstream timer per configuration. */
1534 
1535                     cproxy_start_downstream_timeout_ex(d, c,
1536                         d->behaviors_arr[i].downstream_conn_queue_timeout);
1537 
1538                     return -1;
1539                 }
1540             }
1541         }
1542 
1543         if (d->downstream_conns[i] != NULL &&
1544             d->downstream_conns[i] != NULL_CONN) {
1545             s++;
1546         } else {
1547             mcs_server_st_quit(msst_actual, 1);
1548             d->downstream_conns[i] = NULL_CONN;
1549         }
1550     }
1551 
1552     return s;
1553 }
1554 
1555 conn *cproxy_connect_downstream_conn(downstream *d,
1556                                      LIBEVENT_THREAD *thread,
1557                                      mcs_server_st *msst,
1558                                      proxy_behavior *behavior) {
1559     uint64_t start = 0;
1560     int err = -1;
1561     SOCKET fd;
1562 
1563     cb_assert(d);
1564     cb_assert(d->ptd);
1565     cb_assert(d->ptd->downstream_released != d); /* Should not be in free list. */
1566     cb_assert(thread);
1567     cb_assert(thread->base);
1568     cb_assert(msst);
1569     cb_assert(behavior);
1570     cb_assert(mcs_server_st_hostname(msst) != NULL);
1571     cb_assert(mcs_server_st_port(msst) > 0);
1572     cb_assert(mcs_server_st_fd(msst) == -1);
1573 
1574     if (d->ptd->behavior_pool.base.time_stats) {
1575         start = usec_now();
1576     }
1577 
1578     d->ptd->stats.stats.tot_downstream_connect_started++;
1579 
1580     fd = mcs_connect(mcs_server_st_hostname(msst),
1581                      mcs_server_st_port(msst), &err,
1582                      MOXI_BLOCKING_CONNECT);
1583 
1584     if (settings.verbose > 2) {
1585         moxi_log_write("%d: cproxy_connect_downstream_conn %s:%d %d %d\n", fd,
1586                        mcs_server_st_hostname(msst),
1587                        mcs_server_st_port(msst),
1588                        MOXI_BLOCKING_CONNECT, err);
1589     }
1590 
1591     if (fd != -1) {
1592         conn *c = conn_new(fd, conn_pause, 0,
1593                            DATA_BUFFER_SIZE,
1594                            tcp_transport,
1595                            thread->base,
1596                            &cproxy_downstream_funcs, d);
1597         if (c != NULL ) {
1598             c->protocol = (d->upstream_conn->peer_protocol ?
1599                            d->upstream_conn->peer_protocol :
1600                            behavior->downstream_protocol);
1601             c->thread = thread;
1602             c->cmd_start_time = start;
1603 
1604 #ifdef WIN32
1605             if (err == WSAEINPROGRESS) {
1606                 err = EINPROGRESS;
1607             } else if (err == WSAEWOULDBLOCK) {
1608                 err = EWOULDBLOCK;
1609             }
1610 #endif
1611 
1612             if (err == EINPROGRESS ||
1613                 err == EWOULDBLOCK) {
1614                 if (update_event_timed(c, EV_WRITE | EV_PERSIST,
1615                                        &behavior->connect_timeout)) {
1616                     conn_set_state(c, conn_connecting);
1617 
1618                     d->ptd->stats.stats.tot_downstream_connect_wait++;
1619 
1620                     return c;
1621                 } else {
1622                     d->ptd->stats.stats.err_oom++;
1623                 }
1624             } else {
1625                 if (downstream_connect_init(d, msst, behavior, c)) {
1626                     return c;
1627                 }
1628             }
1629 
1630             cproxy_close_conn(c);
1631         } else {
1632             d->ptd->stats.stats.err_oom++;
1633         }
1634     }
1635 
1636     d->ptd->stats.stats.tot_downstream_connect_failed++;
1637 
1638     return NULL;
1639 }
1640 
1641 bool downstream_connect_init(downstream *d, mcs_server_st *msst,
1642                              proxy_behavior *behavior, conn *c) {
1643     char *host_ident;
1644     int rv;
1645 
1646     cb_assert(c->thread != NULL);
1647 
1648     host_ident = c->host_ident;
1649     if (host_ident == NULL) {
1650         host_ident = mcs_server_st_ident(msst, IS_ASCII(c->protocol));
1651     }
1652 
1653     if (c->cmd_start_time != 0 &&
1654         d->ptd->behavior_pool.base.time_stats) {
1655         downstream_connect_time_sample(&d->ptd->stats,
1656                                        usec_now() - c->cmd_start_time);
1657     }
1658 
1659     rv = cproxy_auth_downstream(msst, behavior, c->sfd);
1660     if (rv == 0) {
1661         d->ptd->stats.stats.tot_downstream_auth++;
1662 
1663         rv = cproxy_bucket_downstream(msst, behavior, c->sfd);
1664         if (rv == 0) {
1665             d->ptd->stats.stats.tot_downstream_bucket++;
1666 
1667             zstored_error_count(c->thread, host_ident, false);
1668 
1669             d->ptd->stats.stats.tot_downstream_connect++;
1670 
1671             return true;
1672         } else {
1673             d->ptd->stats.stats.tot_downstream_bucket_failed++;
1674         }
1675     } else {
1676         d->ptd->stats.stats.tot_downstream_auth_failed++;
1677         if (rv == 1) {
1678             d->ptd->stats.stats.tot_auth_timeout++;
1679         }
1680     }
1681 
1682     /* Treat a auth/bucket error as a blacklistable error. */
1683 
1684     zstored_error_count(c->thread, host_ident, true);
1685 
1686     return false;
1687 }
1688 
1689 conn *cproxy_find_downstream_conn(downstream *d,
1690                                   char *key, int key_length,
1691                                   bool *local) {
1692     return cproxy_find_downstream_conn_ex(d, key, key_length, local, NULL);
1693 }
1694 
1695 conn *cproxy_find_downstream_conn_ex(downstream *d,
1696                                      char *key, int key_length,
1697                                      bool *local,
1698                                      int *vbucket) {
1699     int v;
1700     int s;
1701 
1702     cb_assert(d != NULL);
1703     cb_assert(d->downstream_conns != NULL);
1704     cb_assert(key != NULL);
1705     cb_assert(key_length > 0);
1706 
1707     if (local != NULL) {
1708         *local = false;
1709     }
1710 
1711     v = -1;
1712     s = cproxy_server_index(d, key, key_length, &v);
1713 
1714     if (settings.verbose > 2 && s >= 0) {
1715         moxi_log_write("%d: server_index %d, vbucket %d, conn %d\n", s, v,
1716                        (d->upstream_conn != NULL ?
1717                         d->upstream_conn->sfd : 0),
1718                        (d->downstream_conns[s] == NULL ?
1719                         0 :
1720                         (d->downstream_conns[s] == NULL_CONN ?
1721                          -1 :
1722                          d->downstream_conns[s]->sfd)));
1723     }
1724 
1725     if (s >= 0 &&
1726         s < (int) mcs_server_count(&d->mst) &&
1727         d->downstream_conns[s] != NULL &&
1728         d->downstream_conns[s] != NULL_CONN) {
1729 
1730         if (local != NULL && s == 0) {
1731             *local = true;
1732         }
1733 
1734         if (vbucket != NULL) {
1735             *vbucket = v;
1736         }
1737 
1738         return d->downstream_conns[s];
1739     }
1740 
1741     return NULL;
1742 }
1743 
1744 bool cproxy_prep_conn_for_write(conn *c) {
1745     if (c != NULL) {
1746         cb_assert(c->item == NULL);
1747         cb_assert(IS_PROXY(c->protocol));
1748         cb_assert(c->ilist != NULL);
1749         cb_assert(c->isize > 0);
1750         cb_assert(c->suffixlist != NULL);
1751         cb_assert(c->suffixsize > 0);
1752 
1753         c->icurr      = c->ilist;
1754         c->ileft      = 0;
1755         c->suffixcurr = c->suffixlist;
1756         c->suffixleft = 0;
1757 
1758         c->msgcurr = 0; /* TODO: Mem leak just by blowing these to 0? */
1759         c->msgused = 0;
1760         c->iovused = 0;
1761 
1762         if (add_msghdr(c) == 0) {
1763             return true;
1764         }
1765 
1766         if (settings.verbose > 1) {
1767             moxi_log_write("%d: cproxy_prep_conn_for_write failed\n",
1768                            c->sfd);
1769         }
1770     }
1771 
1772     return false;
1773 }
1774 
1775 bool cproxy_update_event_write(downstream *d, conn *c) {
1776     cb_assert(d != NULL);
1777     cb_assert(d->ptd != NULL);
1778     cb_assert(c != NULL);
1779 
1780     if (!update_event(c, EV_WRITE | EV_PERSIST)) {
1781         d->ptd->stats.stats.err_oom++;
1782         cproxy_close_conn(c);
1783 
1784         return false;
1785     }
1786 
1787     return true;
1788 }
1789 
1790 /**
1791  * Do a hash through libmemcached to see which server (by index)
1792  * should hold a given key.
1793  */
1794 int cproxy_server_index(downstream *d, char *key, size_t key_length,
1795                         int *vbucket) {
1796     cb_assert(d != NULL);
1797     cb_assert(key != NULL);
1798     cb_assert(key_length > 0);
1799 
1800     if (mcs_server_count(&d->mst) <= 0) {
1801         return -1;
1802     }
1803 
1804     return (int) mcs_key_hash(&d->mst, key, key_length, vbucket);
1805 }
1806 
1807 void cproxy_assign_downstream(proxy_td *ptd) {
1808     uint64_t da;
1809     conn *tail;
1810     bool stop = false;
1811 
1812     cb_assert(ptd != NULL);
1813 
1814     if (settings.verbose > 2) {
1815         moxi_log_write("assign_downstream\n");
1816     }
1817 
1818     ptd->downstream_assigns++;
1819 
1820     da = ptd->downstream_assigns;
1821     /* Key loop that tries to reserve any available, released */
1822     /* downstream resources to waiting upstream conns. */
1823 
1824     /* Remember the wait list tail when we start, in case more */
1825     /* upstream conns are tacked onto the wait list while we're */
1826     /* processing.  This helps avoid infinite loop where upstream */
1827     /* conns just keep on moving to the tail. */
1828 
1829     tail = ptd->waiting_any_downstream_tail;
1830     while (ptd->waiting_any_downstream_head != NULL && !stop) {
1831         conn *uc_last;
1832         downstream *d;
1833 
1834         if (ptd->waiting_any_downstream_head == tail) {
1835             stop = true;
1836         }
1837 
1838         d = cproxy_reserve_downstream(ptd);
1839         if (d == NULL) {
1840             if (ptd->downstream_num <= 0) {
1841                 /* Absolutely no downstreams connected, so */
1842                 /* might as well error out. */
1843 
1844                 while (ptd->waiting_any_downstream_head != NULL) {
1845                     conn *uc;
1846                     ptd->stats.stats.tot_downstream_propagate_failed++;
1847 
1848                     uc = ptd->waiting_any_downstream_head;
1849                     ptd->waiting_any_downstream_head =
1850                         ptd->waiting_any_downstream_head->next;
1851                     if (ptd->waiting_any_downstream_head == NULL) {
1852                         ptd->waiting_any_downstream_tail = NULL;
1853                     }
1854                     uc->next = NULL;
1855 
1856                     upstream_error_msg(uc,
1857                                        "SERVER_ERROR proxy out of downstreams\r\n",
1858                                        PROTOCOL_BINARY_RESPONSE_EINTERNAL);
1859                 }
1860             }
1861 
1862             break; /* If no downstreams are available, stop loop. */
1863         }
1864 
1865         cb_assert(d->upstream_conn == NULL);
1866         cb_assert(d->downstream_used == 0);
1867         cb_assert(d->downstream_used_start == 0);
1868         cb_assert(d->multiget == NULL);
1869         cb_assert(d->merger == NULL);
1870         cb_assert(d->timeout_tv.tv_sec == 0);
1871         cb_assert(d->timeout_tv.tv_usec == 0);
1872 
1873         /* We have a downstream reserved, so assign the first */
1874         /* waiting upstream conn to it. */
1875 
1876         d->upstream_conn = ptd->waiting_any_downstream_head;
1877         ptd->waiting_any_downstream_head =
1878             ptd->waiting_any_downstream_head->next;
1879         if (ptd->waiting_any_downstream_head == NULL) {
1880             ptd->waiting_any_downstream_tail = NULL;
1881         }
1882         d->upstream_conn->next = NULL;
1883 
1884         ptd->stats.stats.tot_assign_downstream++;
1885         ptd->stats.stats.tot_assign_upstream++;
1886 
1887         /* Add any compatible upstream conns to the downstream. */
1888         /* By compatible, for example, we mean multi-gets from */
1889         /* different upstreams so we can de-deplicate get keys. */
1890         uc_last = d->upstream_conn;
1891 
1892         while (is_compatible_request(uc_last,
1893                                      ptd->waiting_any_downstream_head)) {
1894             uc_last->next = ptd->waiting_any_downstream_head;
1895 
1896             ptd->waiting_any_downstream_head =
1897                 ptd->waiting_any_downstream_head->next;
1898             if (ptd->waiting_any_downstream_head == NULL) {
1899                 ptd->waiting_any_downstream_tail = NULL;
1900             }
1901 
1902             uc_last = uc_last->next;
1903             uc_last->next = NULL;
1904 
1905             /* Note: tot_assign_upstream - tot_assign_downstream */
1906             /* should get us how many requests we've piggybacked together. */
1907 
1908             ptd->stats.stats.tot_assign_upstream++;
1909         }
1910 
1911         if (settings.verbose > 2) {
1912             moxi_log_write("%d: assign_downstream, matched to upstream\n",
1913                     d->upstream_conn->sfd);
1914         }
1915 
1916         if (cproxy_forward(d) == false) {
1917             /* TODO: This stat is incorrect, as we might reach here */
1918             /* when we have entire front cache hit or talk-to-self */
1919             /* optimization hit on multiget. */
1920 
1921             ptd->stats.stats.tot_downstream_propagate_failed++;
1922 
1923             /* During cproxy_forward(), we might have recursed, */
1924             /* especially in error situation if a downstream */
1925             /* conn got closed and released.  Check for recursion */
1926             /* before we touch d anymore. */
1927 
1928             if (da != ptd->downstream_assigns) {
1929                 ptd->stats.stats.tot_assign_recursion++;
1930                 break;
1931             }
1932 
1933             propagate_error_msg(d, NULL, d->upstream_status);
1934 
1935             cproxy_release_downstream(d, false);
1936         }
1937     }
1938 
1939     if (settings.verbose > 2) {
1940         moxi_log_write("assign_downstream, done\n");
1941     }
1942 }
1943 
1944 void propagate_error_msg(downstream *d, char *ascii_msg,
1945                          protocol_binary_response_status binary_status) {
1946     cb_assert(d != NULL);
1947 
1948     if (ascii_msg == NULL &&
1949         d->upstream_conn != NULL &&
1950         d->target_host_ident != NULL) {
1951         char *s = add_conn_suffix(d->upstream_conn);
1952         if (s != NULL) {
1953             snprintf(s, SUFFIX_SIZE - 1,
1954                      "SERVER_ERROR proxy write to downstream %s\r\n",
1955                      d->target_host_ident);
1956             s[SUFFIX_SIZE - 1] = '\0';
1957             ascii_msg = s;
1958         }
1959     }
1960 
1961     while (d->upstream_conn != NULL) {
1962         conn *uc = d->upstream_conn;
1963         conn *curr;
1964 
1965         if (settings.verbose > 1) {
1966             moxi_log_write("%d: could not forward upstream to downstream\n",
1967                            uc->sfd);
1968         }
1969 
1970         upstream_error_msg(uc, ascii_msg, binary_status);
1971 
1972         curr = d->upstream_conn;
1973         d->upstream_conn = d->upstream_conn->next;
1974         curr->next = NULL;
1975     }
1976 }
1977 
1978 bool cproxy_forward(downstream *d) {
1979     cb_assert(d != NULL);
1980     cb_assert(d->ptd != NULL);
1981     cb_assert(d->upstream_conn != NULL);
1982 
1983     if (settings.verbose > 2) {
1984         moxi_log_write(
1985                 "%d: cproxy_forward prot %d to prot %d\n",
1986                 d->upstream_conn->sfd,
1987                 d->upstream_conn->protocol,
1988                 d->ptd->behavior_pool.base.downstream_protocol);
1989     }
1990 
1991     if (IS_ASCII(d->upstream_conn->protocol)) {
1992         /* ASCII upstream. */
1993 
1994         unsigned int peer_protocol =
1995             d->upstream_conn->peer_protocol ?
1996             d->upstream_conn->peer_protocol :
1997             d->ptd->behavior_pool.base.downstream_protocol;
1998 
1999         if (IS_ASCII(peer_protocol)) {
2000             return cproxy_forward_a2a_downstream(d);
2001         } else {
2002             return cproxy_forward_a2b_downstream(d);
2003         }
2004     } else {
2005         /* BINARY upstream. */
2006 
2007         if (IS_BINARY(d->ptd->behavior_pool.base.downstream_protocol)) {
2008             return cproxy_forward_b2b_downstream(d);
2009         } else {
2010             /* TODO: No binary upstream to ascii downstream support. */
2011 
2012             cb_assert(0);
2013             return false;
2014         }
2015     }
2016 }
2017 
2018 bool cproxy_forward_or_error(downstream *d) {
2019     if (cproxy_forward(d) == false) {
2020         d->ptd->stats.stats.tot_downstream_propagate_failed++;
2021         propagate_error_msg(d, NULL, d->upstream_status);
2022         cproxy_release_downstream(d, false);
2023 
2024         return false;
2025     }
2026 
2027     return true;
2028 }
2029 
2030 void upstream_error_msg(conn *uc, char *ascii_msg,
2031                         protocol_binary_response_status binary_status) {
2032     proxy_td *ptd;
2033     cb_assert(uc);
2034     cb_assert(uc->state == conn_pause);
2035 
2036     ptd = uc->extra;
2037     cb_assert(ptd != NULL);
2038 
2039     if (IS_ASCII(uc->protocol)) {
2040         char *msg = ascii_msg;
2041         if (msg == NULL) {
2042             msg = "SERVER_ERROR proxy write to downstream\r\n";
2043         }
2044 
2045         cb_mutex_enter(&ptd->proxy->proxy_lock);
2046         if (ptd->proxy->name != NULL &&
2047             strcmp(ptd->proxy->name, NULL_BUCKET) == 0) {
2048             msg = "SERVER_ERROR unauthorized, null bucket\r\n";
2049         }
2050         cb_mutex_exit(&ptd->proxy->proxy_lock);
2051 
2052         /* Send an END on get/gets instead of generic SERVER_ERROR. */
2053 
2054         if (uc->cmd == -1 &&
2055             uc->cmd_start != NULL &&
2056             strncmp(uc->cmd_start, "get", 3) == 0 &&
2057             (false == settings.enable_mcmux_mode) &&
2058             (0 != strncmp(ptd->behavior_pool.base.nodeLocator,
2059                           "vbucket",
2060                           sizeof(ptd->behavior_pool.base.nodeLocator) - 1))) {
2061             msg = "END\r\n";
2062         }
2063 
2064         if (settings.verbose > 2) {
2065             moxi_log_write("%d: upstream_error: %s\n", uc->sfd, msg);
2066         }
2067 
2068         if (add_iov(uc, msg, (int)strlen(msg)) == 0 &&
2069             update_event(uc, EV_WRITE | EV_PERSIST)) {
2070             conn_set_state(uc, conn_mwrite);
2071         } else {
2072             ptd->stats.stats.err_oom++;
2073             cproxy_close_conn(uc);
2074         }
2075     } else {
2076         cb_assert(IS_BINARY(uc->protocol));
2077 
2078         if (binary_status == PROTOCOL_BINARY_RESPONSE_SUCCESS) {
2079             /* Default to our favorite catch-all binary protocol response. */
2080 
2081             binary_status = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
2082         }
2083 
2084         cb_mutex_enter(&ptd->proxy->proxy_lock);
2085         if (ptd->proxy->name != NULL &&
2086             strcmp(ptd->proxy->name, NULL_BUCKET) == 0) {
2087             binary_status = PROTOCOL_BINARY_RESPONSE_AUTH_ERROR;
2088         }
2089         cb_mutex_exit(&ptd->proxy->proxy_lock);
2090 
2091         write_bin_error(uc, binary_status, 0);
2092 
2093         update_event(uc, EV_WRITE | EV_PERSIST);
2094     }
2095 }
2096 
2097 void cproxy_reset_upstream(conn *uc) {
2098     proxy_td *ptd;
2099     cb_assert(uc != NULL);
2100 
2101     ptd = uc->extra;
2102     cb_assert(ptd != NULL);
2103 
2104     conn_set_state(uc, conn_new_cmd);
2105 
2106     if (uc->rbytes <= 0) {
2107         if (!update_event(uc, EV_READ | EV_PERSIST)) {
2108             ptd->stats.stats.err_oom++;
2109             cproxy_close_conn(uc);
2110         }
2111 
2112         return; /* Return either way. */
2113     }
2114 
2115     /* We may have already read incoming bytes into the uc's buffer, */
2116     /* so the issue is that libevent may never see (or expect) any */
2117     /* EV_READ events (and hence, won't fire event callbacks) for the */
2118     /* upstream connection.  This can leave the uc seemingly stuck, */
2119     /* never hitting drive_machine() loop. */
2120 
2121     if (settings.verbose > 2) {
2122         moxi_log_write("%d: cproxy_reset_upstream with bytes available: %d\n",
2123                        uc->sfd, uc->rbytes);
2124     }
2125 
2126     /* So, we copy the drive_machine()/conn_new_cmd handling to */
2127     /* schedule uc into drive_machine() execution, where the uc */
2128     /* conn is likely to be writable.  We need to do this */
2129     /* because we're currently on the drive_machine() execution */
2130     /* loop for the downstream connection, not for the uc. */
2131 
2132     if (!update_event(uc, EV_WRITE | EV_PERSIST)) {
2133         if (settings.verbose > 0) {
2134             moxi_log_write("Couldn't update event\n");
2135         }
2136 
2137         conn_set_state(uc, conn_closing);
2138     }
2139 
2140     ptd->stats.stats.tot_reset_upstream_avail++;
2141 }
2142 
2143 bool cproxy_dettach_if_noreply(downstream *d, conn *uc) {
2144     if (uc->noreply) {
2145         uc->noreply        = false;
2146         d->upstream_conn   = NULL;
2147         d->upstream_suffix = NULL;
2148         d->upstream_suffix_len = 0;
2149         d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
2150         d->upstream_retry  = 0;
2151         d->target_host_ident = NULL;
2152 
2153         cproxy_reset_upstream(uc);
2154 
2155         return true;
2156     }
2157 
2158     return false;
2159 }
2160 
2161 void cproxy_wait_any_downstream(proxy_td *ptd, conn *uc) {
2162     cb_assert(uc != NULL);
2163     cb_assert(uc->next == NULL);
2164     cb_assert(ptd != NULL);
2165     cb_assert(!ptd->waiting_any_downstream_tail ||
2166            !ptd->waiting_any_downstream_tail->next);
2167 
2168     /* Add the upstream conn to the wait list. */
2169 
2170     uc->next = NULL;
2171     if (ptd->waiting_any_downstream_tail != NULL) {
2172         ptd->waiting_any_downstream_tail->next = uc;
2173     }
2174     ptd->waiting_any_downstream_tail = uc;
2175     if (ptd->waiting_any_downstream_head == NULL) {
2176         ptd->waiting_any_downstream_head = uc;
2177     }
2178 }
2179 
2180 void cproxy_release_downstream_conn(downstream *d, conn *c) {
2181     proxy_td *ptd;
2182     cb_assert(c != NULL);
2183     cb_assert(d != NULL);
2184 
2185     ptd = d->ptd;
2186     cb_assert(ptd != NULL);
2187 
2188     if (settings.verbose > 2) {
2189         moxi_log_write("%d: release_downstream_conn, downstream_used %d %d,"
2190                        " upstream %d\n",
2191                        c->sfd, d->downstream_used, d->downstream_used_start,
2192                        (d->upstream_conn != NULL ?
2193                         d->upstream_conn->sfd : 0));
2194     }
2195 
2196     d->downstream_used--;
2197     if (d->downstream_used <= 0) {
2198         /* The downstream_used count might go < 0 when if there's */
2199         /* an early error and we decide to close the downstream */
2200         /* conn, before anything gets sent or before the */
2201         /* downstream_used was able to be incremented. */
2202 
2203         cproxy_release_downstream(d, false);
2204         cproxy_assign_downstream(ptd);
2205     }
2206 }
2207 
2208 void cproxy_on_pause_downstream_conn(conn *c) {
2209     downstream *d;
2210     cb_assert(c != NULL);
2211 
2212     if (settings.verbose > 2) {
2213         moxi_log_write("<%d cproxy_on_pause_downstream_conn\n",
2214                 c->sfd);
2215     }
2216 
2217     d = c->extra;
2218 
2219     if (!d || c->rbytes > 0) {
2220         zstored_downstream_conns *conns;
2221 
2222         moxi_log_write("%d: Closed the downstream since got"
2223                        "an event on downstream or extra data on downstream."
2224                        " (rbytes: %u)\n", c->sfd, c->rbytes);
2225 
2226         conns = zstored_get_downstream_conns(c->thread, c->host_ident);
2227         if (conns) {
2228             bool found = false;
2229             conns->dc = conn_list_remove(conns->dc, NULL, c, &found);
2230             if (!found) {
2231                 moxi_log_write("<%d: %s:%d Not able to find connection"
2232                                " in zstore conns\n",
2233                                c->sfd, __FILE__, __LINE__);
2234             }
2235         } else {
2236             moxi_log_write("<%d %s:%d Not able to find zstore conns\n",
2237                            c->sfd, __FILE__, __LINE__);
2238         }
2239         cproxy_close_conn(c);
2240         return;
2241     }
2242 
2243     cb_assert(d->ptd != NULL);
2244 
2245     /* Must update_event() before releasing the downstream conn, */
2246     /* because the release might call udpate_event(), too, */
2247     /* and we don't want to override its work. */
2248 
2249     if (update_event(c, EV_READ | EV_PERSIST)) {
2250         cproxy_release_downstream_conn(d, c);
2251     } else {
2252         d->ptd->stats.stats.err_oom++;
2253         cproxy_close_conn(c);
2254     }
2255 }
2256 
2257 void cproxy_pause_upstream_for_downstream(proxy_td *ptd, conn *upstream) {
2258     cb_assert(ptd != NULL);
2259     cb_assert(upstream != NULL);
2260 
2261     if (settings.verbose > 2) {
2262         moxi_log_write("%d: pause_upstream_for_downstream\n",
2263                 upstream->sfd);
2264     }
2265 
2266     conn_set_state(upstream, conn_pause);
2267 
2268     cproxy_wait_any_downstream(ptd, upstream);
2269 
2270     if (ptd->timeout_tv.tv_sec == 0 &&
2271         ptd->timeout_tv.tv_usec == 0) {
2272         cproxy_start_wait_queue_timeout(ptd, upstream);
2273     }
2274 
2275     cproxy_assign_downstream(ptd);
2276 }
2277 
2278 struct timeval cproxy_get_downstream_timeout(downstream *d, conn *c) {
2279 
2280     struct timeval rv;
2281     proxy_td *ptd;
2282     cb_assert(d);
2283 
2284     if (c != NULL) {
2285         int i;
2286         cb_assert(d->behaviors_num > 0);
2287         cb_assert(d->behaviors_arr != NULL);
2288         cb_assert(d->downstream_conns != NULL);
2289 
2290         i = downstream_conn_index(d, c);
2291         if (i >= 0 && i < d->behaviors_num) {
2292             rv = d->behaviors_arr[i].downstream_timeout;
2293             if (rv.tv_sec != 0 || rv.tv_usec != 0) {
2294                 return rv;
2295             }
2296         }
2297     }
2298 
2299     ptd = d->ptd;
2300     cb_assert(ptd);
2301 
2302     rv = ptd->behavior_pool.base.downstream_timeout;
2303 
2304     return rv;
2305 }
2306 
2307 bool cproxy_start_wait_queue_timeout(proxy_td *ptd, conn *uc) {
2308     cb_assert(ptd);
2309     cb_assert(uc);
2310     cb_assert(uc->thread);
2311     cb_assert(uc->thread->base);
2312 
2313     ptd->timeout_tv = ptd->behavior_pool.base.wait_queue_timeout;
2314     if (ptd->timeout_tv.tv_sec != 0 ||
2315         ptd->timeout_tv.tv_usec != 0) {
2316         if (settings.verbose > 2) {
2317             moxi_log_write("wait_queue_timeout started\n");
2318         }
2319 
2320         evtimer_set(&ptd->timeout_event, wait_queue_timeout, ptd);
2321 
2322         event_base_set(uc->thread->base, &ptd->timeout_event);
2323 
2324         return evtimer_add(&ptd->timeout_event, &ptd->timeout_tv) == 0;
2325     }
2326 
2327     return true;
2328 }
2329 
2330 static void wait_queue_timeout(evutil_socket_t fd,
2331                                const short which,
2332                                void *arg) {
2333     proxy_td *ptd = arg;
2334     cb_assert(ptd != NULL);
2335     (void)fd;
2336     (void)which;
2337 
2338     if (settings.verbose > 2) {
2339         moxi_log_write("wait_queue_timeout\n");
2340     }
2341 
2342     /* This timer callback is invoked when an upstream conn */
2343     /* has been in the wait queue for too long. */
2344 
2345     if (ptd->timeout_tv.tv_sec != 0 || ptd->timeout_tv.tv_usec != 0) {
2346         struct timeval wqt;
2347         uint64_t wqt_msec;
2348         uint64_t cut_msec;
2349         conn *uc_curr;
2350 
2351         evtimer_del(&ptd->timeout_event);
2352 
2353         ptd->timeout_tv.tv_sec = 0;
2354         ptd->timeout_tv.tv_usec = 0;
2355 
2356         if (settings.verbose > 2) {
2357             moxi_log_write("wait_queue_timeout cleared\n");
2358         }
2359 
2360         wqt = ptd->behavior_pool.base.wait_queue_timeout;
2361         wqt_msec = (wqt.tv_sec * 1000) + (wqt.tv_usec / 1000);
2362         cut_msec = msec_current_time - wqt_msec;
2363 
2364         /* Run through all the old upstream conn's in */
2365         /* the wait queue, remove them, and emit errors */
2366         /* on them.  And then start a new timer if needed. */
2367         uc_curr = ptd->waiting_any_downstream_head;
2368         while (uc_curr != NULL) {
2369             conn *uc = uc_curr;
2370 
2371             uc_curr = uc_curr->next;
2372 
2373             /* Check if upstream conn is old and should be removed. */
2374 
2375             if (settings.verbose > 2) {
2376                 moxi_log_write("wait_queue_timeout compare %u to %u cutoff\n",
2377                         uc->cmd_start_time, cut_msec);
2378             }
2379 
2380             if (uc->cmd_start_time <= cut_msec) {
2381                 if (settings.verbose > 1) {
2382                     moxi_log_write("proxy_td_timeout sending error %d\n",
2383                             uc->sfd);
2384                 }
2385 
2386                 ptd->stats.stats.tot_wait_queue_timeout++;
2387 
2388                 ptd->waiting_any_downstream_head =
2389                     conn_list_remove(ptd->waiting_any_downstream_head,
2390                                      &ptd->waiting_any_downstream_tail,
2391                                      uc, NULL); /* TODO: O(N^2). */
2392 
2393                 upstream_error_msg(uc,
2394                                    "SERVER_ERROR proxy wait queue timeout",
2395                                    PROTOCOL_BINARY_RESPONSE_EBUSY);
2396             }
2397         }
2398 
2399         if (ptd->waiting_any_downstream_head != NULL) {
2400             cproxy_start_wait_queue_timeout(ptd,
2401                                             ptd->waiting_any_downstream_head);
2402         }
2403     }
2404 }
2405 
2406 rel_time_t cproxy_realtime(const time_t exptime) {
2407     /* Input is a long... */
2408 
2409     /* 0       | (0...REALIME_MAXDELTA] | (REALTIME_MAXDELTA... */
2410     /* forever | delta                  | unix_time */
2411 
2412     /* Storage is an unsigned int. */
2413 
2414     /* TODO: Handle resolution loss. */
2415 
2416     /* The cproxy version of realtime doesn't do any */
2417     /* time math munging, just pass through. */
2418 
2419     return (rel_time_t) exptime;
2420 }
2421 
2422 void cproxy_close_conn(conn *c) {
2423     cb_assert(c != NULL);
2424 
2425     if (c == NULL_CONN) {
2426         return;
2427     }
2428 
2429     conn_set_state(c, conn_closing);
2430 
2431     update_event(c, 0);
2432 
2433     /* Run through drive_machine just once, */
2434     /* to go through close code paths. */
2435 
2436     drive_machine(c);
2437 }
2438 
2439 bool add_conn_item(conn *c, item *it) {
2440     cb_assert(it != NULL);
2441     cb_assert(c != NULL);
2442     cb_assert(c->ilist != NULL);
2443     cb_assert(c->icurr != NULL);
2444     cb_assert(c->isize > 0);
2445 
2446     if (c->ileft >= c->isize) {
2447         item **new_list =
2448             realloc(c->ilist, sizeof(item *) * c->isize * 2);
2449         if (new_list) {
2450             c->isize *= 2;
2451             c->ilist = new_list;
2452             c->icurr = new_list;
2453         }
2454     }
2455 
2456     if (c->ileft < c->isize) {
2457         c->ilist[c->ileft] = it;
2458         c->ileft++;
2459 
2460         return true;
2461     }
2462 
2463     return false;
2464 }
2465 
2466 char *add_conn_suffix(conn *c) {
2467     cb_assert(c != NULL);
2468     cb_assert(c->suffixlist != NULL);
2469     cb_assert(c->suffixcurr != NULL);
2470     cb_assert(c->suffixsize > 0);
2471 
2472     if (c->suffixleft >= c->suffixsize) {
2473         char **new_suffix_list =
2474             realloc(c->suffixlist,
2475                     sizeof(char *) * c->suffixsize * 2);
2476         if (new_suffix_list) {
2477             c->suffixsize *= 2;
2478             c->suffixlist = new_suffix_list;
2479             c->suffixcurr = new_suffix_list;
2480         }
2481     }
2482 
2483     if (c->suffixleft < c->suffixsize) {
2484         char *suffix = cache_alloc(c->thread->suffix_cache);
2485         if (suffix != NULL) {
2486             c->suffixlist[c->suffixleft] = suffix;
2487             c->suffixleft++;
2488 
2489             return suffix;
2490         }
2491     }
2492 
2493     return NULL;
2494 }
2495 
2496 char *nread_text(short x) {
2497     char *rv = NULL;
2498     switch(x) {
2499     case NREAD_SET:
2500         rv = "set ";
2501         break;
2502     case NREAD_ADD:
2503         rv = "add ";
2504         break;
2505     case NREAD_REPLACE:
2506         rv = "replace ";
2507         break;
2508     case NREAD_APPEND:
2509         rv = "append ";
2510         break;
2511     case NREAD_PREPEND:
2512         rv = "prepend ";
2513         break;
2514     case NREAD_CAS:
2515         rv = "cas ";
2516         break;
2517     }
2518     return rv;
2519 }
2520 
2521 /* Tokenize the command string by updating the token array
2522  * with pointers to start of each token and length.
2523  * Does not modify the input command string.
2524  *
2525  * Returns total number of tokens.  The last valid token is the terminal
2526  * token (value points to the first unprocessed character of the string and
2527  * length zero).
2528  *
2529  * Usage example:
2530  *
2531  *  while (scan_tokens(command, tokens, max_tokens, NULL) > 0) {
2532  *      for(int ix = 0; tokens[ix].length != 0; ix++) {
2533  *          ...
2534  *      }
2535  *      command = tokens[ix].value;
2536  *  }
2537  */
2538 size_t scan_tokens(char *command, token_t *tokens,
2539                    const size_t max_tokens,
2540                    int *command_len) {
2541     char *s, *e;
2542     size_t ntokens = 0;
2543 
2544     if (command_len != NULL) {
2545         *command_len = 0;
2546     }
2547 
2548     cb_assert(command != NULL && tokens != NULL && max_tokens > 1);
2549 
2550     for (s = e = command; ntokens < max_tokens - 1; ++e) {
2551         if (*e == '\0' || *e == ' ') {
2552             if (s != e) {
2553                 tokens[ntokens].value = s;
2554                 tokens[ntokens].length = e - s;
2555                 ntokens++;
2556             }
2557             if (*e == '\0') {
2558                 if (command_len != NULL) {
2559                     *command_len = (int)(e - command);
2560                 }
2561                 break; /* string end */
2562             }
2563             s = e + 1;
2564         }
2565     }
2566 
2567     /* If we scanned the whole string, the terminal value pointer is null,
2568      * otherwise it is the first unprocessed character.
2569      */
2570     tokens[ntokens].value = (*e == '\0' ? NULL : e);
2571     tokens[ntokens].length = 0;
2572     ntokens++;
2573 
2574     return ntokens;
2575 }
2576 
2577 /* Remove conn c from a conn list.
2578  * Returns the new head of the list.
2579  */
2580 conn *conn_list_remove(conn *head, conn **tail, conn *c, bool *found) {
2581     conn *prev = NULL;
2582     conn *curr = head;
2583 
2584     if (found != NULL) {
2585         *found = false;
2586     }
2587 
2588     while (curr != NULL) {
2589         if (curr == c) {
2590             conn *r;
2591             if (found != NULL) {
2592                 *found = true;
2593             }
2594 
2595             if (tail != NULL && *tail == curr) {
2596                 *tail = prev;
2597             }
2598 
2599             if (prev != NULL) {
2600                 cb_assert(curr != head);
2601                 prev->next = curr->next;
2602                 curr->next = NULL;
2603                 return head;
2604             }
2605 
2606             cb_assert(curr == head);
2607             r = curr->next;
2608             curr->next = NULL;
2609             return r;
2610         }
2611 
2612         prev = curr;
2613         curr = curr ->next;
2614     }
2615 
2616     return head;
2617 }
2618 
2619 /* Returns the new head of the list.
2620  */
2621 downstream *downstream_list_remove(downstream *head, downstream *d) {
2622     downstream *prev = NULL;
2623     downstream *curr = head;
2624 
2625     while (curr != NULL) {
2626         if (curr == d) {
2627             downstream *r;
2628             if (prev != NULL) {
2629                 cb_assert(curr != head);
2630                 prev->next = curr->next;
2631                 curr->next = NULL;
2632                 return head;
2633             }
2634 
2635             cb_assert(curr == head);
2636             r = curr->next;
2637             curr->next = NULL;
2638             return r;
2639         }
2640 
2641         prev = curr;
2642         curr = curr ->next;
2643     }
2644 
2645     return head;
2646 }
2647 
2648 /* Returns the new head of the list.
2649  */
2650 downstream *downstream_list_waiting_remove(downstream *head,
2651                                            downstream **tail,
2652                                            downstream *d) {
2653     downstream *prev = NULL;
2654     downstream *curr = head;
2655 
2656     while (curr != NULL) {
2657         if (curr == d) {
2658             downstream *r;
2659             if (tail != NULL && *tail == curr) {
2660                 *tail = prev;
2661             }
2662 
2663             if (prev != NULL) {
2664                 cb_assert(curr != head);
2665                 prev->next_waiting = curr->next_waiting;
2666                 curr->next_waiting = NULL;
2667                 return head;
2668             }
2669 
2670             cb_assert(curr == head);
2671             r = curr->next_waiting;
2672             curr->next_waiting = NULL;
2673             return r;
2674         }
2675 
2676         prev = curr;
2677         curr = curr ->next_waiting;
2678     }
2679 
2680     return head;
2681 }
2682 
2683 /* Returns true if a candidate request is squashable
2684  * or de-duplicatable with an existing request, to
2685  * save on network hops.
2686  */
2687 bool is_compatible_request(conn *existing, conn *candidate) {
2688     (void)existing;
2689     (void)candidate;
2690 
2691     /* The not-my-vbucket error handling requires us to not */
2692     /* squash ascii multi-GET requests, due to reusing the */
2693     /* multiget-deduplication machinery during retries and */
2694     /* to simplify the later codepaths. */
2695     /*
2696     cb_assert(existing);
2697     cb_assert(existing->state == conn_pause);
2698     cb_assert(IS_PROXY(existing->protocol));
2699 
2700     if (IS_BINARY(existing->protocol)) {
2701         // TODO: Revisit multi-get squashing for binary another day.
2702 
2703         return false;
2704     }
2705 
2706     cb_assert(IS_ASCII(existing->protocol));
2707 
2708     if (candidate != NULL) {
2709         cb_assert(IS_ASCII(candidate->protocol));
2710         cb_assert(IS_PROXY(candidate->protocol));
2711         cb_assert(candidate->state == conn_pause);
2712 
2713         // TODO: Allow gets (CAS) for de-duplication.
2714 
2715         if (existing->cmd == -1 &&
2716             candidate->cmd == -1 &&
2717             existing->cmd_retries <= 0 &&
2718             candidate->cmd_retries <= 0 &&
2719             !existing->noreply &&
2720             !candidate->noreply &&
2721             strncmp(existing->cmd_start, "get ", 4) == 0 &&
2722             strncmp(candidate->cmd_start, "get ", 4) == 0) {
2723             cb_assert(existing->item == NULL);
2724             cb_assert(candidate->item == NULL);
2725 
2726             return true;
2727         }
2728     }
2729     */
2730 
2731     return false;
2732 }
2733 
2734 void downstream_timeout(evutil_socket_t fd,
2735                         const short which,
2736                         void *arg) {
2737     downstream *d = arg;
2738     proxy_td *ptd;
2739 
2740     cb_assert(d != NULL);
2741 
2742     ptd = d->ptd;
2743     cb_assert(ptd != NULL);
2744 
2745     /* This timer callback is invoked when one or more of */
2746     /* the downstream conns must be really slow.  Handle by */
2747     /* closing downstream conns, which might help by */
2748     /* freeing up downstream resources. */
2749 
2750     if (cproxy_clear_timeout(d)) {
2751         char *m;
2752         int n;
2753         int i;
2754         /* The downstream_timeout() callback is invoked for */
2755         /* two cases (downstream_conn_queue_timeouts and */
2756         /* downstream_timeouts), so cleanup and track stats */
2757         /* accordingly. */
2758         bool was_conn_queue_waiting =
2759             zstored_downstream_waiting_remove(d);
2760 
2761         if (was_conn_queue_waiting == true) {
2762             if (settings.verbose > 2) {
2763                 moxi_log_write("conn_queue_timeout\n");
2764             }
2765 
2766             ptd->stats.stats.tot_downstream_conn_queue_timeout++;
2767         } else {
2768             if (settings.verbose > 2) {
2769                 moxi_log_write("downstream_timeout\n");
2770             }
2771 
2772             ptd->stats.stats.tot_downstream_timeout++;
2773         }
2774 
2775         m = "SERVER_ERROR proxy downstream timeout\r\n";
2776 
2777         if (d->target_host_ident != NULL) {
2778             m = add_conn_suffix(d->upstream_conn);
2779             if (m != NULL) {
2780                 char *s;
2781                 snprintf(m, SUFFIX_SIZE - 1,
2782                          "SERVER_ERROR proxy downstream timeout %s\r\n",
2783                          d->target_host_ident);
2784                 m[SUFFIX_SIZE - 1] = '\0';
2785 
2786                 s = strchr(m, ':'); /* Clip to avoid sending user/pswd. */
2787                 if (s != NULL) {
2788                     *s++ = '\r';
2789                     *s++ = '\n';
2790                     *s = '\0';
2791                 }
2792             }
2793         }
2794 
2795         propagate_error_msg(d, m, PROTOCOL_BINARY_RESPONSE_EBUSY);
2796         n = mcs_server_count(&d->mst);
2797 
2798         for (i = 0; i < n; i++) {
2799             conn *dc = d->downstream_conns[i];
2800             if (dc != NULL &&
2801                 dc != NULL_CONN) {
2802                 /* We have to de-link early, because we don't want */
2803                 /* to have cproxy_close_conn() release the downstream */
2804                 /* while we're in the middle of this loop. */
2805 
2806                 delink_from_downstream_conns(dc);
2807 
2808                 cproxy_close_conn(dc);
2809             }
2810         }
2811 
2812         cproxy_release_downstream(d, false);
2813         cproxy_assign_downstream(ptd);
2814     }
2815 
2816     (void)fd;
2817     (void)which;
2818 }
2819 
2820 bool cproxy_start_downstream_timeout(downstream *d, conn *c) {
2821     cb_assert(d != NULL);
2822     cb_assert(d->behaviors_num > 0);
2823     cb_assert(d->behaviors_arr != NULL);
2824 
2825     return cproxy_start_downstream_timeout_ex(d, c,
2826                 cproxy_get_downstream_timeout(d, c));
2827 }
2828 
2829 bool cproxy_start_downstream_timeout_ex(downstream *d, conn *c,
2830                                         struct timeval dt) {
2831     conn *uc;
2832 
2833     cb_assert(d != NULL);
2834     cb_assert(d->behaviors_num > 0);
2835     cb_assert(d->behaviors_arr != NULL);
2836 
2837     cproxy_clear_timeout(d);
2838 
2839     if (dt.tv_sec == 0 &&
2840         dt.tv_usec == 0) {
2841         return true;
2842     }
2843 
2844     uc = d->upstream_conn;
2845 
2846     cb_assert(uc != NULL);
2847     cb_assert(uc->state == conn_pause);
2848     cb_assert(uc->thread != NULL);
2849     cb_assert(uc->thread->base != NULL);
2850     cb_assert(IS_PROXY(uc->protocol));
2851 
2852     if (settings.verbose > 2) {
2853         moxi_log_write("%d: cproxy_start_downstream_timeout\n",
2854                        (c != NULL ? c->sfd : -1));
2855     }
2856 
2857     evtimer_set(&d->timeout_event, downstream_timeout, d);
2858 
2859     event_base_set(uc->thread->base, &d->timeout_event);
2860 
2861     d->timeout_tv.tv_sec  = dt.tv_sec;
2862     d->timeout_tv.tv_usec = dt.tv_usec;
2863 
2864     return (evtimer_add(&d->timeout_event, &d->timeout_tv) == 0);
2865 }
2866 
2867 /* Return 0 on success, -1 on general failure, 1 on timeout failure. */
2868 
2869 int cproxy_auth_downstream(mcs_server_st *server,
2870                            proxy_behavior *behavior,
2871                            SOCKET fd) {
2872     protocol_binary_request_header req;
2873     protocol_binary_response_header res;
2874     struct timeval *timeout = NULL;
2875     char buf[3000];
2876     const char *usr;
2877     const char *pwd;
2878     int usr_len;
2879     int pwd_len;
2880     int buf_len;
2881     mcs_return mr;
2882 
2883     cb_assert(server);
2884     cb_assert(behavior);
2885     cb_assert(fd != INVALID_SOCKET);
2886 
2887 
2888     if (!IS_BINARY(behavior->downstream_protocol)) {
2889         return 0;
2890     }
2891 
2892     usr = mcs_server_st_usr(server) != NULL ?
2893         mcs_server_st_usr(server) : behavior->usr;
2894     pwd = mcs_server_st_pwd(server) != NULL ?
2895         mcs_server_st_pwd(server) : behavior->pwd;
2896 
2897     usr_len = (int)strlen(usr);
2898     pwd_len = (int)strlen(pwd);
2899 
2900     if (usr_len <= 0) {
2901         return 0;
2902     }
2903 
2904     if (settings.verbose > 2) {
2905         moxi_log_write("cproxy_auth_downstream usr: %s pwd: (%d)\n",
2906                        usr, pwd_len);
2907     }
2908 
2909     if (usr_len <= 0 ||
2910         !IS_PROXY(behavior->downstream_protocol) ||
2911         (usr_len + pwd_len + 50 > (int) sizeof(buf))) {
2912         if (settings.verbose > 1) {
2913             moxi_log_write("auth failure args\n");
2914         }
2915 
2916         return -1; /* Probably misconfigured. */
2917     }
2918 
2919     /* The key should look like "PLAIN", or the sasl mech string. */
2920     /* The data should look like "\0usr\0pwd".  So, the body buf */
2921     /* should look like "PLAIN\0usr\0pwd". */
2922 
2923     /* TODO: Allow binary passwords. */
2924 
2925     buf_len = snprintf(buf, sizeof(buf), "PLAIN%c%s%c%s",
2926                        0, usr,
2927                        0, pwd);
2928     cb_assert(buf_len == 7 + usr_len + pwd_len);
2929 
2930     memset(req.bytes, 0, sizeof(req.bytes));
2931     memset(res.bytes, 0, sizeof(res.bytes));
2932     req.request.magic    = PROTOCOL_BINARY_REQ;
2933     req.request.opcode   = PROTOCOL_BINARY_CMD_SASL_AUTH;
2934     req.request.keylen   = htons((uint16_t) 5); /* 5 == strlen("PLAIN"). */
2935     req.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
2936     req.request.bodylen  = htonl(buf_len);
2937 
2938     if (mcs_io_write(fd, (const char *) req.bytes,
2939                      sizeof(req.bytes)) != sizeof(req.bytes) ||
2940         mcs_io_write(fd, buf, buf_len) == -1) {
2941         mcs_io_reset(fd);
2942 
2943         if (settings.verbose > 1) {
2944             moxi_log_write("auth failure during write for %s (%d)\n",
2945                            usr, buf_len);
2946         }
2947 
2948         return -1;
2949     }
2950 
2951     if (behavior->auth_timeout.tv_sec != 0 ||
2952         behavior->auth_timeout.tv_usec != 0) {
2953         timeout = &behavior->auth_timeout;
2954     }
2955 
2956     mr = mcs_io_read(fd, &res.bytes, sizeof(res.bytes), timeout);
2957     if (mr == MCS_SUCCESS && res.response.magic == PROTOCOL_BINARY_RES) {
2958         int len;
2959         res.response.status  = ntohs(res.response.status);
2960         res.response.keylen  = ntohs(res.response.keylen);
2961         res.response.bodylen = ntohl(res.response.bodylen);
2962 
2963         /* Swallow whatever body comes. */
2964         len = res.response.bodylen;
2965         while (len > 0) {
2966             int amt = (len > (int) sizeof(buf) ? (int) sizeof(buf) : len);
2967 
2968             mr = mcs_io_read(fd, buf, amt, timeout);
2969             if (mr != MCS_SUCCESS) {
2970                 if (settings.verbose > 1) {
2971                     moxi_log_write("auth could not read response body (%d) %d\n",
2972                                    usr, amt, mr);
2973                 }
2974 
2975                 if (mr == MCS_TIMEOUT) {
2976                     return 1;
2977                 }
2978 
2979                 return -1;
2980             }
2981 
2982             len -= amt;
2983         }
2984 
2985         /* The res status should be either... */
2986         /* - SUCCESS         - sasl aware server and good credentials. */
2987         /* - AUTH_ERROR      - wrong credentials. */
2988         /* - UNKNOWN_COMMAND - sasl-unaware server. */
2989 
2990         if (res.response.status == PROTOCOL_BINARY_RESPONSE_SUCCESS) {
2991             if (settings.verbose > 2) {
2992                 moxi_log_write("auth_downstream success for %s\n", usr);
2993             }
2994 
2995             return 0;
2996         }
2997 
2998         if (settings.verbose > 1) {
2999             moxi_log_write("auth_downstream failure for %s (%x)\n",
3000                            usr, res.response.status);
3001         }
3002     } else {
3003         if (settings.verbose > 1) {
3004             moxi_log_write("auth_downstream response error for %s, %d\n",
3005                            usr, mr);
3006         }
3007     }
3008 
3009     if (mr == MCS_TIMEOUT) {
3010         return 1;
3011     }
3012 
3013     return -1;
3014 }
3015 
3016 /* Return 0 on success, -1 on general failure, 1 on timeout failure. */
3017 
3018 int cproxy_bucket_downstream(mcs_server_st *server,
3019                              proxy_behavior *behavior,
3020                              SOCKET fd) {
3021     protocol_binary_request_header req;
3022     protocol_binary_response_header res;
3023     struct timeval *timeout = NULL;
3024     int bucket_len;
3025     mcs_return mr;
3026 
3027     cb_assert(server);
3028     cb_assert(behavior);
3029     cb_assert(IS_PROXY(behavior->downstream_protocol));
3030     cb_assert(fd != INVALID_SOCKET);
3031 
3032     if (!IS_BINARY(behavior->downstream_protocol)) {
3033         return 0;
3034     }
3035 
3036     bucket_len = (int)strlen(behavior->bucket);
3037     if (bucket_len <= 0) {
3038         return 0; /* When no bucket. */
3039     }
3040 
3041     memset(req.bytes, 0, sizeof(req.bytes));
3042     memset(res.bytes, 0, sizeof(res.bytes));
3043 
3044     req.request.magic    = PROTOCOL_BINARY_REQ;
3045     req.request.opcode   = PROTOCOL_BINARY_CMD_BUCKET;
3046     req.request.keylen   = htons((uint16_t) bucket_len);
3047     req.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
3048     req.request.bodylen  = htonl(bucket_len);
3049 
3050     if (mcs_io_write(fd, (const char *) req.bytes,
3051                      sizeof(req.bytes)) != sizeof(req.bytes) ||
3052         mcs_io_write(fd, behavior->bucket, bucket_len) == -1) {
3053         mcs_io_reset(fd);
3054 
3055         if (settings.verbose > 1) {
3056             moxi_log_write("bucket failure during write (%d)\n",
3057                     bucket_len);
3058         }
3059 
3060         return -1;
3061     }
3062 
3063 
3064     if (behavior->auth_timeout.tv_sec != 0 ||
3065         behavior->auth_timeout.tv_usec != 0) {
3066         timeout = &behavior->auth_timeout;
3067     }
3068 
3069     mr = mcs_io_read(fd, &res.bytes, sizeof(res.bytes), timeout);
3070     if (mr == MCS_SUCCESS && res.response.magic == PROTOCOL_BINARY_RES) {
3071         char buf[300];
3072         int len;
3073 
3074         res.response.status  = ntohs(res.response.status);
3075         res.response.keylen  = ntohs(res.response.keylen);
3076         res.response.bodylen = ntohl(res.response.bodylen);
3077 
3078         /* Swallow whatever body comes. */
3079         len = res.response.bodylen;
3080         while (len > 0) {
3081             int amt = (len > (int) sizeof(buf) ? (int) sizeof(buf) : len);
3082 
3083             mr = mcs_io_read(fd, buf, amt, timeout);
3084             if (mr != MCS_SUCCESS) {
3085                 if (mr == MCS_TIMEOUT) {
3086                     return 1;
3087                 }
3088 
3089                 return -1;
3090             }
3091 
3092             len -= amt;
3093         }
3094 
3095         /* The res status should be either... */
3096         /* - SUCCESS         - we got the bucket. */
3097         /* - AUTH_ERROR      - not allowed to use that bucket. */
3098         /* - UNKNOWN_COMMAND - bucket-unaware server. */
3099 
3100         if (res.response.status == PROTOCOL_BINARY_RESPONSE_SUCCESS) {
3101             if (settings.verbose > 2) {
3102                 moxi_log_write("bucket_downstream success, %s\n",
3103                         behavior->bucket);
3104             }
3105 
3106             return 0;
3107         }
3108 
3109         if (settings.verbose > 1) {
3110             moxi_log_write("bucket_downstream failure, %s (%x)\n",
3111                     behavior->bucket,
3112                     res.response.status);
3113         }
3114     }
3115 
3116     if (mr == MCS_TIMEOUT) {
3117         return 1;
3118     }
3119 
3120     return -1;
3121 }
3122 
3123 int cproxy_max_retries(downstream *d) {
3124     return mcs_server_count(&d->mst) * 2;
3125 }
3126 
3127 int downstream_conn_index(downstream *d, conn *c) {
3128     int nconns;
3129     int i;
3130 
3131     cb_assert(d);
3132 
3133     nconns = mcs_server_count(&d->mst);
3134     for (i = 0; i < nconns; i++) {
3135         if (d->downstream_conns[i] == c) {
3136             return i;
3137         }
3138     }
3139 
3140     return -1;
3141 }
3142 
3143 void cproxy_upstream_state_change(conn *c, enum conn_states next_state) {
3144     proxy_td *ptd;
3145 
3146     cb_assert(c != NULL);
3147     ptd = c->extra;
3148     if (ptd != NULL) {
3149         if (c->state == conn_pause) {
3150             ptd->stats.stats.tot_upstream_unpaused++;
3151             c->cmd_unpaused = true;
3152         }
3153         if (next_state == conn_pause) {
3154             ptd->stats.stats.tot_upstream_paused++;
3155         }
3156 
3157         if (next_state == conn_parse_cmd && c->cmd_arrive_time == 0) {
3158             c->cmd_unpaused = false;
3159             c->hit_local = false;
3160             c->cmd_arrive_time = usec_now();
3161         }
3162 
3163         if (next_state == conn_closing || next_state == conn_new_cmd) {
3164             uint64_t arrive_time = c->cmd_arrive_time;
3165             if (c->cmd_unpaused && arrive_time != 0) {
3166                 uint64_t latency = usec_now() - c->cmd_arrive_time;
3167 
3168                 if (c->hit_local) {
3169                     ptd->stats.stats.tot_local_cmd_time += latency;
3170                     ptd->stats.stats.tot_local_cmd_count++;
3171                 }
3172 
3173                 ptd->stats.stats.tot_cmd_time += latency;
3174                 ptd->stats.stats.tot_cmd_count++;
3175                 c->cmd_arrive_time = 0;
3176             }
3177         }
3178     }
3179 }
3180 
3181 /* ------------------------------------------------- */
3182 
3183 bool cproxy_on_connect_downstream_conn(conn *c) {
3184     int error;
3185     socklen_t errsz = sizeof(error);
3186     int k;
3187     downstream *d;
3188 
3189     cb_assert(c != NULL);
3190     cb_assert(c->host_ident);
3191 
3192     d = c->extra;
3193     cb_assert(d != NULL);
3194 
3195     if (settings.verbose > 2) {
3196         moxi_log_write("%d: cproxy_on_connect_downstream_conn for %s\n",
3197                        c->sfd, c->host_ident);
3198     }
3199 
3200     if (c->which == EV_TIMEOUT) {
3201         d->ptd->stats.stats.tot_downstream_connect_timeout++;
3202 
3203         if (settings.verbose) {
3204             moxi_log_write("%d: connection timed out: %s",
3205                            c->sfd, c->host_ident);
3206         }
3207         goto cleanup;
3208     }
3209 
3210     /* Check if the connection completed */
3211     if (getsockopt(c->sfd, SOL_SOCKET, SO_ERROR, (void *) &error,
3212                    &errsz) == -1) {
3213         if (settings.verbose) {
3214             moxi_log_write("%d: connect error: %s, %s",
3215                            c->sfd, c->host_ident, strerror(error));
3216         }
3217         goto cleanup;
3218     }
3219 
3220     if (error) {
3221         if (settings.verbose) {
3222             moxi_log_write("%d: connect failed: %s, %s",
3223                            c->sfd, c->host_ident, strerror(error));
3224         }
3225         goto cleanup;
3226     }
3227 
3228     k = downstream_conn_index(d, c);
3229     if (k >= 0) {
3230         if (downstream_connect_init(d, mcs_server_index(&d->mst, k),
3231                                     &d->behaviors_arr[k], c)) {
3232             /* We are connected to the server now */
3233             if (settings.verbose > 2) {
3234                 moxi_log_write("%d: connected to: %s\n",
3235                                c->sfd, c->host_ident);
3236             }
3237 
3238             conn_set_state(c, conn_pause);
3239             update_event(c, 0);
3240             cproxy_forward_or_error(d);
3241 
3242             return true;
3243         }
3244     }
3245 
3246 cleanup:
3247     d->ptd->stats.stats.tot_downstream_connect_failed++;
3248 
3249     k = delink_from_downstream_conns(c);
3250     if (k >= 0) {
3251         cb_assert(d->downstream_conns[k] == NULL);
3252 
3253         d->downstream_conns[k] = NULL_CONN;
3254     }
3255 
3256     conn_set_state(c, conn_closing);
3257     update_event(c, 0);
3258     cproxy_forward_or_error(d);
3259 
3260     return false;
3261 }
3262 
3263 void downstream_reserved_time_sample(proxy_stats_td *pstd, uint64_t duration) {
3264     if (pstd->downstream_reserved_time_htgram == NULL) {
3265         pstd->downstream_reserved_time_htgram =
3266             cproxy_create_timing_histogram();
3267     }
3268 
3269     if (pstd->downstream_reserved_time_htgram != NULL) {
3270         htgram_incr(pstd->downstream_reserved_time_htgram, duration, 1);
3271     }
3272 }
3273 
3274 void downstream_connect_time_sample(proxy_stats_td *pstd, uint64_t duration) {
3275     if (pstd->downstream_connect_time_htgram == NULL) {
3276         pstd->downstream_connect_time_htgram =
3277             cproxy_create_timing_histogram();
3278     }
3279 
3280     if (pstd->downstream_connect_time_htgram != NULL) {
3281         htgram_incr(pstd->downstream_connect_time_htgram, duration, 1);
3282     }
3283 }
3284 
3285 /* A histogram for tracking timings, such as for usec request timings. */
3286 
3287 HTGRAM_HANDLE cproxy_create_timing_histogram(void) {
3288     /* TODO: Make histogram bins more configurable one day. */
3289 
3290     HTGRAM_HANDLE h1 = htgram_mk(2000, 100, 2.0, 20, NULL);
3291     HTGRAM_HANDLE h0 = htgram_mk(0, 100, 1.0, 20, h1);
3292 
3293     return h0;
3294 }
3295 
3296 zstored_downstream_conns *zstored_get_downstream_conns(LIBEVENT_THREAD *thread,
3297                                                        const char *host_ident) {
3298 
3299     genhash_t *conn_hash;
3300     zstored_downstream_conns *conns;
3301 
3302     cb_assert(thread);
3303     cb_assert(thread->base);
3304 
3305     conn_hash = thread->conn_hash;
3306     cb_assert(conn_hash != NULL);
3307 
3308     conns = genhash_find(conn_hash, host_ident);
3309     if (conns == NULL) {
3310         conns = calloc(1, sizeof(zstored_downstream_conns));
3311         if (conns != NULL) {
3312             conns->host_ident = strdup(host_ident);
3313             if (conns->host_ident != NULL) {
3314                 genhash_store(conn_hash, conns->host_ident, conns);
3315             } else {
3316                 free(conns);
3317                 conns = NULL;
3318             }
3319         }
3320     }
3321 
3322     return conns;
3323 }
3324 
3325 void zstored_error_count(LIBEVENT_THREAD *thread,
3326                          const char *host_ident,
3327                          bool has_error) {
3328     zstored_downstream_conns *conns;
3329 
3330     cb_assert(thread != NULL);
3331     cb_assert(host_ident != NULL);
3332 
3333     conns = zstored_get_downstream_conns(thread, host_ident);
3334     if (conns != NULL) {
3335         if (has_error) {
3336             conns->error_count++;
3337             conns->error_time = msec_current_time;
3338         } else {
3339             conns->error_count = 0;
3340             conns->error_time = 0;
3341         }
3342 
3343         if (settings.verbose > 2) {
3344             moxi_log_write("z_error, %s, %d, %d, %d, %d\n",
3345                            host_ident,
3346                            has_error,
3347                            conns->dc_acquired,
3348                            conns->error_count,
3349                            conns->error_time);
3350         }
3351 
3352         if (has_error) {
3353             /* We reach here when a non-blocking connect() has failed */
3354             /* or when an acquired downstream conn had an error. */
3355             /* The downstream conn is just going to be closed */
3356             /* rather than be released back to the thread->conn_hash, */
3357             /* so update the dc_acquired here. */
3358 
3359             if (conns->dc_acquired > 0) {
3360                 conns->dc_acquired--;
3361             }
3362 
3363             /* When zero downstream conns are available, wake up all */
3364             /* waiting downstreams so they can proceed (possibly by */
3365             /* just returning ERROR's to upstream clients). */
3366 
3367             if (conns->dc_acquired <= 0 && conns->dc == NULL) {
3368                 downstream *head = conns->downstream_waiting_head;
3369 
3370                 conns->downstream_waiting_head = NULL;
3371                 conns->downstream_waiting_tail = NULL;
3372 
3373                 while (head != NULL) {
3374                     downstream *prev;
3375 
3376                     head->ptd->stats.stats.tot_downstream_waiting_errors++;
3377                     head->ptd->stats.stats.tot_downstream_conn_queue_remove++;
3378 
3379                     prev = head;
3380                     head = head->next_waiting;
3381                     prev->next_waiting = NULL;
3382 
3383                     cproxy_forward_or_error(prev);
3384                 }
3385             }
3386         }
3387     }
3388 }
3389 
3390 conn *zstored_acquire_downstream_conn(downstream *d,
3391                                       LIBEVENT_THREAD *thread,
3392                                       mcs_server_st *msst,
3393                                       proxy_behavior *behavior,
3394                                       bool *downstream_conn_max_reached) {
3395     enum protocol downstream_protocol;
3396     char *host_ident;
3397     conn *dc;
3398     zstored_downstream_conns *conns;
3399 
3400     cb_assert(d);
3401     cb_assert(d->ptd);
3402     cb_assert(d->ptd->downstream_released != d); /* Should not be in free list. */
3403     cb_assert(thread);
3404     cb_assert(msst);
3405     cb_assert(behavior);
3406     cb_assert(mcs_server_st_hostname(msst) != NULL);
3407     cb_assert(mcs_server_st_port(msst) > 0);
3408     cb_assert(mcs_server_st_fd(msst) == -1);
3409 
3410     *downstream_conn_max_reached = false;
3411 
3412     d->ptd->stats.stats.tot_downstream_conn_acquired++;
3413 
3414     downstream_protocol =
3415         d->upstream_conn->peer_protocol ?
3416         d->upstream_conn->peer_protocol :
3417         behavior->downstream_protocol;
3418 
3419     host_ident = mcs_server_st_ident(msst, IS_ASCII(downstream_protocol));
3420     conns = zstored_get_downstream_conns(thread, host_ident);
3421     if (conns != NULL) {
3422         dc = conns->dc;
3423         if (dc != NULL) {
3424             cb_assert(dc->thread == thread);
3425             cb_assert(strcmp(host_ident, dc->host_ident) == 0);
3426 
3427             conns->dc_acquired++;
3428             conns->dc = dc->next;
3429             dc->next = NULL;
3430 
3431             cb_assert(dc->extra == NULL);
3432             dc->extra = d;
3433 
3434             return dc;
3435         }
3436 
3437         if (behavior->connect_max_errors > 0 &&
3438             behavior->connect_max_errors < conns->error_count) {
3439             rel_time_t msecs_since_error =
3440                 (rel_time_t)(msec_current_time - conns->error_time);
3441 
3442             if (settings.verbose > 2) {
3443                 moxi_log_write("zacquire_dc, %s, %d, %"PRIu64", (%d)\n",
3444                                host_ident,
3445                                conns->error_count,
3446                                (uint64_t)conns->error_time,
3447                                msecs_since_error);
3448             }
3449 
3450             if ((behavior->cycle > 0) &&
3451                 (behavior->connect_retry_interval > msecs_since_error)) {
3452                 d->ptd->stats.stats.tot_downstream_connect_interval++;
3453 
3454                 return NULL;
3455             }
3456         }
3457 
3458         if (behavior->downstream_conn_max > 0 &&
3459             behavior->downstream_conn_max <= conns->dc_acquired) {
3460             d->ptd->stats.stats.tot_downstream_connect_max_reached++;
3461 
3462             *downstream_conn_max_reached = true;
3463 
3464             return NULL;
3465         }
3466     }
3467 
3468     dc = cproxy_connect_downstream_conn(d, thread, msst, behavior);
3469     if (dc != NULL) {
3470         cb_assert(dc->host_ident == NULL);
3471         dc->host_ident = strdup(host_ident);
3472         if (conns != NULL) {
3473             conns->dc_acquired++;
3474 
3475             if (dc->state != conn_connecting) {
3476                 conns->error_count = 0;
3477                 conns->error_time = 0;
3478             }
3479         }
3480     } else {
3481         if (conns != NULL) {
3482             conns->error_count++;
3483             conns->error_time = msec_current_time;
3484         }
3485     }
3486 
3487     return dc;
3488 }
3489 
3490 /* new fn by jsh */
3491 void zstored_release_downstream_conn(conn *dc, bool closing) {
3492     bool keep;
3493     zstored_downstream_conns *conns;
3494     downstream *d;
3495 
3496     cb_assert(dc != NULL);
3497     if (dc == NULL_CONN) {
3498         return;
3499     }
3500 
3501     d = dc->extra;
3502     cb_assert(d != NULL);
3503 
3504     d->ptd->stats.stats.tot_downstream_conn_released++;
3505 
3506     if (settings.verbose > 2) {
3507         moxi_log_write("%d: release_downstream_conn, %s, (%d)"
3508                        " upstream %d\n",
3509                        dc->sfd, state_text(dc->state), closing,
3510                        (d->upstream_conn != NULL ?
3511                         d->upstream_conn->sfd : -1));
3512     }
3513 
3514     cb_assert(dc->next == NULL);
3515     cb_assert(dc->thread != NULL);
3516     cb_assert(dc->host_ident != NULL);
3517 
3518     keep = dc->state == conn_pause;
3519     dc->extra = NULL;
3520     conns = zstored_get_downstream_conns(dc->thread, dc->host_ident);
3521     if (conns != NULL) {
3522         if (conns->dc_acquired > 0) {
3523             conns->dc_acquired--;
3524         }
3525 
3526         if (keep) {
3527             downstream *d_head;
3528             cb_assert(dc->next == NULL);
3529             dc->next = conns->dc;
3530             conns->dc = dc;
3531 
3532             /* Since one downstream conn was released, process a single */
3533             /* waiting downstream, if any. */
3534 
3535             d_head = conns->downstream_waiting_head;
3536             if (d_head != NULL) {
3537                 cb_assert(conns->downstream_waiting_tail != NULL);
3538 
3539                 conns->downstream_waiting_head =
3540                     conns->downstream_waiting_head->next_waiting;
3541                 if (conns->downstream_waiting_head == NULL) {
3542                     conns->downstream_waiting_tail = NULL;
3543                 }
3544                 d_head->next_waiting = NULL;
3545 
3546                 d_head->ptd->stats.stats.tot_downstream_conn_queue_remove++;
3547 
3548                 cproxy_clear_timeout(d_head);
3549 
3550                 cproxy_forward_or_error(d_head);
3551             }
3552 
3553             return;
3554         }
3555     }
3556 
3557     cproxy_close_conn(dc);
3558 }
3559 
3560 /* Returns true if the downstream was found on any */
3561 /* conns->downstream_waiting_head/tail queues and was removed. */
3562 
3563 bool zstored_downstream_waiting_remove(downstream *d) {
3564     bool found = false;
3565     int i;
3566     int n;
3567     LIBEVENT_THREAD *thread = thread_by_index(thread_index(cb_thread_self()));
3568     cb_assert(thread != NULL);
3569 
3570     n = mcs_server_count(&d->mst);
3571     for (i = 0; i < n; i++) {
3572         char *host_ident;
3573         zstored_downstream_conns *conns;
3574         mcs_server_st *msst = mcs_server_index(&d->mst, i);
3575 
3576         enum protocol downstream_protocol =
3577             d->upstream_conn && d->upstream_conn->peer_protocol ?
3578             d->upstream_conn->peer_protocol :
3579             d->behaviors_arr[i].downstream_protocol;
3580 
3581         cb_assert(IS_PROXY(downstream_protocol));
3582 
3583         host_ident = mcs_server_st_ident(msst, IS_ASCII(downstream_protocol));
3584         conns = zstored_get_downstream_conns(thread, host_ident);
3585 
3586         if (conns != NULL) {
3587             /* Linked-list removal, on the next_waiting pointer, */
3588             /* and keep head and tail pointers updated. */
3589 
3590             downstream *prev = NULL;
3591             downstream *curr = conns->downstream_waiting_head;
3592 
3593             while (curr != NULL) {
3594                 if (curr == d) {
3595                     found = true;
3596 
3597                     if (conns->downstream_waiting_head == curr) {
3598                         cb_assert(conns->downstream_waiting_tail != NULL);
3599                         conns->downstream_waiting_head = curr->next_waiting;
3600                     }
3601 
3602                     if (conns->downstream_waiting_tail == curr) {
3603                         conns->downstream_waiting_tail = prev;
3604                     }
3605 
3606                     if (prev != NULL) {
3607                         prev->next_waiting = curr->next_waiting;
3608                     }
3609 
3610                     curr->next_waiting = NULL;
3611 
3612                     d->ptd->stats.stats.tot_downstream_conn_queue_remove++;
3613 
3614                     break;
3615                 }
3616 
3617                 prev = curr;
3618                 curr = curr->next_waiting;
3619             }
3620         }
3621     }
3622 
3623     return found;
3624 }
3625 
3626 bool zstored_downstream_waiting_add(downstream *d, LIBEVENT_THREAD *thread,
3627                                     mcs_server_st *msst,
3628                                     proxy_behavior *behavior) {
3629     enum protocol downstream_protocol;
3630     char *host_ident;
3631     zstored_downstream_conns *conns;
3632 
3633     cb_assert(thread != NULL);
3634     cb_assert(d != NULL);
3635     cb_assert(d->upstream_conn != NULL);
3636     cb_assert(d->next_waiting == NULL);
3637 
3638     downstream_protocol = d->upstream_conn->peer_protocol ?
3639         d->upstream_conn->peer_protocol :
3640         behavior->downstream_protocol;
3641 
3642     host_ident = mcs_server_st_ident(msst, IS_ASCII(downstream_protocol));
3643     conns = zstored_get_downstream_conns(thread, host_ident);
3644     if (conns != NULL) {
3645         cb_assert(conns->dc == NULL);
3646 
3647         if (conns->downstream_waiting_head == NULL) {
3648             cb_assert(conns->downstream_waiting_tail == NULL);
3649             conns->downstream_waiting_head = d;
3650         }
3651         if (conns->downstream_waiting_tail != NULL) {
3652             cb_assert(conns->downstream_waiting_tail->next_waiting == NULL);
3653             conns->downstream_waiting_tail->next_waiting = d;
3654         }
3655         conns->downstream_waiting_tail = d;
3656 
3657         d->ptd->stats.stats.tot_downstream_conn_queue_add++;
3658 
3659         return true;
3660     }
3661 
3662     return false;
3663 }
3664 
3665 /* Find an appropriate proxy struct or NULL. */
3666 
3667 proxy *cproxy_find_proxy_by_auth(proxy_main *m,
3668                                  const char *usr,
3669                                  const char *pwd) {
3670     proxy *found = NULL;
3671     proxy *p;
3672 
3673     cb_mutex_enter(&m->proxy_main_lock);
3674 
3675     for (p = m->proxy_head; p != NULL && found == NULL; p = p->next) {
3676         cb_mutex_enter(&p->proxy_lock);
3677         if (strcmp(p->behavior_pool.base.usr, usr) == 0 &&
3678             strcmp(p->