xref: /5.5.2/moxi/src/cproxy.c (revision d0366df5)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3#include "src/config.h"
4#include <stdio.h>
5#include <stdlib.h>
6#include <string.h>
7#include <errno.h>
8#include <platform/cbassert.h>
9#include <fcntl.h>
10#include <math.h>
11#include "memcached.h"
12#include "cproxy.h"
13#include "work.h"
14#include "log.h"
15
16#ifndef MOXI_BLOCKING_CONNECT
17#define MOXI_BLOCKING_CONNECT false
18#endif
19
20/* Internal forward declarations. */
21
22downstream *downstream_list_remove(downstream *head, downstream *d);
23downstream *downstream_list_waiting_remove(downstream *head,
24                                           downstream **tail,
25                                           downstream *d);
26
27static void downstream_timeout(evutil_socket_t fd,
28                        const short which,
29                        void *arg);
30static void wait_queue_timeout(evutil_socket_t fd,
31                        const short which,
32                        void *arg);
33
34conn *conn_list_remove(conn *head, conn **tail,
35                       conn *c, bool *found);
36
37bool is_compatible_request(conn *existing, conn *candidate);
38
39void propagate_error_msg(downstream *d, char *ascii_msg,
40                         protocol_binary_response_status binary_status);
41
42void downstream_reserved_time_sample(proxy_stats_td *ptds, uint64_t duration);
43void downstream_connect_time_sample(proxy_stats_td *ptds, uint64_t duration);
44
45bool downstream_connect_init(downstream *d, mcs_server_st *msst,
46                             proxy_behavior *behavior, conn *c);
47
48int init_mcs_st(mcs_st *mst, char *config,
49                const char *default_usr,
50                const char *default_pwd,
51                const char *opts);
52
53bool cproxy_on_connect_downstream_conn(conn *c);
54
55conn *zstored_acquire_downstream_conn(downstream *d,
56                                      LIBEVENT_THREAD *thread,
57                                      mcs_server_st *msst,
58                                      proxy_behavior *behavior,
59                                      bool *downstream_conn_max_reached);
60
61void zstored_release_downstream_conn(conn *dc, bool closing);
62
63void zstored_error_count(LIBEVENT_THREAD *thread,
64                         const char *host_ident,
65                         bool has_error);
66
67bool zstored_downstream_waiting_add(downstream *d, LIBEVENT_THREAD *thread,
68                                    mcs_server_st *msst,
69                                    proxy_behavior *behavior);
70
71bool zstored_downstream_waiting_remove(downstream *d);
72
73typedef struct {
74    conn      *dc;          /* Linked-list of available downstream conns. */
75    uint32_t   dc_acquired; /* Count of acquired (in-use) downstream conns. */
76    char      *host_ident;
77    uint32_t   error_count;
78    uint64_t   error_time;
79
80    /* Head & tail of singly linked-list/queue, using */
81    /* downstream->next_waiting pointers, where we've reached */
82    /* downstream_conn_max, so there are waiting downstreams. */
83
84    downstream *downstream_waiting_head;
85    downstream *downstream_waiting_tail;
86} zstored_downstream_conns;
87
88zstored_downstream_conns *zstored_get_downstream_conns(LIBEVENT_THREAD *thread,
89                                                       const char *host_ident);
90
91bool cproxy_forward_or_error(downstream *d);
92
93int delink_from_downstream_conns(conn *c);
94
95int cproxy_num_active_proxies(proxy_main *m);
96
97/* Function tables. */
98
99conn_funcs cproxy_listen_funcs = {
100    .conn_init                   = cproxy_init_upstream_conn,
101    .conn_close                  = NULL,
102    .conn_connect                = NULL,
103    .conn_process_ascii_command  = NULL,
104    .conn_process_binary_command = NULL,
105    .conn_complete_nread_ascii   = NULL,
106    .conn_complete_nread_binary  = NULL,
107    .conn_pause                  = NULL,
108    .conn_realtime               = NULL,
109    .conn_state_change           = NULL,
110    .conn_binary_command_magic   = 0
111};
112
113conn_funcs cproxy_upstream_funcs = {
114    .conn_init                   = NULL,
115    .conn_close                  = cproxy_on_close_upstream_conn,
116    .conn_connect                = NULL,
117    .conn_process_ascii_command  = cproxy_process_upstream_ascii,
118    .conn_process_binary_command = cproxy_process_upstream_binary,
119    .conn_complete_nread_ascii   = cproxy_process_upstream_ascii_nread,
120    .conn_complete_nread_binary  = cproxy_process_upstream_binary_nread,
121    .conn_pause                  = NULL,
122    .conn_realtime               = cproxy_realtime,
123    .conn_state_change           = cproxy_upstream_state_change,
124    .conn_binary_command_magic   = PROTOCOL_BINARY_REQ
125};
126
127conn_funcs cproxy_downstream_funcs = {
128    .conn_init                   = cproxy_init_downstream_conn,
129    .conn_close                  = cproxy_on_close_downstream_conn,
130    .conn_connect                = cproxy_on_connect_downstream_conn,
131    .conn_process_ascii_command  = cproxy_process_downstream_ascii,
132    .conn_process_binary_command = cproxy_process_downstream_binary,
133    .conn_complete_nread_ascii   = cproxy_process_downstream_ascii_nread,
134    .conn_complete_nread_binary  = cproxy_process_downstream_binary_nread,
135    .conn_pause                  = cproxy_on_pause_downstream_conn,
136    .conn_realtime               = cproxy_realtime,
137    .conn_state_change           = NULL,
138    .conn_binary_command_magic   = PROTOCOL_BINARY_RES
139};
140
141/* Main function to create a proxy struct.
142 */
143proxy *cproxy_create(proxy_main *main,
144                     char    *name,
145                     int      port,
146                     char    *config,
147                     uint32_t config_ver,
148                     proxy_behavior_pool *behavior_pool,
149                     int nthreads) {
150    if (settings.verbose > 1) {
151        moxi_log_write("cproxy_create on port %d, name %s, config %s\n",
152                       port, name, config);
153    }
154
155    cb_assert(name != NULL);
156    cb_assert(port > 0 || settings.socketpath != NULL);
157    cb_assert(config != NULL);
158    cb_assert(behavior_pool);
159    cb_assert(nthreads > 1); /* Main thread + at least one worker. */
160    cb_assert(nthreads == settings.num_threads);
161
162    proxy *p = (proxy *) calloc(1, sizeof(proxy));
163    if (p != NULL) {
164        p->main       = main;
165        p->name       = trimstrdup(name);
166        p->port       = port;
167        p->config     = trimstrdup(config);
168        p->config_ver = config_ver;
169
170        p->behavior_pool.base = behavior_pool->base;
171        p->behavior_pool.num  = behavior_pool->num;
172        p->behavior_pool.arr  = cproxy_copy_behaviors(behavior_pool->num,
173                                                      behavior_pool->arr);
174
175        p->listening        = 0;
176        p->listening_failed = 0;
177
178        p->next = NULL;
179
180        cb_mutex_initialize(&p->proxy_lock);
181
182        mcache_init(&p->front_cache, true, &mcache_item_funcs, true);
183        matcher_init(&p->front_cache_matcher, true);
184        matcher_init(&p->front_cache_unmatcher, true);
185
186        matcher_init(&p->optimize_set_matcher, true);
187
188        if (behavior_pool->base.front_cache_max > 0 &&
189            behavior_pool->base.front_cache_lifespan > 0) {
190            mcache_start(&p->front_cache,
191                         behavior_pool->base.front_cache_max);
192
193            if (strlen(behavior_pool->base.front_cache_spec) > 0) {
194                matcher_start(&p->front_cache_matcher,
195                              behavior_pool->base.front_cache_spec);
196            }
197
198            if (strlen(behavior_pool->base.front_cache_unspec) > 0) {
199                matcher_start(&p->front_cache_unmatcher,
200                              behavior_pool->base.front_cache_unspec);
201            }
202        }
203
204        if (strlen(behavior_pool->base.optimize_set) > 0) {
205            matcher_start(&p->optimize_set_matcher,
206                          behavior_pool->base.optimize_set);
207        }
208
209        p->thread_data_num = nthreads;
210        p->thread_data = (proxy_td *) calloc(p->thread_data_num,
211                                             sizeof(proxy_td));
212        if (p->thread_data != NULL &&
213            p->name != NULL &&
214            p->config != NULL &&
215            p->behavior_pool.arr != NULL) {
216            /* We start at 1, because thread[0] is the main listen/accept */
217            /* thread, and not a true worker thread.  Too lazy to save */
218            /* the wasted thread[0] slot memory. */
219            int i;
220            for (i = 1; i < p->thread_data_num; i++) {
221                proxy_td *ptd = &p->thread_data[i];
222                ptd->proxy = p;
223
224                ptd->config     = strdup(p->config);
225                ptd->config_ver = p->config_ver;
226
227                ptd->behavior_pool.base = behavior_pool->base;
228                ptd->behavior_pool.num  = behavior_pool->num;
229                ptd->behavior_pool.arr =
230                    cproxy_copy_behaviors(behavior_pool->num,
231                                          behavior_pool->arr);
232
233                ptd->waiting_any_downstream_head = NULL;
234                ptd->waiting_any_downstream_tail = NULL;
235                ptd->downstream_reserved = NULL;
236                ptd->downstream_released = NULL;
237                ptd->downstream_tot = 0;
238                ptd->downstream_num = 0;
239                ptd->downstream_max = behavior_pool->base.downstream_max;
240                ptd->downstream_assigns = 0;
241                ptd->timeout_tv.tv_sec = 0;
242                ptd->timeout_tv.tv_usec = 0;
243                ptd->stats.stats.num_upstream = 0;
244                ptd->stats.stats.num_downstream_conn = 0;
245
246                cproxy_reset_stats_td(&ptd->stats);
247
248                mcache_init(&ptd->key_stats, true,
249                            &mcache_key_stats_funcs, false);
250                matcher_init(&ptd->key_stats_matcher, false);
251                matcher_init(&ptd->key_stats_unmatcher, false);
252
253                if (behavior_pool->base.key_stats_max > 0 &&
254                    behavior_pool->base.key_stats_lifespan > 0) {
255                    mcache_start(&ptd->key_stats,
256                                 behavior_pool->base.key_stats_max);
257
258                    if (strlen(behavior_pool->base.key_stats_spec) > 0) {
259                        matcher_start(&ptd->key_stats_matcher,
260                                      behavior_pool->base.key_stats_spec);
261                    }
262
263                    if (strlen(behavior_pool->base.key_stats_unspec) > 0) {
264                        matcher_start(&ptd->key_stats_unmatcher,
265                                      behavior_pool->base.key_stats_unspec);
266                    }
267                }
268            }
269
270            return p;
271        }
272
273        free(p->name);
274        free(p->config);
275        free(p->behavior_pool.arr);
276        free(p->thread_data);
277        free(p);
278    }
279
280    return NULL;
281}
282
283/* Must be called on the main listener thread.
284 */
285int cproxy_listen(proxy *p) {
286    cb_assert(p != NULL);
287    cb_assert(is_listen_thread());
288
289    if (settings.verbose > 1) {
290        moxi_log_write("cproxy_listen on port %d, downstream %s\n",
291                p->port, p->config);
292    }
293
294    /* Idempotent, remembers if it already created listening socket(s). */
295
296    if (p->listening == 0) {
297        int listening;
298        enum protocol listen_protocol = negotiating_proxy_prot;
299
300        if (IS_ASCII(settings.binding_protocol)) {
301            listen_protocol = proxy_upstream_ascii_prot;
302        }
303        if (IS_BINARY(settings.binding_protocol)) {
304            listen_protocol = proxy_upstream_binary_prot;
305        }
306
307        listening = cproxy_listen_port(p->port, listen_protocol,
308                                       tcp_transport,
309                                       p,
310                                       &cproxy_listen_funcs);
311        if (listening > 0) {
312            p->listening += listening;
313        } else {
314            p->listening_failed++;
315
316            if (settings.enable_mcmux_mode && settings.socketpath) {
317#ifdef HAVE_SYS_UN_H
318                moxi_log_write("error: could not access UNIX socket: %s\n",
319                               settings.socketpath);
320                if (ml->log_mode != ERRORLOG_STDERR) {
321                    fprintf(stderr, "error: could not access UNIX socket: %s\n",
322                            settings.socketpath);
323                }
324                exit(EXIT_FAILURE);
325#endif
326            }
327
328            moxi_log_write("ERROR: could not listen on port %d. "
329                           "Please use -Z port_listen=PORT_NUM "
330                           "to specify a different port number.\n", p->port);
331            if (ml->log_mode != ERRORLOG_STDERR) {
332                fprintf(stderr, "ERROR: could not listen on port %d. "
333                        "Please use -Z port_listen=PORT_NUM "
334                        "to specify a different port number.\n", p->port);
335            }
336            exit(EXIT_FAILURE);
337        }
338    }
339
340    return (int)p->listening;
341}
342
343int cproxy_listen_port(int port,
344                       enum protocol protocol,
345                       enum network_transport transport,
346                       void       *conn_extra,
347                       conn_funcs *funcs) {
348
349    int   listening;
350    conn *listen_conn_orig;
351    conn *x;
352
353    cb_assert(port > 0 || settings.socketpath != NULL);
354    cb_assert(conn_extra);
355    cb_assert(funcs);
356    cb_assert(is_listen_thread());
357
358    listening = 0;
359    listen_conn_orig = listen_conn;
360
361    x = listen_conn_orig;
362    while (x != NULL) {
363        if (x->extra != NULL && x->funcs == funcs) {
364            struct sockaddr_in s_in;
365            socklen_t sin_len = sizeof(s_in);
366            memset(&s_in, 0, sizeof(s_in));
367
368            if (getsockname(x->sfd, (struct sockaddr *) &s_in, &sin_len) == 0) {
369                int x_port = ntohs(s_in.sin_port);
370                if (x_port == port) {
371                    if (settings.verbose > 1) {
372                        moxi_log_write(
373                                "<%d cproxy listening reusing listener on port %d\n",
374                                x->sfd, port);
375                    }
376
377                    listening++;
378                }
379            }
380        }
381
382        x = x->next;
383    }
384
385    if (listening > 0) {
386        /* If we're already listening on the required port, then */
387        /* we don't need to start a new server_socket().  This happens */
388        /* in the multi-bucket case with binary protocol buckets. */
389        /* There will be multiple proxy struct's (one per bucket), but */
390        /* only one proxy struct will actually be pointed at by a */
391        /* listening conn->extra (usually 11211). */
392
393        /* TODO: Add a refcount to handle shutdown properly? */
394
395        return listening;
396    }
397#ifdef HAVE_SYS_UN_H
398    if (settings.socketpath ?
399        (server_socket_unix(settings.socketpath, settings.access) == 0) :
400        (server_socket(port, transport, NULL) == 0)) {
401#else
402    if (server_socket(port, transport, NULL) == 0) {
403#endif
404        conn *c;
405        cb_assert(listen_conn != NULL);
406
407        /* The listen_conn global list is changed by server_socket(), */
408        /* which adds a new listening conn on port for each bindable */
409        /* host address. */
410
411        /* For example, after the call to server_socket(), there */
412        /* might be two new listening conn's -- one for localhost, */
413        /* another for 127.0.0.1. */
414
415        c = listen_conn;
416        while (c != NULL &&
417               c != listen_conn_orig) {
418            if (settings.verbose > 1) {
419                moxi_log_write(
420                        "<%d cproxy listening on port %d\n",
421                        c->sfd, port);
422            }
423
424            listening++;
425
426            /* TODO: Listening conn's never seem to close, */
427            /*       but need to handle cleanup if they do, */
428            /*       such as if we handle graceful shutdown one day. */
429
430            c->extra = conn_extra;
431            c->funcs = funcs;
432            c->protocol = protocol;
433            c = c->next;
434        }
435    }
436
437    return listening;
438}
439
440/* Finds the proxy_td associated with a worker thread.
441 */
442proxy_td *cproxy_find_thread_data(proxy *p, cb_thread_t thread_id) {
443    if (p != NULL) {
444        int i = thread_index(thread_id);
445
446        /* 0 is the main listen thread, not a worker thread. */
447        cb_assert(i > 0);
448        cb_assert(i < p->thread_data_num);
449
450        if (i > 0 && i < p->thread_data_num) {
451            return &p->thread_data[i];
452        }
453    }
454
455    return NULL;
456}
457
458bool cproxy_init_upstream_conn(conn *c) {
459    char *default_name;
460    proxy *p;
461    int n;
462    proxy_td *ptd;
463
464    cb_assert(c != NULL);
465
466    /* We're called once per client/upstream conn early in its */
467    /* lifecycle, on the worker thread, so it's a good place */
468    /* to record the proxy_td into the conn->extra. */
469
470    cb_assert(!is_listen_thread());
471
472    p = c->extra;
473    cb_assert(p != NULL);
474    cb_assert(p->main != NULL);
475
476    n = cproxy_num_active_proxies(p->main);
477    if (n <= 0) {
478        if (settings.verbose > 2) {
479            moxi_log_write("<%d disallowing upstream conn due to no buckets\n",
480                           c->sfd);
481        }
482
483        return false;
484    }
485
486    ptd = cproxy_find_thread_data(p, cb_thread_self());
487    cb_assert(ptd != NULL);
488
489    /* Reassign the client/upstream conn to a different bucket */
490    /* if the default_bucket_name isn't the special FIRST_BUCKET */
491    /* value. */
492
493    default_name = ptd->behavior_pool.base.default_bucket_name;
494    if (strcmp(default_name, FIRST_BUCKET) != 0) {
495        proxy *default_proxy;
496        if (settings.verbose > 2) {
497            moxi_log_write("<%d assigning to default bucket: %s\n",
498                           c->sfd, default_name);
499        }
500
501        default_proxy =
502            cproxy_find_proxy_by_auth(p->main, default_name, "");
503
504        /* If the ostensible default bucket is missing (possibly deleted), */
505        /* assign the client/upstream conn to the NULL BUCKET. */
506
507        if (default_proxy == NULL) {
508            default_proxy =
509                cproxy_find_proxy_by_auth(p->main, NULL_BUCKET, "");
510
511            if (settings.verbose > 2) {
512                moxi_log_write("<%d assigning to null bucket, "
513                               "default bucket missing: %s\n",
514                               c->sfd, default_name);
515            }
516        }
517
518        if (default_proxy != NULL) {
519            proxy_td *default_ptd =
520                cproxy_find_thread_data(default_proxy, cb_thread_self());
521            if (default_ptd != NULL) {
522                ptd = default_ptd;
523            }
524        } else {
525            if (settings.verbose > 2) {
526                moxi_log_write("<%d assigning to first bucket, "
527                               "missing default/null bucket: %s\n",
528                               c->sfd, default_name);
529            }
530        }
531    } else {
532        if (settings.verbose > 2) {
533            moxi_log_write("<%d assigning to first bucket\n", c->sfd);
534        }
535    }
536
537    ptd->stats.stats.num_upstream++;
538    ptd->stats.stats.tot_upstream++;
539
540    c->extra = ptd;
541    c->funcs = &cproxy_upstream_funcs;
542
543    return true;
544}
545
546bool cproxy_init_downstream_conn(conn *c) {
547    downstream *d = c->extra;
548    cb_assert(d != NULL);
549
550    d->ptd->stats.stats.num_downstream_conn++;
551    d->ptd->stats.stats.tot_downstream_conn++;
552
553    return true;
554}
555
556void cproxy_on_close_upstream_conn(conn *c) {
557    downstream *d;
558    proxy_td *ptd;
559
560    cb_assert(c != NULL);
561
562    if (settings.verbose > 2) {
563        moxi_log_write("<%d cproxy_on_close_upstream_conn\n", c->sfd);
564    }
565
566    ptd = c->extra;
567    cb_assert(ptd != NULL);
568    c->extra = NULL;
569
570    if (ptd->stats.stats.num_upstream > 0) {
571        ptd->stats.stats.num_upstream--;
572    }
573
574    /* Delink from any reserved downstream. */
575
576    for (d = ptd->downstream_reserved; d != NULL; d = d->next) {
577        bool found = false;
578
579        d->upstream_conn = conn_list_remove(d->upstream_conn, NULL,
580                                            c, &found);
581        if (d->upstream_conn == NULL) {
582            d->upstream_suffix = NULL;
583            d->upstream_suffix_len = 0;
584            d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
585            d->upstream_retry = 0;
586            d->target_host_ident = NULL;
587
588            /* Don't need to do anything else, as we'll now just */
589            /* read and drop any remaining inflight downstream replies. */
590            /* Eventually, the downstream will be released. */
591        }
592
593        /* If the downstream was reserved for this upstream conn, */
594        /* also clear the upstream from any multiget de-duplication */
595        /* tracking structures. */
596
597        if (found) {
598            int n, i;
599            if (d->multiget != NULL) {
600                genhash_iter(d->multiget, multiget_remove_upstream, c);
601            }
602
603            /* The downstream conn's might have iov's that */
604            /* point to the upstream conn's buffers.  Also, the */
605            /* downstream conn might be in all sorts of states */
606            /* (conn_read, write, mwrite, pause), and we want */
607            /* to be careful about the downstream channel being */
608            /* half written. */
609
610            /* The safest, but inefficient, thing to do then is */
611            /* to close any conn_mwrite downstream conns. */
612
613            ptd->stats.stats.tot_downstream_close_on_upstream_close++;
614
615            n = mcs_server_count(&d->mst);
616
617            for (i = 0; i < n; i++) {
618                conn *downstream_conn = d->downstream_conns[i];
619                if (downstream_conn != NULL &&
620                    downstream_conn != NULL_CONN &&
621                    downstream_conn->state == conn_mwrite) {
622                    downstream_conn->msgcurr = 0;
623                    downstream_conn->msgused = 0;
624                    downstream_conn->iovused = 0;
625
626                    cproxy_close_conn(downstream_conn);
627                }
628            }
629        }
630    }
631
632    /* Delink from wait queue. */
633
634    ptd->waiting_any_downstream_head =
635        conn_list_remove(ptd->waiting_any_downstream_head,
636                         &ptd->waiting_any_downstream_tail,
637                         c, NULL);
638}
639
640int delink_from_downstream_conns(conn *c) {
641    int n, k, i;
642    downstream *d = c->extra;
643    if (d == NULL) {
644        if (settings.verbose > 2) {
645            moxi_log_write("%d: delink_from_downstream_conn no-op\n",
646                           c->sfd);
647        }
648
649        return -1;
650    }
651
652    n = mcs_server_count(&d->mst);
653    k = -1; /* Index of conn. */
654
655    for (i = 0; i < n; i++) {
656        if (d->downstream_conns[i] == c) {
657            d->downstream_conns[i] = NULL;
658
659            if (settings.verbose > 2) {
660                moxi_log_write(
661                        "<%d cproxy_on_close_downstream_conn quit_server\n",
662                        c->sfd);
663            }
664
665            d->ptd->stats.stats.tot_downstream_quit_server++;
666
667            mcs_server_st_quit(mcs_server_index(&d->mst, i), 1);
668            cb_assert(mcs_server_st_fd(mcs_server_index(&d->mst, i)) == -1);
669
670            k = i;
671        }
672    }
673
674    return k;
675}
676
677void cproxy_on_close_downstream_conn(conn *c) {
678    conn *uc_retry = NULL;
679    downstream *d;
680    int k;
681    proxy_td *ptd;
682
683    cb_assert(c != NULL);
684    cb_assert(c->sfd >= 0);
685    cb_assert(c->state == conn_closing);
686
687    if (settings.verbose > 2) {
688        moxi_log_write("<%d cproxy_on_close_downstream_conn\n", c->sfd);
689    }
690
691    d = c->extra;
692
693    /* Might have been set to NULL during cproxy_free_downstream(). */
694    /* Or, when a downstream conn is in the thread-based free pool, it */
695    /* is not associated with any particular downstream. */
696
697    if (d == NULL) {
698        /* TODO: See if we need to remove the downstream conn from the */
699        /* thread-based free pool.  This shouldn't happen, but we */
700        /* should then figure out how to put an cb_assert() here. */
701
702        return;
703    }
704
705    k = delink_from_downstream_conns(c);
706
707    c->extra = NULL;
708
709    if (c->thread != NULL &&
710        c->host_ident != NULL) {
711        zstored_error_count(c->thread, c->host_ident, true);
712    }
713
714    ptd = d->ptd;
715    cb_assert(ptd);
716
717    if (ptd->stats.stats.num_downstream_conn > 0) {
718        ptd->stats.stats.num_downstream_conn--;
719    }
720
721    if (k < 0) {
722        /* If this downstream conn wasn't linked into the */
723        /* downstream, it was delinked already during connect error */
724        /* handling (where its slot was set to NULL_CONN already), */
725        /* or during downstream_timeout/conn_queue_timeout. */
726
727        if (settings.verbose > 2) {
728            moxi_log_write("%d: skipping release dc in on_close_dc\n",
729                           c->sfd);
730        }
731
732        return;
733    }
734
735    if (d->upstream_conn != NULL &&
736        d->downstream_used == 1) {
737        /* TODO: Revisit downstream close error handling. */
738        /*       Should we propagate error when... */
739        /*       - any downstream conn closes? */
740        /*       - all downstream conns closes? */
741        /*       - last downstream conn closes?  Current behavior. */
742
743        if (d->upstream_suffix == NULL) {
744            if (settings.verbose > 2) {
745                moxi_log_write("<%d proxy downstream closed, upstream %d (%x)\n",
746                               c->sfd,
747                               d->upstream_conn->sfd,
748                               d->upstream_conn->protocol);
749            }
750
751            if (IS_ASCII(d->upstream_conn->protocol)) {
752                d->upstream_suffix = "SERVER_ERROR proxy downstream closed\r\n";
753
754                if (c->host_ident != NULL) {
755                    char *s = add_conn_suffix(d->upstream_conn);
756                    if (s != NULL) {
757                        snprintf(s, SUFFIX_SIZE - 1,
758                                 "SERVER_ERROR proxy downstream closed %s\r\n",
759                                 c->host_ident);
760                        s[SUFFIX_SIZE - 1] = '\0';
761                        d->upstream_suffix = s;
762
763                        s = strchr(s, ':'); /* Clip to avoid sending user/pswd. */
764                        if (s != NULL) {
765                            *s++ = '\r';
766                            *s++ = '\n';
767                            *s = '\0';
768                        }
769                    }
770                }
771
772                d->upstream_suffix_len = 0;
773            } else {
774                d->upstream_status = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
775            }
776
777            d->upstream_retry = 0;
778            d->target_host_ident = NULL;
779        }
780
781        /* We sometimes see that drive_machine/transmit will not see */
782        /* a closed connection error during conn_mwrite, possibly */
783        /* due to non-blocking sockets.  Because of this, drive_machine */
784        /* thinks it has a successful downstream request send and */
785        /* moves the state forward trying to read a response from */
786        /* the downstream conn (conn_new_cmd, conn_read, etc), and */
787        /* only then do we finally see the conn close situation, */
788        /* ending up here.  That is, drive_machine only */
789        /* seems to move to conn_closing from conn_read. */
790
791        /* If we haven't received any reply yet, we retry based */
792        /* on our cmd_retries counter. */
793
794        /* TODO: Reconsider retry behavior, is it right in all situations? */
795
796        if (c->rcurr != NULL &&
797            c->rbytes == 0 &&
798            d->downstream_used_start == d->downstream_used &&
799            d->downstream_used_start == 1 &&
800            d->upstream_conn->next == NULL &&
801            d->behaviors_arr != NULL) {
802            if (k >= 0 && k < d->behaviors_num) {
803                int retry_max = d->behaviors_arr[k].downstream_retry;
804                if (d->upstream_conn->cmd_retries < retry_max) {
805                    d->upstream_conn->cmd_retries++;
806                    uc_retry = d->upstream_conn;
807                    d->upstream_suffix = NULL;
808                    d->upstream_suffix_len = 0;
809                    d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
810                    d->upstream_retry = 0;
811                    d->target_host_ident = NULL;
812                }
813            }
814        }
815
816        if (uc_retry == NULL &&
817            d->upstream_suffix == NULL &&
818            IS_BINARY(d->upstream_conn->protocol)) {
819            protocol_binary_response_header *rh =
820                cproxy_make_bin_error(d->upstream_conn,
821                                      PROTOCOL_BINARY_RESPONSE_EINTERNAL);
822            if (rh != NULL) {
823                d->upstream_suffix = (char *) rh;
824                d->upstream_suffix_len = sizeof(protocol_binary_response_header);
825            } else {
826                d->ptd->stats.stats.err_oom++;
827                cproxy_close_conn(d->upstream_conn);
828            }
829        }
830    }
831
832    /* Are we over-decrementing here, and in handling conn_pause? */
833
834    /* Case 1: we're in conn_pause, and socket is closed concurrently. */
835    /* We unpause due to reserve, we move to conn_write/conn_mwrite, */
836    /* fail and move to conn_closing.  So, no over-decrement. */
837
838    /* Case 2: we're waiting for a downstream response in conn_new_cmd, */
839    /* and socket is closed concurrently.  State goes to conn_closing, */
840    /* so, no over-decrement. */
841
842    /* Case 3: we've finished processing downstream response (in */
843    /* conn_parse_cmd or conn_nread), and the downstream socket */
844    /* is closed concurrently.  We then move to conn_pause, */
845    /* and same as Case 1. */
846
847    cproxy_release_downstream_conn(d, c);
848
849    /* Setup a retry after unwinding the call stack. */
850    /* We use the work_queue, because our caller, conn_close(), */
851    /* is likely to blow away our fd if we try to reconnect */
852    /* right now. */
853
854    if (uc_retry != NULL) {
855        if (settings.verbose > 2) {
856            moxi_log_write("%d cproxy retrying\n", uc_retry->sfd);
857        }
858
859        ptd->stats.stats.tot_retry++;
860
861        cb_assert(uc_retry->thread);
862        cb_assert(uc_retry->thread->work_queue);
863
864        work_send(uc_retry->thread->work_queue,
865                  upstream_retry, ptd, uc_retry);
866    }
867}
868
869void upstream_retry(void *data0, void *data1) {
870    proxy_td *ptd = data0;
871    conn *uc = data1;
872
873    cb_assert(ptd);
874    cb_assert(uc);
875
876    cproxy_pause_upstream_for_downstream(ptd, uc);
877}
878
879void cproxy_add_downstream(proxy_td *ptd) {
880    cb_assert(ptd != NULL);
881    cb_assert(ptd->proxy != NULL);
882
883    if (ptd->downstream_max == 0 ||
884        ptd->downstream_num < ptd->downstream_max) {
885        if (settings.verbose > 2) {
886            moxi_log_write("cproxy_add_downstream %d %d\n",
887                    ptd->downstream_num,
888                    ptd->downstream_max);
889        }
890
891        /* The config/behaviors will be NULL if the */
892        /* proxy is shutting down. */
893
894        if (ptd->config != NULL &&
895            ptd->behavior_pool.arr != NULL) {
896            downstream *d =
897                cproxy_create_downstream(ptd->config,
898                                         ptd->config_ver,
899                                         &ptd->behavior_pool);
900            if (d != NULL) {
901                d->ptd = ptd;
902                ptd->downstream_tot++;
903                ptd->downstream_num++;
904                cproxy_release_downstream(d, true);
905            } else {
906                ptd->stats.stats.tot_downstream_create_failed++;
907            }
908        }
909    } else {
910        ptd->stats.stats.tot_downstream_max_reached++;
911    }
912}
913
914downstream *cproxy_reserve_downstream(proxy_td *ptd) {
915    cb_assert(ptd != NULL);
916
917    /* Loop in case we need to clear out downstreams */
918    /* that have outdated configs. */
919
920    while (true) {
921        downstream *d;
922
923        d = ptd->downstream_released;
924        if (d == NULL) {
925            cproxy_add_downstream(ptd);
926        }
927
928        d = ptd->downstream_released;
929        if (d == NULL) {
930            return NULL;
931        }
932
933        ptd->downstream_released = d->next;
934
935        cb_assert(d->upstream_conn == NULL);
936        cb_assert(d->upstream_suffix == NULL);
937        cb_assert(d->upstream_suffix_len == 0);
938        cb_assert(d->upstream_status == PROTOCOL_BINARY_RESPONSE_SUCCESS);
939        cb_assert(d->upstream_retry == 0);
940        cb_assert(d->target_host_ident == NULL);
941        cb_assert(d->downstream_used == 0);
942        cb_assert(d->downstream_used_start == 0);
943        cb_assert(d->merger == NULL);
944        cb_assert(d->timeout_tv.tv_sec == 0);
945        cb_assert(d->timeout_tv.tv_usec == 0);
946        cb_assert(d->next_waiting == NULL);
947
948        d->upstream_conn = NULL;
949        d->upstream_suffix = NULL;
950        d->upstream_suffix_len = 0;
951        d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
952        d->upstream_retry = 0;
953        d->upstream_retries = 0;
954        d->target_host_ident = NULL;
955        d->usec_start = 0;
956        d->downstream_used = 0;
957        d->downstream_used_start = 0;
958        d->merger = NULL;
959        d->timeout_tv.tv_sec = 0;
960        d->timeout_tv.tv_usec = 0;
961        d->next_waiting = NULL;
962
963        if (cproxy_check_downstream_config(d)) {
964            bool found;
965            ptd->downstream_reserved =
966                downstream_list_remove(ptd->downstream_reserved, d);
967            ptd->downstream_released =
968                downstream_list_remove(ptd->downstream_released, d);
969
970            found = zstored_downstream_waiting_remove(d);
971            cb_assert(!found);
972
973            d->next = ptd->downstream_reserved;
974            ptd->downstream_reserved = d;
975
976            ptd->stats.stats.tot_downstream_reserved++;
977
978            return d;
979        }
980
981        cproxy_free_downstream(d);
982    }
983}
984
985bool cproxy_clear_timeout(downstream *d) {
986    bool rv = false;
987
988    if (d->timeout_tv.tv_sec != 0 ||
989        d->timeout_tv.tv_usec != 0) {
990        evtimer_del(&d->timeout_event);
991        rv = true;
992    }
993
994    d->timeout_tv.tv_sec = 0;
995    d->timeout_tv.tv_usec = 0;
996
997    return rv;
998}
999
1000bool cproxy_release_downstream(downstream *d, bool force) {
1001    int i;
1002    int n;
1003    bool found;
1004
1005    cb_assert(d != NULL);
1006    cb_assert(d->ptd != NULL);
1007
1008    if (settings.verbose > 2) {
1009        moxi_log_write("%d: release_downstream\n",
1010                       d->upstream_conn != NULL ?
1011                       d->upstream_conn->sfd : -1);
1012    }
1013
1014    /* Always release the timeout_event, even if we're going to retry, */
1015    /* to avoid pegging CPU with leaked timeout_events. */
1016
1017    cproxy_clear_timeout(d);
1018
1019    /* If we need to retry the command, we do so here, */
1020    /* keeping the same downstream that would otherwise */
1021    /* be released. */
1022
1023    if (!force && d->upstream_conn != NULL && d->upstream_retry > 0) {
1024        int max_retries;
1025        d->upstream_retry = 0;
1026        d->upstream_retries++;
1027
1028        /* But, we can stop retrying if we've tried each server twice. */
1029        max_retries = cproxy_max_retries(d);
1030
1031        if (d->upstream_retries <= max_retries) {
1032            if (settings.verbose > 2) {
1033                moxi_log_write("%d: release_downstream,"
1034                               " instead retrying %d, %d <= %d, %d\n",
1035                               d->upstream_conn->sfd,
1036                               d->upstream_retry,
1037                               d->upstream_retries, max_retries,
1038                               d->ptd->stats.stats.tot_retry);
1039            }
1040
1041            d->ptd->stats.stats.tot_retry++;
1042
1043            if (cproxy_forward(d) == true) {
1044                return true;
1045            } else {
1046                d->ptd->stats.stats.tot_downstream_propagate_failed++;
1047
1048                propagate_error_msg(d, NULL, d->upstream_status);
1049            }
1050        } else {
1051            if (settings.verbose > 2) {
1052                moxi_log_write("%d: release_downstream,"
1053                               " skipping retry %d, %d > %d\n",
1054                               d->upstream_conn->sfd,
1055                               d->upstream_retry,
1056                               d->upstream_retries, max_retries);
1057            }
1058        }
1059    }
1060
1061    /* Record reserved_time histogram timings. */
1062
1063    if (d->usec_start > 0) {
1064        uint64_t ux = usec_now() - d->usec_start;
1065
1066        d->ptd->stats.stats.tot_downstream_reserved_time += ux;
1067
1068        if (d->ptd->stats.stats.max_downstream_reserved_time < ux) {
1069            d->ptd->stats.stats.max_downstream_reserved_time = ux;
1070        }
1071
1072        if (d->upstream_retries > 0) {
1073            d->ptd->stats.stats.tot_retry_time += ux;
1074
1075            if (d->ptd->stats.stats.max_retry_time < ux) {
1076                d->ptd->stats.stats.max_retry_time = ux;
1077            }
1078        }
1079
1080        downstream_reserved_time_sample(&d->ptd->stats, ux);
1081    }
1082
1083    d->ptd->stats.stats.tot_downstream_released++;
1084
1085    /* Delink upstream conns. */
1086
1087    while (d->upstream_conn != NULL) {
1088        conn *curr;
1089        if (d->merger != NULL) {
1090            /* TODO: Allow merger callback to be func pointer. */
1091
1092            genhash_iter(d->merger,
1093                        protocol_stats_foreach_write,
1094                        d->upstream_conn);
1095
1096            if (update_event(d->upstream_conn, EV_WRITE | EV_PERSIST)) {
1097                conn_set_state(d->upstream_conn, conn_mwrite);
1098            } else {
1099                d->ptd->stats.stats.err_oom++;
1100                cproxy_close_conn(d->upstream_conn);
1101            }
1102        }
1103
1104        if (settings.verbose > 2) {
1105            moxi_log_write("%d: release_downstream upstream_suffix %s status %x\n",
1106                           d->upstream_conn->sfd,
1107                           d->upstream_suffix_len == 0 ?
1108                           d->upstream_suffix : "(binary)",
1109                           d->upstream_status);
1110        }
1111
1112        if (d->upstream_suffix != NULL) {
1113            /* Do a last write on the upstream.  For example, */
1114            /* the upstream_suffix might be "END\r\n" or other */
1115            /* way to mark the end of a scatter-gather or */
1116            /* multiline response. */
1117            int suffix_len;
1118
1119            if (settings.verbose > 2) {
1120                if (d->upstream_suffix_len > 0) {
1121                    moxi_log_write("%d: release_downstream"
1122                                   " writing suffix binary: %d\n",
1123                                   d->upstream_conn->sfd,
1124                                   d->upstream_suffix_len);
1125
1126                    cproxy_dump_header(d->upstream_conn->sfd,
1127                                       d->upstream_suffix);
1128                } else {
1129                    moxi_log_write("%d: release_downstream"
1130                                   " writing suffix ascii: %s\n",
1131                                   d->upstream_conn->sfd,
1132                                   d->upstream_suffix);
1133                }
1134            }
1135
1136            suffix_len = d->upstream_suffix_len;
1137            if (suffix_len == 0) {
1138                suffix_len = (int)strlen(d->upstream_suffix);
1139            }
1140
1141            if (add_iov(d->upstream_conn,
1142                        d->upstream_suffix,
1143                        suffix_len) == 0 &&
1144                update_event(d->upstream_conn, EV_WRITE | EV_PERSIST)) {
1145                conn_set_state(d->upstream_conn, conn_mwrite);
1146            } else {
1147                d->ptd->stats.stats.err_oom++;
1148                cproxy_close_conn(d->upstream_conn);
1149            }
1150        }
1151
1152        curr = d->upstream_conn;
1153        d->upstream_conn = d->upstream_conn->next;
1154        curr->next = NULL;
1155    }
1156
1157    /* Free extra hash tables. */
1158
1159    if (d->multiget != NULL) {
1160        genhash_iter(d->multiget, multiget_foreach_free, d);
1161        genhash_free(d->multiget);
1162        d->multiget = NULL;
1163    }
1164
1165    if (d->merger != NULL) {
1166        genhash_iter(d->merger, protocol_stats_foreach_free, NULL);
1167        genhash_free(d->merger);
1168        d->merger = NULL;
1169    }
1170
1171    d->upstream_conn = NULL;
1172    d->upstream_suffix = NULL; /* No free(), expecting a static string. */
1173    d->upstream_suffix_len = 0;
1174    d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
1175    d->upstream_retry = 0;
1176    d->upstream_retries = 0;
1177    d->target_host_ident = NULL;
1178    d->usec_start = 0;
1179    d->downstream_used = 0;
1180    d->downstream_used_start = 0;
1181    d->multiget = NULL;
1182    d->merger = NULL;
1183
1184    /* TODO: Consider adding a downstream->prev backpointer */
1185    /*       or doubly-linked list to save on this scan. */
1186
1187    d->ptd->downstream_reserved =
1188        downstream_list_remove(d->ptd->downstream_reserved, d);
1189    d->ptd->downstream_released =
1190        downstream_list_remove(d->ptd->downstream_released, d);
1191
1192    found = zstored_downstream_waiting_remove(d);
1193    cb_assert(!found);
1194
1195    n = mcs_server_count(&d->mst);
1196
1197    for (i = 0; i < n; i++) {
1198        conn *dc = d->downstream_conns[i];
1199        d->downstream_conns[i] = NULL;
1200        if (dc != NULL) {
1201            zstored_release_downstream_conn(dc, false);
1202        }
1203    }
1204
1205    cproxy_clear_timeout(d); /* For MB-4334. */
1206
1207    cb_assert(d->timeout_tv.tv_sec == 0);
1208    cb_assert(d->timeout_tv.tv_usec == 0);
1209
1210    /* If this downstream still has the same configuration as our top-level */
1211    /* proxy config, go back onto the available, released downstream list. */
1212
1213    if (cproxy_check_downstream_config(d) || force) {
1214        d->next = d->ptd->downstream_released;
1215        d->ptd->downstream_released = d;
1216
1217        return true;
1218    }
1219
1220    cproxy_free_downstream(d);
1221
1222    return false;
1223}
1224
1225void cproxy_free_downstream(downstream *d) {
1226    bool found;
1227    int n;
1228
1229    cb_assert(d != NULL);
1230    cb_assert(d->ptd != NULL);
1231    cb_assert(d->upstream_conn == NULL);
1232    cb_assert(d->multiget == NULL);
1233    cb_assert(d->merger == NULL);
1234    cb_assert(d->timeout_tv.tv_sec == 0);
1235    cb_assert(d->timeout_tv.tv_usec == 0);
1236
1237    if (settings.verbose > 2) {
1238        moxi_log_write("cproxy_free_downstream\n");
1239    }
1240
1241    d->ptd->stats.stats.tot_downstream_freed++;
1242
1243    d->ptd->downstream_reserved =
1244        downstream_list_remove(d->ptd->downstream_reserved, d);
1245    d->ptd->downstream_released =
1246        downstream_list_remove(d->ptd->downstream_released, d);
1247
1248    found = zstored_downstream_waiting_remove(d);
1249    cb_assert(!found);
1250
1251    d->ptd->downstream_num--;
1252    cb_assert(d->ptd->downstream_num >= 0);
1253
1254    n = mcs_server_count(&d->mst);
1255
1256    if (d->downstream_conns != NULL) {
1257        int i;
1258        for (i = 0; i < n; i++) {
1259            if (d->downstream_conns[i] != NULL &&
1260                d->downstream_conns[i] != NULL_CONN) {
1261                d->downstream_conns[i]->extra = NULL;
1262            }
1263        }
1264    }
1265
1266    /* This will close sockets, which will force associated conn's */
1267    /* to go to conn_closing state.  Since we've already cleared */
1268    /* the conn->extra pointers, there's no extra release/free. */
1269
1270    mcs_free(&d->mst);
1271
1272    cproxy_clear_timeout(d);
1273
1274    if (d->downstream_conns != NULL) {
1275        free(d->downstream_conns);
1276    }
1277
1278    if (d->config != NULL) {
1279        free(d->config);
1280    }
1281
1282    free(d->behaviors_arr);
1283    free(d);
1284}
1285
1286/* The config input is something libmemcached can parse.
1287 * See mcs_server_st_parse().
1288 */
1289downstream *cproxy_create_downstream(char *config,
1290                                     uint32_t config_ver,
1291                                     proxy_behavior_pool *behavior_pool) {
1292    downstream *d = calloc(1, sizeof(downstream));
1293    cb_assert(config != NULL);
1294    cb_assert(behavior_pool != NULL);
1295
1296    if (d != NULL &&
1297        config != NULL &&
1298        config[0] != '\0') {
1299        d->config        = strdup(config);
1300        d->config_ver    = config_ver;
1301        d->behaviors_num = behavior_pool->num;
1302        d->behaviors_arr = cproxy_copy_behaviors(behavior_pool->num,
1303                                                 behavior_pool->arr);
1304
1305        /* TODO: Handle non-uniform downstream protocols. */
1306
1307        cb_assert(IS_PROXY(behavior_pool->base.downstream_protocol));
1308
1309        if (settings.verbose > 2) {
1310            moxi_log_write(
1311                    "cproxy_create_downstream: %s, %u, %u\n",
1312                    config, config_ver,
1313                    behavior_pool->base.downstream_protocol);
1314        }
1315
1316        if (d->config != NULL &&
1317            d->behaviors_arr != NULL) {
1318            char *usr = behavior_pool->base.usr[0] != '\0' ?
1319                behavior_pool->base.usr :
1320                NULL;
1321            char *pwd = behavior_pool->base.pwd[0] != '\0' ?
1322                behavior_pool->base.pwd :
1323                NULL;
1324
1325            int nconns = init_mcs_st(&d->mst, d->config, usr, pwd,
1326                                     behavior_pool->base.mcs_opts);
1327            if (nconns > 0) {
1328                d->downstream_conns = (conn **)
1329                    calloc(nconns, sizeof(conn *));
1330                if (d->downstream_conns != NULL) {
1331                    return d;
1332                }
1333
1334                mcs_free(&d->mst);
1335            }
1336        }
1337
1338        free(d->config);
1339        free(d->behaviors_arr);
1340        free(d);
1341    }
1342
1343    return NULL;
1344}
1345
1346int init_mcs_st(mcs_st *mst, char *config,
1347                const char *default_usr,
1348                const char *default_pwd,
1349                const char *opts) {
1350    cb_assert(mst);
1351    cb_assert(config);
1352
1353    if (mcs_create(mst, config,
1354                   default_usr, default_pwd, opts) != NULL) {
1355        return mcs_server_count(mst);
1356    } else {
1357        if (settings.verbose > 1) {
1358            moxi_log_write("mcs_create failed: %s\n",
1359                    config);
1360        }
1361    }
1362
1363    return 0;
1364}
1365
1366/* See if the downstream config matches the top-level proxy config.
1367 */
1368bool cproxy_check_downstream_config(downstream *d) {
1369    int rv = false;
1370
1371    cb_assert(d != NULL);
1372    cb_assert(d->ptd != NULL);
1373    cb_assert(d->ptd->proxy != NULL);
1374
1375    if (d->config_ver == d->ptd->config_ver) {
1376        rv = true;
1377    } else if (d->config != NULL &&
1378               d->ptd->config != NULL &&
1379               cproxy_equal_behaviors(d->behaviors_num,
1380                                      d->behaviors_arr,
1381                                      d->ptd->behavior_pool.num,
1382                                      d->ptd->behavior_pool.arr)) {
1383        /* Parse the proxy/parent's config to see if we can */
1384        /* reuse our existing downstream connections. */
1385
1386        char *usr = d->ptd->behavior_pool.base.usr[0] != '\0' ?
1387            d->ptd->behavior_pool.base.usr :
1388            NULL;
1389        char *pwd = d->ptd->behavior_pool.base.pwd[0] != '\0' ?
1390            d->ptd->behavior_pool.base.pwd :
1391            NULL;
1392
1393        mcs_st next;
1394
1395        int n = init_mcs_st(&next, d->ptd->config, usr, pwd,
1396                            d->ptd->behavior_pool.base.mcs_opts);
1397        if (n > 0) {
1398            if (mcs_stable_update(&d->mst, &next)) {
1399                if (settings.verbose > 2) {
1400                    moxi_log_write("check_downstream_config stable update\n");
1401                }
1402
1403                free(d->config);
1404                d->config     = strdup(d->ptd->config);
1405                d->config_ver = d->ptd->config_ver;
1406                rv = true;
1407            }
1408
1409            mcs_free(&next);
1410        }
1411    }
1412
1413    if (settings.verbose > 2) {
1414        moxi_log_write("check_downstream_config %u\n", rv);
1415    }
1416
1417    return rv;
1418}
1419
1420/* Returns -1 if the connections aren't fully assigned and ready. */
1421/* In that case, the downstream has to wait for a downstream connection */
1422/* to get out of the conn_connecting state. */
1423
1424/* The downstream connection might leave the conn_connecting state */
1425/* with an error (unable to connect).  That case is handled by */
1426/* tracking a NULL_CONN sentinel value. */
1427
1428/* Also, in the -1 result case, the d->upstream_conn should remain in */
1429/* conn_pause state. */
1430
1431/* A server_index of -1 means to connect all downstreams, as the */
1432/* caller probably needs to proxy a broadcast command. */
1433
1434int cproxy_connect_downstream(downstream *d, LIBEVENT_THREAD *thread,
1435                              int server_index) {
1436    int i = 0;
1437    int s = 0; /* Number connected. */
1438    int n;
1439    mcs_server_st *msst_actual;
1440
1441    cb_assert(d != NULL);
1442    cb_assert(d->ptd != NULL);
1443    cb_assert(d->ptd->downstream_released != d); /* Should not be in free list. */
1444    cb_assert(d->downstream_conns != NULL);
1445    cb_assert(mcs_server_count(&d->mst) > 0);
1446    cb_assert(thread != NULL);
1447    cb_assert(thread->base != NULL);
1448
1449    n = mcs_server_count(&d->mst);
1450
1451    cb_assert(d->behaviors_num >= n);
1452    cb_assert(d->behaviors_arr != NULL);
1453
1454    if (settings.verbose > 2) {
1455        moxi_log_write("%d: cproxy_connect_downstream server_index %d in %d\n",
1456                       d->upstream_conn->sfd, server_index, n);
1457    }
1458
1459
1460    if (server_index >= 0) {
1461        cb_assert(server_index < n);
1462        i = server_index;
1463        n = server_index + 1;
1464    }
1465
1466    for (; i < n; i++) {
1467        cb_assert(IS_PROXY(d->behaviors_arr[i].downstream_protocol));
1468
1469        msst_actual = mcs_server_index(&d->mst, i);
1470
1471        /* Connect to downstream servers, if not already. */
1472
1473        /* A NULL_CONN means that we tried to connect, but failed, */
1474        /* which is different than NULL (which means that we haven't */
1475        /* tried to connect yet). */
1476
1477        if (d->downstream_conns[i] == NULL) {
1478            bool downstream_conn_max_reached = false;
1479            conn *c = d->upstream_conn;
1480            /*
1481             * mcmux compatiblity mode, one downstream struct will be associated
1482             * with downstream connection. So overwrite the default msst with the
1483             * value.
1484             */
1485            if (c && c->peer_host && c->peer_port) {
1486                cb_assert(i == 0);
1487                strncpy(msst_actual->hostname, c->peer_host, MCS_HOSTNAME_SIZE);
1488                msst_actual->port = c->peer_port;
1489                msst_actual->fd = -1;
1490                msst_actual->ident_a[0] = msst_actual->ident_b[0] = 0;
1491            }
1492
1493            d->downstream_conns[i] =
1494                zstored_acquire_downstream_conn(d, thread,
1495                                                msst_actual,
1496                                                &d->behaviors_arr[i],
1497                                                &downstream_conn_max_reached);
1498            if (c != NULL &&
1499                i == server_index &&
1500                d->downstream_conns[i] != NULL &&
1501                d->downstream_conns[i] != NULL_CONN &&
1502                d->target_host_ident == NULL) {
1503                d->target_host_ident = add_conn_suffix(c);
1504                if (d->target_host_ident != NULL) {
1505                    strncpy(d->target_host_ident,
1506                            msst_actual->hostname,
1507                            SUFFIX_SIZE);
1508                    d->target_host_ident[SUFFIX_SIZE - 1] = '\0';
1509                }
1510            }
1511
1512            if (d->downstream_conns[i] != NULL &&
1513                d->downstream_conns[i] != NULL_CONN &&
1514                d->downstream_conns[i]->state == conn_connecting) {
1515                return -1;
1516            }
1517
1518            if (d->downstream_conns[i] == NULL &&
1519                downstream_conn_max_reached == true) {
1520                if (settings.verbose > 2) {
1521                    moxi_log_write("%d: downstream_conn_max reached\n",
1522                                   d->upstream_conn->sfd);
1523                }
1524
1525                if (zstored_downstream_waiting_add(d, thread,
1526                                                   msst_actual,
1527                                                   &d->behaviors_arr[i]) == true) {
1528                    /* Since we're waiting on the downstream conn queue, */
1529                    /* start a downstream timer per configuration. */
1530
1531                    cproxy_start_downstream_timeout_ex(d, c,
1532                        d->behaviors_arr[i].downstream_conn_queue_timeout);
1533
1534                    return -1;
1535                }
1536            }
1537        }
1538
1539        if (d->downstream_conns[i] != NULL &&
1540            d->downstream_conns[i] != NULL_CONN) {
1541            s++;
1542        } else {
1543            mcs_server_st_quit(msst_actual, 1);
1544            d->downstream_conns[i] = NULL_CONN;
1545        }
1546    }
1547
1548    return s;
1549}
1550
1551conn *cproxy_connect_downstream_conn(downstream *d,
1552                                     LIBEVENT_THREAD *thread,
1553                                     mcs_server_st *msst,
1554                                     proxy_behavior *behavior) {
1555    uint64_t start = 0;
1556    int err = -1;
1557    SOCKET fd;
1558
1559    cb_assert(d);
1560    cb_assert(d->ptd);
1561    cb_assert(d->ptd->downstream_released != d); /* Should not be in free list. */
1562    cb_assert(thread);
1563    cb_assert(thread->base);
1564    cb_assert(msst);
1565    cb_assert(behavior);
1566    cb_assert(mcs_server_st_hostname(msst) != NULL);
1567    cb_assert(mcs_server_st_port(msst) > 0);
1568    cb_assert(mcs_server_st_fd(msst) == -1);
1569
1570    if (d->ptd->behavior_pool.base.time_stats) {
1571        start = usec_now();
1572    }
1573
1574    d->ptd->stats.stats.tot_downstream_connect_started++;
1575
1576    fd = mcs_connect(mcs_server_st_hostname(msst),
1577                     mcs_server_st_port(msst), &err,
1578                     MOXI_BLOCKING_CONNECT);
1579
1580    if (settings.verbose > 2) {
1581        moxi_log_write("%d: cproxy_connect_downstream_conn %s:%d %d %d\n", fd,
1582                       mcs_server_st_hostname(msst),
1583                       mcs_server_st_port(msst),
1584                       MOXI_BLOCKING_CONNECT, err);
1585    }
1586
1587    if (fd != -1) {
1588        conn *c = conn_new(fd, conn_pause, 0,
1589                           DATA_BUFFER_SIZE,
1590                           tcp_transport,
1591                           thread->base,
1592                           &cproxy_downstream_funcs, d);
1593        if (c != NULL ) {
1594            c->protocol = (d->upstream_conn->peer_protocol ?
1595                           d->upstream_conn->peer_protocol :
1596                           behavior->downstream_protocol);
1597            c->thread = thread;
1598            c->cmd_start_time = start;
1599
1600#ifdef WIN32
1601            if (err == WSAEINPROGRESS) {
1602                err = EINPROGRESS;
1603            } else if (err == WSAEWOULDBLOCK) {
1604                err = EWOULDBLOCK;
1605            }
1606#endif
1607
1608            if (err == EINPROGRESS ||
1609                err == EWOULDBLOCK) {
1610                if (update_event_timed(c, EV_WRITE | EV_PERSIST,
1611                                       &behavior->connect_timeout)) {
1612                    conn_set_state(c, conn_connecting);
1613
1614                    d->ptd->stats.stats.tot_downstream_connect_wait++;
1615
1616                    return c;
1617                } else {
1618                    d->ptd->stats.stats.err_oom++;
1619                }
1620            } else {
1621                if (downstream_connect_init(d, msst, behavior, c)) {
1622                    return c;
1623                }
1624            }
1625
1626            cproxy_close_conn(c);
1627        } else {
1628            d->ptd->stats.stats.err_oom++;
1629        }
1630    }
1631
1632    d->ptd->stats.stats.tot_downstream_connect_failed++;
1633
1634    return NULL;
1635}
1636
1637bool downstream_connect_init(downstream *d, mcs_server_st *msst,
1638                             proxy_behavior *behavior, conn *c) {
1639    char *host_ident;
1640    int rv;
1641
1642    cb_assert(c->thread != NULL);
1643
1644    host_ident = c->host_ident;
1645    if (host_ident == NULL) {
1646        host_ident = mcs_server_st_ident(msst, IS_ASCII(c->protocol));
1647    }
1648
1649    if (c->cmd_start_time != 0 &&
1650        d->ptd->behavior_pool.base.time_stats) {
1651        downstream_connect_time_sample(&d->ptd->stats,
1652                                       usec_now() - c->cmd_start_time);
1653    }
1654
1655    rv = cproxy_auth_downstream(msst, behavior, c->sfd);
1656    if (rv == 0) {
1657        d->ptd->stats.stats.tot_downstream_auth++;
1658
1659        rv = cproxy_bucket_downstream(msst, behavior, c->sfd);
1660        if (rv == 0) {
1661            d->ptd->stats.stats.tot_downstream_bucket++;
1662
1663            zstored_error_count(c->thread, host_ident, false);
1664
1665            d->ptd->stats.stats.tot_downstream_connect++;
1666
1667            return true;
1668        } else {
1669            d->ptd->stats.stats.tot_downstream_bucket_failed++;
1670        }
1671    } else {
1672        d->ptd->stats.stats.tot_downstream_auth_failed++;
1673        if (rv == 1) {
1674            d->ptd->stats.stats.tot_auth_timeout++;
1675        }
1676    }
1677
1678    /* Treat a auth/bucket error as a blacklistable error. */
1679
1680    zstored_error_count(c->thread, host_ident, true);
1681
1682    return false;
1683}
1684
1685conn *cproxy_find_downstream_conn(downstream *d,
1686                                  char *key, int key_length,
1687                                  bool *local) {
1688    return cproxy_find_downstream_conn_ex(d, key, key_length, local, NULL);
1689}
1690
1691conn *cproxy_find_downstream_conn_ex(downstream *d,
1692                                     char *key, int key_length,
1693                                     bool *local,
1694                                     int *vbucket) {
1695    int v;
1696    int s;
1697
1698    cb_assert(d != NULL);
1699    cb_assert(d->downstream_conns != NULL);
1700    cb_assert(key != NULL);
1701    cb_assert(key_length > 0);
1702
1703    if (local != NULL) {
1704        *local = false;
1705    }
1706
1707    v = -1;
1708    s = cproxy_server_index(d, key, key_length, &v);
1709
1710    if (settings.verbose > 2 && s >= 0) {
1711        moxi_log_write("%d: server_index %d, vbucket %d, conn %d\n", s, v,
1712                       (d->upstream_conn != NULL ?
1713                        d->upstream_conn->sfd : 0),
1714                       (d->downstream_conns[s] == NULL ?
1715                        0 :
1716                        (d->downstream_conns[s] == NULL_CONN ?
1717                         -1 :
1718                         d->downstream_conns[s]->sfd)));
1719    }
1720
1721    if (s >= 0 &&
1722        s < (int) mcs_server_count(&d->mst) &&
1723        d->downstream_conns[s] != NULL &&
1724        d->downstream_conns[s] != NULL_CONN) {
1725
1726        if (local != NULL && s == 0) {
1727            *local = true;
1728        }
1729
1730        if (vbucket != NULL) {
1731            *vbucket = v;
1732        }
1733
1734        return d->downstream_conns[s];
1735    }
1736
1737    return NULL;
1738}
1739
1740bool cproxy_prep_conn_for_write(conn *c) {
1741    if (c != NULL) {
1742        cb_assert(c->item == NULL);
1743        cb_assert(IS_PROXY(c->protocol));
1744        cb_assert(c->ilist != NULL);
1745        cb_assert(c->isize > 0);
1746        cb_assert(c->suffixlist != NULL);
1747        cb_assert(c->suffixsize > 0);
1748
1749        c->icurr      = c->ilist;
1750        c->ileft      = 0;
1751        c->suffixcurr = c->suffixlist;
1752        c->suffixleft = 0;
1753
1754        c->msgcurr = 0; /* TODO: Mem leak just by blowing these to 0? */
1755        c->msgused = 0;
1756        c->iovused = 0;
1757
1758        if (add_msghdr(c) == 0) {
1759            return true;
1760        }
1761
1762        if (settings.verbose > 1) {
1763            moxi_log_write("%d: cproxy_prep_conn_for_write failed\n",
1764                           c->sfd);
1765        }
1766    }
1767
1768    return false;
1769}
1770
1771bool cproxy_update_event_write(downstream *d, conn *c) {
1772    cb_assert(d != NULL);
1773    cb_assert(d->ptd != NULL);
1774    cb_assert(c != NULL);
1775
1776    if (!update_event(c, EV_WRITE | EV_PERSIST)) {
1777        d->ptd->stats.stats.err_oom++;
1778        cproxy_close_conn(c);
1779
1780        return false;
1781    }
1782
1783    return true;
1784}
1785
1786/**
1787 * Do a hash through libmemcached to see which server (by index)
1788 * should hold a given key.
1789 */
1790int cproxy_server_index(downstream *d, char *key, size_t key_length,
1791                        int *vbucket) {
1792    cb_assert(d != NULL);
1793    cb_assert(key != NULL);
1794    cb_assert(key_length > 0);
1795
1796    if (mcs_server_count(&d->mst) <= 0) {
1797        return -1;
1798    }
1799
1800    return (int) mcs_key_hash(&d->mst, key, key_length, vbucket);
1801}
1802
1803void cproxy_assign_downstream(proxy_td *ptd) {
1804    uint64_t da;
1805    conn *tail;
1806    bool stop = false;
1807
1808    cb_assert(ptd != NULL);
1809
1810    if (settings.verbose > 2) {
1811        moxi_log_write("assign_downstream\n");
1812    }
1813
1814    ptd->downstream_assigns++;
1815
1816    da = ptd->downstream_assigns;
1817    /* Key loop that tries to reserve any available, released */
1818    /* downstream resources to waiting upstream conns. */
1819
1820    /* Remember the wait list tail when we start, in case more */
1821    /* upstream conns are tacked onto the wait list while we're */
1822    /* processing.  This helps avoid infinite loop where upstream */
1823    /* conns just keep on moving to the tail. */
1824
1825    tail = ptd->waiting_any_downstream_tail;
1826    while (ptd->waiting_any_downstream_head != NULL && !stop) {
1827        conn *uc_last;
1828        downstream *d;
1829
1830        if (ptd->waiting_any_downstream_head == tail) {
1831            stop = true;
1832        }
1833
1834        d = cproxy_reserve_downstream(ptd);
1835        if (d == NULL) {
1836            if (ptd->downstream_num <= 0) {
1837                /* Absolutely no downstreams connected, so */
1838                /* might as well error out. */
1839
1840                while (ptd->waiting_any_downstream_head != NULL) {
1841                    conn *uc;
1842                    ptd->stats.stats.tot_downstream_propagate_failed++;
1843
1844                    uc = ptd->waiting_any_downstream_head;
1845                    ptd->waiting_any_downstream_head =
1846                        ptd->waiting_any_downstream_head->next;
1847                    if (ptd->waiting_any_downstream_head == NULL) {
1848                        ptd->waiting_any_downstream_tail = NULL;
1849                    }
1850                    uc->next = NULL;
1851
1852                    upstream_error_msg(uc,
1853                                       "SERVER_ERROR proxy out of downstreams\r\n",
1854                                       PROTOCOL_BINARY_RESPONSE_EINTERNAL);
1855                }
1856            }
1857
1858            break; /* If no downstreams are available, stop loop. */
1859        }
1860
1861        cb_assert(d->upstream_conn == NULL);
1862        cb_assert(d->downstream_used == 0);
1863        cb_assert(d->downstream_used_start == 0);
1864        cb_assert(d->multiget == NULL);
1865        cb_assert(d->merger == NULL);
1866        cb_assert(d->timeout_tv.tv_sec == 0);
1867        cb_assert(d->timeout_tv.tv_usec == 0);
1868
1869        /* We have a downstream reserved, so assign the first */
1870        /* waiting upstream conn to it. */
1871
1872        d->upstream_conn = ptd->waiting_any_downstream_head;
1873        ptd->waiting_any_downstream_head =
1874            ptd->waiting_any_downstream_head->next;
1875        if (ptd->waiting_any_downstream_head == NULL) {
1876            ptd->waiting_any_downstream_tail = NULL;
1877        }
1878        d->upstream_conn->next = NULL;
1879
1880        ptd->stats.stats.tot_assign_downstream++;
1881        ptd->stats.stats.tot_assign_upstream++;
1882
1883        /* Add any compatible upstream conns to the downstream. */
1884        /* By compatible, for example, we mean multi-gets from */
1885        /* different upstreams so we can de-deplicate get keys. */
1886        uc_last = d->upstream_conn;
1887
1888        while (is_compatible_request(uc_last,
1889                                     ptd->waiting_any_downstream_head)) {
1890            uc_last->next = ptd->waiting_any_downstream_head;
1891
1892            ptd->waiting_any_downstream_head =
1893                ptd->waiting_any_downstream_head->next;
1894            if (ptd->waiting_any_downstream_head == NULL) {
1895                ptd->waiting_any_downstream_tail = NULL;
1896            }
1897
1898            uc_last = uc_last->next;
1899            uc_last->next = NULL;
1900
1901            /* Note: tot_assign_upstream - tot_assign_downstream */
1902            /* should get us how many requests we've piggybacked together. */
1903
1904            ptd->stats.stats.tot_assign_upstream++;
1905        }
1906
1907        if (settings.verbose > 2) {
1908            moxi_log_write("%d: assign_downstream, matched to upstream\n",
1909                    d->upstream_conn->sfd);
1910        }
1911
1912        if (cproxy_forward(d) == false) {
1913            /* TODO: This stat is incorrect, as we might reach here */
1914            /* when we have entire front cache hit or talk-to-self */
1915            /* optimization hit on multiget. */
1916
1917            ptd->stats.stats.tot_downstream_propagate_failed++;
1918
1919            /* During cproxy_forward(), we might have recursed, */
1920            /* especially in error situation if a downstream */
1921            /* conn got closed and released.  Check for recursion */
1922            /* before we touch d anymore. */
1923
1924            if (da != ptd->downstream_assigns) {
1925                ptd->stats.stats.tot_assign_recursion++;
1926                break;
1927            }
1928
1929            propagate_error_msg(d, NULL, d->upstream_status);
1930
1931            cproxy_release_downstream(d, false);
1932        }
1933    }
1934
1935    if (settings.verbose > 2) {
1936        moxi_log_write("assign_downstream, done\n");
1937    }
1938}
1939
1940void propagate_error_msg(downstream *d, char *ascii_msg,
1941                         protocol_binary_response_status binary_status) {
1942    cb_assert(d != NULL);
1943
1944    if (ascii_msg == NULL &&
1945        d->upstream_conn != NULL &&
1946        d->target_host_ident != NULL) {
1947        char *s = add_conn_suffix(d->upstream_conn);
1948        if (s != NULL) {
1949            snprintf(s, SUFFIX_SIZE - 1,
1950                     "SERVER_ERROR proxy write to downstream %s\r\n",
1951                     d->target_host_ident);
1952            s[SUFFIX_SIZE - 1] = '\0';
1953            ascii_msg = s;
1954        }
1955    }
1956
1957    while (d->upstream_conn != NULL) {
1958        conn *uc = d->upstream_conn;
1959        conn *curr;
1960
1961        if (settings.verbose > 1) {
1962            moxi_log_write("%d: could not forward upstream to downstream\n",
1963                           uc->sfd);
1964        }
1965
1966        upstream_error_msg(uc, ascii_msg, binary_status);
1967
1968        curr = d->upstream_conn;
1969        d->upstream_conn = d->upstream_conn->next;
1970        curr->next = NULL;
1971    }
1972}
1973
1974bool cproxy_forward(downstream *d) {
1975    cb_assert(d != NULL);
1976    cb_assert(d->ptd != NULL);
1977    cb_assert(d->upstream_conn != NULL);
1978
1979    if (settings.verbose > 2) {
1980        moxi_log_write(
1981                "%d: cproxy_forward prot %d to prot %d\n",
1982                d->upstream_conn->sfd,
1983                d->upstream_conn->protocol,
1984                d->ptd->behavior_pool.base.downstream_protocol);
1985    }
1986
1987    if (IS_ASCII(d->upstream_conn->protocol)) {
1988        /* ASCII upstream. */
1989
1990        unsigned int peer_protocol =
1991            d->upstream_conn->peer_protocol ?
1992            d->upstream_conn->peer_protocol :
1993            d->ptd->behavior_pool.base.downstream_protocol;
1994
1995        if (IS_ASCII(peer_protocol)) {
1996            return cproxy_forward_a2a_downstream(d);
1997        } else {
1998            return cproxy_forward_a2b_downstream(d);
1999        }
2000    } else {
2001        /* BINARY upstream. */
2002
2003        if (IS_BINARY(d->ptd->behavior_pool.base.downstream_protocol)) {
2004            return cproxy_forward_b2b_downstream(d);
2005        } else {
2006            /* TODO: No binary upstream to ascii downstream support. */
2007
2008            cb_assert(0);
2009            return false;
2010        }
2011    }
2012}
2013
2014bool cproxy_forward_or_error(downstream *d) {
2015    if (cproxy_forward(d) == false) {
2016        d->ptd->stats.stats.tot_downstream_propagate_failed++;
2017        propagate_error_msg(d, NULL, d->upstream_status);
2018        cproxy_release_downstream(d, false);
2019
2020        return false;
2021    }
2022
2023    return true;
2024}
2025
2026void upstream_error_msg(conn *uc, char *ascii_msg,
2027                        protocol_binary_response_status binary_status) {
2028    proxy_td *ptd;
2029    cb_assert(uc);
2030    cb_assert(uc->state == conn_pause);
2031
2032    ptd = uc->extra;
2033    cb_assert(ptd != NULL);
2034
2035    if (IS_ASCII(uc->protocol)) {
2036        char *msg = ascii_msg;
2037        if (msg == NULL) {
2038            msg = "SERVER_ERROR proxy write to downstream\r\n";
2039        }
2040
2041        cb_mutex_enter(&ptd->proxy->proxy_lock);
2042        if (ptd->proxy->name != NULL &&
2043            strcmp(ptd->proxy->name, NULL_BUCKET) == 0) {
2044            msg = "SERVER_ERROR unauthorized, null bucket\r\n";
2045        }
2046        cb_mutex_exit(&ptd->proxy->proxy_lock);
2047
2048        /* Send an END on get/gets instead of generic SERVER_ERROR. */
2049
2050        if (uc->cmd == -1 &&
2051            uc->cmd_start != NULL &&
2052            strncmp(uc->cmd_start, "get", 3) == 0 &&
2053            (false == settings.enable_mcmux_mode) &&
2054            (0 != strncmp(ptd->behavior_pool.base.nodeLocator,
2055                          "vbucket",
2056                          sizeof(ptd->behavior_pool.base.nodeLocator) - 1))) {
2057            msg = "END\r\n";
2058        }
2059
2060        if (settings.verbose > 2) {
2061            moxi_log_write("%d: upstream_error: %s\n", uc->sfd, msg);
2062        }
2063
2064        if (add_iov(uc, msg, (int)strlen(msg)) == 0 &&
2065            update_event(uc, EV_WRITE | EV_PERSIST)) {
2066            conn_set_state(uc, conn_mwrite);
2067        } else {
2068            ptd->stats.stats.err_oom++;
2069            cproxy_close_conn(uc);
2070        }
2071    } else {
2072        cb_assert(IS_BINARY(uc->protocol));
2073
2074        if (binary_status == PROTOCOL_BINARY_RESPONSE_SUCCESS) {
2075            /* Default to our favorite catch-all binary protocol response. */
2076
2077            binary_status = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
2078        }
2079
2080        cb_mutex_enter(&ptd->proxy->proxy_lock);
2081        if (ptd->proxy->name != NULL &&
2082            strcmp(ptd->proxy->name, NULL_BUCKET) == 0) {
2083            binary_status = PROTOCOL_BINARY_RESPONSE_AUTH_ERROR;
2084        }
2085        cb_mutex_exit(&ptd->proxy->proxy_lock);
2086
2087        write_bin_error(uc, binary_status, 0);
2088
2089        update_event(uc, EV_WRITE | EV_PERSIST);
2090    }
2091}
2092
2093void cproxy_reset_upstream(conn *uc) {
2094    proxy_td *ptd;
2095    cb_assert(uc != NULL);
2096
2097    ptd = uc->extra;
2098    cb_assert(ptd != NULL);
2099
2100    conn_set_state(uc, conn_new_cmd);
2101
2102    if (uc->rbytes <= 0) {
2103        if (!update_event(uc, EV_READ | EV_PERSIST)) {
2104            ptd->stats.stats.err_oom++;
2105            cproxy_close_conn(uc);
2106        }
2107
2108        return; /* Return either way. */
2109    }
2110
2111    /* We may have already read incoming bytes into the uc's buffer, */
2112    /* so the issue is that libevent may never see (or expect) any */
2113    /* EV_READ events (and hence, won't fire event callbacks) for the */
2114    /* upstream connection.  This can leave the uc seemingly stuck, */
2115    /* never hitting drive_machine() loop. */
2116
2117    if (settings.verbose > 2) {
2118        moxi_log_write("%d: cproxy_reset_upstream with bytes available: %d\n",
2119                       uc->sfd, uc->rbytes);
2120    }
2121
2122    /* So, we copy the drive_machine()/conn_new_cmd handling to */
2123    /* schedule uc into drive_machine() execution, where the uc */
2124    /* conn is likely to be writable.  We need to do this */
2125    /* because we're currently on the drive_machine() execution */
2126    /* loop for the downstream connection, not for the uc. */
2127
2128    if (!update_event(uc, EV_WRITE | EV_PERSIST)) {
2129        if (settings.verbose > 0) {
2130            moxi_log_write("Couldn't update event\n");
2131        }
2132
2133        conn_set_state(uc, conn_closing);
2134    }
2135
2136    ptd->stats.stats.tot_reset_upstream_avail++;
2137}
2138
2139bool cproxy_dettach_if_noreply(downstream *d, conn *uc) {
2140    if (uc->noreply) {
2141        uc->noreply        = false;
2142        d->upstream_conn   = NULL;
2143        d->upstream_suffix = NULL;
2144        d->upstream_suffix_len = 0;
2145        d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
2146        d->upstream_retry  = 0;
2147        d->target_host_ident = NULL;
2148
2149        cproxy_reset_upstream(uc);
2150
2151        return true;
2152    }
2153
2154    return false;
2155}
2156
2157void cproxy_wait_any_downstream(proxy_td *ptd, conn *uc) {
2158    cb_assert(uc != NULL);
2159    cb_assert(uc->next == NULL);
2160    cb_assert(ptd != NULL);
2161    cb_assert(!ptd->waiting_any_downstream_tail ||
2162           !ptd->waiting_any_downstream_tail->next);
2163
2164    /* Add the upstream conn to the wait list. */
2165
2166    uc->next = NULL;
2167    if (ptd->waiting_any_downstream_tail != NULL) {
2168        ptd->waiting_any_downstream_tail->next = uc;
2169    }
2170    ptd->waiting_any_downstream_tail = uc;
2171    if (ptd->waiting_any_downstream_head == NULL) {
2172        ptd->waiting_any_downstream_head = uc;
2173    }
2174}
2175
2176void cproxy_release_downstream_conn(downstream *d, conn *c) {
2177    proxy_td *ptd;
2178    cb_assert(c != NULL);
2179    cb_assert(d != NULL);
2180
2181    ptd = d->ptd;
2182    cb_assert(ptd != NULL);
2183
2184    if (settings.verbose > 2) {
2185        moxi_log_write("%d: release_downstream_conn, downstream_used %d %d,"
2186                       " upstream %d\n",
2187                       c->sfd, d->downstream_used, d->downstream_used_start,
2188                       (d->upstream_conn != NULL ?
2189                        d->upstream_conn->sfd : 0));
2190    }
2191
2192    d->downstream_used--;
2193    if (d->downstream_used <= 0) {
2194        /* The downstream_used count might go < 0 when if there's */
2195        /* an early error and we decide to close the downstream */
2196        /* conn, before anything gets sent or before the */
2197        /* downstream_used was able to be incremented. */
2198
2199        cproxy_release_downstream(d, false);
2200        cproxy_assign_downstream(ptd);
2201    }
2202}
2203
2204void cproxy_on_pause_downstream_conn(conn *c) {
2205    downstream *d;
2206    cb_assert(c != NULL);
2207
2208    if (settings.verbose > 2) {
2209        moxi_log_write("<%d cproxy_on_pause_downstream_conn\n",
2210                c->sfd);
2211    }
2212
2213    d = c->extra;
2214
2215    if (!d || c->rbytes > 0) {
2216        zstored_downstream_conns *conns;
2217
2218        if (settings.verbose) {
2219            moxi_log_write("%d: Closed the downstream since got"
2220                    "an event on downstream or extra data on downstream\n",
2221                    c->sfd);
2222        }
2223
2224        conns = zstored_get_downstream_conns(c->thread, c->host_ident);
2225        if (conns) {
2226            bool found = false;
2227            conns->dc = conn_list_remove(conns->dc, NULL, c, &found);
2228            if (!found) {
2229                cb_assert(0);
2230                if (settings.verbose) {
2231                    moxi_log_write("<%d Not able to find in zstore conns\n",
2232                            c->sfd);
2233                }
2234            }
2235        } else {
2236            cb_assert(0);
2237            if (settings.verbose) {
2238                moxi_log_write("<%d Not able to find zstore conns\n",
2239                        c->sfd);
2240            }
2241        }
2242        cproxy_close_conn(c);
2243        return;
2244    }
2245
2246    cb_assert(d->ptd != NULL);
2247
2248    /* Must update_event() before releasing the downstream conn, */
2249    /* because the release might call udpate_event(), too, */
2250    /* and we don't want to override its work. */
2251
2252    if (update_event(c, EV_READ | EV_PERSIST)) {
2253        cproxy_release_downstream_conn(d, c);
2254    } else {
2255        d->ptd->stats.stats.err_oom++;
2256        cproxy_close_conn(c);
2257    }
2258}
2259
2260void cproxy_pause_upstream_for_downstream(proxy_td *ptd, conn *upstream) {
2261    cb_assert(ptd != NULL);
2262    cb_assert(upstream != NULL);
2263
2264    if (settings.verbose > 2) {
2265        moxi_log_write("%d: pause_upstream_for_downstream\n",
2266                upstream->sfd);
2267    }
2268
2269    conn_set_state(upstream, conn_pause);
2270
2271    cproxy_wait_any_downstream(ptd, upstream);
2272
2273    if (ptd->timeout_tv.tv_sec == 0 &&
2274        ptd->timeout_tv.tv_usec == 0) {
2275        cproxy_start_wait_queue_timeout(ptd, upstream);
2276    }
2277
2278    cproxy_assign_downstream(ptd);
2279}
2280
2281struct timeval cproxy_get_downstream_timeout(downstream *d, conn *c) {
2282
2283    struct timeval rv;
2284    proxy_td *ptd;
2285    cb_assert(d);
2286
2287    if (c != NULL) {
2288        int i;
2289        cb_assert(d->behaviors_num > 0);
2290        cb_assert(d->behaviors_arr != NULL);
2291        cb_assert(d->downstream_conns != NULL);
2292
2293        i = downstream_conn_index(d, c);
2294        if (i >= 0 && i < d->behaviors_num) {
2295            rv = d->behaviors_arr[i].downstream_timeout;
2296            if (rv.tv_sec != 0 || rv.tv_usec != 0) {
2297                return rv;
2298            }
2299        }
2300    }
2301
2302    ptd = d->ptd;
2303    cb_assert(ptd);
2304
2305    rv = ptd->behavior_pool.base.downstream_timeout;
2306
2307    return rv;
2308}
2309
2310bool cproxy_start_wait_queue_timeout(proxy_td *ptd, conn *uc) {
2311    cb_assert(ptd);
2312    cb_assert(uc);
2313    cb_assert(uc->thread);
2314    cb_assert(uc->thread->base);
2315
2316    ptd->timeout_tv = ptd->behavior_pool.base.wait_queue_timeout;
2317    if (ptd->timeout_tv.tv_sec != 0 ||
2318        ptd->timeout_tv.tv_usec != 0) {
2319        if (settings.verbose > 2) {
2320            moxi_log_write("wait_queue_timeout started\n");
2321        }
2322
2323        evtimer_set(&ptd->timeout_event, wait_queue_timeout, ptd);
2324
2325        event_base_set(uc->thread->base, &ptd->timeout_event);
2326
2327        return evtimer_add(&ptd->timeout_event, &ptd->timeout_tv) == 0;
2328    }
2329
2330    return true;
2331}
2332
2333static void wait_queue_timeout(evutil_socket_t fd,
2334                               const short which,
2335                               void *arg) {
2336    proxy_td *ptd = arg;
2337    cb_assert(ptd != NULL);
2338    (void)fd;
2339    (void)which;
2340
2341    if (settings.verbose > 2) {
2342        moxi_log_write("wait_queue_timeout\n");
2343    }
2344
2345    /* This timer callback is invoked when an upstream conn */
2346    /* has been in the wait queue for too long. */
2347
2348    if (ptd->timeout_tv.tv_sec != 0 || ptd->timeout_tv.tv_usec != 0) {
2349        struct timeval wqt;
2350        uint64_t wqt_msec;
2351        uint64_t cut_msec;
2352        conn *uc_curr;
2353
2354        evtimer_del(&ptd->timeout_event);
2355
2356        ptd->timeout_tv.tv_sec = 0;
2357        ptd->timeout_tv.tv_usec = 0;
2358
2359        if (settings.verbose > 2) {
2360            moxi_log_write("wait_queue_timeout cleared\n");
2361        }
2362
2363        wqt = ptd->behavior_pool.base.wait_queue_timeout;
2364        wqt_msec = (wqt.tv_sec * 1000) + (wqt.tv_usec / 1000);
2365        cut_msec = msec_current_time - wqt_msec;
2366
2367        /* Run through all the old upstream conn's in */
2368        /* the wait queue, remove them, and emit errors */
2369        /* on them.  And then start a new timer if needed. */
2370        uc_curr = ptd->waiting_any_downstream_head;
2371        while (uc_curr != NULL) {
2372            conn *uc = uc_curr;
2373
2374            uc_curr = uc_curr->next;
2375
2376            /* Check if upstream conn is old and should be removed. */
2377
2378            if (settings.verbose > 2) {
2379                moxi_log_write("wait_queue_timeout compare %u to %u cutoff\n",
2380                        uc->cmd_start_time, cut_msec);
2381            }
2382
2383            if (uc->cmd_start_time <= cut_msec) {
2384                if (settings.verbose > 1) {
2385                    moxi_log_write("proxy_td_timeout sending error %d\n",
2386                            uc->sfd);
2387                }
2388
2389                ptd->stats.stats.tot_wait_queue_timeout++;
2390
2391                ptd->waiting_any_downstream_head =
2392                    conn_list_remove(ptd->waiting_any_downstream_head,
2393                                     &ptd->waiting_any_downstream_tail,
2394                                     uc, NULL); /* TODO: O(N^2). */
2395
2396                upstream_error_msg(uc,
2397                                   "SERVER_ERROR proxy wait queue timeout",
2398                                   PROTOCOL_BINARY_RESPONSE_EBUSY);
2399            }
2400        }
2401
2402        if (ptd->waiting_any_downstream_head != NULL) {
2403            cproxy_start_wait_queue_timeout(ptd,
2404                                            ptd->waiting_any_downstream_head);
2405        }
2406    }
2407}
2408
2409rel_time_t cproxy_realtime(const time_t exptime) {
2410    /* Input is a long... */
2411
2412    /* 0       | (0...REALIME_MAXDELTA] | (REALTIME_MAXDELTA... */
2413    /* forever | delta                  | unix_time */
2414
2415    /* Storage is an unsigned int. */
2416
2417    /* TODO: Handle resolution loss. */
2418
2419    /* The cproxy version of realtime doesn't do any */
2420    /* time math munging, just pass through. */
2421
2422    return (rel_time_t) exptime;
2423}
2424
2425void cproxy_close_conn(conn *c) {
2426    cb_assert(c != NULL);
2427
2428    if (c == NULL_CONN) {
2429        return;
2430    }
2431
2432    conn_set_state(c, conn_closing);
2433
2434    update_event(c, 0);
2435
2436    /* Run through drive_machine just once, */
2437    /* to go through close code paths. */
2438
2439    drive_machine(c);
2440}
2441
2442bool add_conn_item(conn *c, item *it) {
2443    cb_assert(it != NULL);
2444    cb_assert(c != NULL);
2445    cb_assert(c->ilist != NULL);
2446    cb_assert(c->icurr != NULL);
2447    cb_assert(c->isize > 0);
2448
2449    if (c->ileft >= c->isize) {
2450        item **new_list =
2451            realloc(c->ilist, sizeof(item *) * c->isize * 2);
2452        if (new_list) {
2453            c->isize *= 2;
2454            c->ilist = new_list;
2455            c->icurr = new_list;
2456        }
2457    }
2458
2459    if (c->ileft < c->isize) {
2460        c->ilist[c->ileft] = it;
2461        c->ileft++;
2462
2463        return true;
2464    }
2465
2466    return false;
2467}
2468
2469char *add_conn_suffix(conn *c) {
2470    cb_assert(c != NULL);
2471    cb_assert(c->suffixlist != NULL);
2472    cb_assert(c->suffixcurr != NULL);
2473    cb_assert(c->suffixsize > 0);
2474
2475    if (c->suffixleft >= c->suffixsize) {
2476        char **new_suffix_list =
2477            realloc(c->suffixlist,
2478                    sizeof(char *) * c->suffixsize * 2);
2479        if (new_suffix_list) {
2480            c->suffixsize *= 2;
2481            c->suffixlist = new_suffix_list;
2482            c->suffixcurr = new_suffix_list;
2483        }
2484    }
2485
2486    if (c->suffixleft < c->suffixsize) {
2487        char *suffix = cache_alloc(c->thread->suffix_cache);
2488        if (suffix != NULL) {
2489            c->suffixlist[c->suffixleft] = suffix;
2490            c->suffixleft++;
2491
2492            return suffix;
2493        }
2494    }
2495
2496    return NULL;
2497}
2498
2499char *nread_text(short x) {
2500    char *rv = NULL;
2501    switch(x) {
2502    case NREAD_SET:
2503        rv = "set ";
2504        break;
2505    case NREAD_ADD:
2506        rv = "add ";
2507        break;
2508    case NREAD_REPLACE:
2509        rv = "replace ";
2510        break;
2511    case NREAD_APPEND:
2512        rv = "append ";
2513        break;
2514    case NREAD_PREPEND:
2515        rv = "prepend ";
2516        break;
2517    case NREAD_CAS:
2518        rv = "cas ";
2519        break;
2520    }
2521    return rv;
2522}
2523
2524/* Tokenize the command string by updating the token array
2525 * with pointers to start of each token and length.
2526 * Does not modify the input command string.
2527 *
2528 * Returns total number of tokens.  The last valid token is the terminal
2529 * token (value points to the first unprocessed character of the string and
2530 * length zero).
2531 *
2532 * Usage example:
2533 *
2534 *  while (scan_tokens(command, tokens, max_tokens, NULL) > 0) {
2535 *      for(int ix = 0; tokens[ix].length != 0; ix++) {
2536 *          ...
2537 *      }
2538 *      command = tokens[ix].value;
2539 *  }
2540 */
2541size_t scan_tokens(char *command, token_t *tokens,
2542                   const size_t max_tokens,
2543                   int *command_len) {
2544    char *s, *e;
2545    size_t ntokens = 0;
2546
2547    if (command_len != NULL) {
2548        *command_len = 0;
2549    }
2550
2551    cb_assert(command != NULL && tokens != NULL && max_tokens > 1);
2552
2553    for (s = e = command; ntokens < max_tokens - 1; ++e) {
2554        if (*e == '\0' || *e == ' ') {
2555            if (s != e) {
2556                tokens[ntokens].value = s;
2557                tokens[ntokens].length = e - s;
2558                ntokens++;
2559            }
2560            if (*e == '\0') {
2561                if (command_len != NULL) {
2562                    *command_len = (int)(e - command);
2563                }
2564                break; /* string end */
2565            }
2566            s = e + 1;
2567        }
2568    }
2569
2570    /* If we scanned the whole string, the terminal value pointer is null,
2571     * otherwise it is the first unprocessed character.
2572     */
2573    tokens[ntokens].value = (*e == '\0' ? NULL : e);
2574    tokens[ntokens].length = 0;
2575    ntokens++;
2576
2577    return ntokens;
2578}
2579
2580/* Remove conn c from a conn list.
2581 * Returns the new head of the list.
2582 */
2583conn *conn_list_remove(conn *head, conn **tail, conn *c, bool *found) {
2584    conn *prev = NULL;
2585    conn *curr = head;
2586
2587    if (found != NULL) {
2588        *found = false;
2589    }
2590
2591    while (curr != NULL) {
2592        if (curr == c) {
2593            conn *r;
2594            if (found != NULL) {
2595                *found = true;
2596            }
2597
2598            if (tail != NULL && *tail == curr) {
2599                *tail = prev;
2600            }
2601
2602            if (prev != NULL) {
2603                cb_assert(curr != head);
2604                prev->next = curr->next;
2605                curr->next = NULL;
2606                return head;
2607            }
2608
2609            cb_assert(curr == head);
2610            r = curr->next;
2611            curr->next = NULL;
2612            return r;
2613        }
2614
2615        prev = curr;
2616        curr = curr ->next;
2617    }
2618
2619    return head;
2620}
2621
2622/* Returns the new head of the list.
2623 */
2624downstream *downstream_list_remove(downstream *head, downstream *d) {
2625    downstream *prev = NULL;
2626    downstream *curr = head;
2627
2628    while (curr != NULL) {
2629        if (curr == d) {
2630            downstream *r;
2631            if (prev != NULL) {
2632                cb_assert(curr != head);
2633                prev->next = curr->next;
2634                curr->next = NULL;
2635                return head;
2636            }
2637
2638            cb_assert(curr == head);
2639            r = curr->next;
2640            curr->next = NULL;
2641            return r;
2642        }
2643
2644        prev = curr;
2645        curr = curr ->next;
2646    }
2647
2648    return head;
2649}
2650
2651/* Returns the new head of the list.
2652 */
2653downstream *downstream_list_waiting_remove(downstream *head,
2654                                           downstream **tail,
2655                                           downstream *d) {
2656    downstream *prev = NULL;
2657    downstream *curr = head;
2658
2659    while (curr != NULL) {
2660        if (curr == d) {
2661            downstream *r;
2662            if (tail != NULL && *tail == curr) {
2663                *tail = prev;
2664            }
2665
2666            if (prev != NULL) {
2667                cb_assert(curr != head);
2668                prev->next_waiting = curr->next_waiting;
2669                curr->next_waiting = NULL;
2670                return head;
2671            }
2672
2673            cb_assert(curr == head);
2674            r = curr->next_waiting;
2675            curr->next_waiting = NULL;
2676            return r;
2677        }
2678
2679        prev = curr;
2680        curr = curr ->next_waiting;
2681    }
2682
2683    return head;
2684}
2685
2686/* Returns true if a candidate request is squashable
2687 * or de-duplicatable with an existing request, to
2688 * save on network hops.
2689 */
2690bool is_compatible_request(conn *existing, conn *candidate) {
2691    (void)existing;
2692    (void)candidate;
2693
2694    /* The not-my-vbucket error handling requires us to not */
2695    /* squash ascii multi-GET requests, due to reusing the */
2696    /* multiget-deduplication machinery during retries and */
2697    /* to simplify the later codepaths. */
2698    /*
2699    cb_assert(existing);
2700    cb_assert(existing->state == conn_pause);
2701    cb_assert(IS_PROXY(existing->protocol));
2702
2703    if (IS_BINARY(existing->protocol)) {
2704        // TODO: Revisit multi-get squashing for binary another day.
2705
2706        return false;
2707    }
2708
2709    cb_assert(IS_ASCII(existing->protocol));
2710
2711    if (candidate != NULL) {
2712        cb_assert(IS_ASCII(candidate->protocol));
2713        cb_assert(IS_PROXY(candidate->protocol));
2714        cb_assert(candidate->state == conn_pause);
2715
2716        // TODO: Allow gets (CAS) for de-duplication.
2717
2718        if (existing->cmd == -1 &&
2719            candidate->cmd == -1 &&
2720            existing->cmd_retries <= 0 &&
2721            candidate->cmd_retries <= 0 &&
2722            !existing->noreply &&
2723            !candidate->noreply &&
2724            strncmp(existing->cmd_start, "get ", 4) == 0 &&
2725            strncmp(candidate->cmd_start, "get ", 4) == 0) {
2726            cb_assert(existing->item == NULL);
2727            cb_assert(candidate->item == NULL);
2728
2729            return true;
2730        }
2731    }
2732    */
2733
2734    return false;
2735}
2736
2737void downstream_timeout(evutil_socket_t fd,
2738                        const short which,
2739                        void *arg) {
2740    downstream *d = arg;
2741    proxy_td *ptd;
2742
2743    cb_assert(d != NULL);
2744
2745    ptd = d->ptd;
2746    cb_assert(ptd != NULL);
2747
2748    /* This timer callback is invoked when one or more of */
2749    /* the downstream conns must be really slow.  Handle by */
2750    /* closing downstream conns, which might help by */
2751    /* freeing up downstream resources. */
2752
2753    if (cproxy_clear_timeout(d)) {
2754        char *m;
2755        int n;
2756        int i;
2757        /* The downstream_timeout() callback is invoked for */
2758        /* two cases (downstream_conn_queue_timeouts and */
2759        /* downstream_timeouts), so cleanup and track stats */
2760        /* accordingly. */
2761        bool was_conn_queue_waiting =
2762            zstored_downstream_waiting_remove(d);
2763
2764        if (was_conn_queue_waiting == true) {
2765            if (settings.verbose > 2) {
2766                moxi_log_write("conn_queue_timeout\n");
2767            }
2768
2769            ptd->stats.stats.tot_downstream_conn_queue_timeout++;
2770        } else {
2771            if (settings.verbose > 2) {
2772                moxi_log_write("downstream_timeout\n");
2773            }
2774
2775            ptd->stats.stats.tot_downstream_timeout++;
2776        }
2777
2778        m = "SERVER_ERROR proxy downstream timeout\r\n";
2779
2780        if (d->target_host_ident != NULL) {
2781            m = add_conn_suffix(d->upstream_conn);
2782            if (m != NULL) {
2783                char *s;
2784                snprintf(m, SUFFIX_SIZE - 1,
2785                         "SERVER_ERROR proxy downstream timeout %s\r\n",
2786                         d->target_host_ident);
2787                m[SUFFIX_SIZE - 1] = '\0';
2788
2789                s = strchr(m, ':'); /* Clip to avoid sending user/pswd. */
2790                if (s != NULL) {
2791                    *s++ = '\r';
2792                    *s++ = '\n';
2793                    *s = '\0';
2794                }
2795            }
2796        }
2797
2798        propagate_error_msg(d, m, PROTOCOL_BINARY_RESPONSE_EBUSY);
2799        n = mcs_server_count(&d->mst);
2800
2801        for (i = 0; i < n; i++) {
2802            conn *dc = d->downstream_conns[i];
2803            if (dc != NULL &&
2804                dc != NULL_CONN) {
2805                /* We have to de-link early, because we don't want */
2806                /* to have cproxy_close_conn() release the downstream */
2807                /* while we're in the middle of this loop. */
2808
2809                delink_from_downstream_conns(dc);
2810
2811                cproxy_close_conn(dc);
2812            }
2813        }
2814
2815        cproxy_release_downstream(d, false);
2816        cproxy_assign_downstream(ptd);
2817    }
2818
2819    (void)fd;
2820    (void)which;
2821}
2822
2823bool cproxy_start_downstream_timeout(downstream *d, conn *c) {
2824    cb_assert(d != NULL);
2825    cb_assert(d->behaviors_num > 0);
2826    cb_assert(d->behaviors_arr != NULL);
2827
2828    return cproxy_start_downstream_timeout_ex(d, c,
2829                cproxy_get_downstream_timeout(d, c));
2830}
2831
2832bool cproxy_start_downstream_timeout_ex(downstream *d, conn *c,
2833                                        struct timeval dt) {
2834    conn *uc;
2835
2836    cb_assert(d != NULL);
2837    cb_assert(d->behaviors_num > 0);
2838    cb_assert(d->behaviors_arr != NULL);
2839
2840    cproxy_clear_timeout(d);
2841
2842    if (dt.tv_sec == 0 &&
2843        dt.tv_usec == 0) {
2844        return true;
2845    }
2846
2847    uc = d->upstream_conn;
2848
2849    cb_assert(uc != NULL);
2850    cb_assert(uc->state == conn_pause);
2851    cb_assert(uc->thread != NULL);
2852    cb_assert(uc->thread->base != NULL);
2853    cb_assert(IS_PROXY(uc->protocol));
2854
2855    if (settings.verbose > 2) {
2856        moxi_log_write("%d: cproxy_start_downstream_timeout\n",
2857                       (c != NULL ? c->sfd : -1));
2858    }
2859
2860    evtimer_set(&d->timeout_event, downstream_timeout, d);
2861
2862    event_base_set(uc->thread->base, &d->timeout_event);
2863
2864    d->timeout_tv.tv_sec  = dt.tv_sec;
2865    d->timeout_tv.tv_usec = dt.tv_usec;
2866
2867    return (evtimer_add(&d->timeout_event, &d->timeout_tv) == 0);
2868}
2869
2870/* Return 0 on success, -1 on general failure, 1 on timeout failure. */
2871
2872int cproxy_auth_downstream(mcs_server_st *server,
2873                           proxy_behavior *behavior,
2874                           SOCKET fd) {
2875    protocol_binary_request_header req;
2876    protocol_binary_response_header res;
2877    struct timeval *timeout = NULL;
2878    char buf[3000];
2879    const char *usr;
2880    const char *pwd;
2881    int usr_len;
2882    int pwd_len;
2883    int buf_len;
2884    mcs_return mr;
2885
2886    cb_assert(server);
2887    cb_assert(behavior);
2888    cb_assert(fd != INVALID_SOCKET);
2889
2890
2891    if (!IS_BINARY(behavior->downstream_protocol)) {
2892        return 0;
2893    }
2894
2895    usr = mcs_server_st_usr(server) != NULL ?
2896        mcs_server_st_usr(server) : behavior->usr;
2897    pwd = mcs_server_st_pwd(server) != NULL ?
2898        mcs_server_st_pwd(server) : behavior->pwd;
2899
2900    usr_len = (int)strlen(usr);
2901    pwd_len = (int)strlen(pwd);
2902
2903    if (usr_len <= 0) {
2904        return 0;
2905    }
2906
2907    if (settings.verbose > 2) {
2908        moxi_log_write("cproxy_auth_downstream usr: %s pwd: (%d)\n",
2909                       usr, pwd_len);
2910    }
2911
2912    if (usr_len <= 0 ||
2913        !IS_PROXY(behavior->downstream_protocol) ||
2914        (usr_len + pwd_len + 50 > (int) sizeof(buf))) {
2915        if (settings.verbose > 1) {
2916            moxi_log_write("auth failure args\n");
2917        }
2918
2919        return -1; /* Probably misconfigured. */
2920    }
2921
2922    /* The key should look like "PLAIN", or the sasl mech string. */
2923    /* The data should look like "\0usr\0pwd".  So, the body buf */
2924    /* should look like "PLAIN\0usr\0pwd". */
2925
2926    /* TODO: Allow binary passwords. */
2927
2928    buf_len = snprintf(buf, sizeof(buf), "PLAIN%c%s%c%s",
2929                       0, usr,
2930                       0, pwd);
2931    cb_assert(buf_len == 7 + usr_len + pwd_len);
2932
2933    memset(req.bytes, 0, sizeof(req.bytes));
2934    memset(res.bytes, 0, sizeof(res.bytes));
2935    req.request.magic    = PROTOCOL_BINARY_REQ;
2936    req.request.opcode   = PROTOCOL_BINARY_CMD_SASL_AUTH;
2937    req.request.keylen   = htons((uint16_t) 5); /* 5 == strlen("PLAIN"). */
2938    req.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
2939    req.request.bodylen  = htonl(buf_len);
2940
2941    if (mcs_io_write(fd, (const char *) req.bytes,
2942                     sizeof(req.bytes)) != sizeof(req.bytes) ||
2943        mcs_io_write(fd, buf, buf_len) == -1) {
2944        mcs_io_reset(fd);
2945
2946        if (settings.verbose > 1) {
2947            moxi_log_write("auth failure during write for %s (%d)\n",
2948                           usr, buf_len);
2949        }
2950
2951        return -1;
2952    }
2953
2954    if (behavior->auth_timeout.tv_sec != 0 ||
2955        behavior->auth_timeout.tv_usec != 0) {
2956        timeout = &behavior->auth_timeout;
2957    }
2958
2959    mr = mcs_io_read(fd, &res.bytes, sizeof(res.bytes), timeout);
2960    if (mr == MCS_SUCCESS && res.response.magic == PROTOCOL_BINARY_RES) {
2961        int len;
2962        res.response.status  = ntohs(res.response.status);
2963        res.response.keylen  = ntohs(res.response.keylen);
2964        res.response.bodylen = ntohl(res.response.bodylen);
2965
2966        /* Swallow whatever body comes. */
2967        len = res.response.bodylen;
2968        while (len > 0) {
2969            int amt = (len > (int) sizeof(buf) ? (int) sizeof(buf) : len);
2970
2971            mr = mcs_io_read(fd, buf, amt, timeout);
2972            if (mr != MCS_SUCCESS) {
2973                if (settings.verbose > 1) {
2974                    moxi_log_write("auth could not read response body (%d) %d\n",
2975                                   usr, amt, mr);
2976                }
2977
2978                if (mr == MCS_TIMEOUT) {
2979                    return 1;
2980                }
2981
2982                return -1;
2983            }
2984
2985            len -= amt;
2986        }
2987
2988        /* The res status should be either... */
2989        /* - SUCCESS         - sasl aware server and good credentials. */
2990        /* - AUTH_ERROR      - wrong credentials. */
2991        /* - UNKNOWN_COMMAND - sasl-unaware server. */
2992
2993        if (res.response.status == PROTOCOL_BINARY_RESPONSE_SUCCESS) {
2994            if (settings.verbose > 2) {
2995                moxi_log_write("auth_downstream success for %s\n", usr);
2996            }
2997
2998            return 0;
2999        }
3000
3001        if (settings.verbose > 1) {
3002            moxi_log_write("auth_downstream failure for %s (%x)\n",
3003                           usr, res.response.status);
3004        }
3005    } else {
3006        if (settings.verbose > 1) {
3007            moxi_log_write("auth_downstream response error for %s, %d\n",
3008                           usr, mr);
3009        }
3010    }
3011
3012    if (mr == MCS_TIMEOUT) {
3013        return 1;
3014    }
3015
3016    return -1;
3017}
3018
3019/* Return 0 on success, -1 on general failure, 1 on timeout failure. */
3020
3021int cproxy_bucket_downstream(mcs_server_st *server,
3022                             proxy_behavior *behavior,
3023                             SOCKET fd) {
3024    protocol_binary_request_header req;
3025    protocol_binary_response_header res;
3026    struct timeval *timeout = NULL;
3027    int bucket_len;
3028    mcs_return mr;
3029
3030    cb_assert(server);
3031    cb_assert(behavior);
3032    cb_assert(IS_PROXY(behavior->downstream_protocol));
3033    cb_assert(fd != INVALID_SOCKET);
3034
3035    if (!IS_BINARY(behavior->downstream_protocol)) {
3036        return 0;
3037    }
3038
3039    bucket_len = (int)strlen(behavior->bucket);
3040    if (bucket_len <= 0) {
3041        return 0; /* When no bucket. */
3042    }
3043
3044    memset(req.bytes, 0, sizeof(req.bytes));
3045    memset(res.bytes, 0, sizeof(res.bytes));
3046
3047    req.request.magic    = PROTOCOL_BINARY_REQ;
3048    req.request.opcode   = PROTOCOL_BINARY_CMD_BUCKET;
3049    req.request.keylen   = htons((uint16_t) bucket_len);
3050    req.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
3051    req.request.bodylen  = htonl(bucket_len);
3052
3053    if (mcs_io_write(fd, (const char *) req.bytes,
3054                     sizeof(req.bytes)) != sizeof(req.bytes) ||
3055        mcs_io_write(fd, behavior->bucket, bucket_len) == -1) {
3056        mcs_io_reset(fd);
3057
3058        if (settings.verbose > 1) {
3059            moxi_log_write("bucket failure during write (%d)\n",
3060                    bucket_len);
3061        }
3062
3063        return -1;
3064    }
3065
3066
3067    if (behavior->auth_timeout.tv_sec != 0 ||
3068        behavior->auth_timeout.tv_usec != 0) {
3069        timeout = &behavior->auth_timeout;
3070    }
3071
3072    mr = mcs_io_read(fd, &res.bytes, sizeof(res.bytes), timeout);
3073    if (mr == MCS_SUCCESS && res.response.magic == PROTOCOL_BINARY_RES) {
3074        char buf[300];
3075        int len;
3076
3077        res.response.status  = ntohs(res.response.status);
3078        res.response.keylen  = ntohs(res.response.keylen);
3079        res.response.bodylen = ntohl(res.response.bodylen);
3080
3081        /* Swallow whatever body comes. */
3082        len = res.response.bodylen;
3083        while (len > 0) {
3084            int amt = (len > (int) sizeof(buf) ? (int) sizeof(buf) : len);
3085
3086            mr = mcs_io_read(fd, buf, amt, timeout);
3087            if (mr != MCS_SUCCESS) {
3088                if (mr == MCS_TIMEOUT) {
3089                    return 1;
3090                }
3091
3092                return -1;
3093            }
3094
3095            len -= amt;
3096        }
3097
3098        /* The res status should be either... */
3099        /* - SUCCESS         - we got the bucket. */
3100        /* - AUTH_ERROR      - not allowed to use that bucket. */
3101        /* - UNKNOWN_COMMAND - bucket-unaware server. */
3102
3103        if (res.response.status == PROTOCOL_BINARY_RESPONSE_SUCCESS) {
3104            if (settings.verbose > 2) {
3105                moxi_log_write("bucket_downstream success, %s\n",
3106                        behavior->bucket);
3107            }
3108
3109            return 0;
3110        }
3111
3112        if (settings.verbose > 1) {
3113            moxi_log_write("bucket_downstream failure, %s (%x)\n",
3114                    behavior->bucket,
3115                    res.response.status);
3116        }
3117    }
3118
3119    if (mr == MCS_TIMEOUT) {
3120        return 1;
3121    }
3122
3123    return -1;
3124}
3125
3126int cproxy_max_retries(downstream *d) {
3127    return mcs_server_count(&d->mst) * 2;
3128}
3129
3130int downstream_conn_index(downstream *d, conn *c) {
3131    int nconns;
3132    int i;
3133
3134    cb_assert(d);
3135
3136    nconns = mcs_server_count(&d->mst);
3137    for (i = 0; i < nconns; i++) {
3138        if (d->downstream_conns[i] == c) {
3139            return i;
3140        }
3141    }
3142
3143    return -1;
3144}
3145
3146void cproxy_upstream_state_change(conn *c, enum conn_states next_state) {
3147    proxy_td *ptd;
3148
3149    cb_assert(c != NULL);
3150    ptd = c->extra;
3151    if (ptd != NULL) {
3152        if (c->state == conn_pause) {
3153            ptd->stats.stats.tot_upstream_unpaused++;
3154            c->cmd_unpaused = true;
3155        }
3156        if (next_state == conn_pause) {
3157            ptd->stats.stats.tot_upstream_paused++;
3158        }
3159
3160        if (next_state == conn_parse_cmd && c->cmd_arrive_time == 0) {
3161            c->cmd_unpaused = false;
3162            c->hit_local = false;
3163            c->cmd_arrive_time = usec_now();
3164        }
3165
3166        if (next_state == conn_closing || next_state == conn_new_cmd) {
3167            uint64_t arrive_time = c->cmd_arrive_time;
3168            if (c->cmd_unpaused && arrive_time != 0) {
3169                uint64_t latency = usec_now() - c->cmd_arrive_time;
3170
3171                if (c->hit_local) {
3172                    ptd->stats.stats.tot_local_cmd_time += latency;
3173                    ptd->stats.stats.tot_local_cmd_count++;
3174                }
3175
3176                ptd->stats.stats.tot_cmd_time += latency;
3177                ptd->stats.stats.tot_cmd_count++;
3178                c->cmd_arrive_time = 0;
3179            }
3180        }
3181    }
3182}
3183
3184/* ------------------------------------------------- */
3185
3186bool cproxy_on_connect_downstream_conn(conn *c) {
3187    int error;
3188    socklen_t errsz = sizeof(error);
3189    int k;
3190    downstream *d;
3191
3192    cb_assert(c != NULL);
3193    cb_assert(c->host_ident);
3194
3195    d = c->extra;
3196    cb_assert(d != NULL);
3197
3198    if (settings.verbose > 2) {
3199        moxi_log_write("%d: cproxy_on_connect_downstream_conn for %s\n",
3200                       c->sfd, c->host_ident);
3201    }
3202
3203    if (c->which == EV_TIMEOUT) {
3204        d->ptd->stats.stats.tot_downstream_connect_timeout++;
3205
3206        if (settings.verbose) {
3207            moxi_log_write("%d: connection timed out: %s",
3208                           c->sfd, c->host_ident);
3209        }
3210        goto cleanup;
3211    }
3212
3213    /* Check if the connection completed */
3214    if (getsockopt(c->sfd, SOL_SOCKET, SO_ERROR, (void *) &error,
3215                   &errsz) == -1) {
3216        if (settings.verbose) {
3217            moxi_log_write("%d: connect error: %s, %s",
3218                           c->sfd, c->host_ident, strerror(error));
3219        }
3220        goto cleanup;
3221    }
3222
3223    if (error) {
3224        if (settings.verbose) {
3225            moxi_log_write("%d: connect failed: %s, %s",
3226                           c->sfd, c->host_ident, strerror(error));
3227        }
3228        goto cleanup;
3229    }
3230
3231    k = downstream_conn_index(d, c);
3232    if (k >= 0) {
3233        if (downstream_connect_init(d, mcs_server_index(&d->mst, k),
3234                                    &d->behaviors_arr[k], c)) {
3235            /* We are connected to the server now */
3236            if (settings.verbose > 2) {
3237                moxi_log_write("%d: connected to: %s\n",
3238                               c->sfd, c->host_ident);
3239            }
3240
3241            conn_set_state(c, conn_pause);
3242            update_event(c, 0);
3243            cproxy_forward_or_error(d);
3244
3245            return true;
3246        }
3247    }
3248
3249cleanup:
3250    d->ptd->stats.stats.tot_downstream_connect_failed++;
3251
3252    k = delink_from_downstream_conns(c);
3253    if (k >= 0) {
3254        cb_assert(d->downstream_conns[k] == NULL);
3255
3256        d->downstream_conns[k] = NULL_CONN;
3257    }
3258
3259    conn_set_state(c, conn_closing);
3260    update_event(c, 0);
3261    cproxy_forward_or_error(d);
3262
3263    return false;
3264}
3265
3266void downstream_reserved_time_sample(proxy_stats_td *pstd, uint64_t duration) {
3267    if (pstd->downstream_reserved_time_htgram == NULL) {
3268        pstd->downstream_reserved_time_htgram =
3269            cproxy_create_timing_histogram();
3270    }
3271
3272    if (pstd->downstream_reserved_time_htgram != NULL) {
3273        htgram_incr(pstd->downstream_reserved_time_htgram, duration, 1);
3274    }
3275}
3276
3277void downstream_connect_time_sample(proxy_stats_td *pstd, uint64_t duration) {
3278    if (pstd->downstream_connect_time_htgram == NULL) {
3279        pstd->downstream_connect_time_htgram =
3280            cproxy_create_timing_histogram();
3281    }
3282
3283    if (pstd->downstream_connect_time_htgram != NULL) {
3284        htgram_incr(pstd->downstream_connect_time_htgram, duration, 1);
3285    }
3286}
3287
3288/* A histogram for tracking timings, such as for usec request timings. */
3289
3290HTGRAM_HANDLE cproxy_create_timing_histogram(void) {
3291    /* TODO: Make histogram bins more configurable one day. */
3292
3293    HTGRAM_HANDLE h1 = htgram_mk(2000, 100, 2.0, 20, NULL);
3294    HTGRAM_HANDLE h0 = htgram_mk(0, 100, 1.0, 20, h1);
3295
3296    return h0;
3297}
3298
3299zstored_downstream_conns *zstored_get_downstream_conns(LIBEVENT_THREAD *thread,
3300                                                       const char *host_ident) {
3301
3302    genhash_t *conn_hash;
3303    zstored_downstream_conns *conns;
3304
3305    cb_assert(thread);
3306    cb_assert(thread->base);
3307
3308    conn_hash = thread->conn_hash;
3309    cb_assert(conn_hash != NULL);
3310
3311    conns = genhash_find(conn_hash, host_ident);
3312    if (conns == NULL) {
3313        conns = calloc(1, sizeof(zstored_downstream_conns));
3314        if (conns != NULL) {
3315            conns->host_ident = strdup(host_ident);
3316            if (conns->host_ident != NULL) {
3317                genhash_store(conn_hash, conns->host_ident, conns);
3318            } else {
3319                free(conns);
3320                conns = NULL;
3321            }
3322        }
3323    }
3324
3325    return conns;
3326}
3327
3328void zstored_error_count(LIBEVENT_THREAD *thread,
3329                         const char *host_ident,
3330                         bool has_error) {
3331    zstored_downstream_conns *conns;
3332
3333    cb_assert(thread != NULL);
3334    cb_assert(host_ident != NULL);
3335
3336    conns = zstored_get_downstream_conns(thread, host_ident);
3337    if (conns != NULL) {
3338        if (has_error) {
3339            conns->error_count++;
3340            conns->error_time = msec_current_time;
3341        } else {
3342            conns->error_count = 0;
3343            conns->error_time = 0;
3344        }
3345
3346        if (settings.verbose > 2) {
3347            moxi_log_write("z_error, %s, %d, %d, %d, %d\n",
3348                           host_ident,
3349                           has_error,
3350                           conns->dc_acquired,
3351                           conns->error_count,
3352                           conns->error_time);
3353        }
3354
3355        if (has_error) {
3356            /* We reach here when a non-blocking connect() has failed */
3357            /* or when an acquired downstream conn had an error. */
3358            /* The downstream conn is just going to be closed */
3359            /* rather than be released back to the thread->conn_hash, */
3360            /* so update the dc_acquired here. */
3361
3362            if (conns->dc_acquired > 0) {
3363                conns->dc_acquired--;
3364            }
3365
3366            /* When zero downstream conns are available, wake up all */
3367            /* waiting downstreams so they can proceed (possibly by */
3368            /* just returning ERROR's to upstream clients). */
3369
3370            if (conns->dc_acquired <= 0 && conns->dc == NULL) {
3371                downstream *head = conns->downstream_waiting_head;
3372
3373                conns->downstream_waiting_head = NULL;
3374                conns->downstream_waiting_tail = NULL;
3375
3376                while (head != NULL) {
3377                    downstream *prev;
3378
3379                    head->ptd->stats.stats.tot_downstream_waiting_errors++;
3380                    head->ptd->stats.stats.tot_downstream_conn_queue_remove++;
3381
3382                    prev = head;
3383                    head = head->next_waiting;
3384                    prev->next_waiting = NULL;
3385
3386                    cproxy_forward_or_error(prev);
3387                }
3388            }
3389        }
3390    }
3391}
3392
3393conn *zstored_acquire_downstream_conn(downstream *d,
3394                                      LIBEVENT_THREAD *thread,
3395                                      mcs_server_st *msst,
3396                                      proxy_behavior *behavior,
3397                                      bool *downstream_conn_max_reached) {
3398    enum protocol downstream_protocol;
3399    char *host_ident;
3400    conn *dc;
3401    zstored_downstream_conns *conns;
3402
3403    cb_assert(d);
3404    cb_assert(d->ptd);
3405    cb_assert(d->ptd->downstream_released != d); /* Should not be in free list. */
3406    cb_assert(thread);
3407    cb_assert(msst);
3408    cb_assert(behavior);
3409    cb_assert(mcs_server_st_hostname(msst) != NULL);
3410    cb_assert(mcs_server_st_port(msst) > 0);
3411    cb_assert(mcs_server_st_fd(msst) == -1);
3412
3413    *downstream_conn_max_reached = false;
3414
3415    d->ptd->stats.stats.tot_downstream_conn_acquired++;
3416
3417    downstream_protocol =
3418        d->upstream_conn->peer_protocol ?
3419        d->upstream_conn->peer_protocol :
3420        behavior->downstream_protocol;
3421
3422    host_ident = mcs_server_st_ident(msst, IS_ASCII(downstream_protocol));
3423    conns = zstored_get_downstream_conns(thread, host_ident);
3424    if (conns != NULL) {
3425        dc = conns->dc;
3426        if (dc != NULL) {
3427            cb_assert(dc->thread == thread);
3428            cb_assert(strcmp(host_ident, dc->host_ident) == 0);
3429
3430            conns->dc_acquired++;
3431            conns->dc = dc->next;
3432            dc->next = NULL;
3433
3434            cb_assert(dc->extra == NULL);
3435            dc->extra = d;
3436
3437            return dc;
3438        }
3439
3440        if (behavior->connect_max_errors > 0 &&
3441            behavior->connect_max_errors < conns->error_count) {
3442            rel_time_t msecs_since_error =
3443                (rel_time_t)(msec_current_time - conns->error_time);
3444
3445            if (settings.verbose > 2) {
3446                moxi_log_write("zacquire_dc, %s, %d, %"PRIu64", (%d)\n",
3447                               host_ident,
3448                               conns->error_count,
3449                               (uint64_t)conns->error_time,
3450                               msecs_since_error);
3451            }
3452
3453            if ((behavior->cycle > 0) &&
3454                (behavior->connect_retry_interval > msecs_since_error)) {
3455                d->ptd->stats.stats.tot_downstream_connect_interval++;
3456
3457                return NULL;
3458            }
3459        }
3460
3461        if (behavior->downstream_conn_max > 0 &&
3462            behavior->downstream_conn_max <= conns->dc_acquired) {
3463            d->ptd->stats.stats.tot_downstream_connect_max_reached++;
3464
3465            *downstream_conn_max_reached = true;
3466
3467            return NULL;
3468        }
3469    }
3470
3471    dc = cproxy_connect_downstream_conn(d, thread, msst, behavior);
3472    if (dc != NULL) {
3473        cb_assert(dc->host_ident == NULL);
3474        dc->host_ident = strdup(host_ident);
3475        if (conns != NULL) {
3476            conns->dc_acquired++;
3477
3478            if (dc->state != conn_connecting) {
3479                conns->error_count = 0;
3480                conns->error_time = 0;
3481            }
3482        }
3483    } else {
3484        if (conns != NULL) {
3485            conns->error_count++;
3486            conns->error_time = msec_current_time;
3487        }
3488    }
3489
3490    return dc;
3491}
3492
3493/* new fn by jsh */
3494void zstored_release_downstream_conn(conn *dc, bool closing) {
3495    bool keep;
3496    zstored_downstream_conns *conns;
3497    downstream *d;
3498
3499    cb_assert(dc != NULL);
3500    if (dc == NULL_CONN) {
3501        return;
3502    }
3503
3504    d = dc->extra;
3505    cb_assert(d != NULL);
3506
3507    d->ptd->stats.stats.tot_downstream_conn_released++;
3508
3509    if (settings.verbose > 2) {
3510        moxi_log_write("%d: release_downstream_conn, %s, (%d)"
3511                       " upstream %d\n",
3512                       dc->sfd, state_text(dc->state), closing,
3513                       (d->upstream_conn != NULL ?
3514                        d->upstream_conn->sfd : -1));
3515    }
3516
3517    cb_assert(dc->next == NULL);
3518    cb_assert(dc->thread != NULL);
3519    cb_assert(dc->host_ident != NULL);
3520
3521    keep = dc->state == conn_pause;
3522    dc->extra = NULL;
3523    conns = zstored_get_downstream_conns(dc->thread, dc->host_ident);
3524    if (conns != NULL) {
3525        if (conns->dc_acquired > 0) {
3526            conns->dc_acquired--;
3527        }
3528
3529        if (keep) {
3530            downstream *d_head;
3531            cb_assert(dc->next == NULL);
3532            dc->next = conns->dc;
3533            conns->dc = dc;
3534
3535            /* Since one downstream conn was released, process a single */
3536            /* waiting downstream, if any. */
3537
3538            d_head = conns->downstream_waiting_head;
3539            if (d_head != NULL) {
3540                cb_assert(conns->downstream_waiting_tail != NULL);
3541
3542                conns->downstream_waiting_head =
3543                    conns->downstream_waiting_head->next_waiting;
3544                if (conns->downstream_waiting_head == NULL) {
3545                    conns->downstream_waiting_tail = NULL;
3546                }
3547                d_head->next_waiting = NULL;
3548
3549                d_head->ptd->stats.stats.tot_downstream_conn_queue_remove++;
3550
3551                cproxy_clear_timeout(d_head);
3552
3553                cproxy_forward_or_error(d_head);
3554            }
3555
3556            return;
3557        }
3558    }
3559
3560    cproxy_close_conn(dc);
3561}
3562
3563/* Returns true if the downstream was found on any */
3564/* conns->downstream_waiting_head/tail queues and was removed. */
3565
3566bool zstored_downstream_waiting_remove(downstream *d) {
3567    bool found = false;
3568    int i;
3569    int n;
3570    LIBEVENT_THREAD *thread = thread_by_index(thread_index(cb_thread_self()));
3571    cb_assert(thread != NULL);
3572
3573    n = mcs_server_count(&d->mst);
3574    for (i = 0; i < n; i++) {
3575        char *host_ident;
3576        zstored_downstream_conns *conns;
3577        mcs_server_st *msst = mcs_server_index(&d->mst, i);
3578
3579        enum protocol downstream_protocol =
3580            d->upstream_conn && d->upstream_conn->peer_protocol ?
3581            d->upstream_conn->peer_protocol :
3582            d->behaviors_arr[i].downstream_protocol;
3583
3584        cb_assert(IS_PROXY(downstream_protocol));
3585
3586        host_ident = mcs_server_st_ident(msst, IS_ASCII(downstream_protocol));
3587        conns = zstored_get_downstream_conns(thread, host_ident);
3588
3589        if (conns != NULL) {
3590            /* Linked-list removal, on the next_waiting pointer, */
3591            /* and keep head and tail pointers updated. */
3592
3593            downstream *prev = NULL;
3594            downstream *curr = conns->downstream_waiting_head;
3595
3596            while (curr != NULL) {
3597                if (curr == d) {
3598                    found = true;
3599
3600                    if (conns->downstream_waiting_head == curr) {
3601                        cb_assert(conns->downstream_waiting_tail != NULL);
3602                        conns->downstream_waiting_head = curr->next_waiting;
3603                    }
3604
3605                    if (conns->downstream_waiting_tail == curr) {
3606                        conns->downstream_waiting_tail = prev;
3607                    }
3608
3609                    if (prev != NULL) {
3610                        prev->next_waiting = curr->next_waiting;
3611                    }
3612
3613                    curr->next_waiting = NULL;
3614
3615                    d->ptd->stats.stats.tot_downstream_conn_queue_remove++;
3616
3617                    break;
3618                }
3619
3620                prev = curr;
3621                curr = curr->next_waiting;
3622            }
3623        }
3624    }
3625
3626    return found;
3627}
3628
3629bool zstored_downstream_waiting_add(downstream *d, LIBEVENT_THREAD *thread,
3630                                    mcs_server_st *msst,
3631                                    proxy_behavior *behavior) {
3632    enum protocol downstream_protocol;
3633    char *host_ident;
3634    zstored_downstream_conns *conns;
3635
3636    cb_assert(thread != NULL);
3637    cb_assert(d != NULL);
3638    cb_assert(d->upstream_conn != NULL);
3639    cb_assert(d->next_waiting == NULL);
3640
3641    downstream_protocol = d->upstream_conn->peer_protocol ?
3642        d->upstream_conn->peer_protocol :
3643        behavior->downstream_protocol;
3644
3645    host_ident = mcs_server_st_ident(msst, IS_ASCII(downstream_protocol));
3646    conns = zstored_get_downstream_conns(thread, host_ident);
3647    if (conns != NULL) {
3648        cb_assert(conns->dc == NULL);
3649
3650        if (conns->downstream_waiting_head == NULL) {
3651            cb_assert(conns->downstream_waiting_tail == NULL);
3652            conns->downstream_waiting_head = d;
3653        }
3654        if (conns->downstream_waiting_tail != NULL) {
3655            cb_assert(conns->downstream_waiting_tail->next_waiting == NULL);
3656            conns->downstream_waiting_tail->next_waiting = d;
3657        }
3658        conns->downstream_waiting_tail = d;
3659
3660        d->ptd->stats.stats.tot_downstream_conn_queue_add++;
3661
3662        return true;
3663    }
3664
3665    return false;
3666}
3667
3668/* Find an appropriate proxy struct or NULL. */
3669
3670proxy *cproxy_find_proxy_by_auth(proxy_main *m,
3671                                 const char *usr,
3672                                 const char *pwd) {
3673    proxy *found = NULL;
3674    proxy *p;
3675
3676    cb_mutex_enter(&m->proxy_main_lock);
3677
3678    for (p = m->proxy_head; p != NULL && found == NULL; p = p->next) {
3679        cb_mutex_enter(&p->proxy_lock);
3680        if (strcmp(p->behavior_pool.base.usr, usr) == 0 &&
3681            strcmp(p->behavior_pool.base.pwd, pwd) == 0) {
3682            found = p;
3683        }
3684        cb_mutex_exit(&p->proxy_lock);
3685    }
3686
3687    cb_mutex_exit(&m->proxy_main_lock);
3688
3689    return found;
3690}
3691
3692int cproxy_num_active_proxies(proxy_main *m) {
3693    int n = 0;
3694    proxy *p;
3695
3696    cb_mutex_enter(&m->proxy_main_lock);
3697
3698    for (p = m->proxy_head; p != NULL; p = p->next) {
3699        cb_mutex_enter(&p->proxy_lock);
3700        if (p->name != NULL &&
3701            p->config != NULL &&
3702            p->config[0] != '\0') {
3703            n++;
3704        }
3705        cb_mutex_exit(&p->proxy_lock);
3706    }
3707
3708    cb_mutex_exit(&m->proxy_main_lock);
3709
3710    return n;
3711}
3712
3713static
3714void diag_single_connection(FILE *out, conn *c) {
3715    fprintf(out, "%p (%d), ev: 0x%04x, state: 0x%x, substate: 0x%x - %s\n",
3716            (void *)c, c->sfd, c->ev_flags,
3717            c->state, c->substate,
3718            c->update_diag ? c->update_diag : "(none)");
3719}
3720
3721static
3722void diag_connections(FILE *out, conn *head, int indent) {
3723    static char *blank = "";
3724    for (; head != NULL; head = head->next) {
3725        fprintf(out, "%*s" "connection: ", indent, blank);
3726        diag_single_connection(out, head);
3727    }
3728}
3729
3730static
3731void diag_single_downstream(FILE *out, downstream *d, int indent) {
3732    static char *blank = "";
3733    int n;
3734    int i;
3735
3736    conn *upstream = d->upstream_conn;
3737    fprintf(out, "%*s" "upstream: ", indent, blank);
3738    if (upstream) {
3739        diag_single_connection(out, upstream);
3740    } else {
3741        fprintf(out, "none\n");
3742    }
3743
3744    fprintf(out, "%*s" "downstream_used: %d\n", indent, blank, d->downstream_used);
3745    fprintf(out, "%*s" "downstream_used_start: %d\n", indent, blank, d->downstream_used_start);
3746    fprintf(out, "%*s" "downstream_conns:\n", indent, blank);
3747
3748    n = mcs_server_count(&d->mst);
3749    for (i = 0; i < n; i++) {
3750        if (d->downstream_conns[i] == NULL)
3751            continue;
3752        fprintf(out, "%*s", indent+2, blank);
3753        diag_single_connection(out, d->downstream_conns[i]);
3754    }
3755}
3756
3757static
3758void diag_downstream_chain(FILE *out, downstream *head, int indent) {
3759    for (; head != NULL; head = head->next) {
3760        diag_single_downstream(out, head, indent);
3761    }
3762}
3763
3764/* this is supposed to be called from gdb when other threads are suspended */
3765void connections_diag(FILE *out);
3766
3767void connections_diag(FILE *out) {
3768    extern proxy_main *diag_last_proxy_main;
3769
3770    proxy *cur_proxy;
3771    proxy_main *m = diag_last_proxy_main;
3772    if (!m) {
3773        fputs("no proxy_main!\n", out);
3774        return;
3775    }
3776
3777    for (cur_proxy = m->proxy_head; cur_proxy != NULL ; cur_proxy = cur_proxy->next) {
3778        int ti;
3779        fprintf(out, "proxy: name='%s', port=%d, cfg=%s (%u)\n",
3780                cur_proxy->name ? cur_proxy->name : "(null)",
3781                cur_proxy->port,
3782                cur_proxy->config, cur_proxy->config_ver);
3783        for (ti = 0; ti < cur_proxy->thread_data_num; ti++) {
3784            proxy_td *td = cur_proxy->thread_data + ti;
3785            fprintf(out, "  thread:%d\n", ti);
3786            fprintf(out, "    waiting_any_downstream:\n");
3787            diag_connections(out, td->waiting_any_downstream_head, 6);
3788
3789            fprintf(out, "    downstream_reserved:\n");
3790            diag_downstream_chain(out, td->downstream_reserved, 6);
3791            fprintf(out, "    downstream_released:\n");
3792            diag_downstream_chain(out, td->downstream_released, 6);
3793        }
3794    }
3795}
3796
3797bool cproxy_front_cache_key(proxy_td *ptd, char *key, int key_len) {
3798    return (key != NULL &&
3799            key_len > 0 &&
3800            ptd->behavior_pool.base.front_cache_lifespan > 0 &&
3801            matcher_check(&ptd->proxy->front_cache_matcher, key, key_len, false) == true &&
3802            matcher_check(&ptd->proxy->front_cache_unmatcher, key, key_len, false) == false);
3803}
3804
3805void cproxy_front_cache_delete(proxy_td *ptd, char *key, int key_len) {
3806    if (cproxy_front_cache_key(ptd, key, key_len) == true) {
3807        mcache_delete(&ptd->proxy->front_cache, key, key_len);
3808
3809        if (settings.verbose > 1) {
3810            moxi_log_write("front_cache del %s\n", key);
3811        }
3812    }
3813}
3814