xref: /5.5.2/moxi/src/cproxy.c (revision c9de98e2)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3#include "src/config.h"
4#include <stdio.h>
5#include <stdlib.h>
6#include <string.h>
7#include <errno.h>
8#include <platform/cbassert.h>
9#include <fcntl.h>
10#include <math.h>
11#include "memcached.h"
12#include "cproxy.h"
13#include "work.h"
14#include "log.h"
15
16#ifndef MOXI_BLOCKING_CONNECT
17#define MOXI_BLOCKING_CONNECT false
18#endif
19
20/* Internal forward declarations. */
21
22downstream *downstream_list_remove(downstream *head, downstream *d);
23downstream *downstream_list_waiting_remove(downstream *head,
24                                           downstream **tail,
25                                           downstream *d);
26
27static void downstream_timeout(evutil_socket_t fd,
28                        const short which,
29                        void *arg);
30static void wait_queue_timeout(evutil_socket_t fd,
31                        const short which,
32                        void *arg);
33
34conn *conn_list_remove(conn *head, conn **tail,
35                       conn *c, bool *found);
36
37bool is_compatible_request(conn *existing, conn *candidate);
38
39void propagate_error_msg(downstream *d, char *ascii_msg,
40                         protocol_binary_response_status binary_status);
41
42void downstream_reserved_time_sample(proxy_stats_td *ptds, uint64_t duration);
43void downstream_connect_time_sample(proxy_stats_td *ptds, uint64_t duration);
44
45bool downstream_connect_init(downstream *d, mcs_server_st *msst,
46                             proxy_behavior *behavior, conn *c);
47
48int init_mcs_st(mcs_st *mst, char *config,
49                const char *default_usr,
50                const char *default_pwd,
51                const char *opts);
52
53bool cproxy_on_connect_downstream_conn(conn *c);
54
55conn *zstored_acquire_downstream_conn(downstream *d,
56                                      LIBEVENT_THREAD *thread,
57                                      mcs_server_st *msst,
58                                      proxy_behavior *behavior,
59                                      bool *downstream_conn_max_reached);
60
61void zstored_release_downstream_conn(conn *dc, bool closing);
62
63void zstored_error_count(LIBEVENT_THREAD *thread,
64                         const char *host_ident,
65                         bool has_error);
66
67bool zstored_downstream_waiting_add(downstream *d, LIBEVENT_THREAD *thread,
68                                    mcs_server_st *msst,
69                                    proxy_behavior *behavior);
70
71bool zstored_downstream_waiting_remove(downstream *d);
72
73typedef struct {
74    conn      *dc;          /* Linked-list of available downstream conns. */
75    uint32_t   dc_acquired; /* Count of acquired (in-use) downstream conns. */
76    char      *host_ident;
77    uint32_t   error_count;
78    uint64_t   error_time;
79
80    /* Head & tail of singly linked-list/queue, using */
81    /* downstream->next_waiting pointers, where we've reached */
82    /* downstream_conn_max, so there are waiting downstreams. */
83
84    downstream *downstream_waiting_head;
85    downstream *downstream_waiting_tail;
86} zstored_downstream_conns;
87
88zstored_downstream_conns *zstored_get_downstream_conns(LIBEVENT_THREAD *thread,
89                                                       const char *host_ident);
90
91bool cproxy_forward_or_error(downstream *d);
92
93int delink_from_downstream_conns(conn *c);
94
95int cproxy_num_active_proxies(proxy_main *m);
96
97/* Function tables. */
98
99conn_funcs cproxy_listen_funcs = {
100    .conn_init                   = cproxy_init_upstream_conn,
101    .conn_close                  = NULL,
102    .conn_connect                = NULL,
103    .conn_process_ascii_command  = NULL,
104    .conn_process_binary_command = NULL,
105    .conn_complete_nread_ascii   = NULL,
106    .conn_complete_nread_binary  = NULL,
107    .conn_pause                  = NULL,
108    .conn_realtime               = NULL,
109    .conn_state_change           = NULL,
110    .conn_binary_command_magic   = 0
111};
112
113conn_funcs cproxy_upstream_funcs = {
114    .conn_init                   = NULL,
115    .conn_close                  = cproxy_on_close_upstream_conn,
116    .conn_connect                = NULL,
117    .conn_process_ascii_command  = cproxy_process_upstream_ascii,
118    .conn_process_binary_command = cproxy_process_upstream_binary,
119    .conn_complete_nread_ascii   = cproxy_process_upstream_ascii_nread,
120    .conn_complete_nread_binary  = cproxy_process_upstream_binary_nread,
121    .conn_pause                  = NULL,
122    .conn_realtime               = cproxy_realtime,
123    .conn_state_change           = cproxy_upstream_state_change,
124    .conn_binary_command_magic   = PROTOCOL_BINARY_REQ
125};
126
127conn_funcs cproxy_downstream_funcs = {
128    .conn_init                   = cproxy_init_downstream_conn,
129    .conn_close                  = cproxy_on_close_downstream_conn,
130    .conn_connect                = cproxy_on_connect_downstream_conn,
131    .conn_process_ascii_command  = cproxy_process_downstream_ascii,
132    .conn_process_binary_command = cproxy_process_downstream_binary,
133    .conn_complete_nread_ascii   = cproxy_process_downstream_ascii_nread,
134    .conn_complete_nread_binary  = cproxy_process_downstream_binary_nread,
135    .conn_pause                  = cproxy_on_pause_downstream_conn,
136    .conn_realtime               = cproxy_realtime,
137    .conn_state_change           = NULL,
138    .conn_binary_command_magic   = PROTOCOL_BINARY_RES
139};
140
141/* Main function to create a proxy struct.
142 */
143proxy *cproxy_create(proxy_main *main,
144                     char    *name,
145                     int      port,
146                     char    *config,
147                     uint32_t config_ver,
148                     proxy_behavior_pool *behavior_pool,
149                     int nthreads) {
150    if (settings.verbose > 1) {
151        moxi_log_write("cproxy_create on port %d, name %s, config %s\n",
152                       port, name, config);
153    }
154
155    cb_assert(name != NULL);
156    cb_assert(port > 0 || settings.socketpath != NULL);
157    cb_assert(config != NULL);
158    cb_assert(behavior_pool);
159    cb_assert(nthreads > 1); /* Main thread + at least one worker. */
160    cb_assert(nthreads == settings.num_threads);
161
162    proxy *p = (proxy *) calloc(1, sizeof(proxy));
163    if (p != NULL) {
164        p->main       = main;
165        p->name       = trimstrdup(name);
166        p->port       = port;
167        p->config     = trimstrdup(config);
168        p->config_ver = config_ver;
169
170        p->behavior_pool.base = behavior_pool->base;
171        p->behavior_pool.num  = behavior_pool->num;
172        p->behavior_pool.arr  = cproxy_copy_behaviors(behavior_pool->num,
173                                                      behavior_pool->arr);
174
175        p->listening        = 0;
176        p->listening_failed = 0;
177
178        p->next = NULL;
179
180        cb_mutex_initialize(&p->proxy_lock);
181
182        mcache_init(&p->front_cache, true, &mcache_item_funcs, true);
183        matcher_init(&p->front_cache_matcher, true);
184        matcher_init(&p->front_cache_unmatcher, true);
185
186        matcher_init(&p->optimize_set_matcher, true);
187
188        if (behavior_pool->base.front_cache_max > 0 &&
189            behavior_pool->base.front_cache_lifespan > 0) {
190            mcache_start(&p->front_cache,
191                         behavior_pool->base.front_cache_max);
192
193            if (strlen(behavior_pool->base.front_cache_spec) > 0) {
194                matcher_start(&p->front_cache_matcher,
195                              behavior_pool->base.front_cache_spec);
196            }
197
198            if (strlen(behavior_pool->base.front_cache_unspec) > 0) {
199                matcher_start(&p->front_cache_unmatcher,
200                              behavior_pool->base.front_cache_unspec);
201            }
202        }
203
204        if (strlen(behavior_pool->base.optimize_set) > 0) {
205            matcher_start(&p->optimize_set_matcher,
206                          behavior_pool->base.optimize_set);
207        }
208
209        p->thread_data_num = nthreads;
210        p->thread_data = (proxy_td *) calloc(p->thread_data_num,
211                                             sizeof(proxy_td));
212        if (p->thread_data != NULL &&
213            p->name != NULL &&
214            p->config != NULL &&
215            p->behavior_pool.arr != NULL) {
216            /* We start at 1, because thread[0] is the main listen/accept */
217            /* thread, and not a true worker thread.  Too lazy to save */
218            /* the wasted thread[0] slot memory. */
219            int i;
220            for (i = 1; i < p->thread_data_num; i++) {
221                proxy_td *ptd = &p->thread_data[i];
222                ptd->proxy = p;
223
224                ptd->config     = strdup(p->config);
225                ptd->config_ver = p->config_ver;
226
227                ptd->behavior_pool.base = behavior_pool->base;
228                ptd->behavior_pool.num  = behavior_pool->num;
229                ptd->behavior_pool.arr =
230                    cproxy_copy_behaviors(behavior_pool->num,
231                                          behavior_pool->arr);
232
233                ptd->waiting_any_downstream_head = NULL;
234                ptd->waiting_any_downstream_tail = NULL;
235                ptd->downstream_reserved = NULL;
236                ptd->downstream_released = NULL;
237                ptd->downstream_tot = 0;
238                ptd->downstream_num = 0;
239                ptd->downstream_max = behavior_pool->base.downstream_max;
240                ptd->downstream_assigns = 0;
241                ptd->timeout_tv.tv_sec = 0;
242                ptd->timeout_tv.tv_usec = 0;
243                ptd->stats.stats.num_upstream = 0;
244                ptd->stats.stats.num_downstream_conn = 0;
245
246                cproxy_reset_stats_td(&ptd->stats);
247
248                mcache_init(&ptd->key_stats, true,
249                            &mcache_key_stats_funcs, false);
250                matcher_init(&ptd->key_stats_matcher, false);
251                matcher_init(&ptd->key_stats_unmatcher, false);
252
253                if (behavior_pool->base.key_stats_max > 0 &&
254                    behavior_pool->base.key_stats_lifespan > 0) {
255                    mcache_start(&ptd->key_stats,
256                                 behavior_pool->base.key_stats_max);
257
258                    if (strlen(behavior_pool->base.key_stats_spec) > 0) {
259                        matcher_start(&ptd->key_stats_matcher,
260                                      behavior_pool->base.key_stats_spec);
261                    }
262
263                    if (strlen(behavior_pool->base.key_stats_unspec) > 0) {
264                        matcher_start(&ptd->key_stats_unmatcher,
265                                      behavior_pool->base.key_stats_unspec);
266                    }
267                }
268            }
269
270            return p;
271        }
272
273        free(p->name);
274        free(p->config);
275        free(p->behavior_pool.arr);
276        free(p->thread_data);
277        free(p);
278    }
279
280    return NULL;
281}
282
283/* Must be called on the main listener thread.
284 */
285int cproxy_listen(proxy *p) {
286    cb_assert(p != NULL);
287    cb_assert(is_listen_thread());
288
289    if (settings.verbose > 1) {
290        moxi_log_write("cproxy_listen on port %d, downstream %s\n",
291                p->port, p->config);
292    }
293
294    /* Idempotent, remembers if it already created listening socket(s). */
295
296    if (p->listening == 0) {
297        int listening;
298        enum protocol listen_protocol = negotiating_proxy_prot;
299
300        if (IS_ASCII(settings.binding_protocol)) {
301            listen_protocol = proxy_upstream_ascii_prot;
302        }
303        if (IS_BINARY(settings.binding_protocol)) {
304            listen_protocol = proxy_upstream_binary_prot;
305        }
306
307        listening = cproxy_listen_port(p->port, listen_protocol,
308                                       tcp_transport,
309                                       p,
310                                       &cproxy_listen_funcs);
311        if (listening > 0) {
312            p->listening += listening;
313        } else {
314            p->listening_failed++;
315
316            if (settings.enable_mcmux_mode && settings.socketpath) {
317#ifdef HAVE_SYS_UN_H
318                moxi_log_write("error: could not access UNIX socket: %s\n",
319                               settings.socketpath);
320                if (ml->log_mode != ERRORLOG_STDERR) {
321                    fprintf(stderr, "error: could not access UNIX socket: %s\n",
322                            settings.socketpath);
323                }
324                exit(EXIT_FAILURE);
325#endif
326            }
327
328            moxi_log_write("ERROR: could not listen on port %d. "
329                           "Please use -Z port_listen=PORT_NUM "
330                           "to specify a different port number.\n", p->port);
331            if (ml->log_mode != ERRORLOG_STDERR) {
332                fprintf(stderr, "ERROR: could not listen on port %d. "
333                        "Please use -Z port_listen=PORT_NUM "
334                        "to specify a different port number.\n", p->port);
335            }
336            exit(EXIT_FAILURE);
337        }
338    }
339
340    return (int)p->listening;
341}
342
343int cproxy_listen_port(int port,
344                       enum protocol protocol,
345                       enum network_transport transport,
346                       void       *conn_extra,
347                       conn_funcs *funcs) {
348
349    int   listening;
350    conn *listen_conn_orig;
351    conn *x;
352
353    cb_assert(port > 0 || settings.socketpath != NULL);
354    cb_assert(conn_extra);
355    cb_assert(funcs);
356    cb_assert(is_listen_thread());
357
358    listening = 0;
359    listen_conn_orig = listen_conn;
360
361    x = listen_conn_orig;
362    while (x != NULL) {
363        if (x->extra != NULL && x->funcs == funcs) {
364            struct sockaddr_in s_in;
365            socklen_t sin_len = sizeof(s_in);
366            memset(&s_in, 0, sizeof(s_in));
367
368            if (getsockname(x->sfd, (struct sockaddr *) &s_in, &sin_len) == 0) {
369                int x_port = ntohs(s_in.sin_port);
370                if (x_port == port) {
371                    if (settings.verbose > 1) {
372                        moxi_log_write(
373                                "<%d cproxy listening reusing listener on port %d\n",
374                                x->sfd, port);
375                    }
376
377                    listening++;
378                }
379            }
380        }
381
382        x = x->next;
383    }
384
385    if (listening > 0) {
386        /* If we're already listening on the required port, then */
387        /* we don't need to start a new server_socket().  This happens */
388        /* in the multi-bucket case with binary protocol buckets. */
389        /* There will be multiple proxy struct's (one per bucket), but */
390        /* only one proxy struct will actually be pointed at by a */
391        /* listening conn->extra (usually 11211). */
392
393        /* TODO: Add a refcount to handle shutdown properly? */
394
395        return listening;
396    }
397#ifdef HAVE_SYS_UN_H
398    if (settings.socketpath ?
399        (server_socket_unix(settings.socketpath, settings.access) == 0) :
400        (server_socket(port, transport, NULL) == 0)) {
401#else
402    if (server_socket(port, transport, NULL) == 0) {
403#endif
404        conn *c;
405        cb_assert(listen_conn != NULL);
406
407        /* The listen_conn global list is changed by server_socket(), */
408        /* which adds a new listening conn on port for each bindable */
409        /* host address. */
410
411        /* For example, after the call to server_socket(), there */
412        /* might be two new listening conn's -- one for localhost, */
413        /* another for 127.0.0.1. */
414
415        c = listen_conn;
416        while (c != NULL &&
417               c != listen_conn_orig) {
418            if (settings.verbose > 1) {
419                moxi_log_write(
420                        "<%d cproxy listening on port %d\n",
421                        c->sfd, port);
422            }
423
424            listening++;
425
426            /* TODO: Listening conn's never seem to close, */
427            /*       but need to handle cleanup if they do, */
428            /*       such as if we handle graceful shutdown one day. */
429
430            c->extra = conn_extra;
431            c->funcs = funcs;
432            c->protocol = protocol;
433            c = c->next;
434        }
435    }
436
437    return listening;
438}
439
440/* Finds the proxy_td associated with a worker thread.
441 */
442proxy_td *cproxy_find_thread_data(proxy *p, cb_thread_t thread_id) {
443    if (p != NULL) {
444        int i = thread_index(thread_id);
445
446        /* 0 is the main listen thread, not a worker thread. */
447        cb_assert(i > 0);
448        cb_assert(i < p->thread_data_num);
449
450        if (i > 0 && i < p->thread_data_num) {
451            return &p->thread_data[i];
452        }
453    }
454
455    return NULL;
456}
457
458bool cproxy_init_upstream_conn(conn *c) {
459    char *default_name;
460    proxy *p;
461    int n;
462    proxy_td *ptd;
463
464    cb_assert(c != NULL);
465
466    /* We're called once per client/upstream conn early in its */
467    /* lifecycle, on the worker thread, so it's a good place */
468    /* to record the proxy_td into the conn->extra. */
469
470    cb_assert(!is_listen_thread());
471
472    p = c->extra;
473    cb_assert(p != NULL);
474    cb_assert(p->main != NULL);
475
476    n = cproxy_num_active_proxies(p->main);
477    if (n <= 0) {
478        if (settings.verbose > 2) {
479            moxi_log_write("<%d disallowing upstream conn due to no buckets\n",
480                           c->sfd);
481        }
482
483        return false;
484    }
485
486    ptd = cproxy_find_thread_data(p, cb_thread_self());
487    cb_assert(ptd != NULL);
488
489    /* Reassign the client/upstream conn to a different bucket */
490    /* if the default_bucket_name isn't the special FIRST_BUCKET */
491    /* value. */
492
493    default_name = ptd->behavior_pool.base.default_bucket_name;
494    if (strcmp(default_name, FIRST_BUCKET) != 0) {
495        proxy *default_proxy;
496        if (settings.verbose > 2) {
497            moxi_log_write("<%d assigning to default bucket: %s\n",
498                           c->sfd, default_name);
499        }
500
501        default_proxy =
502            cproxy_find_proxy_by_auth(p->main, default_name, "");
503
504        /* If the ostensible default bucket is missing (possibly deleted), */
505        /* assign the client/upstream conn to the NULL BUCKET. */
506
507        if (default_proxy == NULL) {
508            default_proxy =
509                cproxy_find_proxy_by_auth(p->main, NULL_BUCKET, "");
510
511            if (settings.verbose > 2) {
512                moxi_log_write("<%d assigning to null bucket, "
513                               "default bucket missing: %s\n",
514                               c->sfd, default_name);
515            }
516        }
517
518        if (default_proxy != NULL) {
519            proxy_td *default_ptd =
520                cproxy_find_thread_data(default_proxy, cb_thread_self());
521            if (default_ptd != NULL) {
522                ptd = default_ptd;
523            }
524        } else {
525            if (settings.verbose > 2) {
526                moxi_log_write("<%d assigning to first bucket, "
527                               "missing default/null bucket: %s\n",
528                               c->sfd, default_name);
529            }
530        }
531    } else {
532        if (settings.verbose > 2) {
533            moxi_log_write("<%d assigning to first bucket\n", c->sfd);
534        }
535    }
536
537    ptd->stats.stats.num_upstream++;
538    ptd->stats.stats.tot_upstream++;
539
540    c->extra = ptd;
541    c->funcs = &cproxy_upstream_funcs;
542
543    return true;
544}
545
546bool cproxy_init_downstream_conn(conn *c) {
547    downstream *d = c->extra;
548    cb_assert(d != NULL);
549
550    d->ptd->stats.stats.num_downstream_conn++;
551    d->ptd->stats.stats.tot_downstream_conn++;
552
553    return true;
554}
555
556void cproxy_on_close_upstream_conn(conn *c) {
557    downstream *d;
558    proxy_td *ptd;
559
560    cb_assert(c != NULL);
561
562    if (settings.verbose > 2) {
563        moxi_log_write("<%d cproxy_on_close_upstream_conn\n", c->sfd);
564    }
565
566    ptd = c->extra;
567    if (c->extra == NULL) {
568        moxi_log_write("<%d cproxy_on_close_upstream_conn already closed\n",
569                       c->sfd);
570        return;
571    }
572    c->extra = NULL;
573
574    if (ptd->stats.stats.num_upstream > 0) {
575        ptd->stats.stats.num_upstream--;
576    }
577
578    /* Delink from any reserved downstream. */
579
580    for (d = ptd->downstream_reserved; d != NULL; d = d->next) {
581        bool found = false;
582
583        d->upstream_conn = conn_list_remove(d->upstream_conn, NULL,
584                                            c, &found);
585        if (d->upstream_conn == NULL) {
586            d->upstream_suffix = NULL;
587            d->upstream_suffix_len = 0;
588            d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
589            d->upstream_retry = 0;
590            d->target_host_ident = NULL;
591
592            /* Don't need to do anything else, as we'll now just */
593            /* read and drop any remaining inflight downstream replies. */
594            /* Eventually, the downstream will be released. */
595        }
596
597        /* If the downstream was reserved for this upstream conn, */
598        /* also clear the upstream from any multiget de-duplication */
599        /* tracking structures. */
600
601        if (found) {
602            int n, i;
603            if (d->multiget != NULL) {
604                genhash_iter(d->multiget, multiget_remove_upstream, c);
605            }
606
607            /* The downstream conn's might have iov's that */
608            /* point to the upstream conn's buffers.  Also, the */
609            /* downstream conn might be in all sorts of states */
610            /* (conn_read, write, mwrite, pause), and we want */
611            /* to be careful about the downstream channel being */
612            /* half written. */
613
614            /* The safest, but inefficient, thing to do then is */
615            /* to close any conn_mwrite downstream conns. */
616
617            ptd->stats.stats.tot_downstream_close_on_upstream_close++;
618
619            n = mcs_server_count(&d->mst);
620
621            for (i = 0; i < n; i++) {
622                conn *downstream_conn = d->downstream_conns[i];
623                if (downstream_conn != NULL &&
624                    downstream_conn != NULL_CONN &&
625                    downstream_conn->state == conn_mwrite) {
626                    downstream_conn->msgcurr = 0;
627                    downstream_conn->msgused = 0;
628                    downstream_conn->iovused = 0;
629
630                    cproxy_close_conn(downstream_conn);
631                }
632            }
633        }
634    }
635
636    /* Delink from wait queue. */
637
638    ptd->waiting_any_downstream_head =
639        conn_list_remove(ptd->waiting_any_downstream_head,
640                         &ptd->waiting_any_downstream_tail,
641                         c, NULL);
642}
643
644int delink_from_downstream_conns(conn *c) {
645    int n, k, i;
646    downstream *d = c->extra;
647    if (d == NULL) {
648        if (settings.verbose > 2) {
649            moxi_log_write("%d: delink_from_downstream_conn no-op\n",
650                           c->sfd);
651        }
652
653        return -1;
654    }
655
656    n = mcs_server_count(&d->mst);
657    k = -1; /* Index of conn. */
658
659    for (i = 0; i < n; i++) {
660        if (d->downstream_conns[i] == c) {
661            d->downstream_conns[i] = NULL;
662
663            if (settings.verbose > 2) {
664                moxi_log_write(
665                        "<%d cproxy_on_close_downstream_conn quit_server\n",
666                        c->sfd);
667            }
668
669            d->ptd->stats.stats.tot_downstream_quit_server++;
670
671            mcs_server_st_quit(mcs_server_index(&d->mst, i), 1);
672            cb_assert(mcs_server_st_fd(mcs_server_index(&d->mst, i)) == -1);
673
674            k = i;
675        }
676    }
677
678    return k;
679}
680
681void cproxy_on_close_downstream_conn(conn *c) {
682    conn *uc_retry = NULL;
683    downstream *d;
684    int k;
685    proxy_td *ptd;
686
687    cb_assert(c != NULL);
688    cb_assert(c->sfd >= 0);
689    cb_assert(c->state == conn_closing);
690
691    if (settings.verbose > 2) {
692        moxi_log_write("<%d cproxy_on_close_downstream_conn\n", c->sfd);
693    }
694
695    d = c->extra;
696
697    /* Might have been set to NULL during cproxy_free_downstream(). */
698    /* Or, when a downstream conn is in the thread-based free pool, it */
699    /* is not associated with any particular downstream. */
700
701    if (d == NULL) {
702        /* TODO: See if we need to remove the downstream conn from the */
703        /* thread-based free pool.  This shouldn't happen, but we */
704        /* should then figure out how to put an cb_assert() here. */
705
706        return;
707    }
708
709    k = delink_from_downstream_conns(c);
710
711    c->extra = NULL;
712
713    if (c->thread != NULL &&
714        c->host_ident != NULL) {
715        zstored_error_count(c->thread, c->host_ident, true);
716    }
717
718    ptd = d->ptd;
719    cb_assert(ptd);
720
721    if (ptd->stats.stats.num_downstream_conn > 0) {
722        ptd->stats.stats.num_downstream_conn--;
723    }
724
725    if (k < 0) {
726        /* If this downstream conn wasn't linked into the */
727        /* downstream, it was delinked already during connect error */
728        /* handling (where its slot was set to NULL_CONN already), */
729        /* or during downstream_timeout/conn_queue_timeout. */
730
731        if (settings.verbose > 2) {
732            moxi_log_write("%d: skipping release dc in on_close_dc\n",
733                           c->sfd);
734        }
735
736        return;
737    }
738
739    if (d->upstream_conn != NULL &&
740        d->downstream_used == 1) {
741        /* TODO: Revisit downstream close error handling. */
742        /*       Should we propagate error when... */
743        /*       - any downstream conn closes? */
744        /*       - all downstream conns closes? */
745        /*       - last downstream conn closes?  Current behavior. */
746
747        if (d->upstream_suffix == NULL) {
748            if (settings.verbose > 2) {
749                moxi_log_write("<%d proxy downstream closed, upstream %d (%x)\n",
750                               c->sfd,
751                               d->upstream_conn->sfd,
752                               d->upstream_conn->protocol);
753            }
754
755            if (IS_ASCII(d->upstream_conn->protocol)) {
756                d->upstream_suffix = "SERVER_ERROR proxy downstream closed\r\n";
757
758                if (c->host_ident != NULL) {
759                    char *s = add_conn_suffix(d->upstream_conn);
760                    if (s != NULL) {
761                        snprintf(s, SUFFIX_SIZE - 1,
762                                 "SERVER_ERROR proxy downstream closed %s\r\n",
763                                 c->host_ident);
764                        s[SUFFIX_SIZE - 1] = '\0';
765                        d->upstream_suffix = s;
766
767                        s = strchr(s, ':'); /* Clip to avoid sending user/pswd. */
768                        if (s != NULL) {
769                            *s++ = '\r';
770                            *s++ = '\n';
771                            *s = '\0';
772                        }
773                    }
774                }
775
776                d->upstream_suffix_len = 0;
777            } else {
778                d->upstream_status = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
779            }
780
781            d->upstream_retry = 0;
782            d->target_host_ident = NULL;
783        }
784
785        /* We sometimes see that drive_machine/transmit will not see */
786        /* a closed connection error during conn_mwrite, possibly */
787        /* due to non-blocking sockets.  Because of this, drive_machine */
788        /* thinks it has a successful downstream request send and */
789        /* moves the state forward trying to read a response from */
790        /* the downstream conn (conn_new_cmd, conn_read, etc), and */
791        /* only then do we finally see the conn close situation, */
792        /* ending up here.  That is, drive_machine only */
793        /* seems to move to conn_closing from conn_read. */
794
795        /* If we haven't received any reply yet, we retry based */
796        /* on our cmd_retries counter. */
797
798        /* TODO: Reconsider retry behavior, is it right in all situations? */
799
800        if (c->rcurr != NULL &&
801            c->rbytes == 0 &&
802            d->downstream_used_start == d->downstream_used &&
803            d->downstream_used_start == 1 &&
804            d->upstream_conn->next == NULL &&
805            d->behaviors_arr != NULL) {
806            if (k >= 0 && k < d->behaviors_num) {
807                int retry_max = d->behaviors_arr[k].downstream_retry;
808                if (d->upstream_conn->cmd_retries < retry_max) {
809                    d->upstream_conn->cmd_retries++;
810                    uc_retry = d->upstream_conn;
811                    d->upstream_suffix = NULL;
812                    d->upstream_suffix_len = 0;
813                    d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
814                    d->upstream_retry = 0;
815                    d->target_host_ident = NULL;
816                }
817            }
818        }
819
820        if (uc_retry == NULL &&
821            d->upstream_suffix == NULL &&
822            IS_BINARY(d->upstream_conn->protocol)) {
823            protocol_binary_response_header *rh =
824                cproxy_make_bin_error(d->upstream_conn,
825                                      PROTOCOL_BINARY_RESPONSE_EINTERNAL);
826            if (rh != NULL) {
827                d->upstream_suffix = (char *) rh;
828                d->upstream_suffix_len = sizeof(protocol_binary_response_header);
829            } else {
830                d->ptd->stats.stats.err_oom++;
831                cproxy_close_conn(d->upstream_conn);
832            }
833        }
834    }
835
836    /* Are we over-decrementing here, and in handling conn_pause? */
837
838    /* Case 1: we're in conn_pause, and socket is closed concurrently. */
839    /* We unpause due to reserve, we move to conn_write/conn_mwrite, */
840    /* fail and move to conn_closing.  So, no over-decrement. */
841
842    /* Case 2: we're waiting for a downstream response in conn_new_cmd, */
843    /* and socket is closed concurrently.  State goes to conn_closing, */
844    /* so, no over-decrement. */
845
846    /* Case 3: we've finished processing downstream response (in */
847    /* conn_parse_cmd or conn_nread), and the downstream socket */
848    /* is closed concurrently.  We then move to conn_pause, */
849    /* and same as Case 1. */
850
851    cproxy_release_downstream_conn(d, c);
852
853    /* Setup a retry after unwinding the call stack. */
854    /* We use the work_queue, because our caller, conn_close(), */
855    /* is likely to blow away our fd if we try to reconnect */
856    /* right now. */
857
858    if (uc_retry != NULL) {
859        if (settings.verbose > 2) {
860            moxi_log_write("%d cproxy retrying\n", uc_retry->sfd);
861        }
862
863        ptd->stats.stats.tot_retry++;
864
865        cb_assert(uc_retry->thread);
866        cb_assert(uc_retry->thread->work_queue);
867
868        work_send(uc_retry->thread->work_queue,
869                  upstream_retry, ptd, uc_retry);
870    }
871}
872
873void upstream_retry(void *data0, void *data1) {
874    proxy_td *ptd = data0;
875    conn *uc = data1;
876
877    cb_assert(ptd);
878    cb_assert(uc);
879
880    cproxy_pause_upstream_for_downstream(ptd, uc);
881}
882
883void cproxy_add_downstream(proxy_td *ptd) {
884    cb_assert(ptd != NULL);
885    cb_assert(ptd->proxy != NULL);
886
887    if (ptd->downstream_max == 0 ||
888        ptd->downstream_num < ptd->downstream_max) {
889        if (settings.verbose > 2) {
890            moxi_log_write("cproxy_add_downstream %d %d\n",
891                    ptd->downstream_num,
892                    ptd->downstream_max);
893        }
894
895        /* The config/behaviors will be NULL if the */
896        /* proxy is shutting down. */
897
898        if (ptd->config != NULL &&
899            ptd->behavior_pool.arr != NULL) {
900            downstream *d =
901                cproxy_create_downstream(ptd->config,
902                                         ptd->config_ver,
903                                         &ptd->behavior_pool);
904            if (d != NULL) {
905                d->ptd = ptd;
906                ptd->downstream_tot++;
907                ptd->downstream_num++;
908                cproxy_release_downstream(d, true);
909            } else {
910                ptd->stats.stats.tot_downstream_create_failed++;
911            }
912        }
913    } else {
914        ptd->stats.stats.tot_downstream_max_reached++;
915    }
916}
917
918downstream *cproxy_reserve_downstream(proxy_td *ptd) {
919    cb_assert(ptd != NULL);
920
921    /* Loop in case we need to clear out downstreams */
922    /* that have outdated configs. */
923
924    while (true) {
925        downstream *d;
926
927        d = ptd->downstream_released;
928        if (d == NULL) {
929            cproxy_add_downstream(ptd);
930        }
931
932        d = ptd->downstream_released;
933        if (d == NULL) {
934            return NULL;
935        }
936
937        ptd->downstream_released = d->next;
938
939        cb_assert(d->upstream_conn == NULL);
940        cb_assert(d->upstream_suffix == NULL);
941        cb_assert(d->upstream_suffix_len == 0);
942        cb_assert(d->upstream_status == PROTOCOL_BINARY_RESPONSE_SUCCESS);
943        cb_assert(d->upstream_retry == 0);
944        cb_assert(d->target_host_ident == NULL);
945        cb_assert(d->downstream_used == 0);
946        cb_assert(d->downstream_used_start == 0);
947        cb_assert(d->merger == NULL);
948        cb_assert(d->timeout_tv.tv_sec == 0);
949        cb_assert(d->timeout_tv.tv_usec == 0);
950        cb_assert(d->next_waiting == NULL);
951
952        d->upstream_conn = NULL;
953        d->upstream_suffix = NULL;
954        d->upstream_suffix_len = 0;
955        d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
956        d->upstream_retry = 0;
957        d->upstream_retries = 0;
958        d->target_host_ident = NULL;
959        d->usec_start = 0;
960        d->downstream_used = 0;
961        d->downstream_used_start = 0;
962        d->merger = NULL;
963        d->timeout_tv.tv_sec = 0;
964        d->timeout_tv.tv_usec = 0;
965        d->next_waiting = NULL;
966
967        if (cproxy_check_downstream_config(d)) {
968            bool found;
969            ptd->downstream_reserved =
970                downstream_list_remove(ptd->downstream_reserved, d);
971            ptd->downstream_released =
972                downstream_list_remove(ptd->downstream_released, d);
973
974            found = zstored_downstream_waiting_remove(d);
975            cb_assert(!found);
976
977            d->next = ptd->downstream_reserved;
978            ptd->downstream_reserved = d;
979
980            ptd->stats.stats.tot_downstream_reserved++;
981
982            return d;
983        }
984
985        cproxy_free_downstream(d);
986    }
987}
988
989bool cproxy_clear_timeout(downstream *d) {
990    bool rv = false;
991
992    if (d->timeout_tv.tv_sec != 0 ||
993        d->timeout_tv.tv_usec != 0) {
994        evtimer_del(&d->timeout_event);
995        rv = true;
996    }
997
998    d->timeout_tv.tv_sec = 0;
999    d->timeout_tv.tv_usec = 0;
1000
1001    return rv;
1002}
1003
1004bool cproxy_release_downstream(downstream *d, bool force) {
1005    int i;
1006    int n;
1007    bool found;
1008
1009    cb_assert(d != NULL);
1010    cb_assert(d->ptd != NULL);
1011
1012    if (settings.verbose > 2) {
1013        moxi_log_write("%d: release_downstream\n",
1014                       d->upstream_conn != NULL ?
1015                       d->upstream_conn->sfd : -1);
1016    }
1017
1018    /* Always release the timeout_event, even if we're going to retry, */
1019    /* to avoid pegging CPU with leaked timeout_events. */
1020
1021    cproxy_clear_timeout(d);
1022
1023    /* If we need to retry the command, we do so here, */
1024    /* keeping the same downstream that would otherwise */
1025    /* be released. */
1026
1027    if (!force && d->upstream_conn != NULL && d->upstream_retry > 0) {
1028        int max_retries;
1029        d->upstream_retry = 0;
1030        d->upstream_retries++;
1031
1032        /* But, we can stop retrying if we've tried each server twice. */
1033        max_retries = cproxy_max_retries(d);
1034
1035        if (d->upstream_retries <= max_retries) {
1036            if (settings.verbose > 2) {
1037                moxi_log_write("%d: release_downstream,"
1038                               " instead retrying %d, %d <= %d, %d\n",
1039                               d->upstream_conn->sfd,
1040                               d->upstream_retry,
1041                               d->upstream_retries, max_retries,
1042                               d->ptd->stats.stats.tot_retry);
1043            }
1044
1045            d->ptd->stats.stats.tot_retry++;
1046
1047            if (cproxy_forward(d) == true) {
1048                return true;
1049            } else {
1050                d->ptd->stats.stats.tot_downstream_propagate_failed++;
1051
1052                propagate_error_msg(d, NULL, d->upstream_status);
1053            }
1054        } else {
1055            if (settings.verbose > 2) {
1056                moxi_log_write("%d: release_downstream,"
1057                               " skipping retry %d, %d > %d\n",
1058                               d->upstream_conn->sfd,
1059                               d->upstream_retry,
1060                               d->upstream_retries, max_retries);
1061            }
1062        }
1063    }
1064
1065    /* Record reserved_time histogram timings. */
1066
1067    if (d->usec_start > 0) {
1068        uint64_t ux = usec_now() - d->usec_start;
1069
1070        d->ptd->stats.stats.tot_downstream_reserved_time += ux;
1071
1072        if (d->ptd->stats.stats.max_downstream_reserved_time < ux) {
1073            d->ptd->stats.stats.max_downstream_reserved_time = ux;
1074        }
1075
1076        if (d->upstream_retries > 0) {
1077            d->ptd->stats.stats.tot_retry_time += ux;
1078
1079            if (d->ptd->stats.stats.max_retry_time < ux) {
1080                d->ptd->stats.stats.max_retry_time = ux;
1081            }
1082        }
1083
1084        downstream_reserved_time_sample(&d->ptd->stats, ux);
1085    }
1086
1087    d->ptd->stats.stats.tot_downstream_released++;
1088
1089    /* Delink upstream conns. */
1090
1091    while (d->upstream_conn != NULL) {
1092        conn *curr;
1093        if (d->merger != NULL) {
1094            /* TODO: Allow merger callback to be func pointer. */
1095
1096            genhash_iter(d->merger,
1097                        protocol_stats_foreach_write,
1098                        d->upstream_conn);
1099
1100            if (update_event(d->upstream_conn, EV_WRITE | EV_PERSIST)) {
1101                conn_set_state(d->upstream_conn, conn_mwrite);
1102            } else {
1103                d->ptd->stats.stats.err_oom++;
1104                cproxy_close_conn(d->upstream_conn);
1105            }
1106        }
1107
1108        if (settings.verbose > 2) {
1109            moxi_log_write("%d: release_downstream upstream_suffix %s status %x\n",
1110                           d->upstream_conn->sfd,
1111                           d->upstream_suffix_len == 0 ?
1112                           d->upstream_suffix : "(binary)",
1113                           d->upstream_status);
1114        }
1115
1116        if (d->upstream_suffix != NULL) {
1117            /* Do a last write on the upstream.  For example, */
1118            /* the upstream_suffix might be "END\r\n" or other */
1119            /* way to mark the end of a scatter-gather or */
1120            /* multiline response. */
1121            int suffix_len;
1122
1123            if (settings.verbose > 2) {
1124                if (d->upstream_suffix_len > 0) {
1125                    moxi_log_write("%d: release_downstream"
1126                                   " writing suffix binary: %d\n",
1127                                   d->upstream_conn->sfd,
1128                                   d->upstream_suffix_len);
1129
1130                    cproxy_dump_header(d->upstream_conn->sfd,
1131                                       d->upstream_suffix);
1132                } else {
1133                    moxi_log_write("%d: release_downstream"
1134                                   " writing suffix ascii: %s\n",
1135                                   d->upstream_conn->sfd,
1136                                   d->upstream_suffix);
1137                }
1138            }
1139
1140            suffix_len = d->upstream_suffix_len;
1141            if (suffix_len == 0) {
1142                suffix_len = (int)strlen(d->upstream_suffix);
1143            }
1144
1145            if (add_iov(d->upstream_conn,
1146                        d->upstream_suffix,
1147                        suffix_len) == 0 &&
1148                update_event(d->upstream_conn, EV_WRITE | EV_PERSIST)) {
1149                conn_set_state(d->upstream_conn, conn_mwrite);
1150            } else {
1151                d->ptd->stats.stats.err_oom++;
1152                cproxy_close_conn(d->upstream_conn);
1153            }
1154        }
1155
1156        curr = d->upstream_conn;
1157        d->upstream_conn = d->upstream_conn->next;
1158        curr->next = NULL;
1159    }
1160
1161    /* Free extra hash tables. */
1162
1163    if (d->multiget != NULL) {
1164        genhash_iter(d->multiget, multiget_foreach_free, d);
1165        genhash_free(d->multiget);
1166        d->multiget = NULL;
1167    }
1168
1169    if (d->merger != NULL) {
1170        genhash_iter(d->merger, protocol_stats_foreach_free, NULL);
1171        genhash_free(d->merger);
1172        d->merger = NULL;
1173    }
1174
1175    d->upstream_conn = NULL;
1176    d->upstream_suffix = NULL; /* No free(), expecting a static string. */
1177    d->upstream_suffix_len = 0;
1178    d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
1179    d->upstream_retry = 0;
1180    d->upstream_retries = 0;
1181    d->target_host_ident = NULL;
1182    d->usec_start = 0;
1183    d->downstream_used = 0;
1184    d->downstream_used_start = 0;
1185    d->multiget = NULL;
1186    d->merger = NULL;
1187
1188    /* TODO: Consider adding a downstream->prev backpointer */
1189    /*       or doubly-linked list to save on this scan. */
1190
1191    d->ptd->downstream_reserved =
1192        downstream_list_remove(d->ptd->downstream_reserved, d);
1193    d->ptd->downstream_released =
1194        downstream_list_remove(d->ptd->downstream_released, d);
1195
1196    found = zstored_downstream_waiting_remove(d);
1197    cb_assert(!found);
1198
1199    n = mcs_server_count(&d->mst);
1200
1201    for (i = 0; i < n; i++) {
1202        conn *dc = d->downstream_conns[i];
1203        d->downstream_conns[i] = NULL;
1204        if (dc != NULL) {
1205            zstored_release_downstream_conn(dc, false);
1206        }
1207    }
1208
1209    cproxy_clear_timeout(d); /* For MB-4334. */
1210
1211    cb_assert(d->timeout_tv.tv_sec == 0);
1212    cb_assert(d->timeout_tv.tv_usec == 0);
1213
1214    /* If this downstream still has the same configuration as our top-level */
1215    /* proxy config, go back onto the available, released downstream list. */
1216
1217    if (cproxy_check_downstream_config(d) || force) {
1218        d->next = d->ptd->downstream_released;
1219        d->ptd->downstream_released = d;
1220
1221        return true;
1222    }
1223
1224    cproxy_free_downstream(d);
1225
1226    return false;
1227}
1228
1229void cproxy_free_downstream(downstream *d) {
1230    bool found;
1231    int n;
1232
1233    cb_assert(d != NULL);
1234    cb_assert(d->ptd != NULL);
1235    cb_assert(d->upstream_conn == NULL);
1236    cb_assert(d->multiget == NULL);
1237    cb_assert(d->merger == NULL);
1238    cb_assert(d->timeout_tv.tv_sec == 0);
1239    cb_assert(d->timeout_tv.tv_usec == 0);
1240
1241    if (settings.verbose > 2) {
1242        moxi_log_write("cproxy_free_downstream\n");
1243    }
1244
1245    d->ptd->stats.stats.tot_downstream_freed++;
1246
1247    d->ptd->downstream_reserved =
1248        downstream_list_remove(d->ptd->downstream_reserved, d);
1249    d->ptd->downstream_released =
1250        downstream_list_remove(d->ptd->downstream_released, d);
1251
1252    found = zstored_downstream_waiting_remove(d);
1253    cb_assert(!found);
1254
1255    d->ptd->downstream_num--;
1256    cb_assert(d->ptd->downstream_num >= 0);
1257
1258    n = mcs_server_count(&d->mst);
1259
1260    if (d->downstream_conns != NULL) {
1261        int i;
1262        for (i = 0; i < n; i++) {
1263            if (d->downstream_conns[i] != NULL &&
1264                d->downstream_conns[i] != NULL_CONN) {
1265                d->downstream_conns[i]->extra = NULL;
1266            }
1267        }
1268    }
1269
1270    /* This will close sockets, which will force associated conn's */
1271    /* to go to conn_closing state.  Since we've already cleared */
1272    /* the conn->extra pointers, there's no extra release/free. */
1273
1274    mcs_free(&d->mst);
1275
1276    cproxy_clear_timeout(d);
1277
1278    if (d->downstream_conns != NULL) {
1279        free(d->downstream_conns);
1280    }
1281
1282    if (d->config != NULL) {
1283        free(d->config);
1284    }
1285
1286    free(d->behaviors_arr);
1287    free(d);
1288}
1289
1290/* The config input is something libmemcached can parse.
1291 * See mcs_server_st_parse().
1292 */
1293downstream *cproxy_create_downstream(char *config,
1294                                     uint32_t config_ver,
1295                                     proxy_behavior_pool *behavior_pool) {
1296    downstream *d = calloc(1, sizeof(downstream));
1297    cb_assert(config != NULL);
1298    cb_assert(behavior_pool != NULL);
1299
1300    if (d != NULL &&
1301        config != NULL &&
1302        config[0] != '\0') {
1303        d->config        = strdup(config);
1304        d->config_ver    = config_ver;
1305        d->behaviors_num = behavior_pool->num;
1306        d->behaviors_arr = cproxy_copy_behaviors(behavior_pool->num,
1307                                                 behavior_pool->arr);
1308
1309        /* TODO: Handle non-uniform downstream protocols. */
1310
1311        cb_assert(IS_PROXY(behavior_pool->base.downstream_protocol));
1312
1313        if (settings.verbose > 2) {
1314            moxi_log_write(
1315                    "cproxy_create_downstream: %s, %u, %u\n",
1316                    config, config_ver,
1317                    behavior_pool->base.downstream_protocol);
1318        }
1319
1320        if (d->config != NULL &&
1321            d->behaviors_arr != NULL) {
1322            char *usr = behavior_pool->base.usr[0] != '\0' ?
1323                behavior_pool->base.usr :
1324                NULL;
1325            char *pwd = behavior_pool->base.pwd[0] != '\0' ?
1326                behavior_pool->base.pwd :
1327                NULL;
1328
1329            int nconns = init_mcs_st(&d->mst, d->config, usr, pwd,
1330                                     behavior_pool->base.mcs_opts);
1331            if (nconns > 0) {
1332                d->downstream_conns = (conn **)
1333                    calloc(nconns, sizeof(conn *));
1334                if (d->downstream_conns != NULL) {
1335                    return d;
1336                }
1337
1338                mcs_free(&d->mst);
1339            }
1340        }
1341
1342        free(d->config);
1343        free(d->behaviors_arr);
1344        free(d);
1345    }
1346
1347    return NULL;
1348}
1349
1350int init_mcs_st(mcs_st *mst, char *config,
1351                const char *default_usr,
1352                const char *default_pwd,
1353                const char *opts) {
1354    cb_assert(mst);
1355    cb_assert(config);
1356
1357    if (mcs_create(mst, config,
1358                   default_usr, default_pwd, opts) != NULL) {
1359        return mcs_server_count(mst);
1360    } else {
1361        if (settings.verbose > 1) {
1362            moxi_log_write("mcs_create failed: %s\n",
1363                    config);
1364        }
1365    }
1366
1367    return 0;
1368}
1369
1370/* See if the downstream config matches the top-level proxy config.
1371 */
1372bool cproxy_check_downstream_config(downstream *d) {
1373    int rv = false;
1374
1375    cb_assert(d != NULL);
1376    cb_assert(d->ptd != NULL);
1377    cb_assert(d->ptd->proxy != NULL);
1378
1379    if (d->config_ver == d->ptd->config_ver) {
1380        rv = true;
1381    } else if (d->config != NULL &&
1382               d->ptd->config != NULL &&
1383               cproxy_equal_behaviors(d->behaviors_num,
1384                                      d->behaviors_arr,
1385                                      d->ptd->behavior_pool.num,
1386                                      d->ptd->behavior_pool.arr)) {
1387        /* Parse the proxy/parent's config to see if we can */
1388        /* reuse our existing downstream connections. */
1389
1390        char *usr = d->ptd->behavior_pool.base.usr[0] != '\0' ?
1391            d->ptd->behavior_pool.base.usr :
1392            NULL;
1393        char *pwd = d->ptd->behavior_pool.base.pwd[0] != '\0' ?
1394            d->ptd->behavior_pool.base.pwd :
1395            NULL;
1396
1397        mcs_st next;
1398
1399        int n = init_mcs_st(&next, d->ptd->config, usr, pwd,
1400                            d->ptd->behavior_pool.base.mcs_opts);
1401        if (n > 0) {
1402            if (mcs_stable_update(&d->mst, &next)) {
1403                if (settings.verbose > 2) {
1404                    moxi_log_write("check_downstream_config stable update\n");
1405                }
1406
1407                free(d->config);
1408                d->config     = strdup(d->ptd->config);
1409                d->config_ver = d->ptd->config_ver;
1410                rv = true;
1411            }
1412
1413            mcs_free(&next);
1414        }
1415    }
1416
1417    if (settings.verbose > 2) {
1418        moxi_log_write("check_downstream_config %u\n", rv);
1419    }
1420
1421    return rv;
1422}
1423
1424/* Returns -1 if the connections aren't fully assigned and ready. */
1425/* In that case, the downstream has to wait for a downstream connection */
1426/* to get out of the conn_connecting state. */
1427
1428/* The downstream connection might leave the conn_connecting state */
1429/* with an error (unable to connect).  That case is handled by */
1430/* tracking a NULL_CONN sentinel value. */
1431
1432/* Also, in the -1 result case, the d->upstream_conn should remain in */
1433/* conn_pause state. */
1434
1435/* A server_index of -1 means to connect all downstreams, as the */
1436/* caller probably needs to proxy a broadcast command. */
1437
1438int cproxy_connect_downstream(downstream *d, LIBEVENT_THREAD *thread,
1439                              int server_index) {
1440    int i = 0;
1441    int s = 0; /* Number connected. */
1442    int n;
1443    mcs_server_st *msst_actual;
1444
1445    cb_assert(d != NULL);
1446    cb_assert(d->ptd != NULL);
1447    cb_assert(d->ptd->downstream_released != d); /* Should not be in free list. */
1448    cb_assert(d->downstream_conns != NULL);
1449    cb_assert(mcs_server_count(&d->mst) > 0);
1450    cb_assert(thread != NULL);
1451    cb_assert(thread->base != NULL);
1452
1453    n = mcs_server_count(&d->mst);
1454
1455    cb_assert(d->behaviors_num >= n);
1456    cb_assert(d->behaviors_arr != NULL);
1457
1458    if (settings.verbose > 2) {
1459        moxi_log_write("%d: cproxy_connect_downstream server_index %d in %d\n",
1460                       d->upstream_conn->sfd, server_index, n);
1461    }
1462
1463
1464    if (server_index >= 0) {
1465        cb_assert(server_index < n);
1466        i = server_index;
1467        n = server_index + 1;
1468    }
1469
1470    for (; i < n; i++) {
1471        cb_assert(IS_PROXY(d->behaviors_arr[i].downstream_protocol));
1472
1473        msst_actual = mcs_server_index(&d->mst, i);
1474
1475        /* Connect to downstream servers, if not already. */
1476
1477        /* A NULL_CONN means that we tried to connect, but failed, */
1478        /* which is different than NULL (which means that we haven't */
1479        /* tried to connect yet). */
1480
1481        if (d->downstream_conns[i] == NULL) {
1482            bool downstream_conn_max_reached = false;
1483            conn *c = d->upstream_conn;
1484            /*
1485             * mcmux compatiblity mode, one downstream struct will be associated
1486             * with downstream connection. So overwrite the default msst with the
1487             * value.
1488             */
1489            if (c && c->peer_host && c->peer_port) {
1490                cb_assert(i == 0);
1491                strncpy(msst_actual->hostname, c->peer_host, MCS_HOSTNAME_SIZE);
1492                msst_actual->port = c->peer_port;
1493                msst_actual->fd = -1;
1494                msst_actual->ident_a[0] = msst_actual->ident_b[0] = 0;
1495            }
1496
1497            d->downstream_conns[i] =
1498                zstored_acquire_downstream_conn(d, thread,
1499                                                msst_actual,
1500                                                &d->behaviors_arr[i],
1501                                                &downstream_conn_max_reached);
1502            if (c != NULL &&
1503                i == server_index &&
1504                d->downstream_conns[i] != NULL &&
1505                d->downstream_conns[i] != NULL_CONN &&
1506                d->target_host_ident == NULL) {
1507                d->target_host_ident = add_conn_suffix(c);
1508                if (d->target_host_ident != NULL) {
1509                    strncpy(d->target_host_ident,
1510                            msst_actual->hostname,
1511                            SUFFIX_SIZE);
1512                    d->target_host_ident[SUFFIX_SIZE - 1] = '\0';
1513                }
1514            }
1515
1516            if (d->downstream_conns[i] != NULL &&
1517                d->downstream_conns[i] != NULL_CONN &&
1518                d->downstream_conns[i]->state == conn_connecting) {
1519                return -1;
1520            }
1521
1522            if (d->downstream_conns[i] == NULL &&
1523                downstream_conn_max_reached == true) {
1524                if (settings.verbose > 2) {
1525                    moxi_log_write("%d: downstream_conn_max reached\n",
1526                                   d->upstream_conn->sfd);
1527                }
1528
1529                if (zstored_downstream_waiting_add(d, thread,
1530                                                   msst_actual,
1531                                                   &d->behaviors_arr[i]) == true) {
1532                    /* Since we're waiting on the downstream conn queue, */
1533                    /* start a downstream timer per configuration. */
1534
1535                    cproxy_start_downstream_timeout_ex(d, c,
1536                        d->behaviors_arr[i].downstream_conn_queue_timeout);
1537
1538                    return -1;
1539                }
1540            }
1541        }
1542
1543        if (d->downstream_conns[i] != NULL &&
1544            d->downstream_conns[i] != NULL_CONN) {
1545            s++;
1546        } else {
1547            mcs_server_st_quit(msst_actual, 1);
1548            d->downstream_conns[i] = NULL_CONN;
1549        }
1550    }
1551
1552    return s;
1553}
1554
1555conn *cproxy_connect_downstream_conn(downstream *d,
1556                                     LIBEVENT_THREAD *thread,
1557                                     mcs_server_st *msst,
1558                                     proxy_behavior *behavior) {
1559    uint64_t start = 0;
1560    int err = -1;
1561    SOCKET fd;
1562
1563    cb_assert(d);
1564    cb_assert(d->ptd);
1565    cb_assert(d->ptd->downstream_released != d); /* Should not be in free list. */
1566    cb_assert(thread);
1567    cb_assert(thread->base);
1568    cb_assert(msst);
1569    cb_assert(behavior);
1570    cb_assert(mcs_server_st_hostname(msst) != NULL);
1571    cb_assert(mcs_server_st_port(msst) > 0);
1572    cb_assert(mcs_server_st_fd(msst) == -1);
1573
1574    if (d->ptd->behavior_pool.base.time_stats) {
1575        start = usec_now();
1576    }
1577
1578    d->ptd->stats.stats.tot_downstream_connect_started++;
1579
1580    fd = mcs_connect(mcs_server_st_hostname(msst),
1581                     mcs_server_st_port(msst), &err,
1582                     MOXI_BLOCKING_CONNECT);
1583
1584    if (settings.verbose > 2) {
1585        moxi_log_write("%d: cproxy_connect_downstream_conn %s:%d %d %d\n", fd,
1586                       mcs_server_st_hostname(msst),
1587                       mcs_server_st_port(msst),
1588                       MOXI_BLOCKING_CONNECT, err);
1589    }
1590
1591    if (fd != -1) {
1592        conn *c = conn_new(fd, conn_pause, 0,
1593                           DATA_BUFFER_SIZE,
1594                           tcp_transport,
1595                           thread->base,
1596                           &cproxy_downstream_funcs, d);
1597        if (c != NULL ) {
1598            c->protocol = (d->upstream_conn->peer_protocol ?
1599                           d->upstream_conn->peer_protocol :
1600                           behavior->downstream_protocol);
1601            c->thread = thread;
1602            c->cmd_start_time = start;
1603
1604#ifdef WIN32
1605            if (err == WSAEINPROGRESS) {
1606                err = EINPROGRESS;
1607            } else if (err == WSAEWOULDBLOCK) {
1608                err = EWOULDBLOCK;
1609            }
1610#endif
1611
1612            if (err == EINPROGRESS ||
1613                err == EWOULDBLOCK) {
1614                if (update_event_timed(c, EV_WRITE | EV_PERSIST,
1615                                       &behavior->connect_timeout)) {
1616                    conn_set_state(c, conn_connecting);
1617
1618                    d->ptd->stats.stats.tot_downstream_connect_wait++;
1619
1620                    return c;
1621                } else {
1622                    d->ptd->stats.stats.err_oom++;
1623                }
1624            } else {
1625                if (downstream_connect_init(d, msst, behavior, c)) {
1626                    return c;
1627                }
1628            }
1629
1630            cproxy_close_conn(c);
1631        } else {
1632            d->ptd->stats.stats.err_oom++;
1633        }
1634    }
1635
1636    d->ptd->stats.stats.tot_downstream_connect_failed++;
1637
1638    return NULL;
1639}
1640
1641bool downstream_connect_init(downstream *d, mcs_server_st *msst,
1642                             proxy_behavior *behavior, conn *c) {
1643    char *host_ident;
1644    int rv;
1645
1646    cb_assert(c->thread != NULL);
1647
1648    host_ident = c->host_ident;
1649    if (host_ident == NULL) {
1650        host_ident = mcs_server_st_ident(msst, IS_ASCII(c->protocol));
1651    }
1652
1653    if (c->cmd_start_time != 0 &&
1654        d->ptd->behavior_pool.base.time_stats) {
1655        downstream_connect_time_sample(&d->ptd->stats,
1656                                       usec_now() - c->cmd_start_time);
1657    }
1658
1659    rv = cproxy_auth_downstream(msst, behavior, c->sfd);
1660    if (rv == 0) {
1661        d->ptd->stats.stats.tot_downstream_auth++;
1662
1663        rv = cproxy_bucket_downstream(msst, behavior, c->sfd);
1664        if (rv == 0) {
1665            d->ptd->stats.stats.tot_downstream_bucket++;
1666
1667            zstored_error_count(c->thread, host_ident, false);
1668
1669            d->ptd->stats.stats.tot_downstream_connect++;
1670
1671            return true;
1672        } else {
1673            d->ptd->stats.stats.tot_downstream_bucket_failed++;
1674        }
1675    } else {
1676        d->ptd->stats.stats.tot_downstream_auth_failed++;
1677        if (rv == 1) {
1678            d->ptd->stats.stats.tot_auth_timeout++;
1679        }
1680    }
1681
1682    /* Treat a auth/bucket error as a blacklistable error. */
1683
1684    zstored_error_count(c->thread, host_ident, true);
1685
1686    return false;
1687}
1688
1689conn *cproxy_find_downstream_conn(downstream *d,
1690                                  char *key, int key_length,
1691                                  bool *local) {
1692    return cproxy_find_downstream_conn_ex(d, key, key_length, local, NULL);
1693}
1694
1695conn *cproxy_find_downstream_conn_ex(downstream *d,
1696                                     char *key, int key_length,
1697                                     bool *local,
1698                                     int *vbucket) {
1699    int v;
1700    int s;
1701
1702    cb_assert(d != NULL);
1703    cb_assert(d->downstream_conns != NULL);
1704    cb_assert(key != NULL);
1705    cb_assert(key_length > 0);
1706
1707    if (local != NULL) {
1708        *local = false;
1709    }
1710
1711    v = -1;
1712    s = cproxy_server_index(d, key, key_length, &v);
1713
1714    if (settings.verbose > 2 && s >= 0) {
1715        moxi_log_write("%d: server_index %d, vbucket %d, conn %d\n", s, v,
1716                       (d->upstream_conn != NULL ?
1717                        d->upstream_conn->sfd : 0),
1718                       (d->downstream_conns[s] == NULL ?
1719                        0 :
1720                        (d->downstream_conns[s] == NULL_CONN ?
1721                         -1 :
1722                         d->downstream_conns[s]->sfd)));
1723    }
1724
1725    if (s >= 0 &&
1726        s < (int) mcs_server_count(&d->mst) &&
1727        d->downstream_conns[s] != NULL &&
1728        d->downstream_conns[s] != NULL_CONN) {
1729
1730        if (local != NULL && s == 0) {
1731            *local = true;
1732        }
1733
1734        if (vbucket != NULL) {
1735            *vbucket = v;
1736        }
1737
1738        return d->downstream_conns[s];
1739    }
1740
1741    return NULL;
1742}
1743
1744bool cproxy_prep_conn_for_write(conn *c) {
1745    if (c != NULL) {
1746        cb_assert(c->item == NULL);
1747        cb_assert(IS_PROXY(c->protocol));
1748        cb_assert(c->ilist != NULL);
1749        cb_assert(c->isize > 0);
1750        cb_assert(c->suffixlist != NULL);
1751        cb_assert(c->suffixsize > 0);
1752
1753        c->icurr      = c->ilist;
1754        c->ileft      = 0;
1755        c->suffixcurr = c->suffixlist;
1756        c->suffixleft = 0;
1757
1758        c->msgcurr = 0; /* TODO: Mem leak just by blowing these to 0? */
1759        c->msgused = 0;
1760        c->iovused = 0;
1761
1762        if (add_msghdr(c) == 0) {
1763            return true;
1764        }
1765
1766        if (settings.verbose > 1) {
1767            moxi_log_write("%d: cproxy_prep_conn_for_write failed\n",
1768                           c->sfd);
1769        }
1770    }
1771
1772    return false;
1773}
1774
1775bool cproxy_update_event_write(downstream *d, conn *c) {
1776    cb_assert(d != NULL);
1777    cb_assert(d->ptd != NULL);
1778    cb_assert(c != NULL);
1779
1780    if (!update_event(c, EV_WRITE | EV_PERSIST)) {
1781        d->ptd->stats.stats.err_oom++;
1782        cproxy_close_conn(c);
1783
1784        return false;
1785    }
1786
1787    return true;
1788}
1789
1790/**
1791 * Do a hash through libmemcached to see which server (by index)
1792 * should hold a given key.
1793 */
1794int cproxy_server_index(downstream *d, char *key, size_t key_length,
1795                        int *vbucket) {
1796    cb_assert(d != NULL);
1797    cb_assert(key != NULL);
1798    cb_assert(key_length > 0);
1799
1800    if (mcs_server_count(&d->mst) <= 0) {
1801        return -1;
1802    }
1803
1804    return (int) mcs_key_hash(&d->mst, key, key_length, vbucket);
1805}
1806
1807void cproxy_assign_downstream(proxy_td *ptd) {
1808    uint64_t da;
1809    conn *tail;
1810    bool stop = false;
1811
1812    cb_assert(ptd != NULL);
1813
1814    if (settings.verbose > 2) {
1815        moxi_log_write("assign_downstream\n");
1816    }
1817
1818    ptd->downstream_assigns++;
1819
1820    da = ptd->downstream_assigns;
1821    /* Key loop that tries to reserve any available, released */
1822    /* downstream resources to waiting upstream conns. */
1823
1824    /* Remember the wait list tail when we start, in case more */
1825    /* upstream conns are tacked onto the wait list while we're */
1826    /* processing.  This helps avoid infinite loop where upstream */
1827    /* conns just keep on moving to the tail. */
1828
1829    tail = ptd->waiting_any_downstream_tail;
1830    while (ptd->waiting_any_downstream_head != NULL && !stop) {
1831        conn *uc_last;
1832        downstream *d;
1833
1834        if (ptd->waiting_any_downstream_head == tail) {
1835            stop = true;
1836        }
1837
1838        d = cproxy_reserve_downstream(ptd);
1839        if (d == NULL) {
1840            if (ptd->downstream_num <= 0) {
1841                /* Absolutely no downstreams connected, so */
1842                /* might as well error out. */
1843
1844                while (ptd->waiting_any_downstream_head != NULL) {
1845                    conn *uc;
1846                    ptd->stats.stats.tot_downstream_propagate_failed++;
1847
1848                    uc = ptd->waiting_any_downstream_head;
1849                    ptd->waiting_any_downstream_head =
1850                        ptd->waiting_any_downstream_head->next;
1851                    if (ptd->waiting_any_downstream_head == NULL) {
1852                        ptd->waiting_any_downstream_tail = NULL;
1853                    }
1854                    uc->next = NULL;
1855
1856                    upstream_error_msg(uc,
1857                                       "SERVER_ERROR proxy out of downstreams\r\n",
1858                                       PROTOCOL_BINARY_RESPONSE_EINTERNAL);
1859                }
1860            }
1861
1862            break; /* If no downstreams are available, stop loop. */
1863        }
1864
1865        cb_assert(d->upstream_conn == NULL);
1866        cb_assert(d->downstream_used == 0);
1867        cb_assert(d->downstream_used_start == 0);
1868        cb_assert(d->multiget == NULL);
1869        cb_assert(d->merger == NULL);
1870        cb_assert(d->timeout_tv.tv_sec == 0);
1871        cb_assert(d->timeout_tv.tv_usec == 0);
1872
1873        /* We have a downstream reserved, so assign the first */
1874        /* waiting upstream conn to it. */
1875
1876        d->upstream_conn = ptd->waiting_any_downstream_head;
1877        ptd->waiting_any_downstream_head =
1878            ptd->waiting_any_downstream_head->next;
1879        if (ptd->waiting_any_downstream_head == NULL) {
1880            ptd->waiting_any_downstream_tail = NULL;
1881        }
1882        d->upstream_conn->next = NULL;
1883
1884        ptd->stats.stats.tot_assign_downstream++;
1885        ptd->stats.stats.tot_assign_upstream++;
1886
1887        /* Add any compatible upstream conns to the downstream. */
1888        /* By compatible, for example, we mean multi-gets from */
1889        /* different upstreams so we can de-deplicate get keys. */
1890        uc_last = d->upstream_conn;
1891
1892        while (is_compatible_request(uc_last,
1893                                     ptd->waiting_any_downstream_head)) {
1894            uc_last->next = ptd->waiting_any_downstream_head;
1895
1896            ptd->waiting_any_downstream_head =
1897                ptd->waiting_any_downstream_head->next;
1898            if (ptd->waiting_any_downstream_head == NULL) {
1899                ptd->waiting_any_downstream_tail = NULL;
1900            }
1901
1902            uc_last = uc_last->next;
1903            uc_last->next = NULL;
1904
1905            /* Note: tot_assign_upstream - tot_assign_downstream */
1906            /* should get us how many requests we've piggybacked together. */
1907
1908            ptd->stats.stats.tot_assign_upstream++;
1909        }
1910
1911        if (settings.verbose > 2) {
1912            moxi_log_write("%d: assign_downstream, matched to upstream\n",
1913                    d->upstream_conn->sfd);
1914        }
1915
1916        if (cproxy_forward(d) == false) {
1917            /* TODO: This stat is incorrect, as we might reach here */
1918            /* when we have entire front cache hit or talk-to-self */
1919            /* optimization hit on multiget. */
1920
1921            ptd->stats.stats.tot_downstream_propagate_failed++;
1922
1923            /* During cproxy_forward(), we might have recursed, */
1924            /* especially in error situation if a downstream */
1925            /* conn got closed and released.  Check for recursion */
1926            /* before we touch d anymore. */
1927
1928            if (da != ptd->downstream_assigns) {
1929                ptd->stats.stats.tot_assign_recursion++;
1930                break;
1931            }
1932
1933            propagate_error_msg(d, NULL, d->upstream_status);
1934
1935            cproxy_release_downstream(d, false);
1936        }
1937    }
1938
1939    if (settings.verbose > 2) {
1940        moxi_log_write("assign_downstream, done\n");
1941    }
1942}
1943
1944void propagate_error_msg(downstream *d, char *ascii_msg,
1945                         protocol_binary_response_status binary_status) {
1946    cb_assert(d != NULL);
1947
1948    if (ascii_msg == NULL &&
1949        d->upstream_conn != NULL &&
1950        d->target_host_ident != NULL) {
1951        char *s = add_conn_suffix(d->upstream_conn);
1952        if (s != NULL) {
1953            snprintf(s, SUFFIX_SIZE - 1,
1954                     "SERVER_ERROR proxy write to downstream %s\r\n",
1955                     d->target_host_ident);
1956            s[SUFFIX_SIZE - 1] = '\0';
1957            ascii_msg = s;
1958        }
1959    }
1960
1961    while (d->upstream_conn != NULL) {
1962        conn *uc = d->upstream_conn;
1963        conn *curr;
1964
1965        if (settings.verbose > 1) {
1966            moxi_log_write("%d: could not forward upstream to downstream\n",
1967                           uc->sfd);
1968        }
1969
1970        upstream_error_msg(uc, ascii_msg, binary_status);
1971
1972        curr = d->upstream_conn;
1973        d->upstream_conn = d->upstream_conn->next;
1974        curr->next = NULL;
1975    }
1976}
1977
1978bool cproxy_forward(downstream *d) {
1979    cb_assert(d != NULL);
1980    cb_assert(d->ptd != NULL);
1981    cb_assert(d->upstream_conn != NULL);
1982
1983    if (settings.verbose > 2) {
1984        moxi_log_write(
1985                "%d: cproxy_forward prot %d to prot %d\n",
1986                d->upstream_conn->sfd,
1987                d->upstream_conn->protocol,
1988                d->ptd->behavior_pool.base.downstream_protocol);
1989    }
1990
1991    if (IS_ASCII(d->upstream_conn->protocol)) {
1992        /* ASCII upstream. */
1993
1994        unsigned int peer_protocol =
1995            d->upstream_conn->peer_protocol ?
1996            d->upstream_conn->peer_protocol :
1997            d->ptd->behavior_pool.base.downstream_protocol;
1998
1999        if (IS_ASCII(peer_protocol)) {
2000            return cproxy_forward_a2a_downstream(d);
2001        } else {
2002            return cproxy_forward_a2b_downstream(d);
2003        }
2004    } else {
2005        /* BINARY upstream. */
2006
2007        if (IS_BINARY(d->ptd->behavior_pool.base.downstream_protocol)) {
2008            return cproxy_forward_b2b_downstream(d);
2009        } else {
2010            /* TODO: No binary upstream to ascii downstream support. */
2011
2012            cb_assert(0);
2013            return false;
2014        }
2015    }
2016}
2017
2018bool cproxy_forward_or_error(downstream *d) {
2019    if (cproxy_forward(d) == false) {
2020        d->ptd->stats.stats.tot_downstream_propagate_failed++;
2021        propagate_error_msg(d, NULL, d->upstream_status);
2022        cproxy_release_downstream(d, false);
2023
2024        return false;
2025    }
2026
2027    return true;
2028}
2029
2030void upstream_error_msg(conn *uc, char *ascii_msg,
2031                        protocol_binary_response_status binary_status) {
2032    proxy_td *ptd;
2033    cb_assert(uc);
2034    cb_assert(uc->state == conn_pause);
2035
2036    ptd = uc->extra;
2037    cb_assert(ptd != NULL);
2038
2039    if (IS_ASCII(uc->protocol)) {
2040        char *msg = ascii_msg;
2041        if (msg == NULL) {
2042            msg = "SERVER_ERROR proxy write to downstream\r\n";
2043        }
2044
2045        cb_mutex_enter(&ptd->proxy->proxy_lock);
2046        if (ptd->proxy->name != NULL &&
2047            strcmp(ptd->proxy->name, NULL_BUCKET) == 0) {
2048            msg = "SERVER_ERROR unauthorized, null bucket\r\n";
2049        }
2050        cb_mutex_exit(&ptd->proxy->proxy_lock);
2051
2052        /* Send an END on get/gets instead of generic SERVER_ERROR. */
2053
2054        if (uc->cmd == -1 &&
2055            uc->cmd_start != NULL &&
2056            strncmp(uc->cmd_start, "get", 3) == 0 &&
2057            (false == settings.enable_mcmux_mode) &&
2058            (0 != strncmp(ptd->behavior_pool.base.nodeLocator,
2059                          "vbucket",
2060                          sizeof(ptd->behavior_pool.base.nodeLocator) - 1))) {
2061            msg = "END\r\n";
2062        }
2063
2064        if (settings.verbose > 2) {
2065            moxi_log_write("%d: upstream_error: %s\n", uc->sfd, msg);
2066        }
2067
2068        if (add_iov(uc, msg, (int)strlen(msg)) == 0 &&
2069            update_event(uc, EV_WRITE | EV_PERSIST)) {
2070            conn_set_state(uc, conn_mwrite);
2071        } else {
2072            ptd->stats.stats.err_oom++;
2073            cproxy_close_conn(uc);
2074        }
2075    } else {
2076        cb_assert(IS_BINARY(uc->protocol));
2077
2078        if (binary_status == PROTOCOL_BINARY_RESPONSE_SUCCESS) {
2079            /* Default to our favorite catch-all binary protocol response. */
2080
2081            binary_status = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
2082        }
2083
2084        cb_mutex_enter(&ptd->proxy->proxy_lock);
2085        if (ptd->proxy->name != NULL &&
2086            strcmp(ptd->proxy->name, NULL_BUCKET) == 0) {
2087            binary_status = PROTOCOL_BINARY_RESPONSE_AUTH_ERROR;
2088        }
2089        cb_mutex_exit(&ptd->proxy->proxy_lock);
2090
2091        write_bin_error(uc, binary_status, 0);
2092
2093        update_event(uc, EV_WRITE | EV_PERSIST);
2094    }
2095}
2096
2097void cproxy_reset_upstream(conn *uc) {
2098    proxy_td *ptd;
2099    cb_assert(uc != NULL);
2100
2101    ptd = uc->extra;
2102    cb_assert(ptd != NULL);
2103
2104    conn_set_state(uc, conn_new_cmd);
2105
2106    if (uc->rbytes <= 0) {
2107        if (!update_event(uc, EV_READ | EV_PERSIST)) {
2108            ptd->stats.stats.err_oom++;
2109            cproxy_close_conn(uc);
2110        }
2111
2112        return; /* Return either way. */
2113    }
2114
2115    /* We may have already read incoming bytes into the uc's buffer, */
2116    /* so the issue is that libevent may never see (or expect) any */
2117    /* EV_READ events (and hence, won't fire event callbacks) for the */
2118    /* upstream connection.  This can leave the uc seemingly stuck, */
2119    /* never hitting drive_machine() loop. */
2120
2121    if (settings.verbose > 2) {
2122        moxi_log_write("%d: cproxy_reset_upstream with bytes available: %d\n",
2123                       uc->sfd, uc->rbytes);
2124    }
2125
2126    /* So, we copy the drive_machine()/conn_new_cmd handling to */
2127    /* schedule uc into drive_machine() execution, where the uc */
2128    /* conn is likely to be writable.  We need to do this */
2129    /* because we're currently on the drive_machine() execution */
2130    /* loop for the downstream connection, not for the uc. */
2131
2132    if (!update_event(uc, EV_WRITE | EV_PERSIST)) {
2133        if (settings.verbose > 0) {
2134            moxi_log_write("Couldn't update event\n");
2135        }
2136
2137        conn_set_state(uc, conn_closing);
2138    }
2139
2140    ptd->stats.stats.tot_reset_upstream_avail++;
2141}
2142
2143bool cproxy_dettach_if_noreply(downstream *d, conn *uc) {
2144    if (uc->noreply) {
2145        uc->noreply        = false;
2146        d->upstream_conn   = NULL;
2147        d->upstream_suffix = NULL;
2148        d->upstream_suffix_len = 0;
2149        d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
2150        d->upstream_retry  = 0;
2151        d->target_host_ident = NULL;
2152
2153        cproxy_reset_upstream(uc);
2154
2155        return true;
2156    }
2157
2158    return false;
2159}
2160
2161void cproxy_wait_any_downstream(proxy_td *ptd, conn *uc) {
2162    cb_assert(uc != NULL);
2163    cb_assert(uc->next == NULL);
2164    cb_assert(ptd != NULL);
2165    cb_assert(!ptd->waiting_any_downstream_tail ||
2166           !ptd->waiting_any_downstream_tail->next);
2167
2168    /* Add the upstream conn to the wait list. */
2169
2170    uc->next = NULL;
2171    if (ptd->waiting_any_downstream_tail != NULL) {
2172        ptd->waiting_any_downstream_tail->next = uc;
2173    }
2174    ptd->waiting_any_downstream_tail = uc;
2175    if (ptd->waiting_any_downstream_head == NULL) {
2176        ptd->waiting_any_downstream_head = uc;
2177    }
2178}
2179
2180void cproxy_release_downstream_conn(downstream *d, conn *c) {
2181    proxy_td *ptd;
2182    cb_assert(c != NULL);
2183    cb_assert(d != NULL);
2184
2185    ptd = d->ptd;
2186    cb_assert(ptd != NULL);
2187
2188    if (settings.verbose > 2) {
2189        moxi_log_write("%d: release_downstream_conn, downstream_used %d %d,"
2190                       " upstream %d\n",
2191                       c->sfd, d->downstream_used, d->downstream_used_start,
2192                       (d->upstream_conn != NULL ?
2193                        d->upstream_conn->sfd : 0));
2194    }
2195
2196    d->downstream_used--;
2197    if (d->downstream_used <= 0) {
2198        /* The downstream_used count might go < 0 when if there's */
2199        /* an early error and we decide to close the downstream */
2200        /* conn, before anything gets sent or before the */
2201        /* downstream_used was able to be incremented. */
2202
2203        cproxy_release_downstream(d, false);
2204        cproxy_assign_downstream(ptd);
2205    }
2206}
2207
2208void cproxy_on_pause_downstream_conn(conn *c) {
2209    downstream *d;
2210    cb_assert(c != NULL);
2211
2212    if (settings.verbose > 2) {
2213        moxi_log_write("<%d cproxy_on_pause_downstream_conn\n",
2214                c->sfd);
2215    }
2216
2217    d = c->extra;
2218
2219    if (!d || c->rbytes > 0) {
2220        zstored_downstream_conns *conns;
2221
2222        if (settings.verbose) {
2223            moxi_log_write("%d: Closed the downstream since got"
2224                    "an event on downstream or extra data on downstream\n",
2225                    c->sfd);
2226        }
2227
2228        conns = zstored_get_downstream_conns(c->thread, c->host_ident);
2229        if (conns) {
2230            bool found = false;
2231            conns->dc = conn_list_remove(conns->dc, NULL, c, &found);
2232            if (!found) {
2233                cb_assert(0);
2234                if (settings.verbose) {
2235                    moxi_log_write("<%d Not able to find in zstore conns\n",
2236                            c->sfd);
2237                }
2238            }
2239        } else {
2240            cb_assert(0);
2241            if (settings.verbose) {
2242                moxi_log_write("<%d Not able to find zstore conns\n",
2243                        c->sfd);
2244            }
2245        }
2246        cproxy_close_conn(c);
2247        return;
2248    }
2249
2250    cb_assert(d->ptd != NULL);
2251
2252    /* Must update_event() before releasing the downstream conn, */
2253    /* because the release might call udpate_event(), too, */
2254    /* and we don't want to override its work. */
2255
2256    if (update_event(c, EV_READ | EV_PERSIST)) {
2257        cproxy_release_downstream_conn(d, c);
2258    } else {
2259        d->ptd->stats.stats.err_oom++;
2260        cproxy_close_conn(c);
2261    }
2262}
2263
2264void cproxy_pause_upstream_for_downstream(proxy_td *ptd, conn *upstream) {
2265    cb_assert(ptd != NULL);
2266    cb_assert(upstream != NULL);
2267
2268    if (settings.verbose > 2) {
2269        moxi_log_write("%d: pause_upstream_for_downstream\n",
2270                upstream->sfd);
2271    }
2272
2273    conn_set_state(upstream, conn_pause);
2274
2275    cproxy_wait_any_downstream(ptd, upstream);
2276
2277    if (ptd->timeout_tv.tv_sec == 0 &&
2278        ptd->timeout_tv.tv_usec == 0) {
2279        cproxy_start_wait_queue_timeout(ptd, upstream);
2280    }
2281
2282    cproxy_assign_downstream(ptd);
2283}
2284
2285struct timeval cproxy_get_downstream_timeout(downstream *d, conn *c) {
2286
2287    struct timeval rv;
2288    proxy_td *ptd;
2289    cb_assert(d);
2290
2291    if (c != NULL) {
2292        int i;
2293        cb_assert(d->behaviors_num > 0);
2294        cb_assert(d->behaviors_arr != NULL);
2295        cb_assert(d->downstream_conns != NULL);
2296
2297        i = downstream_conn_index(d, c);
2298        if (i >= 0 && i < d->behaviors_num) {
2299            rv = d->behaviors_arr[i].downstream_timeout;
2300            if (rv.tv_sec != 0 || rv.tv_usec != 0) {
2301                return rv;
2302            }
2303        }
2304    }
2305
2306    ptd = d->ptd;
2307    cb_assert(ptd);
2308
2309    rv = ptd->behavior_pool.base.downstream_timeout;
2310
2311    return rv;
2312}
2313
2314bool cproxy_start_wait_queue_timeout(proxy_td *ptd, conn *uc) {
2315    cb_assert(ptd);
2316    cb_assert(uc);
2317    cb_assert(uc->thread);
2318    cb_assert(uc->thread->base);
2319
2320    ptd->timeout_tv = ptd->behavior_pool.base.wait_queue_timeout;
2321    if (ptd->timeout_tv.tv_sec != 0 ||
2322        ptd->timeout_tv.tv_usec != 0) {
2323        if (settings.verbose > 2) {
2324            moxi_log_write("wait_queue_timeout started\n");
2325        }
2326
2327        evtimer_set(&ptd->timeout_event, wait_queue_timeout, ptd);
2328
2329        event_base_set(uc->thread->base, &ptd->timeout_event);
2330
2331        return evtimer_add(&ptd->timeout_event, &ptd->timeout_tv) == 0;
2332    }
2333
2334    return true;
2335}
2336
2337static void wait_queue_timeout(evutil_socket_t fd,
2338                               const short which,
2339                               void *arg) {
2340    proxy_td *ptd = arg;
2341    cb_assert(ptd != NULL);
2342    (void)fd;
2343    (void)which;
2344
2345    if (settings.verbose > 2) {
2346        moxi_log_write("wait_queue_timeout\n");
2347    }
2348
2349    /* This timer callback is invoked when an upstream conn */
2350    /* has been in the wait queue for too long. */
2351
2352    if (ptd->timeout_tv.tv_sec != 0 || ptd->timeout_tv.tv_usec != 0) {
2353        struct timeval wqt;
2354        uint64_t wqt_msec;
2355        uint64_t cut_msec;
2356        conn *uc_curr;
2357
2358        evtimer_del(&ptd->timeout_event);
2359
2360        ptd->timeout_tv.tv_sec = 0;
2361        ptd->timeout_tv.tv_usec = 0;
2362
2363        if (settings.verbose > 2) {
2364            moxi_log_write("wait_queue_timeout cleared\n");
2365        }
2366
2367        wqt = ptd->behavior_pool.base.wait_queue_timeout;
2368        wqt_msec = (wqt.tv_sec * 1000) + (wqt.tv_usec / 1000);
2369        cut_msec = msec_current_time - wqt_msec;
2370
2371        /* Run through all the old upstream conn's in */
2372        /* the wait queue, remove them, and emit errors */
2373        /* on them.  And then start a new timer if needed. */
2374        uc_curr = ptd->waiting_any_downstream_head;
2375        while (uc_curr != NULL) {
2376            conn *uc = uc_curr;
2377
2378            uc_curr = uc_curr->next;
2379
2380            /* Check if upstream conn is old and should be removed. */
2381
2382            if (settings.verbose > 2) {
2383                moxi_log_write("wait_queue_timeout compare %u to %u cutoff\n",
2384                        uc->cmd_start_time, cut_msec);
2385            }
2386
2387            if (uc->cmd_start_time <= cut_msec) {
2388                if (settings.verbose > 1) {
2389                    moxi_log_write("proxy_td_timeout sending error %d\n",
2390                            uc->sfd);
2391                }
2392
2393                ptd->stats.stats.tot_wait_queue_timeout++;
2394
2395                ptd->waiting_any_downstream_head =
2396                    conn_list_remove(ptd->waiting_any_downstream_head,
2397                                     &ptd->waiting_any_downstream_tail,
2398                                     uc, NULL); /* TODO: O(N^2). */
2399
2400                upstream_error_msg(uc,
2401                                   "SERVER_ERROR proxy wait queue timeout",
2402                                   PROTOCOL_BINARY_RESPONSE_EBUSY);
2403            }
2404        }
2405
2406        if (ptd->waiting_any_downstream_head != NULL) {
2407            cproxy_start_wait_queue_timeout(ptd,
2408                                            ptd->waiting_any_downstream_head);
2409        }
2410    }
2411}
2412
2413rel_time_t cproxy_realtime(const time_t exptime) {
2414    /* Input is a long... */
2415
2416    /* 0       | (0...REALIME_MAXDELTA] | (REALTIME_MAXDELTA... */
2417    /* forever | delta                  | unix_time */
2418
2419    /* Storage is an unsigned int. */
2420
2421    /* TODO: Handle resolution loss. */
2422
2423    /* The cproxy version of realtime doesn't do any */
2424    /* time math munging, just pass through. */
2425
2426    return (rel_time_t) exptime;
2427}
2428
2429void cproxy_close_conn(conn *c) {
2430    cb_assert(c != NULL);
2431
2432    if (c == NULL_CONN) {
2433        return;
2434    }
2435
2436    conn_set_state(c, conn_closing);
2437
2438    update_event(c, 0);
2439
2440    /* Run through drive_machine just once, */
2441    /* to go through close code paths. */
2442
2443    drive_machine(c);
2444}
2445
2446bool add_conn_item(conn *c, item *it) {
2447    cb_assert(it != NULL);
2448    cb_assert(c != NULL);
2449    cb_assert(c->ilist != NULL);
2450    cb_assert(c->icurr != NULL);
2451    cb_assert(c->isize > 0);
2452
2453    if (c->ileft >= c->isize) {
2454        item **new_list =
2455            realloc(c->ilist, sizeof(item *) * c->isize * 2);
2456        if (new_list) {
2457            c->isize *= 2;
2458            c->ilist = new_list;
2459            c->icurr = new_list;
2460        }
2461    }
2462
2463    if (c->ileft < c->isize) {
2464        c->ilist[c->ileft] = it;
2465        c->ileft++;
2466
2467        return true;
2468    }
2469
2470    return false;
2471}
2472
2473char *add_conn_suffix(conn *c) {
2474    cb_assert(c != NULL);
2475    cb_assert(c->suffixlist != NULL);
2476    cb_assert(c->suffixcurr != NULL);
2477    cb_assert(c->suffixsize > 0);
2478
2479    if (c->suffixleft >= c->suffixsize) {
2480        char **new_suffix_list =
2481            realloc(c->suffixlist,
2482                    sizeof(char *) * c->suffixsize * 2);
2483        if (new_suffix_list) {
2484            c->suffixsize *= 2;
2485            c->suffixlist = new_suffix_list;
2486            c->suffixcurr = new_suffix_list;
2487        }
2488    }
2489
2490    if (c->suffixleft < c->suffixsize) {
2491        char *suffix = cache_alloc(c->thread->suffix_cache);
2492        if (suffix != NULL) {
2493            c->suffixlist[c->suffixleft] = suffix;
2494            c->suffixleft++;
2495
2496            return suffix;
2497        }
2498    }
2499
2500    return NULL;
2501}
2502
2503char *nread_text(short x) {
2504    char *rv = NULL;
2505    switch(x) {
2506    case NREAD_SET:
2507        rv = "set ";
2508        break;
2509    case NREAD_ADD:
2510        rv = "add ";
2511        break;
2512    case NREAD_REPLACE:
2513        rv = "replace ";
2514        break;
2515    case NREAD_APPEND:
2516        rv = "append ";
2517        break;
2518    case NREAD_PREPEND:
2519        rv = "prepend ";
2520        break;
2521    case NREAD_CAS:
2522        rv = "cas ";
2523        break;
2524    }
2525    return rv;
2526}
2527
2528/* Tokenize the command string by updating the token array
2529 * with pointers to start of each token and length.
2530 * Does not modify the input command string.
2531 *
2532 * Returns total number of tokens.  The last valid token is the terminal
2533 * token (value points to the first unprocessed character of the string and
2534 * length zero).
2535 *
2536 * Usage example:
2537 *
2538 *  while (scan_tokens(command, tokens, max_tokens, NULL) > 0) {
2539 *      for(int ix = 0; tokens[ix].length != 0; ix++) {
2540 *          ...
2541 *      }
2542 *      command = tokens[ix].value;
2543 *  }
2544 */
2545size_t scan_tokens(char *command, token_t *tokens,
2546                   const size_t max_tokens,
2547                   int *command_len) {
2548    char *s, *e;
2549    size_t ntokens = 0;
2550
2551    if (command_len != NULL) {
2552        *command_len = 0;
2553    }
2554
2555    cb_assert(command != NULL && tokens != NULL && max_tokens > 1);
2556
2557    for (s = e = command; ntokens < max_tokens - 1; ++e) {
2558        if (*e == '\0' || *e == ' ') {
2559            if (s != e) {
2560                tokens[ntokens].value = s;
2561                tokens[ntokens].length = e - s;
2562                ntokens++;
2563            }
2564            if (*e == '\0') {
2565                if (command_len != NULL) {
2566                    *command_len = (int)(e - command);
2567                }
2568                break; /* string end */
2569            }
2570            s = e + 1;
2571        }
2572    }
2573
2574    /* If we scanned the whole string, the terminal value pointer is null,
2575     * otherwise it is the first unprocessed character.
2576     */
2577    tokens[ntokens].value = (*e == '\0' ? NULL : e);
2578    tokens[ntokens].length = 0;
2579    ntokens++;
2580
2581    return ntokens;
2582}
2583
2584/* Remove conn c from a conn list.
2585 * Returns the new head of the list.
2586 */
2587conn *conn_list_remove(conn *head, conn **tail, conn *c, bool *found) {
2588    conn *prev = NULL;
2589    conn *curr = head;
2590
2591    if (found != NULL) {
2592        *found = false;
2593    }
2594
2595    while (curr != NULL) {
2596        if (curr == c) {
2597            conn *r;
2598            if (found != NULL) {
2599                *found = true;
2600            }
2601
2602            if (tail != NULL && *tail == curr) {
2603                *tail = prev;
2604            }
2605
2606            if (prev != NULL) {
2607                cb_assert(curr != head);
2608                prev->next = curr->next;
2609                curr->next = NULL;
2610                return head;
2611            }
2612
2613            cb_assert(curr == head);
2614            r = curr->next;
2615            curr->next = NULL;
2616            return r;
2617        }
2618
2619        prev = curr;
2620        curr = curr ->next;
2621    }
2622
2623    return head;
2624}
2625
2626/* Returns the new head of the list.
2627 */
2628downstream *downstream_list_remove(downstream *head, downstream *d) {
2629    downstream *prev = NULL;
2630    downstream *curr = head;
2631
2632    while (curr != NULL) {
2633        if (curr == d) {
2634            downstream *r;
2635            if (prev != NULL) {
2636                cb_assert(curr != head);
2637                prev->next = curr->next;
2638                curr->next = NULL;
2639                return head;
2640            }
2641
2642            cb_assert(curr == head);
2643            r = curr->next;
2644            curr->next = NULL;
2645            return r;
2646        }
2647
2648        prev = curr;
2649        curr = curr ->next;
2650    }
2651
2652    return head;
2653}
2654
2655/* Returns the new head of the list.
2656 */
2657downstream *downstream_list_waiting_remove(downstream *head,
2658                                           downstream **tail,
2659                                           downstream *d) {
2660    downstream *prev = NULL;
2661    downstream *curr = head;
2662
2663    while (curr != NULL) {
2664        if (curr == d) {
2665            downstream *r;
2666            if (tail != NULL && *tail == curr) {
2667                *tail = prev;
2668            }
2669
2670            if (prev != NULL) {
2671                cb_assert(curr != head);
2672                prev->next_waiting = curr->next_waiting;
2673                curr->next_waiting = NULL;
2674                return head;
2675            }
2676
2677            cb_assert(curr == head);
2678            r = curr->next_waiting;
2679            curr->next_waiting = NULL;
2680            return r;
2681        }
2682
2683        prev = curr;
2684        curr = curr ->next_waiting;
2685    }
2686
2687    return head;
2688}
2689
2690/* Returns true if a candidate request is squashable
2691 * or de-duplicatable with an existing request, to
2692 * save on network hops.
2693 */
2694bool is_compatible_request(conn *existing, conn *candidate) {
2695    (void)existing;
2696    (void)candidate;
2697
2698    /* The not-my-vbucket error handling requires us to not */
2699    /* squash ascii multi-GET requests, due to reusing the */
2700    /* multiget-deduplication machinery during retries and */
2701    /* to simplify the later codepaths. */
2702    /*
2703    cb_assert(existing);
2704    cb_assert(existing->state == conn_pause);
2705    cb_assert(IS_PROXY(existing->protocol));
2706
2707    if (IS_BINARY(existing->protocol)) {
2708        // TODO: Revisit multi-get squashing for binary another day.
2709
2710        return false;
2711    }
2712
2713    cb_assert(IS_ASCII(existing->protocol));
2714
2715    if (candidate != NULL) {
2716        cb_assert(IS_ASCII(candidate->protocol));
2717        cb_assert(IS_PROXY(candidate->protocol));
2718        cb_assert(candidate->state == conn_pause);
2719
2720        // TODO: Allow gets (CAS) for de-duplication.
2721
2722        if (existing->cmd == -1 &&
2723            candidate->cmd == -1 &&
2724            existing->cmd_retries <= 0 &&
2725            candidate->cmd_retries <= 0 &&
2726            !existing->noreply &&
2727            !candidate->noreply &&
2728            strncmp(existing->cmd_start, "get ", 4) == 0 &&
2729            strncmp(candidate->cmd_start, "get ", 4) == 0) {
2730            cb_assert(existing->item == NULL);
2731            cb_assert(candidate->item == NULL);
2732
2733            return true;
2734        }
2735    }
2736    */
2737
2738    return false;
2739}
2740
2741void downstream_timeout(evutil_socket_t fd,
2742                        const short which,
2743                        void *arg) {
2744    downstream *d = arg;
2745    proxy_td *ptd;
2746
2747    cb_assert(d != NULL);
2748
2749    ptd = d->ptd;
2750    cb_assert(ptd != NULL);
2751
2752    /* This timer callback is invoked when one or more of */
2753    /* the downstream conns must be really slow.  Handle by */
2754    /* closing downstream conns, which might help by */
2755    /* freeing up downstream resources. */
2756
2757    if (cproxy_clear_timeout(d)) {
2758        char *m;
2759        int n;
2760        int i;
2761        /* The downstream_timeout() callback is invoked for */
2762        /* two cases (downstream_conn_queue_timeouts and */
2763        /* downstream_timeouts), so cleanup and track stats */
2764        /* accordingly. */
2765        bool was_conn_queue_waiting =
2766            zstored_downstream_waiting_remove(d);
2767
2768        if (was_conn_queue_waiting == true) {
2769            if (settings.verbose > 2) {
2770                moxi_log_write("conn_queue_timeout\n");
2771            }
2772
2773            ptd->stats.stats.tot_downstream_conn_queue_timeout++;
2774        } else {
2775            if (settings.verbose > 2) {
2776                moxi_log_write("downstream_timeout\n");
2777            }
2778
2779            ptd->stats.stats.tot_downstream_timeout++;
2780        }
2781
2782        m = "SERVER_ERROR proxy downstream timeout\r\n";
2783
2784        if (d->target_host_ident != NULL) {
2785            m = add_conn_suffix(d->upstream_conn);
2786            if (m != NULL) {
2787                char *s;
2788                snprintf(m, SUFFIX_SIZE - 1,
2789                         "SERVER_ERROR proxy downstream timeout %s\r\n",
2790                         d->target_host_ident);
2791                m[SUFFIX_SIZE - 1] = '\0';
2792
2793                s = strchr(m, ':'); /* Clip to avoid sending user/pswd. */
2794                if (s != NULL) {
2795                    *s++ = '\r';
2796                    *s++ = '\n';
2797                    *s = '\0';
2798                }
2799            }
2800        }
2801
2802        propagate_error_msg(d, m, PROTOCOL_BINARY_RESPONSE_EBUSY);
2803        n = mcs_server_count(&d->mst);
2804
2805        for (i = 0; i < n; i++) {
2806            conn *dc = d->downstream_conns[i];
2807            if (dc != NULL &&
2808                dc != NULL_CONN) {
2809                /* We have to de-link early, because we don't want */
2810                /* to have cproxy_close_conn() release the downstream */
2811                /* while we're in the middle of this loop. */
2812
2813                delink_from_downstream_conns(dc);
2814
2815                cproxy_close_conn(dc);
2816            }
2817        }
2818
2819        cproxy_release_downstream(d, false);
2820        cproxy_assign_downstream(ptd);
2821    }
2822
2823    (void)fd;
2824    (void)which;
2825}
2826
2827bool cproxy_start_downstream_timeout(downstream *d, conn *c) {
2828    cb_assert(d != NULL);
2829    cb_assert(d->behaviors_num > 0);
2830    cb_assert(d->behaviors_arr != NULL);
2831
2832    return cproxy_start_downstream_timeout_ex(d, c,
2833                cproxy_get_downstream_timeout(d, c));
2834}
2835
2836bool cproxy_start_downstream_timeout_ex(downstream *d, conn *c,
2837                                        struct timeval dt) {
2838    conn *uc;
2839
2840    cb_assert(d != NULL);
2841    cb_assert(d->behaviors_num > 0);
2842    cb_assert(d->behaviors_arr != NULL);
2843
2844    cproxy_clear_timeout(d);
2845
2846    if (dt.tv_sec == 0 &&
2847        dt.tv_usec == 0) {
2848        return true;
2849    }
2850
2851    uc = d->upstream_conn;
2852
2853    cb_assert(uc != NULL);
2854    cb_assert(uc->state == conn_pause);
2855    cb_assert(uc->thread != NULL);
2856    cb_assert(uc->thread->base != NULL);
2857    cb_assert(IS_PROXY(uc->protocol));
2858
2859    if (settings.verbose > 2) {
2860        moxi_log_write("%d: cproxy_start_downstream_timeout\n",
2861                       (c != NULL ? c->sfd : -1));
2862    }
2863
2864    evtimer_set(&d->timeout_event, downstream_timeout, d);
2865
2866    event_base_set(uc->thread->base, &d->timeout_event);
2867
2868    d->timeout_tv.tv_sec  = dt.tv_sec;
2869    d->timeout_tv.tv_usec = dt.tv_usec;
2870
2871    return (evtimer_add(&d->timeout_event, &d->timeout_tv) == 0);
2872}
2873
2874/* Return 0 on success, -1 on general failure, 1 on timeout failure. */
2875
2876int cproxy_auth_downstream(mcs_server_st *server,
2877                           proxy_behavior *behavior,
2878                           SOCKET fd) {
2879    protocol_binary_request_header req;
2880    protocol_binary_response_header res;
2881    struct timeval *timeout = NULL;
2882    char buf[3000];
2883    const char *usr;
2884    const char *pwd;
2885    int usr_len;
2886    int pwd_len;
2887    int buf_len;
2888    mcs_return mr;
2889
2890    cb_assert(server);
2891    cb_assert(behavior);
2892    cb_assert(fd != INVALID_SOCKET);
2893
2894
2895    if (!IS_BINARY(behavior->downstream_protocol)) {
2896        return 0;
2897    }
2898
2899    usr = mcs_server_st_usr(server) != NULL ?
2900        mcs_server_st_usr(server) : behavior->usr;
2901    pwd = mcs_server_st_pwd(server) != NULL ?
2902        mcs_server_st_pwd(server) : behavior->pwd;
2903
2904    usr_len = (int)strlen(usr);
2905    pwd_len = (int)strlen(pwd);
2906
2907    if (usr_len <= 0) {
2908        return 0;
2909    }
2910
2911    if (settings.verbose > 2) {
2912        moxi_log_write("cproxy_auth_downstream usr: %s pwd: (%d)\n",
2913                       usr, pwd_len);
2914    }
2915
2916    if (usr_len <= 0 ||
2917        !IS_PROXY(behavior->downstream_protocol) ||
2918        (usr_len + pwd_len + 50 > (int) sizeof(buf))) {
2919        if (settings.verbose > 1) {
2920            moxi_log_write("auth failure args\n");
2921        }
2922
2923        return -1; /* Probably misconfigured. */
2924    }
2925
2926    /* The key should look like "PLAIN", or the sasl mech string. */
2927    /* The data should look like "\0usr\0pwd".  So, the body buf */
2928    /* should look like "PLAIN\0usr\0pwd". */
2929
2930    /* TODO: Allow binary passwords. */
2931
2932    buf_len = snprintf(buf, sizeof(buf), "PLAIN%c%s%c%s",
2933                       0, usr,
2934                       0, pwd);
2935    cb_assert(buf_len == 7 + usr_len + pwd_len);
2936
2937    memset(req.bytes, 0, sizeof(req.bytes));
2938    memset(res.bytes, 0, sizeof(res.bytes));
2939    req.request.magic    = PROTOCOL_BINARY_REQ;
2940    req.request.opcode   = PROTOCOL_BINARY_CMD_SASL_AUTH;
2941    req.request.keylen   = htons((uint16_t) 5); /* 5 == strlen("PLAIN"). */
2942    req.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
2943    req.request.bodylen  = htonl(buf_len);
2944
2945    if (mcs_io_write(fd, (const char *) req.bytes,
2946                     sizeof(req.bytes)) != sizeof(req.bytes) ||
2947        mcs_io_write(fd, buf, buf_len) == -1) {
2948        mcs_io_reset(fd);
2949
2950        if (settings.verbose > 1) {
2951            moxi_log_write("auth failure during write for %s (%d)\n",
2952                           usr, buf_len);
2953        }
2954
2955        return -1;
2956    }
2957
2958    if (behavior->auth_timeout.tv_sec != 0 ||
2959        behavior->auth_timeout.tv_usec != 0) {
2960        timeout = &behavior->auth_timeout;
2961    }
2962
2963    mr = mcs_io_read(fd, &res.bytes, sizeof(res.bytes), timeout);
2964    if (mr == MCS_SUCCESS && res.response.magic == PROTOCOL_BINARY_RES) {
2965        int len;
2966        res.response.status  = ntohs(res.response.status);
2967        res.response.keylen  = ntohs(res.response.keylen);
2968        res.response.bodylen = ntohl(res.response.bodylen);
2969
2970        /* Swallow whatever body comes. */
2971        len = res.response.bodylen;
2972        while (len > 0) {
2973            int amt = (len > (int) sizeof(buf) ? (int) sizeof(buf) : len);
2974
2975            mr = mcs_io_read(fd, buf, amt, timeout);
2976            if (mr != MCS_SUCCESS) {
2977                if (settings.verbose > 1) {
2978                    moxi_log_write("auth could not read response body (%d) %d\n",
2979                                   usr, amt, mr);
2980                }
2981
2982                if (mr == MCS_TIMEOUT) {
2983                    return 1;
2984                }
2985
2986                return -1;
2987            }
2988
2989            len -= amt;
2990        }
2991
2992        /* The res status should be either... */
2993        /* - SUCCESS         - sasl aware server and good credentials. */
2994        /* - AUTH_ERROR      - wrong credentials. */
2995        /* - UNKNOWN_COMMAND - sasl-unaware server. */
2996
2997        if (res.response.status == PROTOCOL_BINARY_RESPONSE_SUCCESS) {
2998            if (settings.verbose > 2) {
2999                moxi_log_write("auth_downstream success for %s\n", usr);
3000            }
3001
3002            return 0;
3003        }
3004
3005        if (settings.verbose > 1) {
3006            moxi_log_write("auth_downstream failure for %s (%x)\n",
3007                           usr, res.response.status);
3008        }
3009    } else {
3010        if (settings.verbose > 1) {
3011            moxi_log_write("auth_downstream response error for %s, %d\n",
3012                           usr, mr);
3013        }
3014    }
3015
3016    if (mr == MCS_TIMEOUT) {
3017        return 1;
3018    }
3019
3020    return -1;
3021}
3022
3023/* Return 0 on success, -1 on general failure, 1 on timeout failure. */
3024
3025int cproxy_bucket_downstream(mcs_server_st *server,
3026                             proxy_behavior *behavior,
3027                             SOCKET fd) {
3028    protocol_binary_request_header req;
3029    protocol_binary_response_header res;
3030    struct timeval *timeout = NULL;
3031    int bucket_len;
3032    mcs_return mr;
3033
3034    cb_assert(server);
3035    cb_assert(behavior);
3036    cb_assert(IS_PROXY(behavior->downstream_protocol));
3037    cb_assert(fd != INVALID_SOCKET);
3038
3039    if (!IS_BINARY(behavior->downstream_protocol)) {
3040        return 0;
3041    }
3042
3043    bucket_len = (int)strlen(behavior->bucket);
3044    if (bucket_len <= 0) {
3045        return 0; /* When no bucket. */
3046    }
3047
3048    memset(req.bytes, 0, sizeof(req.bytes));
3049    memset(res.bytes, 0, sizeof(res.bytes));
3050
3051    req.request.magic    = PROTOCOL_BINARY_REQ;
3052    req.request.opcode   = PROTOCOL_BINARY_CMD_BUCKET;
3053    req.request.keylen   = htons((uint16_t) bucket_len);
3054    req.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
3055    req.request.bodylen  = htonl(bucket_len);
3056
3057    if (mcs_io_write(fd, (const char *) req.bytes,
3058                     sizeof(req.bytes)) != sizeof(req.bytes) ||
3059        mcs_io_write(fd, behavior->bucket, bucket_len) == -1) {
3060        mcs_io_reset(fd);
3061
3062        if (settings.verbose > 1) {
3063            moxi_log_write("bucket failure during write (%d)\n",
3064                    bucket_len);
3065        }
3066
3067        return -1;
3068    }
3069
3070
3071    if (behavior->auth_timeout.tv_sec != 0 ||
3072        behavior->auth_timeout.tv_usec != 0) {
3073        timeout = &behavior->auth_timeout;
3074    }
3075
3076    mr = mcs_io_read(fd, &res.bytes, sizeof(res.bytes), timeout);
3077    if (mr == MCS_SUCCESS && res.response.magic == PROTOCOL_BINARY_RES) {
3078        char buf[300];
3079        int len;
3080
3081        res.response.status  = ntohs(res.response.status);
3082        res.response.keylen  = ntohs(res.response.keylen);
3083        res.response.bodylen = ntohl(res.response.bodylen);
3084
3085        /* Swallow whatever body comes. */
3086        len = res.response.bodylen;
3087        while (len > 0) {
3088            int amt = (len > (int) sizeof(buf) ? (int) sizeof(buf) : len);
3089
3090            mr = mcs_io_read(fd, buf, amt, timeout);
3091            if (mr != MCS_SUCCESS) {
3092                if (mr == MCS_TIMEOUT) {
3093                    return 1;
3094                }
3095
3096                return -1;
3097            }
3098
3099            len -= amt;
3100        }
3101
3102        /* The res status should be either... */
3103        /* - SUCCESS         - we got the bucket. */
3104        /* - AUTH_ERROR      - not allowed to use that bucket. */
3105        /* - UNKNOWN_COMMAND - bucket-unaware server. */
3106
3107        if (res.response.status == PROTOCOL_BINARY_RESPONSE_SUCCESS) {
3108            if (settings.verbose > 2) {
3109                moxi_log_write("bucket_downstream success, %s\n",
3110                        behavior->bucket);
3111            }
3112
3113            return 0;
3114        }
3115
3116        if (settings.verbose > 1) {
3117            moxi_log_write("bucket_downstream failure, %s (%x)\n",
3118                    behavior->bucket,
3119                    res.response.status);
3120        }
3121    }
3122
3123    if (mr == MCS_TIMEOUT) {
3124        return 1;
3125    }
3126
3127    return -1;
3128}
3129
3130int cproxy_max_retries(downstream *d) {
3131    return mcs_server_count(&d->mst) * 2;
3132}
3133
3134int downstream_conn_index(downstream *d, conn *c) {
3135    int nconns;
3136    int i;
3137
3138    cb_assert(d);
3139
3140    nconns = mcs_server_count(&d->mst);
3141    for (i = 0; i < nconns; i++) {
3142        if (d->downstream_conns[i] == c) {
3143            return i;
3144        }
3145    }
3146
3147    return -1;
3148}
3149
3150void cproxy_upstream_state_change(conn *c, enum conn_states next_state) {
3151    proxy_td *ptd;
3152
3153    cb_assert(c != NULL);
3154    ptd = c->extra;
3155    if (ptd != NULL) {
3156        if (c->state == conn_pause) {
3157            ptd->stats.stats.tot_upstream_unpaused++;
3158            c->cmd_unpaused = true;
3159        }
3160        if (next_state == conn_pause) {
3161            ptd->stats.stats.tot_upstream_paused++;
3162        }
3163
3164        if (next_state == conn_parse_cmd && c->cmd_arrive_time == 0) {
3165            c->cmd_unpaused = false;
3166            c->hit_local = false;
3167            c->cmd_arrive_time = usec_now();
3168        }
3169
3170        if (next_state == conn_closing || next_state == conn_new_cmd) {
3171            uint64_t arrive_time = c->cmd_arrive_time;
3172            if (c->cmd_unpaused && arrive_time != 0) {
3173                uint64_t latency = usec_now() - c->cmd_arrive_time;
3174
3175                if (c->hit_local) {
3176                    ptd->stats.stats.tot_local_cmd_time += latency;
3177                    ptd->stats.stats.tot_local_cmd_count++;
3178                }
3179
3180                ptd->stats.stats.tot_cmd_time += latency;
3181                ptd->stats.stats.tot_cmd_count++;
3182                c->cmd_arrive_time = 0;
3183            }
3184        }
3185    }
3186}
3187
3188/* ------------------------------------------------- */
3189
3190bool cproxy_on_connect_downstream_conn(conn *c) {
3191    int error;
3192    socklen_t errsz = sizeof(error);
3193    int k;
3194    downstream *d;
3195
3196    cb_assert(c != NULL);
3197    cb_assert(c->host_ident);
3198
3199    d = c->extra;
3200    cb_assert(d != NULL);
3201
3202    if (settings.verbose > 2) {
3203        moxi_log_write("%d: cproxy_on_connect_downstream_conn for %s\n",
3204                       c->sfd, c->host_ident);
3205    }
3206
3207    if (c->which == EV_TIMEOUT) {
3208        d->ptd->stats.stats.tot_downstream_connect_timeout++;
3209
3210        if (settings.verbose) {
3211            moxi_log_write("%d: connection timed out: %s",
3212                           c->sfd, c->host_ident);
3213        }
3214        goto cleanup;
3215    }
3216
3217    /* Check if the connection completed */
3218    if (getsockopt(c->sfd, SOL_SOCKET, SO_ERROR, (void *) &error,
3219                   &errsz) == -1) {
3220        if (settings.verbose) {
3221            moxi_log_write("%d: connect error: %s, %s",
3222                           c->sfd, c->host_ident, strerror(error));
3223        }
3224        goto cleanup;
3225    }
3226
3227    if (error) {
3228        if (settings.verbose) {
3229            moxi_log_write("%d: connect failed: %s, %s",
3230                           c->sfd, c->host_ident, strerror(error));
3231        }
3232        goto cleanup;
3233    }
3234
3235    k = downstream_conn_index(d, c);
3236    if (k >= 0) {
3237        if (downstream_connect_init(d, mcs_server_index(&d->mst, k),
3238                                    &d->behaviors_arr[k], c)) {
3239            /* We are connected to the server now */
3240            if (settings.verbose > 2) {
3241                moxi_log_write("%d: connected to: %s\n",
3242                               c->sfd, c->host_ident);
3243            }
3244
3245            conn_set_state(c, conn_pause);
3246            update_event(c, 0);
3247            cproxy_forward_or_error(d);
3248
3249            return true;
3250        }
3251    }
3252
3253cleanup:
3254    d->ptd->stats.stats.tot_downstream_connect_failed++;
3255
3256    k = delink_from_downstream_conns(c);
3257    if (k >= 0) {
3258        cb_assert(d->downstream_conns[k] == NULL);
3259
3260        d->downstream_conns[k] = NULL_CONN;
3261    }
3262
3263    conn_set_state(c, conn_closing);
3264    update_event(c, 0);
3265    cproxy_forward_or_error(d);
3266
3267    return false;
3268}
3269
3270void downstream_reserved_time_sample(proxy_stats_td *pstd, uint64_t duration) {
3271    if (pstd->downstream_reserved_time_htgram == NULL) {
3272        pstd->downstream_reserved_time_htgram =
3273            cproxy_create_timing_histogram();
3274    }
3275
3276    if (pstd->downstream_reserved_time_htgram != NULL) {
3277        htgram_incr(pstd->downstream_reserved_time_htgram, duration, 1);
3278    }
3279}
3280
3281void downstream_connect_time_sample(proxy_stats_td *pstd, uint64_t duration) {
3282    if (pstd->downstream_connect_time_htgram == NULL) {
3283        pstd->downstream_connect_time_htgram =
3284            cproxy_create_timing_histogram();
3285    }
3286
3287    if (pstd->downstream_connect_time_htgram != NULL) {
3288        htgram_incr(pstd->downstream_connect_time_htgram, duration, 1);
3289    }
3290}
3291
3292/* A histogram for tracking timings, such as for usec request timings. */
3293
3294HTGRAM_HANDLE cproxy_create_timing_histogram(void) {
3295    /* TODO: Make histogram bins more configurable one day. */
3296
3297    HTGRAM_HANDLE h1 = htgram_mk(2000, 100, 2.0, 20, NULL);
3298    HTGRAM_HANDLE h0 = htgram_mk(0, 100, 1.0, 20, h1);
3299
3300    return h0;
3301}
3302
3303zstored_downstream_conns *zstored_get_downstream_conns(LIBEVENT_THREAD *thread,
3304                                                       const char *host_ident) {
3305
3306    genhash_t *conn_hash;
3307    zstored_downstream_conns *conns;
3308
3309    cb_assert(thread);
3310    cb_assert(thread->base);
3311
3312    conn_hash = thread->conn_hash;
3313    cb_assert(conn_hash != NULL);
3314
3315    conns = genhash_find(conn_hash, host_ident);
3316    if (conns == NULL) {
3317        conns = calloc(1, sizeof(zstored_downstream_conns));
3318        if (conns != NULL) {
3319            conns->host_ident = strdup(host_ident);
3320            if (conns->host_ident != NULL) {
3321                genhash_store(conn_hash, conns->host_ident, conns);
3322            } else {
3323                free(conns);
3324                conns = NULL;
3325            }
3326        }
3327    }
3328
3329    return conns;
3330}
3331
3332void zstored_error_count(LIBEVENT_THREAD *thread,
3333                         const char *host_ident,
3334                         bool has_error) {
3335    zstored_downstream_conns *conns;
3336
3337    cb_assert(thread != NULL);
3338    cb_assert(host_ident != NULL);
3339
3340    conns = zstored_get_downstream_conns(thread, host_ident);
3341    if (conns != NULL) {
3342        if (has_error) {
3343            conns->error_count++;
3344            conns->error_time = msec_current_time;
3345        } else {
3346            conns->error_count = 0;
3347            conns->error_time = 0;
3348        }
3349
3350        if (settings.verbose > 2) {
3351            moxi_log_write("z_error, %s, %d, %d, %d, %d\n",
3352                           host_ident,
3353                           has_error,
3354                           conns->dc_acquired,
3355                           conns->error_count,
3356                           conns->error_time);
3357        }
3358
3359        if (has_error) {
3360            /* We reach here when a non-blocking connect() has failed */
3361            /* or when an acquired downstream conn had an error. */
3362            /* The downstream conn is just going to be closed */
3363            /* rather than be released back to the thread->conn_hash, */
3364            /* so update the dc_acquired here. */
3365
3366            if (conns->dc_acquired > 0) {
3367                conns->dc_acquired--;
3368            }
3369
3370            /* When zero downstream conns are available, wake up all */
3371            /* waiting downstreams so they can proceed (possibly by */
3372            /* just returning ERROR's to upstream clients). */
3373
3374            if (conns->dc_acquired <= 0 && conns->dc == NULL) {
3375                downstream *head = conns->downstream_waiting_head;
3376
3377                conns->downstream_waiting_head = NULL;
3378                conns->downstream_waiting_tail = NULL;
3379
3380                while (head != NULL) {
3381                    downstream *prev;
3382
3383                    head->ptd->stats.stats.tot_downstream_waiting_errors++;
3384                    head->ptd->stats.stats.tot_downstream_conn_queue_remove++;
3385
3386                    prev = head;
3387                    head = head->next_waiting;
3388                    prev->next_waiting = NULL;
3389
3390                    cproxy_forward_or_error(prev);
3391                }
3392            }
3393        }
3394    }
3395}
3396
3397conn *zstored_acquire_downstream_conn(downstream *d,
3398                                      LIBEVENT_THREAD *thread,
3399                                      mcs_server_st *msst,
3400                                      proxy_behavior *behavior,
3401                                      bool *downstream_conn_max_reached) {
3402    enum protocol downstream_protocol;
3403    char *host_ident;
3404    conn *dc;
3405    zstored_downstream_conns *conns;
3406
3407    cb_assert(d);
3408    cb_assert(d->ptd);
3409    cb_assert(d->ptd->downstream_released != d); /* Should not be in free list. */
3410    cb_assert(thread);
3411    cb_assert(msst);
3412    cb_assert(behavior);
3413    cb_assert(mcs_server_st_hostname(msst) != NULL);
3414    cb_assert(mcs_server_st_port(msst) > 0);
3415    cb_assert(mcs_server_st_fd(msst) == -1);
3416
3417    *downstream_conn_max_reached = false;
3418
3419    d->ptd->stats.stats.tot_downstream_conn_acquired++;
3420
3421    downstream_protocol =
3422        d->upstream_conn->peer_protocol ?
3423        d->upstream_conn->peer_protocol :
3424        behavior->downstream_protocol;
3425
3426    host_ident = mcs_server_st_ident(msst, IS_ASCII(downstream_protocol));
3427    conns = zstored_get_downstream_conns(thread, host_ident);
3428    if (conns != NULL) {
3429        dc = conns->dc;
3430        if (dc != NULL) {
3431            cb_assert(dc->thread == thread);
3432            cb_assert(strcmp(host_ident, dc->host_ident) == 0);
3433
3434            conns->dc_acquired++;
3435            conns->dc = dc->next;
3436            dc->next = NULL;
3437
3438            cb_assert(dc->extra == NULL);
3439            dc->extra = d;
3440
3441            return dc;
3442        }
3443
3444        if (behavior->connect_max_errors > 0 &&
3445            behavior->connect_max_errors < conns->error_count) {
3446            rel_time_t msecs_since_error =
3447                (rel_time_t)(msec_current_time - conns->error_time);
3448
3449            if (settings.verbose > 2) {
3450                moxi_log_write("zacquire_dc, %s, %d, %"PRIu64", (%d)\n",
3451                               host_ident,
3452                               conns->error_count,
3453                               (uint64_t)conns->error_time,
3454                               msecs_since_error);
3455            }
3456
3457            if ((behavior->cycle > 0) &&
3458                (behavior->connect_retry_interval > msecs_since_error)) {
3459                d->ptd->stats.stats.tot_downstream_connect_interval++;
3460
3461                return NULL;
3462            }
3463        }
3464
3465        if (behavior->downstream_conn_max > 0 &&
3466            behavior->downstream_conn_max <= conns->dc_acquired) {
3467            d->ptd->stats.stats.tot_downstream_connect_max_reached++;
3468
3469            *downstream_conn_max_reached = true;
3470
3471            return NULL;
3472        }
3473    }
3474
3475    dc = cproxy_connect_downstream_conn(d, thread, msst, behavior);
3476    if (dc != NULL) {
3477        cb_assert(dc->host_ident == NULL);
3478        dc->host_ident = strdup(host_ident);
3479        if (conns != NULL) {
3480            conns->dc_acquired++;
3481
3482            if (dc->state != conn_connecting) {
3483                conns->error_count = 0;
3484                conns->error_time = 0;
3485            }
3486        }
3487    } else {
3488        if (conns != NULL) {
3489            conns->error_count++;
3490            conns->error_time = msec_current_time;
3491        }
3492    }
3493
3494    return dc;
3495}
3496
3497/* new fn by jsh */
3498void zstored_release_downstream_conn(conn *dc, bool closing) {
3499    bool keep;
3500    zstored_downstream_conns *conns;
3501    downstream *d;
3502
3503    cb_assert(dc != NULL);
3504    if (dc == NULL_CONN) {
3505        return;
3506    }
3507
3508    d = dc->extra;
3509    cb_assert(d != NULL);
3510
3511    d->ptd->stats.stats.tot_downstream_conn_released++;
3512
3513    if (settings.verbose > 2) {
3514        moxi_log_write("%d: release_downstream_conn, %s, (%d)"
3515                       " upstream %d\n",
3516                       dc->sfd, state_text(dc->state), closing,
3517                       (d->upstream_conn != NULL ?
3518                        d->upstream_conn->sfd : -1));
3519    }
3520
3521    cb_assert(dc->next == NULL);
3522    cb_assert(dc->thread != NULL);
3523    cb_assert(dc->host_ident != NULL);
3524
3525    keep = dc->state == conn_pause;
3526    dc->extra = NULL;
3527    conns = zstored_get_downstream_conns(dc->thread, dc->host_ident);
3528    if (conns != NULL) {
3529        if (conns->dc_acquired > 0) {
3530            conns->dc_acquired--;
3531        }
3532
3533        if (keep) {
3534            downstream *d_head;
3535            cb_assert(dc->next == NULL);
3536            dc->next = conns->dc;
3537            conns->dc = dc;
3538
3539            /* Since one downstream conn was released, process a single */
3540            /* waiting downstream, if any. */
3541
3542            d_head = conns->downstream_waiting_head;
3543            if (d_head != NULL) {
3544                cb_assert(conns->downstream_waiting_tail != NULL);
3545
3546                conns->downstream_waiting_head =
3547                    conns->downstream_waiting_head->next_waiting;
3548                if (conns->downstream_waiting_head == NULL) {
3549                    conns->downstream_waiting_tail = NULL;
3550                }
3551                d_head->next_waiting = NULL;
3552
3553                d_head->ptd->stats.stats.tot_downstream_conn_queue_remove++;
3554
3555                cproxy_clear_timeout(d_head);
3556
3557                cproxy_forward_or_error(d_head);
3558            }
3559
3560            return;
3561        }
3562    }
3563
3564    cproxy_close_conn(dc);
3565}
3566
3567/* Returns true if the downstream was found on any */
3568/* conns->downstream_waiting_head/tail queues and was removed. */
3569
3570bool zstored_downstream_waiting_remove(downstream *d) {
3571    bool found = false;
3572    int i;
3573    int n;
3574    LIBEVENT_THREAD *thread = thread_by_index(thread_index(cb_thread_self()));
3575    cb_assert(thread != NULL);
3576
3577    n = mcs_server_count(&d->mst);
3578    for (i = 0; i < n; i++) {
3579        char *host_ident;
3580        zstored_downstream_conns *conns;
3581        mcs_server_st *msst = mcs_server_index(&d->mst, i);
3582
3583        enum protocol downstream_protocol =
3584            d->upstream_conn && d->upstream_conn->peer_protocol ?
3585            d->upstream_conn->peer_protocol :
3586            d->behaviors_arr[i].downstream_protocol;
3587
3588        cb_assert(IS_PROXY(downstream_protocol));
3589
3590        host_ident = mcs_server_st_ident(msst, IS_ASCII(downstream_protocol));
3591        conns = zstored_get_downstream_conns(thread, host_ident);
3592
3593        if (conns != NULL) {
3594            /* Linked-list removal, on the next_waiting pointer, */
3595            /* and keep head and tail pointers updated. */
3596
3597            downstream *prev = NULL;
3598            downstream *curr = conns->downstream_waiting_head;
3599
3600            while (curr != NULL) {
3601                if (curr == d) {
3602                    found = true;
3603
3604                    if (conns->downstream_waiting_head == curr) {
3605                        cb_assert(conns->downstream_waiting_tail != NULL);
3606                        conns->downstream_waiting_head = curr->next_waiting;
3607                    }
3608
3609                    if (conns->downstream_waiting_tail == curr) {
3610                        conns->downstream_waiting_tail = prev;
3611                    }
3612
3613                    if (prev != NULL) {
3614                        prev->next_waiting = curr->next_waiting;
3615                    }
3616
3617                    curr->next_waiting = NULL;
3618
3619                    d->ptd->stats.stats.tot_downstream_conn_queue_remove++;
3620
3621                    break;
3622                }
3623
3624                prev = curr;
3625                curr = curr->next_waiting;
3626            }
3627        }
3628    }
3629
3630    return found;
3631}
3632
3633bool zstored_downstream_waiting_add(downstream *d, LIBEVENT_THREAD *thread,
3634                                    mcs_server_st *msst,
3635                                    proxy_behavior *behavior) {
3636    enum protocol downstream_protocol;
3637    char *host_ident;
3638    zstored_downstream_conns *conns;
3639
3640    cb_assert(thread != NULL);
3641    cb_assert(d != NULL);
3642    cb_assert(d->upstream_conn != NULL);
3643    cb_assert(d->next_waiting == NULL);
3644
3645    downstream_protocol = d->upstream_conn->peer_protocol ?
3646        d->upstream_conn->peer_protocol :
3647        behavior->downstream_protocol;
3648
3649    host_ident = mcs_server_st_ident(msst, IS_ASCII(downstream_protocol));
3650    conns = zstored_get_downstream_conns(thread, host_ident);
3651    if (conns != NULL) {
3652        cb_assert(conns->dc == NULL);
3653
3654        if (conns->downstream_waiting_head == NULL) {
3655            cb_assert(conns->downstream_waiting_tail == NULL);
3656            conns->downstream_waiting_head = d;
3657        }
3658        if (conns->downstream_waiting_tail != NULL) {
3659            cb_assert(conns->downstream_waiting_tail->next_waiting == NULL);
3660            conns->downstream_waiting_tail->next_waiting = d;
3661        }
3662        conns->downstream_waiting_tail = d;
3663
3664        d->ptd->stats.stats.tot_downstream_conn_queue_add++;
3665
3666        return true;
3667    }
3668
3669    return false;
3670}
3671
3672/* Find an appropriate proxy struct or NULL. */
3673
3674proxy *cproxy_find_proxy_by_auth(proxy_main *m,
3675                                 const char *usr,
3676                                 const char *pwd) {
3677    proxy *found = NULL;
3678    proxy *p;
3679
3680    cb_mutex_enter(&m->proxy_main_lock);
3681
3682    for (p = m->proxy_head; p != NULL && found == NULL; p = p->next) {
3683        cb_mutex_enter(&p->proxy_lock);
3684        if (strcmp(p->behavior_pool.base.usr, usr) == 0 &&
3685            strcmp(p->behavior_pool.base.pwd, pwd) == 0) {
3686            found = p;
3687        }
3688        cb_mutex_exit(&p->proxy_lock);
3689    }
3690
3691    cb_mutex_exit(&m->proxy_main_lock);
3692
3693    return found;
3694}
3695
3696int cproxy_num_active_proxies(proxy_main *m) {
3697    int n = 0;
3698    proxy *p;
3699
3700    cb_mutex_enter(&m->proxy_main_lock);
3701
3702    for (p = m->proxy_head; p != NULL; p = p->next) {
3703        cb_mutex_enter(&p->proxy_lock);
3704        if (p->name != NULL &&
3705            p->config != NULL &&
3706            p->config[0] != '\0') {
3707            n++;
3708        }
3709        cb_mutex_exit(&p->proxy_lock);
3710    }
3711
3712    cb_mutex_exit(&m->proxy_main_lock);
3713
3714    return n;
3715}
3716
3717static
3718void diag_single_connection(FILE *out, conn *c) {
3719    fprintf(out, "%p (%d), ev: 0x%04x, state: 0x%x, substate: 0x%x - %s\n",
3720            (void *)c, c->sfd, c->ev_flags,
3721            c->state, c->substate,
3722            c->update_diag ? c->update_diag : "(none)");
3723}
3724
3725static
3726void diag_connections(FILE *out, conn *head, int indent) {
3727    static char *blank = "";
3728    for (; head != NULL; head = head->next) {
3729        fprintf(out, "%*s" "connection: ", indent, blank);
3730        diag_single_connection(out, head);
3731    }
3732}
3733
3734static
3735void diag_single_downstream(FILE *out, downstream *d, int indent) {
3736    static char *blank = "";
3737    int n;
3738    int i;
3739
3740    conn *upstream = d->upstream_conn;
3741    fprintf(out, "%*s" "upstream: ", indent, blank);
3742    if (upstream) {
3743        diag_single_connection(out, upstream);
3744    } else {
3745        fprintf(out, "none\n");
3746    }
3747
3748    fprintf(out, "%*s" "downstream_used: %d\n", indent, blank, d->downstream_used);
3749    fprintf(out, "%*s" "downstream_used_start: %d\n", indent, blank, d->downstream_used_start);
3750    fprintf(out, "%*s" "downstream_conns:\n", indent, blank);
3751
3752    n = mcs_server_count(&d->mst);
3753    for (i = 0; i < n; i++) {
3754        if (d->downstream_conns[i] == NULL)
3755            continue;
3756        fprintf(out, "%*s", indent+2, blank);
3757        diag_single_connection(out, d->downstream_conns[i]);
3758    }
3759}
3760
3761static
3762void diag_downstream_chain(FILE *out, downstream *head, int indent) {
3763    for (; head != NULL; head = head->next) {
3764        diag_single_downstream(out, head, indent);
3765    }
3766}
3767
3768/* this is supposed to be called from gdb when other threads are suspended */
3769void connections_diag(FILE *out);
3770
3771void connections_diag(FILE *out) {
3772    extern proxy_main *diag_last_proxy_main;
3773
3774    proxy *cur_proxy;
3775    proxy_main *m = diag_last_proxy_main;
3776    if (!m) {
3777        fputs("no proxy_main!\n", out);
3778        return;
3779    }
3780
3781    for (cur_proxy = m->proxy_head; cur_proxy != NULL ; cur_proxy = cur_proxy->next) {
3782        int ti;
3783        fprintf(out, "proxy: name='%s', port=%d, cfg=%s (%u)\n",
3784                cur_proxy->name ? cur_proxy->name : "(null)",
3785                cur_proxy->port,
3786                cur_proxy->config, cur_proxy->config_ver);
3787        for (ti = 0; ti < cur_proxy->thread_data_num; ti++) {
3788            proxy_td *td = cur_proxy->thread_data + ti;
3789            fprintf(out, "  thread:%d\n", ti);
3790            fprintf(out, "    waiting_any_downstream:\n");
3791            diag_connections(out, td->waiting_any_downstream_head, 6);
3792
3793            fprintf(out, "    downstream_reserved:\n");
3794            diag_downstream_chain(out, td->downstream_reserved, 6);
3795            fprintf(out, "    downstream_released:\n");
3796            diag_downstream_chain(out, td->downstream_released, 6);
3797        }
3798    }
3799}
3800
3801bool cproxy_front_cache_key(proxy_td *ptd, char *key, int key_len) {
3802    return (key != NULL &&
3803            key_len > 0 &&
3804            ptd->behavior_pool.base.front_cache_lifespan > 0 &&
3805            matcher_check(&ptd->proxy->front_cache_matcher, key, key_len, false) == true &&
3806            matcher_check(&ptd->proxy->front_cache_unmatcher, key, key_len, false) == false);
3807}
3808
3809void cproxy_front_cache_delete(proxy_td *ptd, char *key, int key_len) {
3810    if (cproxy_front_cache_key(ptd, key, key_len) == true) {
3811        mcache_delete(&ptd->proxy->front_cache, key, key_len);
3812
3813        if (settings.verbose > 1) {
3814            moxi_log_write("front_cache del %s\n", key);
3815        }
3816    }
3817}
3818