1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 
3 #ifndef CPROXY_H
4 #define CPROXY_H
5 
6 #include "genhash.h"
7 #include "work.h"
8 #include "matcher.h"
9 #include "mcs.h"
10 #include "htgram.h"
11 
12 /* From libmemcached. */
13 
14 uint32_t murmur_hash(const char *key, size_t length);
15 
16 /* ------------------------------- */
17 
18 int cproxy_init(char *cfg_str,
19                 char *behavior_str,
20                 int nthreads,
21                 struct event_base *main_base);
22 
23 #define CPROXY_NOT_CAS UINT64_MAX
24 
25 extern volatile uint64_t msec_current_time;
26 
27 uint64_t usec_now(void);
28 
29 extern char cproxy_hostname[300]; /* Immutable after init. */
30 
31 /* ------------------------------- */
32 
33 /* Special bucket name for the null bucket. */
34 
35 #define NULL_BUCKET "[ <NULL_BUCKET> ]"
36 
37 /* Special bucket name that signifies that */
38 /* upstream connections start on the first */
39 /* configured bucket. */
40 
41 #define FIRST_BUCKET "[ <FIRST_BUCKET> ]"
42 
43 /* ------------------------------- */
44 
45 typedef struct {
46     char *(*item_key)(void *it);
47     int   (*item_key_len)(void *it);
48     int   (*item_len)(void *it);
49     void  (*item_add_ref)(void *it);
50     void  (*item_dec_ref)(void *it);
51     void *(*item_get_next)(void *it);
52     void  (*item_set_next)(void *it, void *next);
53     void *(*item_get_prev)(void *it);
54     void  (*item_set_prev)(void *it, void *prev);
55     uint64_t (*item_get_exptime)(void *it);
56     void     (*item_set_exptime)(void *it, uint64_t exptime);
57 } mcache_funcs;
58 
59 extern mcache_funcs mcache_item_funcs;
60 extern mcache_funcs mcache_key_stats_funcs;
61 
62 typedef struct {
63     mcache_funcs *funcs;
64 
65     cb_mutex_t *lock; /* NULL-able, for non-multithreaded. */
66 
67     bool key_alloc;        /* True if mcache must alloc key memory. */
68 
69     genhash_t *map;        /* NULL-able, keyed by string, value is item. */
70 
71     uint32_t max;          /* Maxiumum number of items to keep. */
72 
73     void *lru_head;        /* Most recently used. */
74     void *lru_tail;        /* Least recently used. */
75 
76     uint32_t oldest_live;  /* In millisecs, relative to msec_current_time. */
77 
78     /* Statistics. */
79 
80     uint64_t tot_get_hits;
81     uint64_t tot_get_expires;
82     uint64_t tot_get_misses;
83     uint64_t tot_get_bytes;
84     uint64_t tot_adds;
85     uint64_t tot_add_skips;
86     uint64_t tot_add_fails;
87     uint64_t tot_add_bytes;
88     uint64_t tot_deletes;
89     uint64_t tot_evictions;
90 } mcache;
91 
92 typedef struct proxy               proxy;
93 typedef struct proxy_td            proxy_td;
94 typedef struct proxy_main          proxy_main;
95 typedef struct proxy_stats         proxy_stats;
96 typedef struct proxy_behavior      proxy_behavior;
97 typedef struct proxy_behavior_pool proxy_behavior_pool;
98 typedef struct downstream          downstream;
99 typedef struct key_stats           key_stats;
100 
101 struct proxy_behavior {
102     /* IL means startup, system initialization level behavior. */
103     /* ML means proxy/pool manager-level behavior (proxy_main). */
104     /* PL means proxy/pool-level behavior. */
105     /* SL means server-level behavior, although we inherit from proxy level. */
106 
107     uint32_t       cycle;               /* IL: Clock resolution in millisecs. */
108     uint32_t       downstream_max;      /* PL: Downstream concurrency. */
109     uint32_t       downstream_conn_max; /* PL: Max # of conns per thread */
110                                         /* and per host_ident. */
111     uint32_t       downstream_weight;   /* SL: Server weight. */
112     uint32_t       downstream_retry;    /* SL: How many times to retry a cmd. */
113     enum protocol  downstream_protocol; /* SL: Favored downstream protocol. */
114     struct timeval downstream_timeout;  /* SL: Fields of 0 mean no timeout. */
115     struct timeval downstream_conn_queue_timeout; /* SL: Fields of 0 mean no timeout. */
116     struct timeval wait_queue_timeout;  /* PL: Fields of 0 mean no timeout. */
117     struct timeval connect_timeout;     /* PL: Fields of 0 mean no timeout. */
118     struct timeval auth_timeout;        /* PL: Fields of 0 mean no timeout. */
119     bool           time_stats;          /* IL: Capture timing stats. */
120     char           mcs_opts[80];        /* PL: Extra options for mcs initialization. */
121 
122     uint32_t connect_max_errors;      /* IL: Pause when too many connect() errs. */
123     uint32_t connect_retry_interval;  /* IL: Time in millisecs before retrying */
124                                       /* when too many connect() errors, to not */
125                                       /* overwhelm the downstream servers. */
126 
127     uint32_t front_cache_max;         /* PL: Max # of front cachable items. */
128     uint32_t front_cache_lifespan;    /* PL: In millisecs. */
129     char     front_cache_spec[300];   /* PL: Matcher prefixes for front caching. */
130     char     front_cache_unspec[100]; /* PL: Don't front cache prefixes. */
131 
132     uint32_t key_stats_max;         /* PL: Max # of key stats entries. */
133     uint32_t key_stats_lifespan;    /* PL: In millisecs. */
134     char     key_stats_spec[300];   /* PL: Matcher prefixes for key-level stats. */
135     char     key_stats_unspec[100]; /* PL: Don't key stat prefixes. */
136 
137     char optimize_set[400]; /* PL: Matcher prefixes for SET optimization. */
138 
139     char usr[250];    /* SL. */
140     char pwd[900];    /* SL. */
141     char host[250];   /* SL. */
142     int  port;        /* SL. */
143     char bucket[250]; /* SL. */
144     char nodeLocator[20]; /* Ex: ketama or vbucket. */
145 
146     /* ML: Port for proxy_main to listen on. */
147 
148     int port_listen;
149 
150     char default_bucket_name[250]; /* ML: The named bucket (proxy->name) */
151                                    /* that upstream conn's should start on. */
152                                    /* When empty (""), then only binary SASL */
153                                    /* clients can actually do anything useful. */
154 };
155 
156 proxy_behavior behavior_default_g;
157 
158 struct proxy_behavior_pool {
159     proxy_behavior  base; /* Proxy pool-level (PL) behavior. */
160     int             num;  /* Number of server-level (SL) behaviors. */
161     proxy_behavior *arr;  /* Array, size is num. */
162 };
163 
164 typedef enum {
165     PROXY_CONF_TYPE_STATIC = 0,
166     PROXY_CONF_TYPE_DYNAMIC,
167     PROXY_CONF_TYPE_last
168 } enum_proxy_conf_type;
169 
170 /* Quick map of struct hierarchy... */
171 
172 /* proxy_main */
173 /*  - has list of... */
174 /*    proxy */
175 /*     - has array of... */
176 /*       proxy_td (one proxy_td per worker thread) */
177 /*       - has list of... */
178 /*         downstream (in either reserved or released list) */
179 /*         - has mst/libmemcached struct */
180 /*         - has array of downstream conn's */
181 /*         - has non-NULL upstream conn, when reserved */
182 
183 /* Structure used and owned by main listener thread to
184  * track all the outstanding proxy objects.
185  */
186 struct proxy_main {
187     proxy_behavior behavior; /* Default, main listener modifiable only. */
188 
189     enum_proxy_conf_type conf_type; /* Immutable proxy configuration type. */
190 
191     /* Any thread that accesses the proxy list must */
192     /* first acquire the proxy_main_lock. */
193 
194     cb_mutex_t proxy_main_lock;
195 
196     /* Start of proxy list.  Covered by proxy_main_lock. */
197     /* Only the main listener thread may modify the proxy list. */
198     /* Other threads may read-only traverse the proxy list. */
199 
200     proxy *proxy_head;
201 
202     int nthreads; /* Immutable. */
203 
204     /* Updated by main listener thread only, */
205     /* so no extra locking needed. */
206 
207     uint64_t stat_configs;
208     uint64_t stat_config_fails;
209     uint64_t stat_proxy_starts;
210     uint64_t stat_proxy_start_fails;
211     uint64_t stat_proxy_existings;
212     uint64_t stat_proxy_shutdowns;
213 };
214 
215 /* Owned by main listener thread.
216  */
217 struct proxy {
218     proxy_main *main; /* Immutable, points to parent proxy_main. */
219 
220     int   port;   /* Immutable. */
221     char *name;   /* Mutable, covered by proxy_lock, for debugging, NULL-able. */
222     char *config; /* Mutable, covered by proxy_lock, mem owned by proxy, */
223                   /* might be NULL if the proxy is shutting down. */
224 
225     /* Mutable, covered by proxy_lock, incremented */
226     /* whenever config changes. */
227 
228     uint32_t config_ver;
229 
230     /* Mutable, covered by proxy_lock. */
231 
232     proxy_behavior_pool behavior_pool;
233 
234     /* Any thread that accesses the mutable fields should */
235     /* first acquire the proxy_lock. */
236 
237     cb_mutex_t proxy_lock;
238 
239     /* Number of listening conn's acting as a proxy, */
240     /* where (((proxy *) conn->extra) == this). */
241     /* Modified/accessed only by main listener thread. */
242 
243     uint64_t listening;
244     uint64_t listening_failed; /* When server_socket() failed. */
245 
246     proxy *next; /* Modified/accessed only by main listener thread. */
247 
248     mcache  front_cache;
249     matcher front_cache_matcher;
250     matcher front_cache_unmatcher;
251 
252     matcher optimize_set_matcher;
253 
254     proxy_td *thread_data;     /* Immutable. */
255     int       thread_data_num; /* Immutable. */
256 };
257 
258 struct proxy_stats {
259     /* Naming convention is that num_xxx's go up and down, */
260     /* while tot_xxx's and err_xxx's only increase.  Only */
261     /* the tot_xxx's and err_xxx's can be reset to 0. */
262 
263     uint64_t num_upstream; /* Current # of upstreams conns using this proxy. */
264     uint64_t tot_upstream; /* Total # upstream conns that used this proxy. */
265 
266     uint64_t num_downstream_conn;
267     uint64_t tot_downstream_conn;
268     uint64_t tot_downstream_conn_acquired;
269     uint64_t tot_downstream_conn_released;
270     uint64_t tot_downstream_released;
271     uint64_t tot_downstream_reserved;
272     uint64_t tot_downstream_reserved_time;
273     uint64_t max_downstream_reserved_time;
274     uint64_t tot_downstream_freed;
275     uint64_t tot_downstream_quit_server;
276     uint64_t tot_downstream_max_reached;
277     uint64_t tot_downstream_create_failed;
278 
279     /* When connections have stabilized... */
280     /*   tot_downstream_connect_started == */
281     /*     tot_downstream_connect + tot_downstream_connect_failed. */
282 
283     /* When a new connection is just created but not yet ready for use... */
284     /*   tot_downstream_connect_started > */
285     /*     tot_downstream_connect + tot_downstream_connect_failed. */
286 
287     uint64_t tot_downstream_connect_started;
288     uint64_t tot_downstream_connect_wait;
289     uint64_t tot_downstream_connect; /* Incremented when connect() + auth + */
290                                      /* bucket_selection succeeds. */
291     uint64_t tot_downstream_connect_failed;
292     uint64_t tot_downstream_connect_timeout;
293     uint64_t tot_downstream_connect_interval;
294     uint64_t tot_downstream_connect_max_reached;
295 
296     uint64_t tot_downstream_waiting_errors;
297     uint64_t tot_downstream_auth;
298     uint64_t tot_downstream_auth_failed;
299     uint64_t tot_downstream_bucket;
300     uint64_t tot_downstream_bucket_failed;
301     uint64_t tot_downstream_propagate_failed;
302     uint64_t tot_downstream_close_on_upstream_close;
303     uint64_t tot_downstream_conn_queue_timeout;
304     uint64_t tot_downstream_conn_queue_add;
305     uint64_t tot_downstream_conn_queue_remove;
306     uint64_t tot_downstream_timeout;
307     uint64_t tot_wait_queue_timeout;
308     uint64_t tot_auth_timeout;
309     uint64_t tot_assign_downstream;
310     uint64_t tot_assign_upstream;
311     uint64_t tot_assign_recursion;
312     uint64_t tot_reset_upstream_avail;
313     uint64_t tot_retry;
314     uint64_t tot_retry_time;
315     uint64_t max_retry_time;
316     uint64_t tot_retry_vbucket;
317     uint64_t tot_upstream_paused;
318     uint64_t tot_upstream_unpaused;
319     uint64_t tot_multiget_keys;
320     uint64_t tot_multiget_keys_dedupe;
321     uint64_t tot_multiget_bytes_dedupe;
322     uint64_t tot_optimize_sets;
323     uint64_t err_oom;
324     uint64_t err_upstream_write_prep;
325     uint64_t err_downstream_write_prep;
326     uint64_t tot_cmd_time;
327     uint64_t tot_cmd_count;
328     uint64_t tot_local_cmd_time;
329     uint64_t tot_local_cmd_count;
330 };
331 
332 typedef struct {
333     uint64_t seen;        /* Number of times a command was seen. */
334     uint64_t hits;        /* Number of hits or successes. */
335     uint64_t misses;      /* Number of misses or failures. */
336     uint64_t read_bytes;  /* Total bytes read, incoming into proxy. */
337     uint64_t write_bytes; /* Total bytes written, outgoing from proxy. */
338     uint64_t cas;         /* Number that had or required cas-id. */
339 } proxy_stats_cmd;
340 
341 typedef enum {
342     STATS_CMD_GET = 0, /* For each "get" cmd, even if multikey get. */
343     STATS_CMD_GET_KEY, /* For each key in a "get". */
344     STATS_CMD_SET,
345     STATS_CMD_ADD,
346     STATS_CMD_REPLACE,
347     STATS_CMD_DELETE,
348     STATS_CMD_APPEND,
349     STATS_CMD_PREPEND,
350     STATS_CMD_INCR,
351     STATS_CMD_DECR,
352     STATS_CMD_FLUSH_ALL,
353     STATS_CMD_CAS,
354     STATS_CMD_STATS,
355     STATS_CMD_STATS_RESET,
356     STATS_CMD_VERSION,
357     STATS_CMD_VERBOSITY,
358     STATS_CMD_QUIT,
359     STATS_CMD_GETL,
360     STATS_CMD_UNL,
361     STATS_CMD_ERROR,
362     STATS_CMD_last
363 } enum_stats_cmd;
364 
365 typedef enum {
366     STATS_CMD_TYPE_REGULAR = 0,
367     STATS_CMD_TYPE_QUIET,
368     STATS_CMD_TYPE_last
369 } enum_stats_cmd_type;
370 
371 typedef struct {
372     proxy_stats     stats;
373     proxy_stats_cmd stats_cmd[STATS_CMD_TYPE_last][STATS_CMD_last];
374 
375     HTGRAM_HANDLE downstream_reserved_time_htgram;
376     HTGRAM_HANDLE downstream_connect_time_htgram;
377 } proxy_stats_td;
378 
379 struct key_stats {
380     char key[KEY_MAX_LENGTH + 1];
381     int  refcount;
382     uint64_t exptime;
383     uint64_t added_at;
384     key_stats *next;
385     key_stats *prev;
386     proxy_stats_cmd stats_cmd[STATS_CMD_TYPE_last][STATS_CMD_last];
387 };
388 
389 /* We mirror memcached's threading model with a separate
390  * proxy_td (td means "thread data") struct owned by each
391  * worker thread.  The idea is to avoid extraneous locks.
392  */
393 struct proxy_td { /* Per proxy, per worker-thread data struct. */
394     proxy *proxy; /* Immutable parent pointer. */
395 
396     /* Snapshot of proxy-level configuration to avoid locks. */
397 
398     char    *config;
399     uint32_t config_ver;
400 
401     proxy_behavior_pool behavior_pool;
402 
403     /* Upstream conns that are paused, waiting for */
404     /* an available, released downstream. */
405 
406     conn *waiting_any_downstream_head;
407     conn *waiting_any_downstream_tail;
408 
409     downstream *downstream_reserved; /* Downstreams assigned to upstream conns. */
410     downstream *downstream_released; /* Downstreams unassigned to upstreams conn. */
411     uint64_t    downstream_tot;      /* Total lifetime downstreams created. */
412     int         downstream_num;      /* Number downstreams existing. */
413     int         downstream_max;      /* Max downstream concurrency number. */
414     uint64_t    downstream_assigns;  /* Track recursion. */
415 
416     /* A timeout for the wait_queue, so that we can emit error */
417     /* on any upstream conn's that are waiting too long for */
418     /* an available downstream. */
419 
420     /* Timeout is in use when timeout_tv fields are non-zero. */
421 
422     struct timeval timeout_tv;
423     struct event   timeout_event;
424 
425     mcache  key_stats;
426     matcher key_stats_matcher;
427     matcher key_stats_unmatcher;
428 
429     proxy_stats_td stats;
430 };
431 
432 /* A 'downstream' struct represents a set of downstream connections.
433  * A possibly better name for it should have been "downstream_conn_set".
434  *
435  * Owned by worker thread.
436  */
437 struct downstream {
438     /* The following group of fields are immutable or read-only (RO), */
439     /* except for config_ver, which gets updated if the downstream's */
440     /* config/behaviors still matches the parent ptd's config/behaviors. */
441 
442     proxy_td       *ptd;           /* RO: Parent pointer. */
443     char           *config;        /* RO: Mem owned by downstream. */
444     uint32_t        config_ver;    /* RW: Mutable, copy of proxy->config_ver. */
445     int             behaviors_num; /* RO: Snapshot of ptd->behavior_pool.num. */
446     proxy_behavior *behaviors_arr; /* RO: Snapshot of ptd->behavior_pool.arr. */
447     mcs_st          mst;           /* RW: From mcs. */
448 
449     downstream *next; /* To track reserved/released lists. */
450                       /* See ptd->downstream_reserved/downstream_released. */
451 
452     downstream *next_waiting; /* To track lists when a downstream is reserved, */
453                               /* but is waiting for a downstream connection, */
454                               /* per zstored perf enhancement. */
455 
456     conn **downstream_conns;  /* Wraps the fd's of mst with conns. */
457     int    downstream_used;   /* Number of in-use downstream conns, might */
458                               /* be >1 during scatter-gather commands. */
459     int    downstream_used_start;
460 
461     uint64_t usec_start; /* Snapshot of usec_now(). */
462 
463     conn  *upstream_conn;     /* Non-NULL when downstream is reserved. */
464     char  *upstream_suffix;   /* Last bit to write when downstreams are done. */
465     int    upstream_suffix_len; /* When > 0, overrides strlen(upstream_suffix), */
466                                 /* during binary protocol. */
467 
468     /* Used during an error when upstream is binary protocol. */
469     protocol_binary_response_status upstream_status;
470 
471     int    upstream_retry;    /* Will be >0 if we should retry the entire */
472                               /* command again when all downstreams are done. */
473                               /* Used in not-my-vbucket error case.  During */
474                               /* the retry, we'll reuse the same multiget */
475                               /* de-duplication tracking table to avoid */
476                               /* asking for successful keys again. */
477     int    upstream_retries;  /* Count number of upstream_retry attempts. */
478 
479     /* Used when proxying a simple, single-key (non-broadcast) command. */
480     char *target_host_ident;
481 
482     genhash_t *multiget; /* Keyed by string. */
483     genhash_t *merger;   /* Keyed by string, for merging replies like STATS. */
484 
485     /* Timeout is in use when timeout_tv fields are non-zero. */
486 
487     struct timeval timeout_tv;
488     struct event   timeout_event;
489 };
490 
491 /* Sentinel value for downstream->downstream_conns[] array entries, */
492 /* which usually signals that moxi wasn't able to create a connection */
493 /* to a downstream server. */
494 
495 #define NULL_CONN ((conn *) -1)
496 
497 /* Functions. */
498 
499 proxy *cproxy_create(proxy_main *main,
500                      char    *name,
501                      int      port,
502                      char    *config,
503                      uint32_t config_ver,
504                      proxy_behavior_pool *behavior_pool,
505                      int nthreads);
506 
507 int cproxy_listen(proxy *p);
508 int cproxy_listen_port(int port,
509                        enum protocol protocol,
510                        enum network_transport transport,
511                        void       *conn_extra,
512                        conn_funcs *conn_funcs);
513 
514 proxy_td *cproxy_find_thread_data(proxy *p, cb_thread_t thread_id);
515 bool      cproxy_init_upstream_conn(conn *c);
516 bool      cproxy_init_downstream_conn(conn *c);
517 void      cproxy_on_close_upstream_conn(conn *c);
518 void      cproxy_on_close_downstream_conn(conn *c);
519 void      cproxy_on_pause_downstream_conn(conn *c);
520 
521 void cproxy_upstream_state_change(conn *c, enum conn_states next_state);
522 
523 void cproxy_add_downstream(proxy_td *ptd);
524 void cproxy_free_downstream(downstream *d);
525 
526 downstream *cproxy_create_downstream(char *config,
527                                      uint32_t config_ver,
528                                      proxy_behavior_pool *behavior_pool);
529 
530 downstream *cproxy_reserve_downstream(proxy_td *ptd);
531 bool        cproxy_release_downstream(downstream *d, bool force);
532 void        cproxy_release_downstream_conn(downstream *d, conn *c);
533 bool        cproxy_check_downstream_config(downstream *d);
534 
535 int   cproxy_connect_downstream(downstream *d,
536                                 LIBEVENT_THREAD *thread,
537                                 int server_index);
538 conn *cproxy_connect_downstream_conn(downstream *d,
539                                      LIBEVENT_THREAD *thread,
540                                      mcs_server_st *msst,
541                                      proxy_behavior *behavior);
542 
543 void  cproxy_wait_any_downstream(proxy_td *ptd, conn *c);
544 void  cproxy_assign_downstream(proxy_td *ptd);
545 
546 proxy *cproxy_find_proxy_by_auth(proxy_main *m,
547                                  const char *usr,
548                                  const char *pwd);
549 
550 int cproxy_auth_downstream(mcs_server_st *server,
551                            proxy_behavior *behavior, SOCKET fd);
552 int cproxy_bucket_downstream(mcs_server_st *server,
553                              proxy_behavior *behavior, SOCKET fd);
554 
555 void  cproxy_pause_upstream_for_downstream(proxy_td *ptd, conn *upstream);
556 conn *cproxy_find_downstream_conn(downstream *d, char *key, int key_length,
557                                   bool *local);
558 conn *cproxy_find_downstream_conn_ex(downstream *d, char *key, int key_length,
559                                      bool *local, int *vbucket);
560 int   cproxy_server_index(downstream *d, char *key, size_t key_length, int *vbucket);
561 bool  cproxy_prep_conn_for_write(conn *c);
562 bool  cproxy_dettach_if_noreply(downstream *d, conn *uc);
563 
564 void cproxy_reset_upstream(conn *uc);
565 
566 bool cproxy_update_event_write(downstream *d, conn *c);
567 
568 bool cproxy_forward(downstream *d);
569 
570 void upstream_error_msg(conn *uc, char *ascii_msg,
571                         protocol_binary_response_status binary_status);
572 void upstream_retry(void *data0, void *data1);
573 
574 int downstream_conn_index(downstream *d, conn *c);
575 
576 void cproxy_dump_header(SOCKET prefix, char *bb);
577 
578 int cproxy_max_retries(downstream *d);
579 
580 /* --------------------------------------------------------------- */
581 
582 void cproxy_process_upstream_ascii(conn *c, char *line);
583 void cproxy_process_upstream_ascii_nread(conn *c);
584 
585 void cproxy_process_downstream_ascii(conn *c, char *line);
586 void cproxy_process_downstream_ascii_nread(conn *c);
587 
588 void cproxy_process_upstream_binary(conn *c);
589 void cproxy_process_upstream_binary_nread(conn *c);
590 
591 void cproxy_process_downstream_binary(conn *c);
592 void cproxy_process_downstream_binary_nread(conn *c);
593 
594 /* --------------------------------------------------------------- */
595 /* a2a means ascii upstream, ascii downstream. */
596 
597 void cproxy_init_a2a(void);
598 void cproxy_process_a2a_downstream(conn *c, char *line);
599 void cproxy_process_a2a_downstream_nread(conn *c);
600 
601 bool cproxy_forward_a2a_downstream(downstream *d);
602 
603 bool cproxy_forward_a2a_multiget_downstream(downstream *d, conn *uc);
604 bool cproxy_forward_a2a_simple_downstream(downstream *d, char *command,
605                                           conn *uc);
606 bool cproxy_forward_a2a_item_downstream(downstream *d, short cmd,
607                                         item *it, conn *uc);
608 bool cproxy_broadcast_a2a_downstream(downstream *d, char *command,
609                                      conn *uc, char *suffix);
610 
611 /* --------------------------------------------------------------- */
612 /* a2b means ascii upstream, binary downstream. */
613 
614 void cproxy_init_a2b(void);
615 void cproxy_process_a2b_downstream(conn *c);
616 void cproxy_process_a2b_downstream_nread(conn *c);
617 
618 bool cproxy_forward_a2b_downstream(downstream *d);
619 bool cproxy_forward_a2b_multiget_downstream(downstream *d, conn *uc);
620 bool cproxy_forward_a2b_simple_downstream(downstream *d, char *command,
621                                           conn *uc);
622 bool cproxy_forward_a2b_item_downstream(downstream *d, short cmd,
623                                         item *it, conn *uc);
624 bool cproxy_broadcast_a2b_downstream(downstream *d,
625                                      protocol_binary_request_header *req,
626                                      int req_size,
627                                      uint8_t *key,
628                                      uint16_t keylen,
629                                      uint8_t  extlen,
630                                      conn *uc, char *suffix);
631 
632 /* --------------------------------------------------------------- */
633 /* b2b means binary upstream, binary downstream. */
634 
635 void cproxy_init_b2b(void);
636 void cproxy_process_b2b_downstream(conn *c);
637 void cproxy_process_b2b_downstream_nread(conn *c);
638 
639 bool cproxy_forward_b2b_downstream(downstream *d);
640 bool cproxy_forward_b2b_multiget_downstream(downstream *d, conn *uc);
641 bool cproxy_forward_b2b_simple_downstream(downstream *d, conn *uc);
642 
643 bool cproxy_broadcast_b2b_downstream(downstream *d, conn *uc);
644 
645 /* --------------------------------------------------------------- */
646 
647 bool b2b_forward_item(conn *uc, downstream *d, item *it);
648 
649 bool b2b_forward_item_vbucket(conn *uc, downstream *d, item *it,
650                               conn *c, int vbucket);
651 
652 /* --------------------------------------------------------------- */
653 
654 /* Magic opaque value that tells us to eat a binary quiet command */
655 /* response.  That is, do not send the response up to the ascii client */
656 /* which originally made its request with noreply. */
657 
658 #define OPAQUE_IGNORE_REPLY 0x0411F00D
659 
660 bool cproxy_binary_ignore_reply(conn *c, protocol_binary_response_header *header, item *it);
661 
662 /* --------------------------------------------------------------- */
663 
664 proxy_main *cproxy_gen_proxy_main(proxy_behavior behavior,
665                                   int nthreads, enum_proxy_conf_type conf_type);
666 
667 proxy_behavior cproxy_parse_behavior(char          *behavior_str,
668                                      proxy_behavior behavior_default);
669 
670 void cproxy_parse_behavior_key_val_str(char *key_val,
671                                        proxy_behavior *behavior);
672 
673 void cproxy_parse_behavior_key_val(char *key,
674                                    char *val,
675                                    proxy_behavior *behavior);
676 
677 proxy_behavior *cproxy_copy_behaviors(int arr_size, proxy_behavior *arr);
678 
679 bool cproxy_equal_behaviors(int x_size, proxy_behavior *x,
680                             int y_size, proxy_behavior *y);
681 bool cproxy_equal_behavior(proxy_behavior *x,
682                            proxy_behavior *y);
683 
684 void cproxy_dump_behavior(proxy_behavior *b, char *prefix, int level);
685 void cproxy_dump_behavior_ex(proxy_behavior *b, char *prefix, int level,
686                              void (*dump)(const void *dump_opaque,
687                                           const char *prefix,
688                                           const char *key,
689                                           const char *buf),
690                              const void *dump_opaque);
691 void cproxy_dump_behavior_stderr(const void *dump_opaque,
692                                  const char *prefix,
693                                  const char *key,
694                                  const char *val);
695 
696 /* --------------------------------------------------------------- */
697 
698 bool cproxy_is_broadcast_cmd(int cmd);
699 
700 void cproxy_ascii_broadcast_suffix(downstream *d);
701 
702 void cproxy_upstream_ascii_item_response(item *it, conn *uc,
703                                          int cas_emit);
704 
705 bool cproxy_clear_timeout(downstream *d);
706 
707 struct timeval cproxy_get_downstream_timeout(downstream *d, conn *c);
708 
709 bool cproxy_start_downstream_timeout(downstream *d, conn *c);
710 bool cproxy_start_downstream_timeout_ex(downstream *d, conn *c,
711                                         struct timeval dt);
712 bool cproxy_start_wait_queue_timeout(proxy_td *ptd, conn *uc);
713 
714 rel_time_t cproxy_realtime(const time_t exptime);
715 
716 void cproxy_close_conn(conn *c);
717 
718 void cproxy_reset_stats_td(proxy_stats_td *pstd);
719 void cproxy_reset_stats(proxy_stats *ps);
720 void cproxy_reset_stats_cmd(proxy_stats_cmd *sc);
721 
722 bool cproxy_binary_cork_cmd(conn *uc);
723 void cproxy_binary_uncork_cmds(downstream *d, conn *uc);
724 
725 bool ascii_scan_key(char *line, char **key, int *key_len);
726 
727 /* Multiget key de-duplication. */
728 
729 typedef struct multiget_entry multiget_entry;
730 
731 struct multiget_entry {
732     conn           *upstream_conn;
733     uint32_t        opaque; /* For binary protocol. */
734     uint64_t        hits;
735     multiget_entry *next;
736 };
737 
738 bool multiget_ascii_downstream(
739     downstream *d, conn *uc,
740     int (*emit_start)(conn *c, char *cmd, int cmd_len),
741     int (*emit_skey)(conn *c, char *skey, int skey_len, int vbucket, int key_index),
742     int (*emit_end)(conn *c),
743     mcache *front_cache);
744 
745 void multiget_ascii_downstream_response(downstream *d, item *it);
746 
747 void multiget_foreach_free(const void *key,
748                            const void *value,
749                            void *user_data);
750 
751 void multiget_remove_upstream(const void *key,
752                               const void *value,
753                               void *user_data);
754 
755 /* Space or null terminated key funcs. */
756 
757 size_t skey_len(const char *key);
758 int    skey_hash(const void *v);
759 int    skey_equal(const void *v1, const void *v2);
760 
761 extern struct hash_ops strhash_ops;
762 extern struct hash_ops skeyhash_ops;
763 
764 void noop_free(void *v);
765 
766 /* Stats handling. */
767 
768 bool protocol_stats_merge_line(genhash_t *merger, char *line);
769 
770 bool protocol_stats_merge_name_val(genhash_t *merger,
771                                    char *prefix,
772                                    int   prefix_len,
773                                    char *name,
774                                    int   name_len,
775                                    char *val,
776                                    int   val_len);
777 
778 void protocol_stats_foreach_free(const void *key,
779                                  const void *value,
780                                  void *user_data);
781 
782 void protocol_stats_foreach_write(const void *key,
783                                   const void *value,
784                                   void *user_data);
785 
786 bool cproxy_optimize_set_ascii(downstream *d, conn *uc,
787                                char *key, int key_len);
788 
789 void cproxy_del_front_cache_key_ascii(downstream *d,
790                                       char *command);
791 
792 void cproxy_del_front_cache_key_ascii_response(downstream *d,
793                                                char *response,
794                                                char *command);
795 
796 void cproxy_front_cache_delete(proxy_td *ptd, char *key, int key_len);
797 
798 bool cproxy_front_cache_key(proxy_td *ptd, char *key, int key_len);
799 
800 HTGRAM_HANDLE cproxy_create_timing_histogram(void);
801 
802 typedef void (*mcache_traversal_func)(const void *it, void *userdata);
803 
804 /* Functions for the front cache. */
805 
806 void  mcache_init(mcache *m, bool multithreaded,
807                   mcache_funcs *funcs, bool key_alloc);
808 void  mcache_start(mcache *m, uint32_t max);
809 bool  mcache_started(mcache *m);
810 void  mcache_stop(mcache *m);
811 void  mcache_reset_stats(mcache *m);
812 void *mcache_get(mcache *m, char *key, int key_len,
813                  uint64_t curr_time);
814 void  mcache_set(mcache *m, void *it,
815                  uint64_t exptime,
816                  bool add_only,
817                  bool mod_exptime_if_exists);
818 void  mcache_delete(mcache *m, char *key, int key_len);
819 void  mcache_flush_all(mcache *m, uint32_t msec_exp);
820 void  mcache_foreach(mcache *m, mcache_traversal_func f, void *userdata);
821 
822 /* Functions for key stats. */
823 
824 key_stats *find_key_stats(proxy_td *ptd, char *key, int key_len,
825                           uint64_t msec_time);
826 
827 void touch_key_stats(proxy_td *ptd, char *key, int key_len,
828                      uint64_t msec_current_time,
829                      enum_stats_cmd_type cmd_type,
830                      enum_stats_cmd cmd,
831                      int delta_seen,
832                      int delta_hits,
833                      int delta_misses,
834                      int delta_read_bytes,
835                      int delta_write_bytes);
836 
837 void key_stats_add_ref(void *it);
838 void key_stats_dec_ref(void *it);
839 
840 /* TODO: The following generic items should be broken out into util file. */
841 
842 bool  add_conn_item(conn *c, item *it);
843 char *add_conn_suffix(conn *c);
844 
845 void *cproxy_make_bin_header(conn *c, uint8_t magic);
846 
847 protocol_binary_response_header *cproxy_make_bin_error(conn *c,
848                                                        uint16_t status);
849 
850 size_t scan_tokens(char *command, token_t *tokens, const size_t max_tokens,
851                    int *command_len);
852 
853 char *nread_text(short x);
854 
855 char *skipspace(char *s);
856 char *trailspace(char *s);
857 char *trimstr(char *s);
858 char *trimstrdup(char *s);
859 bool  wordeq(char *s, char *word);
860 
861 #endif /* CPROXY_H */
862