xref: /6.0.3/moxi/src/cproxy.c (revision f6b2f64b)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3#include "src/config.h"
4#include <stdio.h>
5#include <stdlib.h>
6#include <string.h>
7#include <errno.h>
8#include <platform/cbassert.h>
9#include <fcntl.h>
10#include <math.h>
11#include "memcached.h"
12#include "cproxy.h"
13#include "work.h"
14#include "log.h"
15
16#ifndef MOXI_BLOCKING_CONNECT
17#define MOXI_BLOCKING_CONNECT false
18#endif
19
20/* Internal forward declarations. */
21
22downstream *downstream_list_remove(downstream *head, downstream *d);
23downstream *downstream_list_waiting_remove(downstream *head,
24                                           downstream **tail,
25                                           downstream *d);
26
27static void downstream_timeout(evutil_socket_t fd,
28                        const short which,
29                        void *arg);
30static void wait_queue_timeout(evutil_socket_t fd,
31                        const short which,
32                        void *arg);
33
34conn *conn_list_remove(conn *head, conn **tail,
35                       conn *c, bool *found);
36
37bool is_compatible_request(conn *existing, conn *candidate);
38
39void propagate_error_msg(downstream *d, char *ascii_msg,
40                         protocol_binary_response_status binary_status);
41
42void downstream_reserved_time_sample(proxy_stats_td *ptds, uint64_t duration);
43void downstream_connect_time_sample(proxy_stats_td *ptds, uint64_t duration);
44
45bool downstream_connect_init(downstream *d, mcs_server_st *msst,
46                             proxy_behavior *behavior, conn *c);
47
48int init_mcs_st(mcs_st *mst, char *config,
49                const char *default_usr,
50                const char *default_pwd,
51                const char *opts);
52
53bool cproxy_on_connect_downstream_conn(conn *c);
54
55conn *zstored_acquire_downstream_conn(downstream *d,
56                                      LIBEVENT_THREAD *thread,
57                                      mcs_server_st *msst,
58                                      proxy_behavior *behavior,
59                                      bool *downstream_conn_max_reached);
60
61void zstored_release_downstream_conn(conn *dc, bool closing);
62
63void zstored_error_count(LIBEVENT_THREAD *thread,
64                         const char *host_ident,
65                         bool has_error);
66
67bool zstored_downstream_waiting_add(downstream *d, LIBEVENT_THREAD *thread,
68                                    mcs_server_st *msst,
69                                    proxy_behavior *behavior);
70
71bool zstored_downstream_waiting_remove(downstream *d);
72
73typedef struct {
74    conn      *dc;          /* Linked-list of available downstream conns. */
75    uint32_t   dc_acquired; /* Count of acquired (in-use) downstream conns. */
76    char      *host_ident;
77    uint32_t   error_count;
78    uint64_t   error_time;
79
80    /* Head & tail of singly linked-list/queue, using */
81    /* downstream->next_waiting pointers, where we've reached */
82    /* downstream_conn_max, so there are waiting downstreams. */
83
84    downstream *downstream_waiting_head;
85    downstream *downstream_waiting_tail;
86} zstored_downstream_conns;
87
88zstored_downstream_conns *zstored_get_downstream_conns(LIBEVENT_THREAD *thread,
89                                                       const char *host_ident);
90
91bool cproxy_forward_or_error(downstream *d);
92
93int delink_from_downstream_conns(conn *c);
94
95int cproxy_num_active_proxies(proxy_main *m);
96
97/* Function tables. */
98
99conn_funcs cproxy_listen_funcs = {
100    .conn_init                   = cproxy_init_upstream_conn,
101    .conn_close                  = NULL,
102    .conn_connect                = NULL,
103    .conn_process_ascii_command  = NULL,
104    .conn_process_binary_command = NULL,
105    .conn_complete_nread_ascii   = NULL,
106    .conn_complete_nread_binary  = NULL,
107    .conn_pause                  = NULL,
108    .conn_realtime               = NULL,
109    .conn_state_change           = NULL,
110    .conn_binary_command_magic   = 0
111};
112
113conn_funcs cproxy_upstream_funcs = {
114    .conn_init                   = NULL,
115    .conn_close                  = cproxy_on_close_upstream_conn,
116    .conn_connect                = NULL,
117    .conn_process_ascii_command  = cproxy_process_upstream_ascii,
118    .conn_process_binary_command = cproxy_process_upstream_binary,
119    .conn_complete_nread_ascii   = cproxy_process_upstream_ascii_nread,
120    .conn_complete_nread_binary  = cproxy_process_upstream_binary_nread,
121    .conn_pause                  = NULL,
122    .conn_realtime               = cproxy_realtime,
123    .conn_state_change           = cproxy_upstream_state_change,
124    .conn_binary_command_magic   = PROTOCOL_BINARY_REQ
125};
126
127conn_funcs cproxy_downstream_funcs = {
128    .conn_init                   = cproxy_init_downstream_conn,
129    .conn_close                  = cproxy_on_close_downstream_conn,
130    .conn_connect                = cproxy_on_connect_downstream_conn,
131    .conn_process_ascii_command  = cproxy_process_downstream_ascii,
132    .conn_process_binary_command = cproxy_process_downstream_binary,
133    .conn_complete_nread_ascii   = cproxy_process_downstream_ascii_nread,
134    .conn_complete_nread_binary  = cproxy_process_downstream_binary_nread,
135    .conn_pause                  = cproxy_on_pause_downstream_conn,
136    .conn_realtime               = cproxy_realtime,
137    .conn_state_change           = NULL,
138    .conn_binary_command_magic   = PROTOCOL_BINARY_RES
139};
140
141/* Main function to create a proxy struct.
142 */
143proxy *cproxy_create(proxy_main *main,
144                     char    *name,
145                     int      port,
146                     char    *config,
147                     uint32_t config_ver,
148                     proxy_behavior_pool *behavior_pool,
149                     int nthreads) {
150    if (settings.verbose > 1) {
151        moxi_log_write("cproxy_create on port %d, name %s, config %s\n",
152                       port, name, config);
153    }
154
155    cb_assert(name != NULL);
156    cb_assert(port > 0 || settings.socketpath != NULL);
157    cb_assert(config != NULL);
158    cb_assert(behavior_pool);
159    cb_assert(nthreads > 1); /* Main thread + at least one worker. */
160    cb_assert(nthreads == settings.num_threads);
161
162    proxy *p = (proxy *) calloc(1, sizeof(proxy));
163    if (p != NULL) {
164        p->main       = main;
165        p->name       = trimstrdup(name);
166        p->port       = port;
167        p->config     = trimstrdup(config);
168        p->config_ver = config_ver;
169
170        p->behavior_pool.base = behavior_pool->base;
171        p->behavior_pool.num  = behavior_pool->num;
172        p->behavior_pool.arr  = cproxy_copy_behaviors(behavior_pool->num,
173                                                      behavior_pool->arr);
174
175        p->listening        = 0;
176        p->listening_failed = 0;
177
178        p->next = NULL;
179
180        cb_mutex_initialize(&p->proxy_lock);
181
182        mcache_init(&p->front_cache, true, &mcache_item_funcs, true);
183        matcher_init(&p->front_cache_matcher, true);
184        matcher_init(&p->front_cache_unmatcher, true);
185
186        matcher_init(&p->optimize_set_matcher, true);
187
188        if (behavior_pool->base.front_cache_max > 0 &&
189            behavior_pool->base.front_cache_lifespan > 0) {
190            mcache_start(&p->front_cache,
191                         behavior_pool->base.front_cache_max);
192
193            if (strlen(behavior_pool->base.front_cache_spec) > 0) {
194                matcher_start(&p->front_cache_matcher,
195                              behavior_pool->base.front_cache_spec);
196            }
197
198            if (strlen(behavior_pool->base.front_cache_unspec) > 0) {
199                matcher_start(&p->front_cache_unmatcher,
200                              behavior_pool->base.front_cache_unspec);
201            }
202        }
203
204        if (strlen(behavior_pool->base.optimize_set) > 0) {
205            matcher_start(&p->optimize_set_matcher,
206                          behavior_pool->base.optimize_set);
207        }
208
209        p->thread_data_num = nthreads;
210        p->thread_data = (proxy_td *) calloc(p->thread_data_num,
211                                             sizeof(proxy_td));
212        if (p->thread_data != NULL &&
213            p->name != NULL &&
214            p->config != NULL &&
215            p->behavior_pool.arr != NULL) {
216            /* We start at 1, because thread[0] is the main listen/accept */
217            /* thread, and not a true worker thread.  Too lazy to save */
218            /* the wasted thread[0] slot memory. */
219            int i;
220            for (i = 1; i < p->thread_data_num; i++) {
221                proxy_td *ptd = &p->thread_data[i];
222                ptd->proxy = p;
223
224                ptd->config     = strdup(p->config);
225                ptd->config_ver = p->config_ver;
226
227                ptd->behavior_pool.base = behavior_pool->base;
228                ptd->behavior_pool.num  = behavior_pool->num;
229                ptd->behavior_pool.arr =
230                    cproxy_copy_behaviors(behavior_pool->num,
231                                          behavior_pool->arr);
232
233                ptd->waiting_any_downstream_head = NULL;
234                ptd->waiting_any_downstream_tail = NULL;
235                ptd->downstream_reserved = NULL;
236                ptd->downstream_released = NULL;
237                ptd->downstream_tot = 0;
238                ptd->downstream_num = 0;
239                ptd->downstream_max = behavior_pool->base.downstream_max;
240                ptd->downstream_assigns = 0;
241                ptd->timeout_tv.tv_sec = 0;
242                ptd->timeout_tv.tv_usec = 0;
243                ptd->stats.stats.num_upstream = 0;
244                ptd->stats.stats.num_downstream_conn = 0;
245
246                cproxy_reset_stats_td(&ptd->stats);
247
248                mcache_init(&ptd->key_stats, true,
249                            &mcache_key_stats_funcs, false);
250                matcher_init(&ptd->key_stats_matcher, false);
251                matcher_init(&ptd->key_stats_unmatcher, false);
252
253                if (behavior_pool->base.key_stats_max > 0 &&
254                    behavior_pool->base.key_stats_lifespan > 0) {
255                    mcache_start(&ptd->key_stats,
256                                 behavior_pool->base.key_stats_max);
257
258                    if (strlen(behavior_pool->base.key_stats_spec) > 0) {
259                        matcher_start(&ptd->key_stats_matcher,
260                                      behavior_pool->base.key_stats_spec);
261                    }
262
263                    if (strlen(behavior_pool->base.key_stats_unspec) > 0) {
264                        matcher_start(&ptd->key_stats_unmatcher,
265                                      behavior_pool->base.key_stats_unspec);
266                    }
267                }
268            }
269
270            return p;
271        }
272
273        free(p->name);
274        free(p->config);
275        free(p->behavior_pool.arr);
276        free(p->thread_data);
277        free(p);
278    }
279
280    return NULL;
281}
282
283/* Must be called on the main listener thread.
284 */
285int cproxy_listen(proxy *p) {
286    cb_assert(p != NULL);
287    cb_assert(is_listen_thread());
288
289    if (settings.verbose > 1) {
290        moxi_log_write("cproxy_listen on port %d, downstream %s\n",
291                p->port, p->config);
292    }
293
294    /* Idempotent, remembers if it already created listening socket(s). */
295
296    if (p->listening == 0) {
297        int listening;
298        enum protocol listen_protocol = negotiating_proxy_prot;
299
300        if (IS_ASCII(settings.binding_protocol)) {
301            listen_protocol = proxy_upstream_ascii_prot;
302        }
303        if (IS_BINARY(settings.binding_protocol)) {
304            listen_protocol = proxy_upstream_binary_prot;
305        }
306
307        listening = cproxy_listen_port(p->port, listen_protocol,
308                                       tcp_transport,
309                                       p,
310                                       &cproxy_listen_funcs);
311        if (listening > 0) {
312            p->listening += listening;
313        } else {
314            p->listening_failed++;
315
316            if (settings.enable_mcmux_mode && settings.socketpath) {
317#ifdef HAVE_SYS_UN_H
318                moxi_log_write("error: could not access UNIX socket: %s\n",
319                               settings.socketpath);
320                if (ml->log_mode != ERRORLOG_STDERR) {
321                    fprintf(stderr, "error: could not access UNIX socket: %s\n",
322                            settings.socketpath);
323                }
324                exit(EXIT_FAILURE);
325#endif
326            }
327
328            moxi_log_write("ERROR: could not listen on port %d. "
329                           "Please use -Z port_listen=PORT_NUM "
330                           "to specify a different port number.\n", p->port);
331            if (ml->log_mode != ERRORLOG_STDERR) {
332                fprintf(stderr, "ERROR: could not listen on port %d. "
333                        "Please use -Z port_listen=PORT_NUM "
334                        "to specify a different port number.\n", p->port);
335            }
336            exit(EXIT_FAILURE);
337        }
338    }
339
340    return (int)p->listening;
341}
342
343int cproxy_listen_port(int port,
344                       enum protocol protocol,
345                       enum network_transport transport,
346                       void       *conn_extra,
347                       conn_funcs *funcs) {
348
349    int   listening;
350    conn *listen_conn_orig;
351    conn *x;
352
353    cb_assert(port > 0 || settings.socketpath != NULL);
354    cb_assert(conn_extra);
355    cb_assert(funcs);
356    cb_assert(is_listen_thread());
357
358    listening = 0;
359    listen_conn_orig = listen_conn;
360
361    x = listen_conn_orig;
362    while (x != NULL) {
363        if (x->extra != NULL && x->funcs == funcs) {
364            struct sockaddr_in s_in;
365            socklen_t sin_len = sizeof(s_in);
366            memset(&s_in, 0, sizeof(s_in));
367
368            if (getsockname(x->sfd, (struct sockaddr *) &s_in, &sin_len) == 0) {
369                int x_port = ntohs(s_in.sin_port);
370                if (x_port == port) {
371                    if (settings.verbose > 1) {
372                        moxi_log_write(
373                                "<%d cproxy listening reusing listener on port %d\n",
374                                x->sfd, port);
375                    }
376
377                    listening++;
378                }
379            }
380        }
381
382        x = x->next;
383    }
384
385    if (listening > 0) {
386        /* If we're already listening on the required port, then */
387        /* we don't need to start a new server_socket().  This happens */
388        /* in the multi-bucket case with binary protocol buckets. */
389        /* There will be multiple proxy struct's (one per bucket), but */
390        /* only one proxy struct will actually be pointed at by a */
391        /* listening conn->extra (usually 11211). */
392
393        /* TODO: Add a refcount to handle shutdown properly? */
394
395        return listening;
396    }
397#ifdef HAVE_SYS_UN_H
398    if (settings.socketpath ?
399        (server_socket_unix(settings.socketpath, settings.access) == 0) :
400        (server_socket(port, transport, NULL) == 0)) {
401#else
402    if (server_socket(port, transport, NULL) == 0) {
403#endif
404        conn *c;
405        cb_assert(listen_conn != NULL);
406
407        /* The listen_conn global list is changed by server_socket(), */
408        /* which adds a new listening conn on port for each bindable */
409        /* host address. */
410
411        /* For example, after the call to server_socket(), there */
412        /* might be two new listening conn's -- one for localhost, */
413        /* another for 127.0.0.1. */
414
415        c = listen_conn;
416        while (c != NULL &&
417               c != listen_conn_orig) {
418            if (settings.verbose > 1) {
419                moxi_log_write(
420                        "<%d cproxy listening on port %d\n",
421                        c->sfd, port);
422            }
423
424            listening++;
425
426            /* TODO: Listening conn's never seem to close, */
427            /*       but need to handle cleanup if they do, */
428            /*       such as if we handle graceful shutdown one day. */
429
430            c->extra = conn_extra;
431            c->funcs = funcs;
432            c->protocol = protocol;
433            c = c->next;
434        }
435    }
436
437    return listening;
438}
439
440/* Finds the proxy_td associated with a worker thread.
441 */
442proxy_td *cproxy_find_thread_data(proxy *p, cb_thread_t thread_id) {
443    if (p != NULL) {
444        int i = thread_index(thread_id);
445
446        /* 0 is the main listen thread, not a worker thread. */
447        cb_assert(i > 0);
448        cb_assert(i < p->thread_data_num);
449
450        if (i > 0 && i < p->thread_data_num) {
451            return &p->thread_data[i];
452        }
453    }
454
455    return NULL;
456}
457
458bool cproxy_init_upstream_conn(conn *c) {
459    char *default_name;
460    proxy *p;
461    int n;
462    proxy_td *ptd;
463
464    cb_assert(c != NULL);
465
466    /* We're called once per client/upstream conn early in its */
467    /* lifecycle, on the worker thread, so it's a good place */
468    /* to record the proxy_td into the conn->extra. */
469
470    cb_assert(!is_listen_thread());
471
472    p = c->extra;
473    cb_assert(p != NULL);
474    cb_assert(p->main != NULL);
475
476    n = cproxy_num_active_proxies(p->main);
477    if (n <= 0) {
478        if (settings.verbose > 2) {
479            moxi_log_write("<%d disallowing upstream conn due to no buckets\n",
480                           c->sfd);
481        }
482
483        return false;
484    }
485
486    ptd = cproxy_find_thread_data(p, cb_thread_self());
487    cb_assert(ptd != NULL);
488
489    /* Reassign the client/upstream conn to a different bucket */
490    /* if the default_bucket_name isn't the special FIRST_BUCKET */
491    /* value. */
492
493    default_name = ptd->behavior_pool.base.default_bucket_name;
494    if (strcmp(default_name, FIRST_BUCKET) != 0) {
495        proxy *default_proxy;
496        if (settings.verbose > 2) {
497            moxi_log_write("<%d assigning to default bucket: %s\n",
498                           c->sfd, default_name);
499        }
500
501        default_proxy =
502            cproxy_find_proxy_by_auth(p->main, default_name, "");
503
504        /* If the ostensible default bucket is missing (possibly deleted), */
505        /* assign the client/upstream conn to the NULL BUCKET. */
506
507        if (default_proxy == NULL) {
508            default_proxy =
509                cproxy_find_proxy_by_auth(p->main, NULL_BUCKET, "");
510
511            if (settings.verbose > 2) {
512                moxi_log_write("<%d assigning to null bucket, "
513                               "default bucket missing: %s\n",
514                               c->sfd, default_name);
515            }
516        }
517
518        if (default_proxy != NULL) {
519            proxy_td *default_ptd =
520                cproxy_find_thread_data(default_proxy, cb_thread_self());
521            if (default_ptd != NULL) {
522                ptd = default_ptd;
523            }
524        } else {
525            if (settings.verbose > 2) {
526                moxi_log_write("<%d assigning to first bucket, "
527                               "missing default/null bucket: %s\n",
528                               c->sfd, default_name);
529            }
530        }
531    } else {
532        if (settings.verbose > 2) {
533            moxi_log_write("<%d assigning to first bucket\n", c->sfd);
534        }
535    }
536
537    ptd->stats.stats.num_upstream++;
538    ptd->stats.stats.tot_upstream++;
539
540    c->extra = ptd;
541    c->funcs = &cproxy_upstream_funcs;
542
543    return true;
544}
545
546bool cproxy_init_downstream_conn(conn *c) {
547    downstream *d = c->extra;
548    cb_assert(d != NULL);
549
550    d->ptd->stats.stats.num_downstream_conn++;
551    d->ptd->stats.stats.tot_downstream_conn++;
552
553    return true;
554}
555
556void cproxy_on_close_upstream_conn(conn *c) {
557    downstream *d;
558    proxy_td *ptd;
559
560    cb_assert(c != NULL);
561
562    if (settings.verbose > 2) {
563        moxi_log_write("<%d cproxy_on_close_upstream_conn\n", c->sfd);
564    }
565
566    ptd = c->extra;
567    if (c->extra == NULL) {
568        moxi_log_write("<%d cproxy_on_close_upstream_conn already closed\n",
569                       c->sfd);
570        return;
571    }
572    c->extra = NULL;
573
574    if (ptd->stats.stats.num_upstream > 0) {
575        ptd->stats.stats.num_upstream--;
576    }
577
578    /* Delink from any reserved downstream. */
579
580    for (d = ptd->downstream_reserved; d != NULL; d = d->next) {
581        bool found = false;
582
583        d->upstream_conn = conn_list_remove(d->upstream_conn, NULL,
584                                            c, &found);
585        if (d->upstream_conn == NULL) {
586            d->upstream_suffix = NULL;
587            d->upstream_suffix_len = 0;
588            d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
589            d->upstream_retry = 0;
590            d->target_host_ident = NULL;
591
592            /* Don't need to do anything else, as we'll now just */
593            /* read and drop any remaining inflight downstream replies. */
594            /* Eventually, the downstream will be released. */
595        }
596
597        /* If the downstream was reserved for this upstream conn, */
598        /* also clear the upstream from any multiget de-duplication */
599        /* tracking structures. */
600
601        if (found) {
602            int n, i;
603            if (d->multiget != NULL) {
604                genhash_iter(d->multiget, multiget_remove_upstream, c);
605            }
606
607            /* The downstream conn's might have iov's that */
608            /* point to the upstream conn's buffers.  Also, the */
609            /* downstream conn might be in all sorts of states */
610            /* (conn_read, write, mwrite, pause), and we want */
611            /* to be careful about the downstream channel being */
612            /* half written. */
613
614            /* The safest, but inefficient, thing to do then is */
615            /* to close any conn_mwrite downstream conns. */
616
617            ptd->stats.stats.tot_downstream_close_on_upstream_close++;
618
619            n = mcs_server_count(&d->mst);
620
621            for (i = 0; i < n; i++) {
622                conn *downstream_conn = d->downstream_conns[i];
623                if (downstream_conn != NULL &&
624                    downstream_conn != NULL_CONN &&
625                    downstream_conn->state == conn_mwrite) {
626                    downstream_conn->msgcurr = 0;
627                    downstream_conn->msgused = 0;
628                    downstream_conn->iovused = 0;
629
630                    cproxy_close_conn(downstream_conn);
631                }
632            }
633        }
634    }
635
636    /* Delink from wait queue. */
637
638    ptd->waiting_any_downstream_head =
639        conn_list_remove(ptd->waiting_any_downstream_head,
640                         &ptd->waiting_any_downstream_tail,
641                         c, NULL);
642}
643
644int delink_from_downstream_conns(conn *c) {
645    int n, k, i;
646    downstream *d = c->extra;
647    if (d == NULL) {
648        if (settings.verbose > 2) {
649            moxi_log_write("%d: delink_from_downstream_conn no-op\n",
650                           c->sfd);
651        }
652
653        return -1;
654    }
655
656    n = mcs_server_count(&d->mst);
657    k = -1; /* Index of conn. */
658
659    for (i = 0; i < n; i++) {
660        if (d->downstream_conns[i] == c) {
661            d->downstream_conns[i] = NULL;
662
663            if (settings.verbose > 2) {
664                moxi_log_write(
665                        "<%d cproxy_on_close_downstream_conn quit_server\n",
666                        c->sfd);
667            }
668
669            d->ptd->stats.stats.tot_downstream_quit_server++;
670
671            mcs_server_st_quit(mcs_server_index(&d->mst, i), 1);
672            cb_assert(mcs_server_st_fd(mcs_server_index(&d->mst, i)) == -1);
673
674            k = i;
675        }
676    }
677
678    return k;
679}
680
681void cproxy_on_close_downstream_conn(conn *c) {
682    conn *uc_retry = NULL;
683    downstream *d;
684    int k;
685    proxy_td *ptd;
686
687    cb_assert(c != NULL);
688    cb_assert(c->sfd >= 0);
689    cb_assert(c->state == conn_closing);
690
691    if (settings.verbose > 2) {
692        moxi_log_write("<%d cproxy_on_close_downstream_conn\n", c->sfd);
693    }
694
695    d = c->extra;
696
697    /* Might have been set to NULL during cproxy_free_downstream(). */
698    /* Or, when a downstream conn is in the thread-based free pool, it */
699    /* is not associated with any particular downstream. */
700
701    if (d == NULL) {
702        /* TODO: See if we need to remove the downstream conn from the */
703        /* thread-based free pool.  This shouldn't happen, but we */
704        /* should then figure out how to put an cb_assert() here. */
705
706        return;
707    }
708
709    k = delink_from_downstream_conns(c);
710
711    c->extra = NULL;
712
713    if (c->thread != NULL &&
714        c->host_ident != NULL) {
715        zstored_error_count(c->thread, c->host_ident, true);
716    }
717
718    ptd = d->ptd;
719    cb_assert(ptd);
720
721    if (ptd->stats.stats.num_downstream_conn > 0) {
722        ptd->stats.stats.num_downstream_conn--;
723    }
724
725    if (k < 0) {
726        /* If this downstream conn wasn't linked into the */
727        /* downstream, it was delinked already during connect error */
728        /* handling (where its slot was set to NULL_CONN already), */
729        /* or during downstream_timeout/conn_queue_timeout. */
730
731        if (settings.verbose > 2) {
732            moxi_log_write("%d: skipping release dc in on_close_dc\n",
733                           c->sfd);
734        }
735
736        return;
737    }
738
739    if (d->upstream_conn != NULL &&
740        d->downstream_used == 1) {
741        /* TODO: Revisit downstream close error handling. */
742        /*       Should we propagate error when... */
743        /*       - any downstream conn closes? */
744        /*       - all downstream conns closes? */
745        /*       - last downstream conn closes?  Current behavior. */
746
747        if (d->upstream_suffix == NULL) {
748            if (settings.verbose > 2) {
749                moxi_log_write("<%d proxy downstream closed, upstream %d (%x)\n",
750                               c->sfd,
751                               d->upstream_conn->sfd,
752                               d->upstream_conn->protocol);
753            }
754
755            if (IS_ASCII(d->upstream_conn->protocol)) {
756                d->upstream_suffix = "SERVER_ERROR proxy downstream closed\r\n";
757
758                if (c->host_ident != NULL) {
759                    char *s = add_conn_suffix(d->upstream_conn);
760                    if (s != NULL) {
761                        snprintf(s, SUFFIX_SIZE - 1,
762                                 "SERVER_ERROR proxy downstream closed %s\r\n",
763                                 c->host_ident);
764                        s[SUFFIX_SIZE - 1] = '\0';
765                        d->upstream_suffix = s;
766
767                        s = strchr(s, ':'); /* Clip to avoid sending user/pswd. */
768                        if (s != NULL) {
769                            *s++ = '\r';
770                            *s++ = '\n';
771                            *s = '\0';
772                        }
773                    }
774                }
775
776                d->upstream_suffix_len = 0;
777            } else {
778                d->upstream_status = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
779            }
780
781            d->upstream_retry = 0;
782            d->target_host_ident = NULL;
783        }
784
785        /* We sometimes see that drive_machine/transmit will not see */
786        /* a closed connection error during conn_mwrite, possibly */
787        /* due to non-blocking sockets.  Because of this, drive_machine */
788        /* thinks it has a successful downstream request send and */
789        /* moves the state forward trying to read a response from */
790        /* the downstream conn (conn_new_cmd, conn_read, etc), and */
791        /* only then do we finally see the conn close situation, */
792        /* ending up here.  That is, drive_machine only */
793        /* seems to move to conn_closing from conn_read. */
794
795        /* If we haven't received any reply yet, we retry based */
796        /* on our cmd_retries counter. */
797
798        /* TODO: Reconsider retry behavior, is it right in all situations? */
799
800        if (c->rcurr != NULL &&
801            c->rbytes == 0 &&
802            d->downstream_used_start == d->downstream_used &&
803            d->downstream_used_start == 1 &&
804            d->upstream_conn->next == NULL &&
805            d->behaviors_arr != NULL) {
806            if (k >= 0 && k < d->behaviors_num) {
807                int retry_max = d->behaviors_arr[k].downstream_retry;
808                if (d->upstream_conn->cmd_retries < retry_max) {
809                    d->upstream_conn->cmd_retries++;
810                    uc_retry = d->upstream_conn;
811                    d->upstream_suffix = NULL;
812                    d->upstream_suffix_len = 0;
813                    d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
814                    d->upstream_retry = 0;
815                    d->target_host_ident = NULL;
816                }
817            }
818        }
819
820        if (uc_retry == NULL &&
821            d->upstream_suffix == NULL &&
822            IS_BINARY(d->upstream_conn->protocol)) {
823            protocol_binary_response_header *rh =
824                cproxy_make_bin_error(d->upstream_conn,
825                                      PROTOCOL_BINARY_RESPONSE_EINTERNAL);
826            if (rh != NULL) {
827                d->upstream_suffix = (char *) rh;
828                d->upstream_suffix_len = sizeof(protocol_binary_response_header);
829            } else {
830                d->ptd->stats.stats.err_oom++;
831                cproxy_close_conn(d->upstream_conn);
832            }
833        }
834    }
835
836    /* Are we over-decrementing here, and in handling conn_pause? */
837
838    /* Case 1: we're in conn_pause, and socket is closed concurrently. */
839    /* We unpause due to reserve, we move to conn_write/conn_mwrite, */
840    /* fail and move to conn_closing.  So, no over-decrement. */
841
842    /* Case 2: we're waiting for a downstream response in conn_new_cmd, */
843    /* and socket is closed concurrently.  State goes to conn_closing, */
844    /* so, no over-decrement. */
845
846    /* Case 3: we've finished processing downstream response (in */
847    /* conn_parse_cmd or conn_nread), and the downstream socket */
848    /* is closed concurrently.  We then move to conn_pause, */
849    /* and same as Case 1. */
850
851    cproxy_release_downstream_conn(d, c);
852
853    /* Setup a retry after unwinding the call stack. */
854    /* We use the work_queue, because our caller, conn_close(), */
855    /* is likely to blow away our fd if we try to reconnect */
856    /* right now. */
857
858    if (uc_retry != NULL) {
859        if (settings.verbose > 2) {
860            moxi_log_write("%d cproxy retrying\n", uc_retry->sfd);
861        }
862
863        ptd->stats.stats.tot_retry++;
864
865        cb_assert(uc_retry->thread);
866        cb_assert(uc_retry->thread->work_queue);
867
868        work_send(uc_retry->thread->work_queue,
869                  upstream_retry, ptd, uc_retry);
870    }
871}
872
873void upstream_retry(void *data0, void *data1) {
874    proxy_td *ptd = data0;
875    conn *uc = data1;
876
877    cb_assert(ptd);
878    cb_assert(uc);
879
880    cproxy_pause_upstream_for_downstream(ptd, uc);
881}
882
883void cproxy_add_downstream(proxy_td *ptd) {
884    cb_assert(ptd != NULL);
885    cb_assert(ptd->proxy != NULL);
886
887    if (ptd->downstream_max == 0 ||
888        ptd->downstream_num < ptd->downstream_max) {
889        if (settings.verbose > 2) {
890            moxi_log_write("cproxy_add_downstream %d %d\n",
891                    ptd->downstream_num,
892                    ptd->downstream_max);
893        }
894
895        /* The config/behaviors will be NULL if the */
896        /* proxy is shutting down. */
897
898        if (ptd->config != NULL &&
899            ptd->behavior_pool.arr != NULL) {
900            downstream *d =
901                cproxy_create_downstream(ptd->config,
902                                         ptd->config_ver,
903                                         &ptd->behavior_pool);
904            if (d != NULL) {
905                d->ptd = ptd;
906                ptd->downstream_tot++;
907                ptd->downstream_num++;
908                cproxy_release_downstream(d, true);
909            } else {
910                ptd->stats.stats.tot_downstream_create_failed++;
911            }
912        }
913    } else {
914        ptd->stats.stats.tot_downstream_max_reached++;
915    }
916}
917
918downstream *cproxy_reserve_downstream(proxy_td *ptd) {
919    cb_assert(ptd != NULL);
920
921    /* Loop in case we need to clear out downstreams */
922    /* that have outdated configs. */
923
924    while (true) {
925        downstream *d;
926
927        d = ptd->downstream_released;
928        if (d == NULL) {
929            cproxy_add_downstream(ptd);
930        }
931
932        d = ptd->downstream_released;
933        if (d == NULL) {
934            return NULL;
935        }
936
937        ptd->downstream_released = d->next;
938
939        cb_assert(d->upstream_conn == NULL);
940        cb_assert(d->upstream_suffix == NULL);
941        cb_assert(d->upstream_suffix_len == 0);
942        cb_assert(d->upstream_status == PROTOCOL_BINARY_RESPONSE_SUCCESS);
943        cb_assert(d->upstream_retry == 0);
944        cb_assert(d->target_host_ident == NULL);
945        cb_assert(d->downstream_used == 0);
946        cb_assert(d->downstream_used_start == 0);
947        cb_assert(d->merger == NULL);
948        cb_assert(d->timeout_tv.tv_sec == 0);
949        cb_assert(d->timeout_tv.tv_usec == 0);
950        cb_assert(d->next_waiting == NULL);
951
952        d->upstream_conn = NULL;
953        d->upstream_suffix = NULL;
954        d->upstream_suffix_len = 0;
955        d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
956        d->upstream_retry = 0;
957        d->upstream_retries = 0;
958        d->target_host_ident = NULL;
959        d->usec_start = 0;
960        d->downstream_used = 0;
961        d->downstream_used_start = 0;
962        d->merger = NULL;
963        d->timeout_tv.tv_sec = 0;
964        d->timeout_tv.tv_usec = 0;
965        d->next_waiting = NULL;
966
967        if (cproxy_check_downstream_config(d)) {
968            bool found;
969            ptd->downstream_reserved =
970                downstream_list_remove(ptd->downstream_reserved, d);
971            ptd->downstream_released =
972                downstream_list_remove(ptd->downstream_released, d);
973
974            found = zstored_downstream_waiting_remove(d);
975            cb_assert(!found);
976
977            d->next = ptd->downstream_reserved;
978            ptd->downstream_reserved = d;
979
980            ptd->stats.stats.tot_downstream_reserved++;
981
982            return d;
983        }
984
985        cproxy_free_downstream(d);
986    }
987}
988
989bool cproxy_clear_timeout(downstream *d) {
990    bool rv = false;
991
992    if (d->timeout_tv.tv_sec != 0 ||
993        d->timeout_tv.tv_usec != 0) {
994        evtimer_del(&d->timeout_event);
995        rv = true;
996    }
997
998    d->timeout_tv.tv_sec = 0;
999    d->timeout_tv.tv_usec = 0;
1000
1001    return rv;
1002}
1003
1004bool cproxy_release_downstream(downstream *d, bool force) {
1005    int i;
1006    int n;
1007    bool found;
1008
1009    cb_assert(d != NULL);
1010    cb_assert(d->ptd != NULL);
1011
1012    if (settings.verbose > 2) {
1013        moxi_log_write("%d: release_downstream\n",
1014                       d->upstream_conn != NULL ?
1015                       d->upstream_conn->sfd : -1);
1016    }
1017
1018    /* Always release the timeout_event, even if we're going to retry, */
1019    /* to avoid pegging CPU with leaked timeout_events. */
1020
1021    cproxy_clear_timeout(d);
1022
1023    /* If we need to retry the command, we do so here, */
1024    /* keeping the same downstream that would otherwise */
1025    /* be released. */
1026
1027    if (!force && d->upstream_conn != NULL && d->upstream_retry > 0) {
1028        int max_retries;
1029        d->upstream_retry = 0;
1030        d->upstream_retries++;
1031
1032        /* But, we can stop retrying if we've tried each server twice. */
1033        max_retries = cproxy_max_retries(d);
1034
1035        if (d->upstream_retries <= max_retries) {
1036            if (settings.verbose > 2) {
1037                moxi_log_write("%d: release_downstream,"
1038                               " instead retrying %d, %d <= %d, %d\n",
1039                               d->upstream_conn->sfd,
1040                               d->upstream_retry,
1041                               d->upstream_retries, max_retries,
1042                               d->ptd->stats.stats.tot_retry);
1043            }
1044
1045            d->ptd->stats.stats.tot_retry++;
1046
1047            if (cproxy_forward(d) == true) {
1048                return true;
1049            } else {
1050                d->ptd->stats.stats.tot_downstream_propagate_failed++;
1051
1052                propagate_error_msg(d, NULL, d->upstream_status);
1053            }
1054        } else {
1055            if (settings.verbose > 2) {
1056                moxi_log_write("%d: release_downstream,"
1057                               " skipping retry %d, %d > %d\n",
1058                               d->upstream_conn->sfd,
1059                               d->upstream_retry,
1060                               d->upstream_retries, max_retries);
1061            }
1062        }
1063    }
1064
1065    /* Record reserved_time histogram timings. */
1066
1067    if (d->usec_start > 0) {
1068        uint64_t ux = usec_now() - d->usec_start;
1069
1070        d->ptd->stats.stats.tot_downstream_reserved_time += ux;
1071
1072        if (d->ptd->stats.stats.max_downstream_reserved_time < ux) {
1073            d->ptd->stats.stats.max_downstream_reserved_time = ux;
1074        }
1075
1076        if (d->upstream_retries > 0) {
1077            d->ptd->stats.stats.tot_retry_time += ux;
1078
1079            if (d->ptd->stats.stats.max_retry_time < ux) {
1080                d->ptd->stats.stats.max_retry_time = ux;
1081            }
1082        }
1083
1084        downstream_reserved_time_sample(&d->ptd->stats, ux);
1085    }
1086
1087    d->ptd->stats.stats.tot_downstream_released++;
1088
1089    /* Delink upstream conns. */
1090
1091    while (d->upstream_conn != NULL) {
1092        conn *curr;
1093        if (d->merger != NULL) {
1094            /* TODO: Allow merger callback to be func pointer. */
1095
1096            genhash_iter(d->merger,
1097                        protocol_stats_foreach_write,
1098                        d->upstream_conn);
1099
1100            if (update_event(d->upstream_conn, EV_WRITE | EV_PERSIST)) {
1101                conn_set_state(d->upstream_conn, conn_mwrite);
1102            } else {
1103                d->ptd->stats.stats.err_oom++;
1104                cproxy_close_conn(d->upstream_conn);
1105            }
1106        }
1107
1108        if (settings.verbose > 2) {
1109            moxi_log_write("%d: release_downstream upstream_suffix %s status %x\n",
1110                           d->upstream_conn->sfd,
1111                           d->upstream_suffix_len == 0 ?
1112                           d->upstream_suffix : "(binary)",
1113                           d->upstream_status);
1114        }
1115
1116        if (d->upstream_suffix != NULL) {
1117            /* Do a last write on the upstream.  For example, */
1118            /* the upstream_suffix might be "END\r\n" or other */
1119            /* way to mark the end of a scatter-gather or */
1120            /* multiline response. */
1121            int suffix_len;
1122
1123            if (settings.verbose > 2) {
1124                if (d->upstream_suffix_len > 0) {
1125                    moxi_log_write("%d: release_downstream"
1126                                   " writing suffix binary: %d\n",
1127                                   d->upstream_conn->sfd,
1128                                   d->upstream_suffix_len);
1129
1130                    cproxy_dump_header(d->upstream_conn->sfd,
1131                                       d->upstream_suffix);
1132                } else {
1133                    moxi_log_write("%d: release_downstream"
1134                                   " writing suffix ascii: %s\n",
1135                                   d->upstream_conn->sfd,
1136                                   d->upstream_suffix);
1137                }
1138            }
1139
1140            suffix_len = d->upstream_suffix_len;
1141            if (suffix_len == 0) {
1142                suffix_len = (int)strlen(d->upstream_suffix);
1143            }
1144
1145            if (add_iov(d->upstream_conn,
1146                        d->upstream_suffix,
1147                        suffix_len) == 0 &&
1148                update_event(d->upstream_conn, EV_WRITE | EV_PERSIST)) {
1149                conn_set_state(d->upstream_conn, conn_mwrite);
1150            } else {
1151                d->ptd->stats.stats.err_oom++;
1152                cproxy_close_conn(d->upstream_conn);
1153            }
1154        }
1155
1156        curr = d->upstream_conn;
1157        d->upstream_conn = d->upstream_conn->next;
1158        curr->next = NULL;
1159    }
1160
1161    /* Free extra hash tables. */
1162
1163    if (d->multiget != NULL) {
1164        genhash_iter(d->multiget, multiget_foreach_free, d);
1165        genhash_free(d->multiget);
1166        d->multiget = NULL;
1167    }
1168
1169    if (d->merger != NULL) {
1170        genhash_iter(d->merger, protocol_stats_foreach_free, NULL);
1171        genhash_free(d->merger);
1172        d->merger = NULL;
1173    }
1174
1175    d->upstream_conn = NULL;
1176    d->upstream_suffix = NULL; /* No free(), expecting a static string. */
1177    d->upstream_suffix_len = 0;
1178    d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
1179    d->upstream_retry = 0;
1180    d->upstream_retries = 0;
1181    d->target_host_ident = NULL;
1182    d->usec_start = 0;
1183    d->downstream_used = 0;
1184    d->downstream_used_start = 0;
1185    d->multiget = NULL;
1186    d->merger = NULL;
1187
1188    /* TODO: Consider adding a downstream->prev backpointer */
1189    /*       or doubly-linked list to save on this scan. */
1190
1191    d->ptd->downstream_reserved =
1192        downstream_list_remove(d->ptd->downstream_reserved, d);
1193    d->ptd->downstream_released =
1194        downstream_list_remove(d->ptd->downstream_released, d);
1195
1196    found = zstored_downstream_waiting_remove(d);
1197    cb_assert(!found);
1198
1199    n = mcs_server_count(&d->mst);
1200
1201    for (i = 0; i < n; i++) {
1202        conn *dc = d->downstream_conns[i];
1203        d->downstream_conns[i] = NULL;
1204        if (dc != NULL) {
1205            zstored_release_downstream_conn(dc, false);
1206        }
1207    }
1208
1209    cproxy_clear_timeout(d); /* For MB-4334. */
1210
1211    cb_assert(d->timeout_tv.tv_sec == 0);
1212    cb_assert(d->timeout_tv.tv_usec == 0);
1213
1214    /* If this downstream still has the same configuration as our top-level */
1215    /* proxy config, go back onto the available, released downstream list. */
1216
1217    if (cproxy_check_downstream_config(d) || force) {
1218        d->next = d->ptd->downstream_released;
1219        d->ptd->downstream_released = d;
1220
1221        return true;
1222    }
1223
1224    cproxy_free_downstream(d);
1225
1226    return false;
1227}
1228
1229void cproxy_free_downstream(downstream *d) {
1230    bool found;
1231    int n;
1232
1233    cb_assert(d != NULL);
1234    cb_assert(d->ptd != NULL);
1235    cb_assert(d->upstream_conn == NULL);
1236    cb_assert(d->multiget == NULL);
1237    cb_assert(d->merger == NULL);
1238    cb_assert(d->timeout_tv.tv_sec == 0);
1239    cb_assert(d->timeout_tv.tv_usec == 0);
1240
1241    if (settings.verbose > 2) {
1242        moxi_log_write("cproxy_free_downstream\n");
1243    }
1244
1245    d->ptd->stats.stats.tot_downstream_freed++;
1246
1247    d->ptd->downstream_reserved =
1248        downstream_list_remove(d->ptd->downstream_reserved, d);
1249    d->ptd->downstream_released =
1250        downstream_list_remove(d->ptd->downstream_released, d);
1251
1252    found = zstored_downstream_waiting_remove(d);
1253    cb_assert(!found);
1254
1255    d->ptd->downstream_num--;
1256    cb_assert(d->ptd->downstream_num >= 0);
1257
1258    n = mcs_server_count(&d->mst);
1259
1260    if (d->downstream_conns != NULL) {
1261        int i;
1262        for (i = 0; i < n; i++) {
1263            if (d->downstream_conns[i] != NULL &&
1264                d->downstream_conns[i] != NULL_CONN) {
1265                d->downstream_conns[i]->extra = NULL;
1266            }
1267        }
1268    }
1269
1270    /* This will close sockets, which will force associated conn's */
1271    /* to go to conn_closing state.  Since we've already cleared */
1272    /* the conn->extra pointers, there's no extra release/free. */
1273
1274    mcs_free(&d->mst);
1275
1276    cproxy_clear_timeout(d);
1277
1278    if (d->downstream_conns != NULL) {
1279        free(d->downstream_conns);
1280    }
1281
1282    if (d->config != NULL) {
1283        free(d->config);
1284    }
1285
1286    free(d->behaviors_arr);
1287    free(d);
1288}
1289
1290/* The config input is something libmemcached can parse.
1291 * See mcs_server_st_parse().
1292 */
1293downstream *cproxy_create_downstream(char *config,
1294                                     uint32_t config_ver,
1295                                     proxy_behavior_pool *behavior_pool) {
1296    downstream *d = calloc(1, sizeof(downstream));
1297    cb_assert(config != NULL);
1298    cb_assert(behavior_pool != NULL);
1299
1300    if (d != NULL &&
1301        config != NULL &&
1302        config[0] != '\0') {
1303        d->config        = strdup(config);
1304        d->config_ver    = config_ver;
1305        d->behaviors_num = behavior_pool->num;
1306        d->behaviors_arr = cproxy_copy_behaviors(behavior_pool->num,
1307                                                 behavior_pool->arr);
1308
1309        /* TODO: Handle non-uniform downstream protocols. */
1310
1311        cb_assert(IS_PROXY(behavior_pool->base.downstream_protocol));
1312
1313        if (settings.verbose > 2) {
1314            moxi_log_write(
1315                    "cproxy_create_downstream: %s, %u, %u\n",
1316                    config, config_ver,
1317                    behavior_pool->base.downstream_protocol);
1318        }
1319
1320        if (d->config != NULL &&
1321            d->behaviors_arr != NULL) {
1322            char *usr = behavior_pool->base.usr[0] != '\0' ?
1323                behavior_pool->base.usr :
1324                NULL;
1325            char *pwd = behavior_pool->base.pwd[0] != '\0' ?
1326                behavior_pool->base.pwd :
1327                NULL;
1328
1329            int nconns = init_mcs_st(&d->mst, d->config, usr, pwd,
1330                                     behavior_pool->base.mcs_opts);
1331            if (nconns > 0) {
1332                d->downstream_conns = (conn **)
1333                    calloc(nconns, sizeof(conn *));
1334                if (d->downstream_conns != NULL) {
1335                    return d;
1336                }
1337
1338                mcs_free(&d->mst);
1339            }
1340        }
1341
1342        free(d->config);
1343        free(d->behaviors_arr);
1344        free(d);
1345    }
1346
1347    return NULL;
1348}
1349
1350int init_mcs_st(mcs_st *mst, char *config,
1351                const char *default_usr,
1352                const char *default_pwd,
1353                const char *opts) {
1354    cb_assert(mst);
1355    cb_assert(config);
1356
1357    if (mcs_create(mst, config,
1358                   default_usr, default_pwd, opts) != NULL) {
1359        return mcs_server_count(mst);
1360    } else {
1361        if (settings.verbose > 1) {
1362            moxi_log_write("mcs_create failed: %s\n",
1363                    config);
1364        }
1365    }
1366
1367    return 0;
1368}
1369
1370/* See if the downstream config matches the top-level proxy config.
1371 */
1372bool cproxy_check_downstream_config(downstream *d) {
1373    int rv = false;
1374
1375    cb_assert(d != NULL);
1376    cb_assert(d->ptd != NULL);
1377    cb_assert(d->ptd->proxy != NULL);
1378
1379    if (d->config_ver == d->ptd->config_ver) {
1380        rv = true;
1381    } else if (d->config != NULL &&
1382               d->ptd->config != NULL &&
1383               cproxy_equal_behaviors(d->behaviors_num,
1384                                      d->behaviors_arr,
1385                                      d->ptd->behavior_pool.num,
1386                                      d->ptd->behavior_pool.arr)) {
1387        /* Parse the proxy/parent's config to see if we can */
1388        /* reuse our existing downstream connections. */
1389
1390        char *usr = d->ptd->behavior_pool.base.usr[0] != '\0' ?
1391            d->ptd->behavior_pool.base.usr :
1392            NULL;
1393        char *pwd = d->ptd->behavior_pool.base.pwd[0] != '\0' ?
1394            d->ptd->behavior_pool.base.pwd :
1395            NULL;
1396
1397        mcs_st next;
1398
1399        int n = init_mcs_st(&next, d->ptd->config, usr, pwd,
1400                            d->ptd->behavior_pool.base.mcs_opts);
1401        if (n > 0) {
1402            if (mcs_stable_update(&d->mst, &next)) {
1403                if (settings.verbose > 2) {
1404                    moxi_log_write("check_downstream_config stable update\n");
1405                }
1406
1407                free(d->config);
1408                d->config     = strdup(d->ptd->config);
1409                d->config_ver = d->ptd->config_ver;
1410                rv = true;
1411            }
1412
1413            mcs_free(&next);
1414        }
1415    }
1416
1417    if (settings.verbose > 2) {
1418        moxi_log_write("check_downstream_config %u\n", rv);
1419    }
1420
1421    return rv;
1422}
1423
1424/* Returns -1 if the connections aren't fully assigned and ready. */
1425/* In that case, the downstream has to wait for a downstream connection */
1426/* to get out of the conn_connecting state. */
1427
1428/* The downstream connection might leave the conn_connecting state */
1429/* with an error (unable to connect).  That case is handled by */
1430/* tracking a NULL_CONN sentinel value. */
1431
1432/* Also, in the -1 result case, the d->upstream_conn should remain in */
1433/* conn_pause state. */
1434
1435/* A server_index of -1 means to connect all downstreams, as the */
1436/* caller probably needs to proxy a broadcast command. */
1437
1438int cproxy_connect_downstream(downstream *d, LIBEVENT_THREAD *thread,
1439                              int server_index) {
1440    int i = 0;
1441    int s = 0; /* Number connected. */
1442    int n;
1443    mcs_server_st *msst_actual;
1444
1445    cb_assert(d != NULL);
1446    cb_assert(d->ptd != NULL);
1447    cb_assert(d->ptd->downstream_released != d); /* Should not be in free list. */
1448    cb_assert(d->downstream_conns != NULL);
1449    cb_assert(mcs_server_count(&d->mst) > 0);
1450    cb_assert(thread != NULL);
1451    cb_assert(thread->base != NULL);
1452
1453    n = mcs_server_count(&d->mst);
1454
1455    cb_assert(d->behaviors_num >= n);
1456    cb_assert(d->behaviors_arr != NULL);
1457
1458    if (settings.verbose > 2) {
1459        moxi_log_write("%d: cproxy_connect_downstream server_index %d in %d\n",
1460                       d->upstream_conn->sfd, server_index, n);
1461    }
1462
1463
1464    if (server_index >= 0) {
1465        cb_assert(server_index < n);
1466        i = server_index;
1467        n = server_index + 1;
1468    }
1469
1470    for (; i < n; i++) {
1471        cb_assert(IS_PROXY(d->behaviors_arr[i].downstream_protocol));
1472
1473        msst_actual = mcs_server_index(&d->mst, i);
1474
1475        /* Connect to downstream servers, if not already. */
1476
1477        /* A NULL_CONN means that we tried to connect, but failed, */
1478        /* which is different than NULL (which means that we haven't */
1479        /* tried to connect yet). */
1480
1481        if (d->downstream_conns[i] == NULL) {
1482            bool downstream_conn_max_reached = false;
1483            conn *c = d->upstream_conn;
1484            /*
1485             * mcmux compatiblity mode, one downstream struct will be associated
1486             * with downstream connection. So overwrite the default msst with the
1487             * value.
1488             */
1489            if (c && c->peer_host && c->peer_port) {
1490                cb_assert(i == 0);
1491                strncpy(msst_actual->hostname, c->peer_host, MCS_HOSTNAME_SIZE);
1492                msst_actual->port = c->peer_port;
1493                msst_actual->fd = -1;
1494                msst_actual->ident_a[0] = msst_actual->ident_b[0] = 0;
1495            }
1496
1497            d->downstream_conns[i] =
1498                zstored_acquire_downstream_conn(d, thread,
1499                                                msst_actual,
1500                                                &d->behaviors_arr[i],
1501                                                &downstream_conn_max_reached);
1502            if (c != NULL &&
1503                i == server_index &&
1504                d->downstream_conns[i] != NULL &&
1505                d->downstream_conns[i] != NULL_CONN &&
1506                d->target_host_ident == NULL) {
1507                d->target_host_ident = add_conn_suffix(c);
1508                if (d->target_host_ident != NULL) {
1509                    strncpy(d->target_host_ident,
1510                            msst_actual->hostname,
1511                            SUFFIX_SIZE);
1512                    d->target_host_ident[SUFFIX_SIZE - 1] = '\0';
1513                }
1514            }
1515
1516            if (d->downstream_conns[i] != NULL &&
1517                d->downstream_conns[i] != NULL_CONN &&
1518                d->downstream_conns[i]->state == conn_connecting) {
1519                return -1;
1520            }
1521
1522            if (d->downstream_conns[i] == NULL &&
1523                downstream_conn_max_reached == true) {
1524                if (settings.verbose > 2) {
1525                    moxi_log_write("%d: downstream_conn_max reached\n",
1526                                   d->upstream_conn->sfd);
1527                }
1528
1529                if (zstored_downstream_waiting_add(d, thread,
1530                                                   msst_actual,
1531                                                   &d->behaviors_arr[i]) == true) {
1532                    /* Since we're waiting on the downstream conn queue, */
1533                    /* start a downstream timer per configuration. */
1534
1535                    cproxy_start_downstream_timeout_ex(d, c,
1536                        d->behaviors_arr[i].downstream_conn_queue_timeout);
1537
1538                    return -1;
1539                }
1540            }
1541        }
1542
1543        if (d->downstream_conns[i] != NULL &&
1544            d->downstream_conns[i] != NULL_CONN) {
1545            s++;
1546        } else {
1547            mcs_server_st_quit(msst_actual, 1);
1548            d->downstream_conns[i] = NULL_CONN;
1549        }
1550    }
1551
1552    return s;
1553}
1554
1555conn *cproxy_connect_downstream_conn(downstream *d,
1556                                     LIBEVENT_THREAD *thread,
1557                                     mcs_server_st *msst,
1558                                     proxy_behavior *behavior) {
1559    uint64_t start = 0;
1560    int err = -1;
1561    SOCKET fd;
1562
1563    cb_assert(d);
1564    cb_assert(d->ptd);
1565    cb_assert(d->ptd->downstream_released != d); /* Should not be in free list. */
1566    cb_assert(thread);
1567    cb_assert(thread->base);
1568    cb_assert(msst);
1569    cb_assert(behavior);
1570    cb_assert(mcs_server_st_hostname(msst) != NULL);
1571    cb_assert(mcs_server_st_port(msst) > 0);
1572    cb_assert(mcs_server_st_fd(msst) == -1);
1573
1574    if (d->ptd->behavior_pool.base.time_stats) {
1575        start = usec_now();
1576    }
1577
1578    d->ptd->stats.stats.tot_downstream_connect_started++;
1579
1580    fd = mcs_connect(mcs_server_st_hostname(msst),
1581                     mcs_server_st_port(msst), &err,
1582                     MOXI_BLOCKING_CONNECT);
1583
1584    if (settings.verbose > 2) {
1585        moxi_log_write("%d: cproxy_connect_downstream_conn %s:%d %d %d\n", fd,
1586                       mcs_server_st_hostname(msst),
1587                       mcs_server_st_port(msst),
1588                       MOXI_BLOCKING_CONNECT, err);
1589    }
1590
1591    if (fd != -1) {
1592        conn *c = conn_new(fd, conn_pause, 0,
1593                           DATA_BUFFER_SIZE,
1594                           tcp_transport,
1595                           thread->base,
1596                           &cproxy_downstream_funcs, d);
1597        if (c != NULL ) {
1598            c->protocol = (d->upstream_conn->peer_protocol ?
1599                           d->upstream_conn->peer_protocol :
1600                           behavior->downstream_protocol);
1601            c->thread = thread;
1602            c->cmd_start_time = start;
1603
1604#ifdef WIN32
1605            if (err == WSAEINPROGRESS) {
1606                err = EINPROGRESS;
1607            } else if (err == WSAEWOULDBLOCK) {
1608                err = EWOULDBLOCK;
1609            }
1610#endif
1611
1612            if (err == EINPROGRESS ||
1613                err == EWOULDBLOCK) {
1614                if (update_event_timed(c, EV_WRITE | EV_PERSIST,
1615                                       &behavior->connect_timeout)) {
1616                    conn_set_state(c, conn_connecting);
1617
1618                    d->ptd->stats.stats.tot_downstream_connect_wait++;
1619
1620                    return c;
1621                } else {
1622                    d->ptd->stats.stats.err_oom++;
1623                }
1624            } else {
1625                if (downstream_connect_init(d, msst, behavior, c)) {
1626                    return c;
1627                }
1628            }
1629
1630            cproxy_close_conn(c);
1631        } else {
1632            d->ptd->stats.stats.err_oom++;
1633        }
1634    }
1635
1636    d->ptd->stats.stats.tot_downstream_connect_failed++;
1637
1638    return NULL;
1639}
1640
1641bool downstream_connect_init(downstream *d, mcs_server_st *msst,
1642                             proxy_behavior *behavior, conn *c) {
1643    char *host_ident;
1644    int rv;
1645
1646    cb_assert(c->thread != NULL);
1647
1648    host_ident = c->host_ident;
1649    if (host_ident == NULL) {
1650        host_ident = mcs_server_st_ident(msst, IS_ASCII(c->protocol));
1651    }
1652
1653    if (c->cmd_start_time != 0 &&
1654        d->ptd->behavior_pool.base.time_stats) {
1655        downstream_connect_time_sample(&d->ptd->stats,
1656                                       usec_now() - c->cmd_start_time);
1657    }
1658
1659    rv = cproxy_auth_downstream(msst, behavior, c->sfd);
1660    if (rv == 0) {
1661        d->ptd->stats.stats.tot_downstream_auth++;
1662
1663        rv = cproxy_bucket_downstream(msst, behavior, c->sfd);
1664        if (rv == 0) {
1665            d->ptd->stats.stats.tot_downstream_bucket++;
1666
1667            zstored_error_count(c->thread, host_ident, false);
1668
1669            d->ptd->stats.stats.tot_downstream_connect++;
1670
1671            return true;
1672        } else {
1673            d->ptd->stats.stats.tot_downstream_bucket_failed++;
1674        }
1675    } else {
1676        d->ptd->stats.stats.tot_downstream_auth_failed++;
1677        if (rv == 1) {
1678            d->ptd->stats.stats.tot_auth_timeout++;
1679        }
1680    }
1681
1682    /* Treat a auth/bucket error as a blacklistable error. */
1683
1684    zstored_error_count(c->thread, host_ident, true);
1685
1686    return false;
1687}
1688
1689conn *cproxy_find_downstream_conn(downstream *d,
1690                                  char *key, int key_length,
1691                                  bool *local) {
1692    return cproxy_find_downstream_conn_ex(d, key, key_length, local, NULL);
1693}
1694
1695conn *cproxy_find_downstream_conn_ex(downstream *d,
1696                                     char *key, int key_length,
1697                                     bool *local,
1698                                     int *vbucket) {
1699    int v;
1700    int s;
1701
1702    cb_assert(d != NULL);
1703    cb_assert(d->downstream_conns != NULL);
1704    cb_assert(key != NULL);
1705    cb_assert(key_length > 0);
1706
1707    if (local != NULL) {
1708        *local = false;
1709    }
1710
1711    v = -1;
1712    s = cproxy_server_index(d, key, key_length, &v);
1713
1714    if (settings.verbose > 2 && s >= 0) {
1715        moxi_log_write("%d: server_index %d, vbucket %d, conn %d\n", s, v,
1716                       (d->upstream_conn != NULL ?
1717                        d->upstream_conn->sfd : 0),
1718                       (d->downstream_conns[s] == NULL ?
1719                        0 :
1720                        (d->downstream_conns[s] == NULL_CONN ?
1721                         -1 :
1722                         d->downstream_conns[s]->sfd)));
1723    }
1724
1725    if (s >= 0 &&
1726        s < (int) mcs_server_count(&d->mst) &&
1727        d->downstream_conns[s] != NULL &&
1728        d->downstream_conns[s] != NULL_CONN) {
1729
1730        if (local != NULL && s == 0) {
1731            *local = true;
1732        }
1733
1734        if (vbucket != NULL) {
1735            *vbucket = v;
1736        }
1737
1738        return d->downstream_conns[s];
1739    }
1740
1741    return NULL;
1742}
1743
1744bool cproxy_prep_conn_for_write(conn *c) {
1745    if (c != NULL) {
1746        cb_assert(c->item == NULL);
1747        cb_assert(IS_PROXY(c->protocol));
1748        cb_assert(c->ilist != NULL);
1749        cb_assert(c->isize > 0);
1750        cb_assert(c->suffixlist != NULL);
1751        cb_assert(c->suffixsize > 0);
1752
1753        c->icurr      = c->ilist;
1754        c->ileft      = 0;
1755        c->suffixcurr = c->suffixlist;
1756        c->suffixleft = 0;
1757
1758        c->msgcurr = 0; /* TODO: Mem leak just by blowing these to 0? */
1759        c->msgused = 0;
1760        c->iovused = 0;
1761
1762        if (add_msghdr(c) == 0) {
1763            return true;
1764        }
1765
1766        if (settings.verbose > 1) {
1767            moxi_log_write("%d: cproxy_prep_conn_for_write failed\n",
1768                           c->sfd);
1769        }
1770    }
1771
1772    return false;
1773}
1774
1775bool cproxy_update_event_write(downstream *d, conn *c) {
1776    cb_assert(d != NULL);
1777    cb_assert(d->ptd != NULL);
1778    cb_assert(c != NULL);
1779
1780    if (!update_event(c, EV_WRITE | EV_PERSIST)) {
1781        d->ptd->stats.stats.err_oom++;
1782        cproxy_close_conn(c);
1783
1784        return false;
1785    }
1786
1787    return true;
1788}
1789
1790/**
1791 * Do a hash through libmemcached to see which server (by index)
1792 * should hold a given key.
1793 */
1794int cproxy_server_index(downstream *d, char *key, size_t key_length,
1795                        int *vbucket) {
1796    cb_assert(d != NULL);
1797    cb_assert(key != NULL);
1798    cb_assert(key_length > 0);
1799
1800    if (mcs_server_count(&d->mst) <= 0) {
1801        return -1;
1802    }
1803
1804    return (int) mcs_key_hash(&d->mst, key, key_length, vbucket);
1805}
1806
1807void cproxy_assign_downstream(proxy_td *ptd) {
1808    uint64_t da;
1809    conn *tail;
1810    bool stop = false;
1811
1812    cb_assert(ptd != NULL);
1813
1814    if (settings.verbose > 2) {
1815        moxi_log_write("assign_downstream\n");
1816    }
1817
1818    ptd->downstream_assigns++;
1819
1820    da = ptd->downstream_assigns;
1821    /* Key loop that tries to reserve any available, released */
1822    /* downstream resources to waiting upstream conns. */
1823
1824    /* Remember the wait list tail when we start, in case more */
1825    /* upstream conns are tacked onto the wait list while we're */
1826    /* processing.  This helps avoid infinite loop where upstream */
1827    /* conns just keep on moving to the tail. */
1828
1829    tail = ptd->waiting_any_downstream_tail;
1830    while (ptd->waiting_any_downstream_head != NULL && !stop) {
1831        conn *uc_last;
1832        downstream *d;
1833
1834        if (ptd->waiting_any_downstream_head == tail) {
1835            stop = true;
1836        }
1837
1838        d = cproxy_reserve_downstream(ptd);
1839        if (d == NULL) {
1840            if (ptd->downstream_num <= 0) {
1841                /* Absolutely no downstreams connected, so */
1842                /* might as well error out. */
1843
1844                while (ptd->waiting_any_downstream_head != NULL) {
1845                    conn *uc;
1846                    ptd->stats.stats.tot_downstream_propagate_failed++;
1847
1848                    uc = ptd->waiting_any_downstream_head;
1849                    ptd->waiting_any_downstream_head =
1850                        ptd->waiting_any_downstream_head->next;
1851                    if (ptd->waiting_any_downstream_head == NULL) {
1852                        ptd->waiting_any_downstream_tail = NULL;
1853                    }
1854                    uc->next = NULL;
1855
1856                    upstream_error_msg(uc,
1857                                       "SERVER_ERROR proxy out of downstreams\r\n",
1858                                       PROTOCOL_BINARY_RESPONSE_EINTERNAL);
1859                }
1860            }
1861
1862            break; /* If no downstreams are available, stop loop. */
1863        }
1864
1865        cb_assert(d->upstream_conn == NULL);
1866        cb_assert(d->downstream_used == 0);
1867        cb_assert(d->downstream_used_start == 0);
1868        cb_assert(d->multiget == NULL);
1869        cb_assert(d->merger == NULL);
1870        cb_assert(d->timeout_tv.tv_sec == 0);
1871        cb_assert(d->timeout_tv.tv_usec == 0);
1872
1873        /* We have a downstream reserved, so assign the first */
1874        /* waiting upstream conn to it. */
1875
1876        d->upstream_conn = ptd->waiting_any_downstream_head;
1877        ptd->waiting_any_downstream_head =
1878            ptd->waiting_any_downstream_head->next;
1879        if (ptd->waiting_any_downstream_head == NULL) {
1880            ptd->waiting_any_downstream_tail = NULL;
1881        }
1882        d->upstream_conn->next = NULL;
1883
1884        ptd->stats.stats.tot_assign_downstream++;
1885        ptd->stats.stats.tot_assign_upstream++;
1886
1887        /* Add any compatible upstream conns to the downstream. */
1888        /* By compatible, for example, we mean multi-gets from */
1889        /* different upstreams so we can de-deplicate get keys. */
1890        uc_last = d->upstream_conn;
1891
1892        while (is_compatible_request(uc_last,
1893                                     ptd->waiting_any_downstream_head)) {
1894            uc_last->next = ptd->waiting_any_downstream_head;
1895
1896            ptd->waiting_any_downstream_head =
1897                ptd->waiting_any_downstream_head->next;
1898            if (ptd->waiting_any_downstream_head == NULL) {
1899                ptd->waiting_any_downstream_tail = NULL;
1900            }
1901
1902            uc_last = uc_last->next;
1903            uc_last->next = NULL;
1904
1905            /* Note: tot_assign_upstream - tot_assign_downstream */
1906            /* should get us how many requests we've piggybacked together. */
1907
1908            ptd->stats.stats.tot_assign_upstream++;
1909        }
1910
1911        if (settings.verbose > 2) {
1912            moxi_log_write("%d: assign_downstream, matched to upstream\n",
1913                    d->upstream_conn->sfd);
1914        }
1915
1916        if (cproxy_forward(d) == false) {
1917            /* TODO: This stat is incorrect, as we might reach here */
1918            /* when we have entire front cache hit or talk-to-self */
1919            /* optimization hit on multiget. */
1920
1921            ptd->stats.stats.tot_downstream_propagate_failed++;
1922
1923            /* During cproxy_forward(), we might have recursed, */
1924            /* especially in error situation if a downstream */
1925            /* conn got closed and released.  Check for recursion */
1926            /* before we touch d anymore. */
1927
1928            if (da != ptd->downstream_assigns) {
1929                ptd->stats.stats.tot_assign_recursion++;
1930                break;
1931            }
1932
1933            propagate_error_msg(d, NULL, d->upstream_status);
1934
1935            cproxy_release_downstream(d, false);
1936        }
1937    }
1938
1939    if (settings.verbose > 2) {
1940        moxi_log_write("assign_downstream, done\n");
1941    }
1942}
1943
1944void propagate_error_msg(downstream *d, char *ascii_msg,
1945                         protocol_binary_response_status binary_status) {
1946    cb_assert(d != NULL);
1947
1948    if (ascii_msg == NULL &&
1949        d->upstream_conn != NULL &&
1950        d->target_host_ident != NULL) {
1951        char *s = add_conn_suffix(d->upstream_conn);
1952        if (s != NULL) {
1953            snprintf(s, SUFFIX_SIZE - 1,
1954                     "SERVER_ERROR proxy write to downstream %s\r\n",
1955                     d->target_host_ident);
1956            s[SUFFIX_SIZE - 1] = '\0';
1957            ascii_msg = s;
1958        }
1959    }
1960
1961    while (d->upstream_conn != NULL) {
1962        conn *uc = d->upstream_conn;
1963        conn *curr;
1964
1965        if (settings.verbose > 1) {
1966            moxi_log_write("%d: could not forward upstream to downstream\n",
1967                           uc->sfd);
1968        }
1969
1970        upstream_error_msg(uc, ascii_msg, binary_status);
1971
1972        curr = d->upstream_conn;
1973        d->upstream_conn = d->upstream_conn->next;
1974        curr->next = NULL;
1975    }
1976}
1977
1978bool cproxy_forward(downstream *d) {
1979    cb_assert(d != NULL);
1980    cb_assert(d->ptd != NULL);
1981    cb_assert(d->upstream_conn != NULL);
1982
1983    if (settings.verbose > 2) {
1984        moxi_log_write(
1985                "%d: cproxy_forward prot %d to prot %d\n",
1986                d->upstream_conn->sfd,
1987                d->upstream_conn->protocol,
1988                d->ptd->behavior_pool.base.downstream_protocol);
1989    }
1990
1991    if (IS_ASCII(d->upstream_conn->protocol)) {
1992        /* ASCII upstream. */
1993
1994        unsigned int peer_protocol =
1995            d->upstream_conn->peer_protocol ?
1996            d->upstream_conn->peer_protocol :
1997            d->ptd->behavior_pool.base.downstream_protocol;
1998
1999        if (IS_ASCII(peer_protocol)) {
2000            return cproxy_forward_a2a_downstream(d);
2001        } else {
2002            return cproxy_forward_a2b_downstream(d);
2003        }
2004    } else {
2005        /* BINARY upstream. */
2006
2007        if (IS_BINARY(d->ptd->behavior_pool.base.downstream_protocol)) {
2008            return cproxy_forward_b2b_downstream(d);
2009        } else {
2010            /* TODO: No binary upstream to ascii downstream support. */
2011
2012            cb_assert(0);
2013            return false;
2014        }
2015    }
2016}
2017
2018bool cproxy_forward_or_error(downstream *d) {
2019    if (cproxy_forward(d) == false) {
2020        d->ptd->stats.stats.tot_downstream_propagate_failed++;
2021        propagate_error_msg(d, NULL, d->upstream_status);
2022        cproxy_release_downstream(d, false);
2023
2024        return false;
2025    }
2026
2027    return true;
2028}
2029
2030void upstream_error_msg(conn *uc, char *ascii_msg,
2031                        protocol_binary_response_status binary_status) {
2032    proxy_td *ptd;
2033    cb_assert(uc);
2034    cb_assert(uc->state == conn_pause);
2035
2036    ptd = uc->extra;
2037    cb_assert(ptd != NULL);
2038
2039    if (IS_ASCII(uc->protocol)) {
2040        char *msg = ascii_msg;
2041        if (msg == NULL) {
2042            msg = "SERVER_ERROR proxy write to downstream\r\n";
2043        }
2044
2045        cb_mutex_enter(&ptd->proxy->proxy_lock);
2046        if (ptd->proxy->name != NULL &&
2047            strcmp(ptd->proxy->name, NULL_BUCKET) == 0) {
2048            msg = "SERVER_ERROR unauthorized, null bucket\r\n";
2049        }
2050        cb_mutex_exit(&ptd->proxy->proxy_lock);
2051
2052        /* Send an END on get/gets instead of generic SERVER_ERROR. */
2053
2054        if (uc->cmd == -1 &&
2055            uc->cmd_start != NULL &&
2056            strncmp(uc->cmd_start, "get", 3) == 0 &&
2057            (false == settings.enable_mcmux_mode) &&
2058            (0 != strncmp(ptd->behavior_pool.base.nodeLocator,
2059                          "vbucket",
2060                          sizeof(ptd->behavior_pool.base.nodeLocator) - 1))) {
2061            msg = "END\r\n";
2062        }
2063
2064        if (settings.verbose > 2) {
2065            moxi_log_write("%d: upstream_error: %s\n", uc->sfd, msg);
2066        }
2067
2068        if (add_iov(uc, msg, (int)strlen(msg)) == 0 &&
2069            update_event(uc, EV_WRITE | EV_PERSIST)) {
2070            conn_set_state(uc, conn_mwrite);
2071        } else {
2072            ptd->stats.stats.err_oom++;
2073            cproxy_close_conn(uc);
2074        }
2075    } else {
2076        cb_assert(IS_BINARY(uc->protocol));
2077
2078        if (binary_status == PROTOCOL_BINARY_RESPONSE_SUCCESS) {
2079            /* Default to our favorite catch-all binary protocol response. */
2080
2081            binary_status = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
2082        }
2083
2084        cb_mutex_enter(&ptd->proxy->proxy_lock);
2085        if (ptd->proxy->name != NULL &&
2086            strcmp(ptd->proxy->name, NULL_BUCKET) == 0) {
2087            binary_status = PROTOCOL_BINARY_RESPONSE_AUTH_ERROR;
2088        }
2089        cb_mutex_exit(&ptd->proxy->proxy_lock);
2090
2091        write_bin_error(uc, binary_status, 0);
2092
2093        update_event(uc, EV_WRITE | EV_PERSIST);
2094    }
2095}
2096
2097void cproxy_reset_upstream(conn *uc) {
2098    proxy_td *ptd;
2099    cb_assert(uc != NULL);
2100
2101    ptd = uc->extra;
2102    cb_assert(ptd != NULL);
2103
2104    conn_set_state(uc, conn_new_cmd);
2105
2106    if (uc->rbytes <= 0) {
2107        if (!update_event(uc, EV_READ | EV_PERSIST)) {
2108            ptd->stats.stats.err_oom++;
2109            cproxy_close_conn(uc);
2110        }
2111
2112        return; /* Return either way. */
2113    }
2114
2115    /* We may have already read incoming bytes into the uc's buffer, */
2116    /* so the issue is that libevent may never see (or expect) any */
2117    /* EV_READ events (and hence, won't fire event callbacks) for the */
2118    /* upstream connection.  This can leave the uc seemingly stuck, */
2119    /* never hitting drive_machine() loop. */
2120
2121    if (settings.verbose > 2) {
2122        moxi_log_write("%d: cproxy_reset_upstream with bytes available: %d\n",
2123                       uc->sfd, uc->rbytes);
2124    }
2125
2126    /* So, we copy the drive_machine()/conn_new_cmd handling to */
2127    /* schedule uc into drive_machine() execution, where the uc */
2128    /* conn is likely to be writable.  We need to do this */
2129    /* because we're currently on the drive_machine() execution */
2130    /* loop for the downstream connection, not for the uc. */
2131
2132    if (!update_event(uc, EV_WRITE | EV_PERSIST)) {
2133        if (settings.verbose > 0) {
2134            moxi_log_write("Couldn't update event\n");
2135        }
2136
2137        conn_set_state(uc, conn_closing);
2138    }
2139
2140    ptd->stats.stats.tot_reset_upstream_avail++;
2141}
2142
2143bool cproxy_dettach_if_noreply(downstream *d, conn *uc) {
2144    if (uc->noreply) {
2145        uc->noreply        = false;
2146        d->upstream_conn   = NULL;
2147        d->upstream_suffix = NULL;
2148        d->upstream_suffix_len = 0;
2149        d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
2150        d->upstream_retry  = 0;
2151        d->target_host_ident = NULL;
2152
2153        cproxy_reset_upstream(uc);
2154
2155        return true;
2156    }
2157
2158    return false;
2159}
2160
2161void cproxy_wait_any_downstream(proxy_td *ptd, conn *uc) {
2162    cb_assert(uc != NULL);
2163    cb_assert(uc->next == NULL);
2164    cb_assert(ptd != NULL);
2165    cb_assert(!ptd->waiting_any_downstream_tail ||
2166           !ptd->waiting_any_downstream_tail->next);
2167
2168    /* Add the upstream conn to the wait list. */
2169
2170    uc->next = NULL;
2171    if (ptd->waiting_any_downstream_tail != NULL) {
2172        ptd->waiting_any_downstream_tail->next = uc;
2173    }
2174    ptd->waiting_any_downstream_tail = uc;
2175    if (ptd->waiting_any_downstream_head == NULL) {
2176        ptd->waiting_any_downstream_head = uc;
2177    }
2178}
2179
2180void cproxy_release_downstream_conn(downstream *d, conn *c) {
2181    proxy_td *ptd;
2182    cb_assert(c != NULL);
2183    cb_assert(d != NULL);
2184
2185    ptd = d->ptd;
2186    cb_assert(ptd != NULL);
2187
2188    if (settings.verbose > 2) {
2189        moxi_log_write("%d: release_downstream_conn, downstream_used %d %d,"
2190                       " upstream %d\n",
2191                       c->sfd, d->downstream_used, d->downstream_used_start,
2192                       (d->upstream_conn != NULL ?
2193                        d->upstream_conn->sfd : 0));
2194    }
2195
2196    d->downstream_used--;
2197    if (d->downstream_used <= 0) {
2198        /* The downstream_used count might go < 0 when if there's */
2199        /* an early error and we decide to close the downstream */
2200        /* conn, before anything gets sent or before the */
2201        /* downstream_used was able to be incremented. */
2202
2203        cproxy_release_downstream(d, false);
2204        cproxy_assign_downstream(ptd);
2205    }
2206}
2207
2208void cproxy_on_pause_downstream_conn(conn *c) {
2209    downstream *d;
2210    cb_assert(c != NULL);
2211
2212    if (settings.verbose > 2) {
2213        moxi_log_write("<%d cproxy_on_pause_downstream_conn\n",
2214                c->sfd);
2215    }
2216
2217    d = c->extra;
2218
2219    if (!d || c->rbytes > 0) {
2220        zstored_downstream_conns *conns;
2221
2222        moxi_log_write("%d: Closed the downstream since got"
2223                       "an event on downstream or extra data on downstream."
2224                       " (rbytes: %u)\n", c->sfd, c->rbytes);
2225
2226        conns = zstored_get_downstream_conns(c->thread, c->host_ident);
2227        if (conns) {
2228            bool found = false;
2229            conns->dc = conn_list_remove(conns->dc, NULL, c, &found);
2230            if (!found) {
2231                moxi_log_write("<%d: %s:%d Not able to find connection"
2232                               " in zstore conns\n",
2233                               c->sfd, __FILE__, __LINE__);
2234            }
2235        } else {
2236            moxi_log_write("<%d %s:%d Not able to find zstore conns\n",
2237                           c->sfd, __FILE__, __LINE__);
2238        }
2239        cproxy_close_conn(c);
2240        return;
2241    }
2242
2243    cb_assert(d->ptd != NULL);
2244
2245    /* Must update_event() before releasing the downstream conn, */
2246    /* because the release might call udpate_event(), too, */
2247    /* and we don't want to override its work. */
2248
2249    if (update_event(c, EV_READ | EV_PERSIST)) {
2250        cproxy_release_downstream_conn(d, c);
2251    } else {
2252        d->ptd->stats.stats.err_oom++;
2253        cproxy_close_conn(c);
2254    }
2255}
2256
2257void cproxy_pause_upstream_for_downstream(proxy_td *ptd, conn *upstream) {
2258    cb_assert(ptd != NULL);
2259    cb_assert(upstream != NULL);
2260
2261    if (settings.verbose > 2) {
2262        moxi_log_write("%d: pause_upstream_for_downstream\n",
2263                upstream->sfd);
2264    }
2265
2266    conn_set_state(upstream, conn_pause);
2267
2268    cproxy_wait_any_downstream(ptd, upstream);
2269
2270    if (ptd->timeout_tv.tv_sec == 0 &&
2271        ptd->timeout_tv.tv_usec == 0) {
2272        cproxy_start_wait_queue_timeout(ptd, upstream);
2273    }
2274
2275    cproxy_assign_downstream(ptd);
2276}
2277
2278struct timeval cproxy_get_downstream_timeout(downstream *d, conn *c) {
2279
2280    struct timeval rv;
2281    proxy_td *ptd;
2282    cb_assert(d);
2283
2284    if (c != NULL) {
2285        int i;
2286        cb_assert(d->behaviors_num > 0);
2287        cb_assert(d->behaviors_arr != NULL);
2288        cb_assert(d->downstream_conns != NULL);
2289
2290        i = downstream_conn_index(d, c);
2291        if (i >= 0 && i < d->behaviors_num) {
2292            rv = d->behaviors_arr[i].downstream_timeout;
2293            if (rv.tv_sec != 0 || rv.tv_usec != 0) {
2294                return rv;
2295            }
2296        }
2297    }
2298
2299    ptd = d->ptd;
2300    cb_assert(ptd);
2301
2302    rv = ptd->behavior_pool.base.downstream_timeout;
2303
2304    return rv;
2305}
2306
2307bool cproxy_start_wait_queue_timeout(proxy_td *ptd, conn *uc) {
2308    cb_assert(ptd);
2309    cb_assert(uc);
2310    cb_assert(uc->thread);
2311    cb_assert(uc->thread->base);
2312
2313    ptd->timeout_tv = ptd->behavior_pool.base.wait_queue_timeout;
2314    if (ptd->timeout_tv.tv_sec != 0 ||
2315        ptd->timeout_tv.tv_usec != 0) {
2316        if (settings.verbose > 2) {
2317            moxi_log_write("wait_queue_timeout started\n");
2318        }
2319
2320        evtimer_set(&ptd->timeout_event, wait_queue_timeout, ptd);
2321
2322        event_base_set(uc->thread->base, &ptd->timeout_event);
2323
2324        return evtimer_add(&ptd->timeout_event, &ptd->timeout_tv) == 0;
2325    }
2326
2327    return true;
2328}
2329
2330static void wait_queue_timeout(evutil_socket_t fd,
2331                               const short which,
2332                               void *arg) {
2333    proxy_td *ptd = arg;
2334    cb_assert(ptd != NULL);
2335    (void)fd;
2336    (void)which;
2337
2338    if (settings.verbose > 2) {
2339        moxi_log_write("wait_queue_timeout\n");
2340    }
2341
2342    /* This timer callback is invoked when an upstream conn */
2343    /* has been in the wait queue for too long. */
2344
2345    if (ptd->timeout_tv.tv_sec != 0 || ptd->timeout_tv.tv_usec != 0) {
2346        struct timeval wqt;
2347        uint64_t wqt_msec;
2348        uint64_t cut_msec;
2349        conn *uc_curr;
2350
2351        evtimer_del(&ptd->timeout_event);
2352
2353        ptd->timeout_tv.tv_sec = 0;
2354        ptd->timeout_tv.tv_usec = 0;
2355
2356        if (settings.verbose > 2) {
2357            moxi_log_write("wait_queue_timeout cleared\n");
2358        }
2359
2360        wqt = ptd->behavior_pool.base.wait_queue_timeout;
2361        wqt_msec = (wqt.tv_sec * 1000) + (wqt.tv_usec / 1000);
2362        cut_msec = msec_current_time - wqt_msec;
2363
2364        /* Run through all the old upstream conn's in */
2365        /* the wait queue, remove them, and emit errors */
2366        /* on them.  And then start a new timer if needed. */
2367        uc_curr = ptd->waiting_any_downstream_head;
2368        while (uc_curr != NULL) {
2369            conn *uc = uc_curr;
2370
2371            uc_curr = uc_curr->next;
2372
2373            /* Check if upstream conn is old and should be removed. */
2374
2375            if (settings.verbose > 2) {
2376                moxi_log_write("wait_queue_timeout compare %u to %u cutoff\n",
2377                        uc->cmd_start_time, cut_msec);
2378            }
2379
2380            if (uc->cmd_start_time <= cut_msec) {
2381                if (settings.verbose > 1) {
2382                    moxi_log_write("proxy_td_timeout sending error %d\n",
2383                            uc->sfd);
2384                }
2385
2386                ptd->stats.stats.tot_wait_queue_timeout++;
2387
2388                ptd->waiting_any_downstream_head =
2389                    conn_list_remove(ptd->waiting_any_downstream_head,
2390                                     &ptd->waiting_any_downstream_tail,
2391                                     uc, NULL); /* TODO: O(N^2). */
2392
2393                upstream_error_msg(uc,
2394                                   "SERVER_ERROR proxy wait queue timeout",
2395                                   PROTOCOL_BINARY_RESPONSE_EBUSY);
2396            }
2397        }
2398
2399        if (ptd->waiting_any_downstream_head != NULL) {
2400            cproxy_start_wait_queue_timeout(ptd,
2401                                            ptd->waiting_any_downstream_head);
2402        }
2403    }
2404}
2405
2406rel_time_t cproxy_realtime(const time_t exptime) {
2407    /* Input is a long... */
2408
2409    /* 0       | (0...REALIME_MAXDELTA] | (REALTIME_MAXDELTA... */
2410    /* forever | delta                  | unix_time */
2411
2412    /* Storage is an unsigned int. */
2413
2414    /* TODO: Handle resolution loss. */
2415
2416    /* The cproxy version of realtime doesn't do any */
2417    /* time math munging, just pass through. */
2418
2419    return (rel_time_t) exptime;
2420}
2421
2422void cproxy_close_conn(conn *c) {
2423    cb_assert(c != NULL);
2424
2425    if (c == NULL_CONN) {
2426        return;
2427    }
2428
2429    conn_set_state(c, conn_closing);
2430
2431    update_event(c, 0);
2432
2433    /* Run through drive_machine just once, */
2434    /* to go through close code paths. */
2435
2436    drive_machine(c);
2437}
2438
2439bool add_conn_item(conn *c, item *it) {
2440    cb_assert(it != NULL);
2441    cb_assert(c != NULL);
2442    cb_assert(c->ilist != NULL);
2443    cb_assert(c->icurr != NULL);
2444    cb_assert(c->isize > 0);
2445
2446    if (c->ileft >= c->isize) {
2447        item **new_list =
2448            realloc(c->ilist, sizeof(item *) * c->isize * 2);
2449        if (new_list) {
2450            c->isize *= 2;
2451            c->ilist = new_list;
2452            c->icurr = new_list;
2453        }
2454    }
2455
2456    if (c->ileft < c->isize) {
2457        c->ilist[c->ileft] = it;
2458        c->ileft++;
2459
2460        return true;
2461    }
2462
2463    return false;
2464}
2465
2466char *add_conn_suffix(conn *c) {
2467    cb_assert(c != NULL);
2468    cb_assert(c->suffixlist != NULL);
2469    cb_assert(c->suffixcurr != NULL);
2470    cb_assert(c->suffixsize > 0);
2471
2472    if (c->suffixleft >= c->suffixsize) {
2473        char **new_suffix_list =
2474            realloc(c->suffixlist,
2475                    sizeof(char *) * c->suffixsize * 2);
2476        if (new_suffix_list) {
2477            c->suffixsize *= 2;
2478            c->suffixlist = new_suffix_list;
2479            c->suffixcurr = new_suffix_list;
2480        }
2481    }
2482
2483    if (c->suffixleft < c->suffixsize) {
2484        char *suffix = cache_alloc(c->thread->suffix_cache);
2485        if (suffix != NULL) {
2486            c->suffixlist[c->suffixleft] = suffix;
2487            c->suffixleft++;
2488
2489            return suffix;
2490        }
2491    }
2492
2493    return NULL;
2494}
2495
2496char *nread_text(short x) {
2497    char *rv = NULL;
2498    switch(x) {
2499    case NREAD_SET:
2500        rv = "set ";
2501        break;
2502    case NREAD_ADD:
2503        rv = "add ";
2504        break;
2505    case NREAD_REPLACE:
2506        rv = "replace ";
2507        break;
2508    case NREAD_APPEND:
2509        rv = "append ";
2510        break;
2511    case NREAD_PREPEND:
2512        rv = "prepend ";
2513        break;
2514    case NREAD_CAS:
2515        rv = "cas ";
2516        break;
2517    }
2518    return rv;
2519}
2520
2521/* Tokenize the command string by updating the token array
2522 * with pointers to start of each token and length.
2523 * Does not modify the input command string.
2524 *
2525 * Returns total number of tokens.  The last valid token is the terminal
2526 * token (value points to the first unprocessed character of the string and
2527 * length zero).
2528 *
2529 * Usage example:
2530 *
2531 *  while (scan_tokens(command, tokens, max_tokens, NULL) > 0) {
2532 *      for(int ix = 0; tokens[ix].length != 0; ix++) {
2533 *          ...
2534 *      }
2535 *      command = tokens[ix].value;
2536 *  }
2537 */
2538size_t scan_tokens(char *command, token_t *tokens,
2539                   const size_t max_tokens,
2540                   int *command_len) {
2541    char *s, *e;
2542    size_t ntokens = 0;
2543
2544    if (command_len != NULL) {
2545        *command_len = 0;
2546    }
2547
2548    cb_assert(command != NULL && tokens != NULL && max_tokens > 1);
2549
2550    for (s = e = command; ntokens < max_tokens - 1; ++e) {
2551        if (*e == '\0' || *e == ' ') {
2552            if (s != e) {
2553                tokens[ntokens].value = s;
2554                tokens[ntokens].length = e - s;
2555                ntokens++;
2556            }
2557            if (*e == '\0') {
2558                if (command_len != NULL) {
2559                    *command_len = (int)(e - command);
2560                }
2561                break; /* string end */
2562            }
2563            s = e + 1;
2564        }
2565    }
2566
2567    /* If we scanned the whole string, the terminal value pointer is null,
2568     * otherwise it is the first unprocessed character.
2569     */
2570    tokens[ntokens].value = (*e == '\0' ? NULL : e);
2571    tokens[ntokens].length = 0;
2572    ntokens++;
2573
2574    return ntokens;
2575}
2576
2577/* Remove conn c from a conn list.
2578 * Returns the new head of the list.
2579 */
2580conn *conn_list_remove(conn *head, conn **tail, conn *c, bool *found) {
2581    conn *prev = NULL;
2582    conn *curr = head;
2583
2584    if (found != NULL) {
2585        *found = false;
2586    }
2587
2588    while (curr != NULL) {
2589        if (curr == c) {
2590            conn *r;
2591            if (found != NULL) {
2592                *found = true;
2593            }
2594
2595            if (tail != NULL && *tail == curr) {
2596                *tail = prev;
2597            }
2598
2599            if (prev != NULL) {
2600                cb_assert(curr != head);
2601                prev->next = curr->next;
2602                curr->next = NULL;
2603                return head;
2604            }
2605
2606            cb_assert(curr == head);
2607            r = curr->next;
2608            curr->next = NULL;
2609            return r;
2610        }
2611
2612        prev = curr;
2613        curr = curr ->next;
2614    }
2615
2616    return head;
2617}
2618
2619/* Returns the new head of the list.
2620 */
2621downstream *downstream_list_remove(downstream *head, downstream *d) {
2622    downstream *prev = NULL;
2623    downstream *curr = head;
2624
2625    while (curr != NULL) {
2626        if (curr == d) {
2627            downstream *r;
2628            if (prev != NULL) {
2629                cb_assert(curr != head);
2630                prev->next = curr->next;
2631                curr->next = NULL;
2632                return head;
2633            }
2634
2635            cb_assert(curr == head);
2636            r = curr->next;
2637            curr->next = NULL;
2638            return r;
2639        }
2640
2641        prev = curr;
2642        curr = curr ->next;
2643    }
2644
2645    return head;
2646}
2647
2648/* Returns the new head of the list.
2649 */
2650downstream *downstream_list_waiting_remove(downstream *head,
2651                                           downstream **tail,
2652                                           downstream *d) {
2653    downstream *prev = NULL;
2654    downstream *curr = head;
2655
2656    while (curr != NULL) {
2657        if (curr == d) {
2658            downstream *r;
2659            if (tail != NULL && *tail == curr) {
2660                *tail = prev;
2661            }
2662
2663            if (prev != NULL) {
2664                cb_assert(curr != head);
2665                prev->next_waiting = curr->next_waiting;
2666                curr->next_waiting = NULL;
2667                return head;
2668            }
2669
2670            cb_assert(curr == head);
2671            r = curr->next_waiting;
2672            curr->next_waiting = NULL;
2673            return r;
2674        }
2675
2676        prev = curr;
2677        curr = curr ->next_waiting;
2678    }
2679
2680    return head;
2681}
2682
2683/* Returns true if a candidate request is squashable
2684 * or de-duplicatable with an existing request, to
2685 * save on network hops.
2686 */
2687bool is_compatible_request(conn *existing, conn *candidate) {
2688    (void)existing;
2689    (void)candidate;
2690
2691    /* The not-my-vbucket error handling requires us to not */
2692    /* squash ascii multi-GET requests, due to reusing the */
2693    /* multiget-deduplication machinery during retries and */
2694    /* to simplify the later codepaths. */
2695    /*
2696    cb_assert(existing);
2697    cb_assert(existing->state == conn_pause);
2698    cb_assert(IS_PROXY(existing->protocol));
2699
2700    if (IS_BINARY(existing->protocol)) {
2701        // TODO: Revisit multi-get squashing for binary another day.
2702
2703        return false;
2704    }
2705
2706    cb_assert(IS_ASCII(existing->protocol));
2707
2708    if (candidate != NULL) {
2709        cb_assert(IS_ASCII(candidate->protocol));
2710        cb_assert(IS_PROXY(candidate->protocol));
2711        cb_assert(candidate->state == conn_pause);
2712
2713        // TODO: Allow gets (CAS) for de-duplication.
2714
2715        if (existing->cmd == -1 &&
2716            candidate->cmd == -1 &&
2717            existing->cmd_retries <= 0 &&
2718            candidate->cmd_retries <= 0 &&
2719            !existing->noreply &&
2720            !candidate->noreply &&
2721            strncmp(existing->cmd_start, "get ", 4) == 0 &&
2722            strncmp(candidate->cmd_start, "get ", 4) == 0) {
2723            cb_assert(existing->item == NULL);
2724            cb_assert(candidate->item == NULL);
2725
2726            return true;
2727        }
2728    }
2729    */
2730
2731    return false;
2732}
2733
2734void downstream_timeout(evutil_socket_t fd,
2735                        const short which,
2736                        void *arg) {
2737    downstream *d = arg;
2738    proxy_td *ptd;
2739
2740    cb_assert(d != NULL);
2741
2742    ptd = d->ptd;
2743    cb_assert(ptd != NULL);
2744
2745    /* This timer callback is invoked when one or more of */
2746    /* the downstream conns must be really slow.  Handle by */
2747    /* closing downstream conns, which might help by */
2748    /* freeing up downstream resources. */
2749
2750    if (cproxy_clear_timeout(d)) {
2751        char *m;
2752        int n;
2753        int i;
2754        /* The downstream_timeout() callback is invoked for */
2755        /* two cases (downstream_conn_queue_timeouts and */
2756        /* downstream_timeouts), so cleanup and track stats */
2757        /* accordingly. */
2758        bool was_conn_queue_waiting =
2759            zstored_downstream_waiting_remove(d);
2760
2761        if (was_conn_queue_waiting == true) {
2762            if (settings.verbose > 2) {
2763                moxi_log_write("conn_queue_timeout\n");
2764            }
2765
2766            ptd->stats.stats.tot_downstream_conn_queue_timeout++;
2767        } else {
2768            if (settings.verbose > 2) {
2769                moxi_log_write("downstream_timeout\n");
2770            }
2771
2772            ptd->stats.stats.tot_downstream_timeout++;
2773        }
2774
2775        m = "SERVER_ERROR proxy downstream timeout\r\n";
2776
2777        if (d->target_host_ident != NULL) {
2778            m = add_conn_suffix(d->upstream_conn);
2779            if (m != NULL) {
2780                char *s;
2781                snprintf(m, SUFFIX_SIZE - 1,
2782                         "SERVER_ERROR proxy downstream timeout %s\r\n",
2783                         d->target_host_ident);
2784                m[SUFFIX_SIZE - 1] = '\0';
2785
2786                s = strchr(m, ':'); /* Clip to avoid sending user/pswd. */
2787                if (s != NULL) {
2788                    *s++ = '\r';
2789                    *s++ = '\n';
2790                    *s = '\0';
2791                }
2792            }
2793        }
2794
2795        propagate_error_msg(d, m, PROTOCOL_BINARY_RESPONSE_EBUSY);
2796        n = mcs_server_count(&d->mst);
2797
2798        for (i = 0; i < n; i++) {
2799            conn *dc = d->downstream_conns[i];
2800            if (dc != NULL &&
2801                dc != NULL_CONN) {
2802                /* We have to de-link early, because we don't want */
2803                /* to have cproxy_close_conn() release the downstream */
2804                /* while we're in the middle of this loop. */
2805
2806                delink_from_downstream_conns(dc);
2807
2808                cproxy_close_conn(dc);
2809            }
2810        }
2811
2812        cproxy_release_downstream(d, false);
2813        cproxy_assign_downstream(ptd);
2814    }
2815
2816    (void)fd;
2817    (void)which;
2818}
2819
2820bool cproxy_start_downstream_timeout(downstream *d, conn *c) {
2821    cb_assert(d != NULL);
2822    cb_assert(d->behaviors_num > 0);
2823    cb_assert(d->behaviors_arr != NULL);
2824
2825    return cproxy_start_downstream_timeout_ex(d, c,
2826                cproxy_get_downstream_timeout(d, c));
2827}
2828
2829bool cproxy_start_downstream_timeout_ex(downstream *d, conn *c,
2830                                        struct timeval dt) {
2831    conn *uc;
2832
2833    cb_assert(d != NULL);
2834    cb_assert(d->behaviors_num > 0);
2835    cb_assert(d->behaviors_arr != NULL);
2836
2837    cproxy_clear_timeout(d);
2838
2839    if (dt.tv_sec == 0 &&
2840        dt.tv_usec == 0) {
2841        return true;
2842    }
2843
2844    uc = d->upstream_conn;
2845
2846    cb_assert(uc != NULL);
2847    cb_assert(uc->state == conn_pause);
2848    cb_assert(uc->thread != NULL);
2849    cb_assert(uc->thread->base != NULL);
2850    cb_assert(IS_PROXY(uc->protocol));
2851
2852    if (settings.verbose > 2) {
2853        moxi_log_write("%d: cproxy_start_downstream_timeout\n",
2854                       (c != NULL ? c->sfd : -1));
2855    }
2856
2857    evtimer_set(&d->timeout_event, downstream_timeout, d);
2858
2859    event_base_set(uc->thread->base, &d->timeout_event);
2860
2861    d->timeout_tv.tv_sec  = dt.tv_sec;
2862    d->timeout_tv.tv_usec = dt.tv_usec;
2863
2864    return (evtimer_add(&d->timeout_event, &d->timeout_tv) == 0);
2865}
2866
2867/* Return 0 on success, -1 on general failure, 1 on timeout failure. */
2868
2869int cproxy_auth_downstream(mcs_server_st *server,
2870                           proxy_behavior *behavior,
2871                           SOCKET fd) {
2872    protocol_binary_request_header req;
2873    protocol_binary_response_header res;
2874    struct timeval *timeout = NULL;
2875    char buf[3000];
2876    const char *usr;
2877    const char *pwd;
2878    int usr_len;
2879    int pwd_len;
2880    int buf_len;
2881    mcs_return mr;
2882
2883    cb_assert(server);
2884    cb_assert(behavior);
2885    cb_assert(fd != INVALID_SOCKET);
2886
2887
2888    if (!IS_BINARY(behavior->downstream_protocol)) {
2889        return 0;
2890    }
2891
2892    usr = mcs_server_st_usr(server) != NULL ?
2893        mcs_server_st_usr(server) : behavior->usr;
2894    pwd = mcs_server_st_pwd(server) != NULL ?
2895        mcs_server_st_pwd(server) : behavior->pwd;
2896
2897    usr_len = (int)strlen(usr);
2898    pwd_len = (int)strlen(pwd);
2899
2900    if (usr_len <= 0) {
2901        return 0;
2902    }
2903
2904    if (settings.verbose > 2) {
2905        moxi_log_write("cproxy_auth_downstream usr: %s pwd: (%d)\n",
2906                       usr, pwd_len);
2907    }
2908
2909    if (usr_len <= 0 ||
2910        !IS_PROXY(behavior->downstream_protocol) ||
2911        (usr_len + pwd_len + 50 > (int) sizeof(buf))) {
2912        if (settings.verbose > 1) {
2913            moxi_log_write("auth failure args\n");
2914        }
2915
2916        return -1; /* Probably misconfigured. */
2917    }
2918
2919    /* The key should look like "PLAIN", or the sasl mech string. */
2920    /* The data should look like "\0usr\0pwd".  So, the body buf */
2921    /* should look like "PLAIN\0usr\0pwd". */
2922
2923    /* TODO: Allow binary passwords. */
2924
2925    buf_len = snprintf(buf, sizeof(buf), "PLAIN%c%s%c%s",
2926                       0, usr,
2927                       0, pwd);
2928    cb_assert(buf_len == 7 + usr_len + pwd_len);
2929
2930    memset(req.bytes, 0, sizeof(req.bytes));
2931    memset(res.bytes, 0, sizeof(res.bytes));
2932    req.request.magic    = PROTOCOL_BINARY_REQ;
2933    req.request.opcode   = PROTOCOL_BINARY_CMD_SASL_AUTH;
2934    req.request.keylen   = htons((uint16_t) 5); /* 5 == strlen("PLAIN"). */
2935    req.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
2936    req.request.bodylen  = htonl(buf_len);
2937
2938    if (mcs_io_write(fd, (const char *) req.bytes,
2939                     sizeof(req.bytes)) != sizeof(req.bytes) ||
2940        mcs_io_write(fd, buf, buf_len) == -1) {
2941        mcs_io_reset(fd);
2942
2943        if (settings.verbose > 1) {
2944            moxi_log_write("auth failure during write for %s (%d)\n",
2945                           usr, buf_len);
2946        }
2947
2948        return -1;
2949    }
2950
2951    if (behavior->auth_timeout.tv_sec != 0 ||
2952        behavior->auth_timeout.tv_usec != 0) {
2953        timeout = &behavior->auth_timeout;
2954    }
2955
2956    mr = mcs_io_read(fd, &res.bytes, sizeof(res.bytes), timeout);
2957    if (mr == MCS_SUCCESS && res.response.magic == PROTOCOL_BINARY_RES) {
2958        int len;
2959        res.response.status  = ntohs(res.response.status);
2960        res.response.keylen  = ntohs(res.response.keylen);
2961        res.response.bodylen = ntohl(res.response.bodylen);
2962
2963        /* Swallow whatever body comes. */
2964        len = res.response.bodylen;
2965        while (len > 0) {
2966            int amt = (len > (int) sizeof(buf) ? (int) sizeof(buf) : len);
2967
2968            mr = mcs_io_read(fd, buf, amt, timeout);
2969            if (mr != MCS_SUCCESS) {
2970                if (settings.verbose > 1) {
2971                    moxi_log_write("auth could not read response body (%d) %d\n",
2972                                   usr, amt, mr);
2973                }
2974
2975                if (mr == MCS_TIMEOUT) {
2976                    return 1;
2977                }
2978
2979                return -1;
2980            }
2981
2982            len -= amt;
2983        }
2984
2985        /* The res status should be either... */
2986        /* - SUCCESS         - sasl aware server and good credentials. */
2987        /* - AUTH_ERROR      - wrong credentials. */
2988        /* - UNKNOWN_COMMAND - sasl-unaware server. */
2989
2990        if (res.response.status == PROTOCOL_BINARY_RESPONSE_SUCCESS) {
2991            if (settings.verbose > 2) {
2992                moxi_log_write("auth_downstream success for %s\n", usr);
2993            }
2994
2995            return 0;
2996        }
2997
2998        if (settings.verbose > 1) {
2999            moxi_log_write("auth_downstream failure for %s (%x)\n",
3000                           usr, res.response.status);
3001        }
3002    } else {
3003        if (settings.verbose > 1) {
3004            moxi_log_write("auth_downstream response error for %s, %d\n",
3005                           usr, mr);
3006        }
3007    }
3008
3009    if (mr == MCS_TIMEOUT) {
3010        return 1;
3011    }
3012
3013    return -1;
3014}
3015
3016/* Return 0 on success, -1 on general failure, 1 on timeout failure. */
3017
3018int cproxy_bucket_downstream(mcs_server_st *server,
3019                             proxy_behavior *behavior,
3020                             SOCKET fd) {
3021    protocol_binary_request_header req;
3022    protocol_binary_response_header res;
3023    struct timeval *timeout = NULL;
3024    int bucket_len;
3025    mcs_return mr;
3026
3027    cb_assert(server);
3028    cb_assert(behavior);
3029    cb_assert(IS_PROXY(behavior->downstream_protocol));
3030    cb_assert(fd != INVALID_SOCKET);
3031
3032    if (!IS_BINARY(behavior->downstream_protocol)) {
3033        return 0;
3034    }
3035
3036    bucket_len = (int)strlen(behavior->bucket);
3037    if (bucket_len <= 0) {
3038        return 0; /* When no bucket. */
3039    }
3040
3041    memset(req.bytes, 0, sizeof(req.bytes));
3042    memset(res.bytes, 0, sizeof(res.bytes));
3043
3044    req.request.magic    = PROTOCOL_BINARY_REQ;
3045    req.request.opcode   = PROTOCOL_BINARY_CMD_BUCKET;
3046    req.request.keylen   = htons((uint16_t) bucket_len);
3047    req.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
3048    req.request.bodylen  = htonl(bucket_len);
3049
3050    if (mcs_io_write(fd, (const char *) req.bytes,
3051                     sizeof(req.bytes)) != sizeof(req.bytes) ||
3052        mcs_io_write(fd, behavior->bucket, bucket_len) == -1) {
3053        mcs_io_reset(fd);
3054
3055        if (settings.verbose > 1) {
3056            moxi_log_write("bucket failure during write (%d)\n",
3057                    bucket_len);
3058        }
3059
3060        return -1;
3061    }
3062
3063
3064    if (behavior->auth_timeout.tv_sec != 0 ||
3065        behavior->auth_timeout.tv_usec != 0) {
3066        timeout = &behavior->auth_timeout;
3067    }
3068
3069    mr = mcs_io_read(fd, &res.bytes, sizeof(res.bytes), timeout);
3070    if (mr == MCS_SUCCESS && res.response.magic == PROTOCOL_BINARY_RES) {
3071        char buf[300];
3072        int len;
3073
3074        res.response.status  = ntohs(res.response.status);
3075        res.response.keylen  = ntohs(res.response.keylen);
3076        res.response.bodylen = ntohl(res.response.bodylen);
3077
3078        /* Swallow whatever body comes. */
3079        len = res.response.bodylen;
3080        while (len > 0) {
3081            int amt = (len > (int) sizeof(buf) ? (int) sizeof(buf) : len);
3082
3083            mr = mcs_io_read(fd, buf, amt, timeout);
3084            if (mr != MCS_SUCCESS) {
3085                if (mr == MCS_TIMEOUT) {
3086                    return 1;
3087                }
3088
3089                return -1;
3090            }
3091
3092            len -= amt;
3093        }
3094
3095        /* The res status should be either... */
3096        /* - SUCCESS         - we got the bucket. */
3097        /* - AUTH_ERROR      - not allowed to use that bucket. */
3098        /* - UNKNOWN_COMMAND - bucket-unaware server. */
3099
3100        if (res.response.status == PROTOCOL_BINARY_RESPONSE_SUCCESS) {
3101            if (settings.verbose > 2) {
3102                moxi_log_write("bucket_downstream success, %s\n",
3103                        behavior->bucket);
3104            }
3105
3106            return 0;
3107        }
3108
3109        if (settings.verbose > 1) {
3110            moxi_log_write("bucket_downstream failure, %s (%x)\n",
3111                    behavior->bucket,
3112                    res.response.status);
3113        }
3114    }
3115
3116    if (mr == MCS_TIMEOUT) {
3117        return 1;
3118    }
3119
3120    return -1;
3121}
3122
3123int cproxy_max_retries(downstream *d) {
3124    return mcs_server_count(&d->mst) * 2;
3125}
3126
3127int downstream_conn_index(downstream *d, conn *c) {
3128    int nconns;
3129    int i;
3130
3131    cb_assert(d);
3132
3133    nconns = mcs_server_count(&d->mst);
3134    for (i = 0; i < nconns; i++) {
3135        if (d->downstream_conns[i] == c) {
3136            return i;
3137        }
3138    }
3139
3140    return -1;
3141}
3142
3143void cproxy_upstream_state_change(conn *c, enum conn_states next_state) {
3144    proxy_td *ptd;
3145
3146    cb_assert(c != NULL);
3147    ptd = c->extra;
3148    if (ptd != NULL) {
3149        if (c->state == conn_pause) {
3150            ptd->stats.stats.tot_upstream_unpaused++;
3151            c->cmd_unpaused = true;
3152        }
3153        if (next_state == conn_pause) {
3154            ptd->stats.stats.tot_upstream_paused++;
3155        }
3156
3157        if (next_state == conn_parse_cmd && c->cmd_arrive_time == 0) {
3158            c->cmd_unpaused = false;
3159            c->hit_local = false;
3160            c->cmd_arrive_time = usec_now();
3161        }
3162
3163        if (next_state == conn_closing || next_state == conn_new_cmd) {
3164            uint64_t arrive_time = c->cmd_arrive_time;
3165            if (c->cmd_unpaused && arrive_time != 0) {
3166                uint64_t latency = usec_now() - c->cmd_arrive_time;
3167
3168                if (c->hit_local) {
3169                    ptd->stats.stats.tot_local_cmd_time += latency;
3170                    ptd->stats.stats.tot_local_cmd_count++;
3171                }
3172
3173                ptd->stats.stats.tot_cmd_time += latency;
3174                ptd->stats.stats.tot_cmd_count++;
3175                c->cmd_arrive_time = 0;
3176            }
3177        }
3178    }
3179}
3180
3181/* ------------------------------------------------- */
3182
3183bool cproxy_on_connect_downstream_conn(conn *c) {
3184    int error;
3185    socklen_t errsz = sizeof(error);
3186    int k;
3187    downstream *d;
3188
3189    cb_assert(c != NULL);
3190    cb_assert(c->host_ident);
3191
3192    d = c->extra;
3193    cb_assert(d != NULL);
3194
3195    if (settings.verbose > 2) {
3196        moxi_log_write("%d: cproxy_on_connect_downstream_conn for %s\n",
3197                       c->sfd, c->host_ident);
3198    }
3199
3200    if (c->which == EV_TIMEOUT) {
3201        d->ptd->stats.stats.tot_downstream_connect_timeout++;
3202
3203        if (settings.verbose) {
3204            moxi_log_write("%d: connection timed out: %s",
3205                           c->sfd, c->host_ident);
3206        }
3207        goto cleanup;
3208    }
3209
3210    /* Check if the connection completed */
3211    if (getsockopt(c->sfd, SOL_SOCKET, SO_ERROR, (void *) &error,
3212                   &errsz) == -1) {
3213        if (settings.verbose) {
3214            moxi_log_write("%d: connect error: %s, %s",
3215                           c->sfd, c->host_ident, strerror(error));
3216        }
3217        goto cleanup;
3218    }
3219
3220    if (error) {
3221        if (settings.verbose) {
3222            moxi_log_write("%d: connect failed: %s, %s",
3223                           c->sfd, c->host_ident, strerror(error));
3224        }
3225        goto cleanup;
3226    }
3227
3228    k = downstream_conn_index(d, c);
3229    if (k >= 0) {
3230        if (downstream_connect_init(d, mcs_server_index(&d->mst, k),
3231                                    &d->behaviors_arr[k], c)) {
3232            /* We are connected to the server now */
3233            if (settings.verbose > 2) {
3234                moxi_log_write("%d: connected to: %s\n",
3235                               c->sfd, c->host_ident);
3236            }
3237
3238            conn_set_state(c, conn_pause);
3239            update_event(c, 0);
3240            cproxy_forward_or_error(d);
3241
3242            return true;
3243        }
3244    }
3245
3246cleanup:
3247    d->ptd->stats.stats.tot_downstream_connect_failed++;
3248
3249    k = delink_from_downstream_conns(c);
3250    if (k >= 0) {
3251        cb_assert(d->downstream_conns[k] == NULL);
3252
3253        d->downstream_conns[k] = NULL_CONN;
3254    }
3255
3256    conn_set_state(c, conn_closing);
3257    update_event(c, 0);
3258    cproxy_forward_or_error(d);
3259
3260    return false;
3261}
3262
3263void downstream_reserved_time_sample(proxy_stats_td *pstd, uint64_t duration) {
3264    if (pstd->downstream_reserved_time_htgram == NULL) {
3265        pstd->downstream_reserved_time_htgram =
3266            cproxy_create_timing_histogram();
3267    }
3268
3269    if (pstd->downstream_reserved_time_htgram != NULL) {
3270        htgram_incr(pstd->downstream_reserved_time_htgram, duration, 1);
3271    }
3272}
3273
3274void downstream_connect_time_sample(proxy_stats_td *pstd, uint64_t duration) {
3275    if (pstd->downstream_connect_time_htgram == NULL) {
3276        pstd->downstream_connect_time_htgram =
3277            cproxy_create_timing_histogram();
3278    }
3279
3280    if (pstd->downstream_connect_time_htgram != NULL) {
3281        htgram_incr(pstd->downstream_connect_time_htgram, duration, 1);
3282    }
3283}
3284
3285/* A histogram for tracking timings, such as for usec request timings. */
3286
3287HTGRAM_HANDLE cproxy_create_timing_histogram(void) {
3288    /* TODO: Make histogram bins more configurable one day. */
3289
3290    HTGRAM_HANDLE h1 = htgram_mk(2000, 100, 2.0, 20, NULL);
3291    HTGRAM_HANDLE h0 = htgram_mk(0, 100, 1.0, 20, h1);
3292
3293    return h0;
3294}
3295
3296zstored_downstream_conns *zstored_get_downstream_conns(LIBEVENT_THREAD *thread,
3297                                                       const char *host_ident) {
3298
3299    genhash_t *conn_hash;
3300    zstored_downstream_conns *conns;
3301
3302    cb_assert(thread);
3303    cb_assert(thread->base);
3304
3305    conn_hash = thread->conn_hash;
3306    cb_assert(conn_hash != NULL);
3307
3308    conns = genhash_find(conn_hash, host_ident);
3309    if (conns == NULL) {
3310        conns = calloc(1, sizeof(zstored_downstream_conns));
3311        if (conns != NULL) {
3312            conns->host_ident = strdup(host_ident);
3313            if (conns->host_ident != NULL) {
3314                genhash_store(conn_hash, conns->host_ident, conns);
3315            } else {
3316                free(conns);
3317                conns = NULL;
3318            }
3319        }
3320    }
3321
3322    return conns;
3323}
3324
3325void zstored_error_count(LIBEVENT_THREAD *thread,
3326                         const char *host_ident,
3327                         bool has_error) {
3328    zstored_downstream_conns *conns;
3329
3330    cb_assert(thread != NULL);
3331    cb_assert(host_ident != NULL);
3332
3333    conns = zstored_get_downstream_conns(thread, host_ident);
3334    if (conns != NULL) {
3335        if (has_error) {
3336            conns->error_count++;
3337            conns->error_time = msec_current_time;
3338        } else {
3339            conns->error_count = 0;
3340            conns->error_time = 0;
3341        }
3342
3343        if (settings.verbose > 2) {
3344            moxi_log_write("z_error, %s, %d, %d, %d, %d\n",
3345                           host_ident,
3346                           has_error,
3347                           conns->dc_acquired,
3348                           conns->error_count,
3349                           conns->error_time);
3350        }
3351
3352        if (has_error) {
3353            /* We reach here when a non-blocking connect() has failed */
3354            /* or when an acquired downstream conn had an error. */
3355            /* The downstream conn is just going to be closed */
3356            /* rather than be released back to the thread->conn_hash, */
3357            /* so update the dc_acquired here. */
3358
3359            if (conns->dc_acquired > 0) {
3360                conns->dc_acquired--;
3361            }
3362
3363            /* When zero downstream conns are available, wake up all */
3364            /* waiting downstreams so they can proceed (possibly by */
3365            /* just returning ERROR's to upstream clients). */
3366
3367            if (conns->dc_acquired <= 0 && conns->dc == NULL) {
3368                downstream *head = conns->downstream_waiting_head;
3369
3370                conns->downstream_waiting_head = NULL;
3371                conns->downstream_waiting_tail = NULL;
3372
3373                while (head != NULL) {
3374                    downstream *prev;
3375
3376                    head->ptd->stats.stats.tot_downstream_waiting_errors++;
3377                    head->ptd->stats.stats.tot_downstream_conn_queue_remove++;
3378
3379                    prev = head;
3380                    head = head->next_waiting;
3381                    prev->next_waiting = NULL;
3382
3383                    cproxy_forward_or_error(prev);
3384                }
3385            }
3386        }
3387    }
3388}
3389
3390conn *zstored_acquire_downstream_conn(downstream *d,
3391                                      LIBEVENT_THREAD *thread,
3392                                      mcs_server_st *msst,
3393                                      proxy_behavior *behavior,
3394                                      bool *downstream_conn_max_reached) {
3395    enum protocol downstream_protocol;
3396    char *host_ident;
3397    conn *dc;
3398    zstored_downstream_conns *conns;
3399
3400    cb_assert(d);
3401    cb_assert(d->ptd);
3402    cb_assert(d->ptd->downstream_released != d); /* Should not be in free list. */
3403    cb_assert(thread);
3404    cb_assert(msst);
3405    cb_assert(behavior);
3406    cb_assert(mcs_server_st_hostname(msst) != NULL);
3407    cb_assert(mcs_server_st_port(msst) > 0);
3408    cb_assert(mcs_server_st_fd(msst) == -1);
3409
3410    *downstream_conn_max_reached = false;
3411
3412    d->ptd->stats.stats.tot_downstream_conn_acquired++;
3413
3414    downstream_protocol =
3415        d->upstream_conn->peer_protocol ?
3416        d->upstream_conn->peer_protocol :
3417        behavior->downstream_protocol;
3418
3419    host_ident = mcs_server_st_ident(msst, IS_ASCII(downstream_protocol));
3420    conns = zstored_get_downstream_conns(thread, host_ident);
3421    if (conns != NULL) {
3422        dc = conns->dc;
3423        if (dc != NULL) {
3424            cb_assert(dc->thread == thread);
3425            cb_assert(strcmp(host_ident, dc->host_ident) == 0);
3426
3427            conns->dc_acquired++;
3428            conns->dc = dc->next;
3429            dc->next = NULL;
3430
3431            cb_assert(dc->extra == NULL);
3432            dc->extra = d;
3433
3434            return dc;
3435        }
3436
3437        if (behavior->connect_max_errors > 0 &&
3438            behavior->connect_max_errors < conns->error_count) {
3439            rel_time_t msecs_since_error =
3440                (rel_time_t)(msec_current_time - conns->error_time);
3441
3442            if (settings.verbose > 2) {
3443                moxi_log_write("zacquire_dc, %s, %d, %"PRIu64", (%d)\n",
3444                               host_ident,
3445                               conns->error_count,
3446                               (uint64_t)conns->error_time,
3447                               msecs_since_error);
3448            }
3449
3450            if ((behavior->cycle > 0) &&
3451                (behavior->connect_retry_interval > msecs_since_error)) {
3452                d->ptd->stats.stats.tot_downstream_connect_interval++;
3453
3454                return NULL;
3455            }
3456        }
3457
3458        if (behavior->downstream_conn_max > 0 &&
3459            behavior->downstream_conn_max <= conns->dc_acquired) {
3460            d->ptd->stats.stats.tot_downstream_connect_max_reached++;
3461
3462            *downstream_conn_max_reached = true;
3463
3464            return NULL;
3465        }
3466    }
3467
3468    dc = cproxy_connect_downstream_conn(d, thread, msst, behavior);
3469    if (dc != NULL) {
3470        cb_assert(dc->host_ident == NULL);
3471        dc->host_ident = strdup(host_ident);
3472        if (conns != NULL) {
3473            conns->dc_acquired++;
3474
3475            if (dc->state != conn_connecting) {
3476                conns->error_count = 0;
3477                conns->error_time = 0;
3478            }
3479        }
3480    } else {
3481        if (conns != NULL) {
3482            conns->error_count++;
3483            conns->error_time = msec_current_time;
3484        }
3485    }
3486
3487    return dc;
3488}
3489
3490/* new fn by jsh */
3491void zstored_release_downstream_conn(conn *dc, bool closing) {
3492    bool keep;
3493    zstored_downstream_conns *conns;
3494    downstream *d;
3495
3496    cb_assert(dc != NULL);
3497    if (dc == NULL_CONN) {
3498        return;
3499    }
3500
3501    d = dc->extra;
3502    cb_assert(d != NULL);
3503
3504    d->ptd->stats.stats.tot_downstream_conn_released++;
3505
3506    if (settings.verbose > 2) {
3507        moxi_log_write("%d: release_downstream_conn, %s, (%d)"
3508                       " upstream %d\n",
3509                       dc->sfd, state_text(dc->state), closing,
3510                       (d->upstream_conn != NULL ?
3511                        d->upstream_conn->sfd : -1));
3512    }
3513
3514    cb_assert(dc->next == NULL);
3515    cb_assert(dc->thread != NULL);
3516    cb_assert(dc->host_ident != NULL);
3517
3518    keep = dc->state == conn_pause;
3519    dc->extra = NULL;
3520    conns = zstored_get_downstream_conns(dc->thread, dc->host_ident);
3521    if (conns != NULL) {
3522        if (conns->dc_acquired > 0) {
3523            conns->dc_acquired--;
3524        }
3525
3526        if (keep) {
3527            downstream *d_head;
3528            cb_assert(dc->next == NULL);
3529            dc->next = conns->dc;
3530            conns->dc = dc;
3531
3532            /* Since one downstream conn was released, process a single */
3533            /* waiting downstream, if any. */
3534
3535            d_head = conns->downstream_waiting_head;
3536            if (d_head != NULL) {
3537                cb_assert(conns->downstream_waiting_tail != NULL);
3538
3539                conns->downstream_waiting_head =
3540                    conns->downstream_waiting_head->next_waiting;
3541                if (conns->downstream_waiting_head == NULL) {
3542                    conns->downstream_waiting_tail = NULL;
3543                }
3544                d_head->next_waiting = NULL;
3545
3546                d_head->ptd->stats.stats.tot_downstream_conn_queue_remove++;
3547
3548                cproxy_clear_timeout(d_head);
3549
3550                cproxy_forward_or_error(d_head);
3551            }
3552
3553            return;
3554        }
3555    }
3556
3557    cproxy_close_conn(dc);
3558}
3559
3560/* Returns true if the downstream was found on any */
3561/* conns->downstream_waiting_head/tail queues and was removed. */
3562
3563bool zstored_downstream_waiting_remove(downstream *d) {
3564    bool found = false;
3565    int i;
3566    int n;
3567    LIBEVENT_THREAD *thread = thread_by_index(thread_index(cb_thread_self()));
3568    cb_assert(thread != NULL);
3569
3570    n = mcs_server_count(&d->mst);
3571    for (i = 0; i < n; i++) {
3572        char *host_ident;
3573        zstored_downstream_conns *conns;
3574        mcs_server_st *msst = mcs_server_index(&d->mst, i);
3575
3576        enum protocol downstream_protocol =
3577            d->upstream_conn && d->upstream_conn->peer_protocol ?
3578            d->upstream_conn->peer_protocol :
3579            d->behaviors_arr[i].downstream_protocol;
3580
3581        cb_assert(IS_PROXY(downstream_protocol));
3582
3583        host_ident = mcs_server_st_ident(msst, IS_ASCII(downstream_protocol));
3584        conns = zstored_get_downstream_conns(thread, host_ident);
3585
3586        if (conns != NULL) {
3587            /* Linked-list removal, on the next_waiting pointer, */
3588            /* and keep head and tail pointers updated. */
3589
3590            downstream *prev = NULL;
3591            downstream *curr = conns->downstream_waiting_head;
3592
3593            while (curr != NULL) {
3594                if (curr == d) {
3595                    found = true;
3596
3597                    if (conns->downstream_waiting_head == curr) {
3598                        cb_assert(conns->downstream_waiting_tail != NULL);
3599                        conns->downstream_waiting_head = curr->next_waiting;
3600                    }
3601
3602                    if (conns->downstream_waiting_tail == curr) {
3603                        conns->downstream_waiting_tail = prev;
3604                    }
3605
3606                    if (prev != NULL) {
3607                        prev->next_waiting = curr->next_waiting;
3608                    }
3609
3610                    curr->next_waiting = NULL;
3611
3612                    d->ptd->stats.stats.tot_downstream_conn_queue_remove++;
3613
3614                    break;
3615                }
3616
3617                prev = curr;
3618                curr = curr->next_waiting;
3619            }
3620        }
3621    }
3622
3623    return found;
3624}
3625
3626bool zstored_downstream_waiting_add(downstream *d, LIBEVENT_THREAD *thread,
3627                                    mcs_server_st *msst,
3628                                    proxy_behavior *behavior) {
3629    enum protocol downstream_protocol;
3630    char *host_ident;
3631    zstored_downstream_conns *conns;
3632
3633    cb_assert(thread != NULL);
3634    cb_assert(d != NULL);
3635    cb_assert(d->upstream_conn != NULL);
3636    cb_assert(d->next_waiting == NULL);
3637
3638    downstream_protocol = d->upstream_conn->peer_protocol ?
3639        d->upstream_conn->peer_protocol :
3640        behavior->downstream_protocol;
3641
3642    host_ident = mcs_server_st_ident(msst, IS_ASCII(downstream_protocol));
3643    conns = zstored_get_downstream_conns(thread, host_ident);
3644    if (conns != NULL) {
3645        cb_assert(conns->dc == NULL);
3646
3647        if (conns->downstream_waiting_head == NULL) {
3648            cb_assert(conns->downstream_waiting_tail == NULL);
3649            conns->downstream_waiting_head = d;
3650        }
3651        if (conns->downstream_waiting_tail != NULL) {
3652            cb_assert(conns->downstream_waiting_tail->next_waiting == NULL);
3653            conns->downstream_waiting_tail->next_waiting = d;
3654        }
3655        conns->downstream_waiting_tail = d;
3656
3657        d->ptd->stats.stats.tot_downstream_conn_queue_add++;
3658
3659        return true;
3660    }
3661
3662    return false;
3663}
3664
3665/* Find an appropriate proxy struct or NULL. */
3666
3667proxy *cproxy_find_proxy_by_auth(proxy_main *m,
3668                                 const char *usr,
3669                                 const char *pwd) {
3670    proxy *found = NULL;
3671    proxy *p;
3672
3673    cb_mutex_enter(&m->proxy_main_lock);
3674
3675    for (p = m->proxy_head; p != NULL && found == NULL; p = p->next) {
3676        cb_mutex_enter(&p->proxy_lock);
3677        if (strcmp(p->behavior_pool.base.usr, usr) == 0 &&
3678            strcmp(p->behavior_pool.base.pwd, pwd) == 0) {
3679            found = p;
3680        }
3681        cb_mutex_exit(&p->proxy_lock);
3682    }
3683
3684    cb_mutex_exit(&m->proxy_main_lock);
3685
3686    return found;
3687}
3688
3689int cproxy_num_active_proxies(proxy_main *m) {
3690    int n = 0;
3691    proxy *p;
3692
3693    cb_mutex_enter(&m->proxy_main_lock);
3694
3695    for (p = m->proxy_head; p != NULL; p = p->next) {
3696        cb_mutex_enter(&p->proxy_lock);
3697        if (p->name != NULL &&
3698            p->config != NULL &&
3699            p->config[0] != '\0') {
3700            n++;
3701        }
3702        cb_mutex_exit(&p->proxy_lock);
3703    }
3704
3705    cb_mutex_exit(&m->proxy_main_lock);
3706
3707    return n;
3708}
3709
3710static
3711void diag_single_connection(FILE *out, conn *c) {
3712    fprintf(out, "%p (%d), ev: 0x%04x, state: 0x%x, substate: 0x%x - %s\n",
3713            (void *)c, c->sfd, c->ev_flags,
3714            c->state, c->substate,
3715            c->update_diag ? c->update_diag : "(none)");
3716}
3717
3718static
3719void diag_connections(FILE *out, conn *head, int indent) {
3720    static char *blank = "";
3721    for (; head != NULL; head = head->next) {
3722        fprintf(out, "%*s" "connection: ", indent, blank);
3723        diag_single_connection(out, head);
3724    }
3725}
3726
3727static
3728void diag_single_downstream(FILE *out, downstream *d, int indent) {
3729    static char *blank = "";
3730    int n;
3731    int i;
3732
3733    conn *upstream = d->upstream_conn;
3734    fprintf(out, "%*s" "upstream: ", indent, blank);
3735    if (upstream) {
3736        diag_single_connection(out, upstream);
3737    } else {
3738        fprintf(out, "none\n");
3739    }
3740
3741    fprintf(out, "%*s" "downstream_used: %d\n", indent, blank, d->downstream_used);
3742    fprintf(out, "%*s" "downstream_used_start: %d\n", indent, blank, d->downstream_used_start);
3743    fprintf(out, "%*s" "downstream_conns:\n", indent, blank);
3744
3745    n = mcs_server_count(&d->mst);
3746    for (i = 0; i < n; i++) {
3747        if (d->downstream_conns[i] == NULL)
3748            continue;
3749        fprintf(out, "%*s", indent+2, blank);
3750        diag_single_connection(out, d->downstream_conns[i]);
3751    }
3752}
3753
3754static
3755void diag_downstream_chain(FILE *out, downstream *head, int indent) {
3756    for (; head != NULL; head = head->next) {
3757        diag_single_downstream(out, head, indent);
3758    }
3759}
3760
3761/* this is supposed to be called from gdb when other threads are suspended */
3762void connections_diag(FILE *out);
3763
3764void connections_diag(FILE *out) {
3765    extern proxy_main *diag_last_proxy_main;
3766
3767    proxy *cur_proxy;
3768    proxy_main *m = diag_last_proxy_main;
3769    if (!m) {
3770        fputs("no proxy_main!\n", out);
3771        return;
3772    }
3773
3774    for (cur_proxy = m->proxy_head; cur_proxy != NULL ; cur_proxy = cur_proxy->next) {
3775        int ti;
3776        fprintf(out, "proxy: name='%s', port=%d, cfg=%s (%u)\n",
3777                cur_proxy->name ? cur_proxy->name : "(null)",
3778                cur_proxy->port,
3779                cur_proxy->config, cur_proxy->config_ver);
3780        for (ti = 0; ti < cur_proxy->thread_data_num; ti++) {
3781            proxy_td *td = cur_proxy->thread_data + ti;
3782            fprintf(out, "  thread:%d\n", ti);
3783            fprintf(out, "    waiting_any_downstream:\n");
3784            diag_connections(out, td->waiting_any_downstream_head, 6);
3785
3786            fprintf(out, "    downstream_reserved:\n");
3787            diag_downstream_chain(out, td->downstream_reserved, 6);
3788            fprintf(out, "    downstream_released:\n");
3789            diag_downstream_chain(out, td->downstream_released, 6);
3790        }
3791    }
3792}
3793
3794bool cproxy_front_cache_key(proxy_td *ptd, char *key, int key_len) {
3795    return (key != NULL &&
3796            key_len > 0 &&
3797            ptd->behavior_pool.base.front_cache_lifespan > 0 &&
3798            matcher_check(&ptd->proxy->front_cache_matcher, key, key_len, false) == true &&
3799            matcher_check(&ptd->proxy->front_cache_unmatcher, key, key_len, false) == false);
3800}
3801
3802void cproxy_front_cache_delete(proxy_td *ptd, char *key, int key_len) {
3803    if (cproxy_front_cache_key(ptd, key, key_len) == true) {
3804        mcache_delete(&ptd->proxy->front_cache, key, key_len);
3805
3806        if (settings.verbose > 1) {
3807            moxi_log_write("front_cache del %s\n", key);
3808        }
3809    }
3810}
3811