1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include "config.h"
3 #include <stdio.h>
4 #include <stdlib.h>
5 #include <ctype.h>
6 #include <string.h>
7 #include <unistd.h>
8 #include <stddef.h>
9 #include <stdarg.h>
10 
11 #include <memcached/engine.h>
12 #include <platform/platform.h>
13 #include "genhash.h"
14 #include "topkeys.h"
15 #include "bucket_engine.h"
16 
17 static rel_time_t (*get_current_time)(void);
18 static EXTENSION_LOGGER_DESCRIPTOR *logger;
19 
20 #ifdef WIN32
21 
ATOMIC_ADD(volatile int *dest, int value)22 static int ATOMIC_ADD(volatile int *dest, int value) {
23     LONG old = InterlockedExchangeAdd((LPLONG)dest, (LONG)value);
24     return (int)(old + value);
25 }
26 
ATOMIC_INCR(volatile int *dest)27 static int ATOMIC_INCR(volatile int *dest) {
28     return (int)InterlockedIncrement((LPLONG)dest);
29 }
30 
ATOMIC_DECR(volatile int *dest)31 static int ATOMIC_DECR(volatile int *dest) {
32     return (int)InterlockedDecrement((LPLONG)dest);
33 }
34 
ATOMIC_CAS(volatile bucket_state_t *dest, int prev, int next)35 static int ATOMIC_CAS(volatile bucket_state_t *dest, int prev, int next) {
36     LONG old = InterlockedCompareExchange((LONG*)dest, (LONG)next, (LONG)prev);
37     return old == prev;
38 }
39 
40 #elif defined(HAVE_ATOMIC_H) && defined(__SUNPRO_C)
41 #include <atomic.h>
ATOMIC_ADD(volatile int *dest, int value)42 static inline int ATOMIC_ADD(volatile int *dest, int value) {
43     return atomic_add_int_nv((volatile unsigned int *)dest, value);
44 }
45 
ATOMIC_INCR(volatile int *dest)46 static inline int ATOMIC_INCR(volatile int *dest) {
47     return atomic_inc_32_nv((volatile unsigned int *)dest);
48 }
49 
ATOMIC_DECR(volatile int *dest)50 static inline int ATOMIC_DECR(volatile int *dest) {
51     return atomic_dec_32_nv((volatile unsigned int *)dest);
52 }
53 
ATOMIC_CAS(volatile bucket_state_t *dest, int prev, int next)54 static inline int ATOMIC_CAS(volatile bucket_state_t *dest, int prev, int next) {
55     return (prev == atomic_cas_uint((volatile uint_t*)dest, (uint_t)prev,
56                                     (uint_t)next));
57 }
58 #else
59 #define ATOMIC_ADD(i, by) __sync_add_and_fetch(i, by)
60 #define ATOMIC_INCR(i) ATOMIC_ADD(i, 1)
61 #define ATOMIC_DECR(i) ATOMIC_ADD(i, -1)
62 #define ATOMIC_CAS(ptr, oldval, newval) \
63             __sync_bool_compare_and_swap(ptr, oldval, newval)
64 #endif
65 
66 static ENGINE_ERROR_CODE (*upstream_reserve_cookie)(const void *cookie);
67 static ENGINE_ERROR_CODE (*upstream_release_cookie)(const void *cookie);
68 static ENGINE_ERROR_CODE bucket_engine_reserve_cookie(const void *cookie);
69 static ENGINE_ERROR_CODE bucket_engine_release_cookie(const void *cookie);
70 
71 struct bucket_list {
72     char *name;
73     size_t namelen;
74     proxied_engine_handle_t *peh;
75     struct bucket_list *next;
76 };
77 
78 MEMCACHED_PUBLIC_API
79 ENGINE_ERROR_CODE create_instance(uint64_t interface,
80                                   GET_SERVER_API gsapi,
81                                   ENGINE_HANDLE **handle);
82 
83 static const engine_info* bucket_get_info(ENGINE_HANDLE* handle);
84 
85 static ENGINE_ERROR_CODE bucket_initialize(ENGINE_HANDLE* handle,
86                                            const char* config_str);
87 static void bucket_destroy(ENGINE_HANDLE* handle,
88                            const bool force);
89 static ENGINE_ERROR_CODE bucket_item_allocate(ENGINE_HANDLE* handle,
90                                               const void* cookie,
91                                               item **item,
92                                               const void* key,
93                                               const size_t nkey,
94                                               const size_t nbytes,
95                                               const int flags,
96                                               const rel_time_t exptime,
97                                               uint8_t datatype);
98 static ENGINE_ERROR_CODE bucket_item_delete(ENGINE_HANDLE* handle,
99                                             const void* cookie,
100                                             const void* key,
101                                             const size_t nkey,
102                                             uint64_t* cas,
103                                             uint16_t vbucket);
104 static void bucket_item_release(ENGINE_HANDLE* handle,
105                                 const void *cookie,
106                                 item* item);
107 static ENGINE_ERROR_CODE bucket_get(ENGINE_HANDLE* handle,
108                                     const void* cookie,
109                                     item** item,
110                                     const void* key,
111                                     const int nkey,
112                                     uint16_t vbucket);
113 static ENGINE_ERROR_CODE bucket_get_stats(ENGINE_HANDLE* handle,
114                                           const void *cookie,
115                                           const char *stat_key,
116                                           int nkey,
117                                           ADD_STAT add_stat);
118 static void *bucket_get_stats_struct(ENGINE_HANDLE* handle,
119                                                     const void *cookie);
120 static ENGINE_ERROR_CODE bucket_aggregate_stats(ENGINE_HANDLE* handle,
121                                                 const void* cookie,
122                                                 void (*callback)(void*, void*),
123                                                 void *stats);
124 static void bucket_reset_stats(ENGINE_HANDLE* handle, const void *cookie);
125 static ENGINE_ERROR_CODE bucket_store(ENGINE_HANDLE* handle,
126                                       const void *cookie,
127                                       item* item,
128                                       uint64_t *cas,
129                                       ENGINE_STORE_OPERATION operation,
130                                       uint16_t vbucket);
131 static ENGINE_ERROR_CODE bucket_arithmetic(ENGINE_HANDLE* handle,
132                                            const void* cookie,
133                                            const void* key,
134                                            const int nkey,
135                                            const bool increment,
136                                            const bool create,
137                                            const uint64_t delta,
138                                            const uint64_t initial,
139                                            const rel_time_t exptime,
140                                            uint64_t *cas,
141                                            uint8_t datatype,
142                                            uint64_t *result,
143                                            uint16_t vbucket);
144 static ENGINE_ERROR_CODE bucket_flush(ENGINE_HANDLE* handle,
145                                       const void* cookie, time_t when);
146 static ENGINE_ERROR_CODE initialize_configuration(struct bucket_engine *me,
147                                                   const char *cfg_str);
148 static ENGINE_ERROR_CODE bucket_unknown_command(ENGINE_HANDLE* handle,
149                                                 const void* cookie,
150                                                 protocol_binary_request_header *request,
151                                                 ADD_RESPONSE response);
152 
153 static bool bucket_get_item_info(ENGINE_HANDLE *handle,
154                                  const void *cookie,
155                                  const item* item,
156                                  item_info *item_info);
157 
158 static bool bucket_set_item_info(ENGINE_HANDLE *handle,
159                                  const void *cookie,
160                                  item* item,
161                                  const item_info *itm_info);
162 
163 static void bucket_item_set_cas(ENGINE_HANDLE *handle, const void *cookie,
164                                 item *item, uint64_t cas);
165 
166 static ENGINE_ERROR_CODE bucket_tap_notify(ENGINE_HANDLE* handle,
167                                            const void *cookie,
168                                            void *engine_specific,
169                                            uint16_t nengine,
170                                            uint8_t ttl,
171                                            uint16_t tap_flags,
172                                            tap_event_t tap_event,
173                                            uint32_t tap_seqno,
174                                            const void *key,
175                                            size_t nkey,
176                                            uint32_t flags,
177                                            uint32_t exptime,
178                                            uint64_t cas,
179                                            uint8_t datatype,
180                                            const void *data,
181                                            size_t ndata,
182                                            uint16_t vbucket);
183 
184 static TAP_ITERATOR bucket_get_tap_iterator(ENGINE_HANDLE* handle, const void* cookie,
185                                             const void* client, size_t nclient,
186                                             uint32_t flags,
187                                             const void* userdata, size_t nuserdata);
188 
189 static ENGINE_ERROR_CODE dcp_step(ENGINE_HANDLE* handle, const void* cookie,
190                                   struct dcp_message_producers *producers);
191 
192 static ENGINE_ERROR_CODE dcp_open(ENGINE_HANDLE* handle,
193                                   const void* cookie,
194                                   uint32_t opaque,
195                                   uint32_t seqno,
196                                   uint32_t flags,
197                                   void *name,
198                                   uint16_t nname);
199 
200 static ENGINE_ERROR_CODE dcp_add_stream(ENGINE_HANDLE* handle,
201                                         const void* cookie,
202                                         uint32_t opaque,
203                                         uint16_t vbucket,
204                                         uint32_t flags);
205 
206 static ENGINE_ERROR_CODE dcp_close_stream(ENGINE_HANDLE* handle,
207                                           const void* cookie,
208                                           uint32_t opaque,
209                                           uint16_t vbucket);
210 
211 static ENGINE_ERROR_CODE dcp_stream_req(ENGINE_HANDLE* handle, const void* cookie,
212                                         uint32_t flags,
213                                         uint32_t opaque,
214                                         uint16_t vbucket,
215                                         uint64_t start_seqno,
216                                         uint64_t end_seqno,
217                                         uint64_t vbucket_uuid,
218                                         uint64_t snap_start_seqno,
219                                         uint64_t snap_end_seqno,
220                                         uint64_t *rollback_seqno,
221                                         dcp_add_failover_log callback);
222 
223 
224 static ENGINE_ERROR_CODE dcp_get_failover_log(ENGINE_HANDLE* handle, const void* cookie,
225                                               uint32_t opaque,
226                                               uint16_t vbucket,
227                                               ENGINE_ERROR_CODE (*failover_log)(vbucket_failover_t*,
228                                                                                 size_t nentries,
229                                                                                 const void *cookie));
230 
231 static ENGINE_ERROR_CODE dcp_stream_end(ENGINE_HANDLE* handle, const void* cookie,
232                                         uint32_t opaque,
233                                         uint16_t vbucket,
234                                         uint32_t flags);
235 
236 static ENGINE_ERROR_CODE dcp_snapshot_marker(ENGINE_HANDLE* handle,
237                                              const void* cookie,
238                                              uint32_t opaque,
239                                              uint16_t vbucket,
240                                              uint64_t start_seqno,
241                                              uint64_t end_seqno,
242                                              uint32_t flags);
243 
244 static ENGINE_ERROR_CODE dcp_mutation(ENGINE_HANDLE* handle, const void* cookie,
245                                       uint32_t opaque,
246                                       const void *key,
247                                       uint16_t nkey,
248                                       const void *value,
249                                       uint32_t nvalue,
250                                       uint64_t cas,
251                                       uint16_t vbucket,
252                                       uint32_t flags,
253                                       uint8_t datatype,
254                                       uint64_t by_seqno,
255                                       uint64_t rev_seqno,
256                                       uint32_t expiration,
257                                       uint32_t lock_time,
258                                       const void *meta,
259                                       uint16_t nmeta,
260                                       uint8_t nru);
261 
262 static ENGINE_ERROR_CODE dcp_deletion(ENGINE_HANDLE* handle, const void* cookie,
263                                       uint32_t opaque,
264                                       const void *key,
265                                       uint16_t nkey,
266                                       uint64_t cas,
267                                       uint16_t vbucket,
268                                       uint64_t by_seqno,
269                                       uint64_t rev_seqno,
270                                       const void *meta,
271                                       uint16_t nmeta);
272 
273 static ENGINE_ERROR_CODE dcp_expiration(ENGINE_HANDLE* handle, const void* cookie,
274                                         uint32_t opaque,
275                                         const void *key,
276                                         uint16_t nkey,
277                                         uint64_t cas,
278                                         uint16_t vbucket,
279                                         uint64_t by_seqno,
280                                         uint64_t rev_seqno,
281                                         const void *meta,
282                                         uint16_t nmeta);
283 
284 static  ENGINE_ERROR_CODE dcp_flush(ENGINE_HANDLE* handle, const void* cookie,
285                                    uint32_t opaque,
286                                    uint16_t vbucket);
287 
288 static ENGINE_ERROR_CODE dcp_set_vbucket_state(ENGINE_HANDLE* handle, const void* cookie,
289                                                uint32_t opaque,
290                                                uint16_t vbucket,
291                                                vbucket_state_t state);
292 
293 static ENGINE_ERROR_CODE dcp_noop(ENGINE_HANDLE* handle,
294                                   const void* cookie,
295                                   uint32_t opaque);
296 
297 static ENGINE_ERROR_CODE dcp_buffer_acknowledgement(ENGINE_HANDLE* handle,
298                                                     const void* cookie,
299                                                     uint32_t opaque,
300                                                     uint16_t vbucket,
301                                                     uint32_t bb);
302 
303 static ENGINE_ERROR_CODE dcp_control(ENGINE_HANDLE* handle,
304                                      const void* cookie,
305                                      uint32_t opaque,
306                                      const void *key,
307                                      uint16_t nkey,
308                                      const void *value,
309                                      uint32_t nvalue);
310 
311 static ENGINE_ERROR_CODE dcp_response_handler(ENGINE_HANDLE* handle,
312                                               const void* cookie,
313                                               protocol_binary_response_header *r);
314 
315 static size_t bucket_errinfo(ENGINE_HANDLE *handle, const void* cookie,
316                              char *buffer, size_t buffsz);
317 
318 static ENGINE_ERROR_CODE bucket_get_engine_vb_map(ENGINE_HANDLE* handle,
319                                                   const void * cookie,
320                                                   engine_get_vb_map_cb callback);
321 
322 static ENGINE_HANDLE *load_engine(cb_dlhandle_t *dlhandle, const char *soname);
323 
324 static bool is_authorized(ENGINE_HANDLE* handle, const void* cookie);
325 
326 static void free_engine_handle(proxied_engine_handle_t *);
327 
328 static bool list_buckets(struct bucket_engine *e, struct bucket_list **blist);
329 static void bucket_list_free(struct bucket_list *blist);
330 static void maybe_start_engine_shutdown(proxied_engine_handle_t *e);
331 
332 
333 /**
334  * This is the one and only instance of the bucket engine.
335  */
336 struct bucket_engine bucket_engine;
337 /**
338  * To help us detect if we're using free'd memory, let's write a
339  * pattern to the memory before releasing it. That makes it more easy
340  * to identify in a core file if we're operating on a freed memory area
341  */
release_memory(void *ptr, size_t size)342 static void release_memory(void *ptr, size_t size)
343 {
344     memset(ptr, 0xae, size);
345     free(ptr);
346 }
347 
348 
349 /**
350  * Access to the global list of engines is protected by a single lock.
351  * To make the code more readable we're using a separate function
352  * to acquire the lock
353  */
lock_engines(void)354 static void lock_engines(void)
355 {
356     cb_mutex_enter(&bucket_engine.engines_mutex);
357 }
358 
359 /**
360  * This is the corresponding function to release the lock for
361  * the list of engines.
362  */
unlock_engines(void)363 static void unlock_engines(void)
364 {
365     cb_mutex_exit(&bucket_engine.engines_mutex);
366 }
367 
368 /**
369  * Convert a bucket state (enum) t a textual string
370  */
bucket_state_name(bucket_state_t s)371 static const char * bucket_state_name(bucket_state_t s) {
372     const char * rv = NULL;
373     switch(s) {
374     case STATE_NULL: rv = "NULL"; break;
375     case STATE_RUNNING: rv = "running"; break;
376     case STATE_STOPPING: rv = "stopping"; break;
377     case STATE_STOPPED: rv = "stopped"; break;
378     }
379     cb_assert(rv);
380     return rv;
381 }
382 
383 /**
384  * Helper function to get a pointer to the server API
385  */
bucket_get_server_api(void)386 static SERVER_HANDLE_V1 *bucket_get_server_api(void) {
387     return &bucket_engine.server;
388 }
389 
390 /**
391  * Helper structure used by find_bucket_by_engine
392  */
393 struct bucket_find_by_handle_data {
394     /** The engine we're searching for */
395     ENGINE_HANDLE *needle;
396     /** The engine-handle for this engine */
397     proxied_engine_handle_t *peh;
398 };
399 
400 /**
401  * A callback function used by genhash_iter to locate the engine handle
402  * object for a given engine.
403  *
404  * Runs with engines lock held.
405  *
406  * @param key not used
407  * @param nkey not used
408  * @param val the engine handle stored at this position in the hash
409  * @param nval not used
410  * @param args pointer to a bucket_find_by_handle_data structure
411  *             used to pass the search cirtera into the function and
412  *             return the object (if found).
413  */
find_bucket_by_engine(const void* key, size_t nkey, const void *val, size_t nval, void *args)414 static void find_bucket_by_engine(const void* key, size_t nkey,
415                                   const void *val, size_t nval,
416                                   void *args) {
417     struct bucket_find_by_handle_data *find_data = args;
418     const proxied_engine_handle_t *peh;
419     (void)key;
420     (void)nkey;
421     (void)nval;
422 
423     cb_assert(find_data);
424     cb_assert(find_data->needle);
425 
426     peh = val;
427     if (find_data->needle == peh->pe.v0) {
428         find_data->peh = (proxied_engine_handle_t *)peh;
429     }
430 }
431 
432 /**
433  * bucket_engine intercepts the calls from the underlying engine to
434  * register callbacks. During startup bucket engine registers a callback
435  * for ON_DISCONNECT in memcached, so we should always be notified
436  * whenever a client disconnects. The underlying engine may however also
437  * want this notification, so we intercept their attemt to register
438  * callbacks and forward the callback to the correct engine.
439  *
440  * This function will _always_ be called while we're holding the global
441  * lock for the hash table (during the call to "initialize" in the
442  * underlying engine. It is therefore safe to try to traverse the
443  * engines list.
444  */
bucket_register_callback(ENGINE_HANDLE *eh, ENGINE_EVENT_TYPE type, EVENT_CALLBACK cb, const void *cb_data)445 static void bucket_register_callback(ENGINE_HANDLE *eh,
446                                      ENGINE_EVENT_TYPE type,
447                                      EVENT_CALLBACK cb, const void *cb_data) {
448 
449     struct bucket_find_by_handle_data find_data;
450 
451     /* For simplicity, we're not going to test every combination until
452        we need them. */
453     cb_assert(type == ON_DISCONNECT);
454 
455     /* Assume this always happens while holding the hash table lock. */
456     /* This is called from underlying engine 'initialize' handler
457      * which we invoke with engines_mutex held */
458     find_data.needle = eh;
459     find_data.peh = NULL;
460 
461     genhash_iter(bucket_engine.engines, find_bucket_by_engine, &find_data);
462 
463     if (find_data.peh) {
464         find_data.peh->cb = cb;
465         find_data.peh->cb_data = cb_data;
466         find_data.peh->wants_disconnects = true;
467     } else if (bucket_engine.has_default && eh == bucket_engine.default_engine.pe.v0){
468         bucket_engine.default_engine.cb = cb;
469         bucket_engine.default_engine.cb_data = cb_data;
470         bucket_engine.default_engine.wants_disconnects = true;
471     }
472 }
473 
474 /**
475  * The engine api allows the underlying engine to perform various callbacks
476  * This isn't implemented in bucket engine as of today.
477  */
bucket_perform_callbacks(ENGINE_EVENT_TYPE type, const void *data, const void *cookie)478 static void bucket_perform_callbacks(ENGINE_EVENT_TYPE type,
479                                      const void *data, const void *cookie) {
480     (void)type;
481     (void)data;
482     (void)cookie;
483     abort(); /* Not implemented */
484 }
485 
486 /**
487  * Store engine-specific data in the engine-specific section of this
488  * cookie's data stored in the memcached core. The "upstream" cookie
489  * should have been registered during the "ON_CONNECT" callback, so it
490  * would be a bug if it isn't here anymore
491  */
bucket_store_engine_specific(const void *cookie, void *engine_data)492 static void bucket_store_engine_specific(const void *cookie, void *engine_data) {
493     engine_specific_t *es;
494     es = bucket_engine.upstream_server->cookie->get_engine_specific(cookie);
495     cb_assert(es);
496     es->engine_specific = engine_data;
497 }
498 
499 /**
500  * Get the engine-specific data from the engine-specific section of
501  * this cookies data stored in the memcached core.
502  */
bucket_get_engine_specific(const void *cookie)503 static void* bucket_get_engine_specific(const void *cookie) {
504     engine_specific_t *es;
505     es = bucket_engine.upstream_server->cookie->get_engine_specific(cookie);
506     cb_assert(es);
507     return es->engine_specific;
508 }
509 
510 /**
511  * Get the session-token stored in the memcached core.
512  */
bucket_validate_session_cas(const uint64_t cas)513 static bool bucket_validate_session_cas(const uint64_t cas) {
514     return bucket_engine.upstream_server->cookie->validate_session_cas(cas);
515 }
516 
517 /**
518  * Decrement the session_cas's counter held in memcached core.
519  */
bucket_decrement_session_ctr(void)520 static void bucket_decrement_session_ctr(void) {
521     bucket_engine.upstream_server->cookie->decrement_session_ctr();
522 }
523 
524 /**
525  * We don't allow the underlying engines to register or remove extensions
526  */
bucket_register_extension(extension_type_t type, void *extension)527 static bool bucket_register_extension(extension_type_t type,
528                                       void *extension) {
529     (void)type;
530     (void)extension;
531     logger->log(EXTENSION_LOG_WARNING, NULL,
532                 "Extension support isn't implemented in this version "
533                 "of bucket_engine");
534     return false;
535 }
536 
537 /**
538  * Since you can't register an extension this function should _never_ be
539  * called...
540  */
bucket_unregister_extension(extension_type_t type, void *extension)541 static void bucket_unregister_extension(extension_type_t type, void *extension) {
542     (void)type;
543     (void)extension;
544     logger->log(EXTENSION_LOG_WARNING, NULL,
545                 "Extension support isn't implemented in this version "
546                 "of bucket_engine");
547     abort(); /* No extensions registered, none can unregister */
548 }
549 
550 /**
551  * Get a given extension type from the memcached core.
552  * @todo Why do we overload this when all we do is wrap it directly?
553  */
bucket_get_extension(extension_type_t type)554 static void* bucket_get_extension(extension_type_t type) {
555     return bucket_engine.upstream_server->extension->get_extension(type);
556 }
557 
558 /* Engine API functions */
559 
560 /**
561  * This is the public entry point for bucket_engine. It is called by
562  * the memcached core and is responsible for doing basic allocation and
563  * initialization of the one and only instance of the bucket_engine object.
564  *
565  * The "normal" initialization is performed in bucket_initialize which is
566  * called from the memcached core after a successful call to create_instance.
567  */
create_instance(uint64_t interface, GET_SERVER_API gsapi, ENGINE_HANDLE **handle)568 ENGINE_ERROR_CODE create_instance(uint64_t interface,
569                                   GET_SERVER_API gsapi,
570                                   ENGINE_HANDLE **handle) {
571     if (interface != 1) {
572         return ENGINE_ENOTSUP;
573     }
574 
575     memset(&bucket_engine, 0, sizeof(bucket_engine));
576     bucket_engine.engine.interface.interface = 1;
577     bucket_engine.engine.get_info = bucket_get_info;
578     bucket_engine.engine.initialize = bucket_initialize;
579     bucket_engine.engine.destroy = bucket_destroy;
580     bucket_engine.engine.allocate = bucket_item_allocate;
581     bucket_engine.engine.remove = bucket_item_delete;
582     bucket_engine.engine.release = bucket_item_release;
583     bucket_engine.engine.get = bucket_get;
584     bucket_engine.engine.store = bucket_store;
585     bucket_engine.engine.arithmetic = bucket_arithmetic;
586     bucket_engine.engine.flush = bucket_flush;
587     bucket_engine.engine.get_stats = bucket_get_stats;
588     bucket_engine.engine.reset_stats = bucket_reset_stats;
589     bucket_engine.engine.get_stats_struct = bucket_get_stats_struct;
590     bucket_engine.engine.aggregate_stats = bucket_aggregate_stats;
591     bucket_engine.engine.unknown_command = bucket_unknown_command;
592     bucket_engine.engine.tap_notify = bucket_tap_notify;
593     bucket_engine.engine.get_tap_iterator = bucket_get_tap_iterator;
594     bucket_engine.engine.item_set_cas = bucket_item_set_cas;
595     bucket_engine.engine.get_item_info = bucket_get_item_info;
596     bucket_engine.engine.set_item_info = bucket_set_item_info;
597     bucket_engine.engine.errinfo = bucket_errinfo;
598     bucket_engine.engine.get_engine_vb_map = bucket_get_engine_vb_map;
599     bucket_engine.engine.dcp.step = dcp_step;
600     bucket_engine.engine.dcp.open = dcp_open;
601     bucket_engine.engine.dcp.add_stream = dcp_add_stream;
602     bucket_engine.engine.dcp.close_stream = dcp_close_stream;
603     bucket_engine.engine.dcp.get_failover_log = dcp_get_failover_log;
604     bucket_engine.engine.dcp.stream_req = dcp_stream_req;
605     bucket_engine.engine.dcp.stream_end = dcp_stream_end;
606     bucket_engine.engine.dcp.snapshot_marker = dcp_snapshot_marker;
607     bucket_engine.engine.dcp.mutation = dcp_mutation;
608     bucket_engine.engine.dcp.deletion = dcp_deletion;
609     bucket_engine.engine.dcp.expiration = dcp_expiration;
610     bucket_engine.engine.dcp.flush = dcp_flush;
611     bucket_engine.engine.dcp.set_vbucket_state = dcp_set_vbucket_state;
612     bucket_engine.engine.dcp.noop = dcp_noop;
613     bucket_engine.engine.dcp.buffer_acknowledgement = dcp_buffer_acknowledgement;
614     bucket_engine.engine.dcp.control = dcp_control;
615     bucket_engine.engine.dcp.response_handler = dcp_response_handler;
616     bucket_engine.initialized = false;
617     bucket_engine.shutdown.in_progress = false;
618     bucket_engine.shutdown.bucket_counter = 0;
619     cb_mutex_initialize(&bucket_engine.shutdown.mutex);
620     cb_cond_initialize(&bucket_engine.shutdown.cond);
621     cb_cond_initialize(&bucket_engine.shutdown.refcount_cond);
622     bucket_engine.info.engine_info.description = "Bucket engine v0.2";
623     bucket_engine.info.engine_info.num_features = 1;
624     bucket_engine.info.engine_info.features[0].feature = ENGINE_FEATURE_MULTI_TENANCY;
625     bucket_engine.info.engine_info.features[0].description = "Multi tenancy";
626 
627     *handle = (ENGINE_HANDLE*)&bucket_engine;
628     bucket_engine.upstream_server = gsapi();
629     bucket_engine.server = *bucket_engine.upstream_server;
630     bucket_engine.get_server_api = bucket_get_server_api;
631 
632     /* Use our own callback API for inferior engines */
633     bucket_engine.callback_api.register_callback = bucket_register_callback;
634     bucket_engine.callback_api.perform_callbacks = bucket_perform_callbacks;
635     bucket_engine.server.callback = &bucket_engine.callback_api;
636 
637     /* Same for extensions */
638     bucket_engine.extension_api.register_extension = bucket_register_extension;
639     bucket_engine.extension_api.unregister_extension = bucket_unregister_extension;
640     bucket_engine.extension_api.get_extension = bucket_get_extension;
641     bucket_engine.server.extension = &bucket_engine.extension_api;
642 
643     /* Override engine specific */
644     bucket_engine.cookie_api = *bucket_engine.upstream_server->cookie;
645     bucket_engine.server.cookie = &bucket_engine.cookie_api;
646     bucket_engine.server.cookie->store_engine_specific = bucket_store_engine_specific;
647     bucket_engine.server.cookie->get_engine_specific = bucket_get_engine_specific;
648 
649     upstream_reserve_cookie = bucket_engine.server.cookie->reserve;
650     upstream_release_cookie = bucket_engine.server.cookie->release;
651 
652     bucket_engine.server.cookie->reserve = bucket_engine_reserve_cookie;
653     bucket_engine.server.cookie->release = bucket_engine_release_cookie;
654 
655     logger = bucket_engine.server.extension->get_extension(EXTENSION_LOGGER);
656     return ENGINE_SUCCESS;
657 }
658 
659 /**
660  * Grab the engine handle mutex and release the proxied engine handle.
661  * The function currently allows you to call it with a NULL pointer,
662  * but that should be replaced (we should have better control of if we
663  * have an engine handle or not....)
664  */
release_handle(proxied_engine_handle_t *peh)665 static void release_handle(proxied_engine_handle_t *peh) {
666     int count;
667     if (!peh) {
668         return;
669     }
670 
671     count = ATOMIC_DECR(&peh->refcount);
672     cb_assert(count >= 0);
673     if (count == 0) {
674         cb_mutex_enter(&bucket_engine.shutdown.mutex);
675         cb_cond_broadcast(&bucket_engine.shutdown.refcount_cond);
676         cb_mutex_exit(&bucket_engine.shutdown.mutex);
677     }
678 }
679 
680 /**
681  * Helper function to search for a named bucket in the list of engines
682  * You must wrap this call with (un)lock_engines() in order for it to
683  * be mt-safe
684  */
find_bucket_inner(const char *name)685 static proxied_engine_handle_t *find_bucket_inner(const char *name) {
686     return genhash_find(bucket_engine.engines, name, strlen(name));
687 }
688 
689 /**
690  * If the bucket is in a runnable state, increment its reference counter
691  * and return its handle. Otherwise a NIL pointer is returned.
692  * The caller is responsible for releasing the handle
693  * with release_handle.
694  */
retain_handle(proxied_engine_handle_t *peh)695 static proxied_engine_handle_t* retain_handle(proxied_engine_handle_t *peh) {
696     proxied_engine_handle_t *rv = NULL;
697     if (peh) {
698         if (peh->state == STATE_RUNNING) {
699             int count = ATOMIC_INCR(&peh->refcount);
700             cb_assert(count > 0);
701             rv = peh;
702         }
703     }
704     return rv;
705 }
706 
707 /**
708  * Search the list of buckets for a named bucket. If the bucket
709  * exists and is in a runnable state, it's reference count is
710  * incremented and returned. The caller is responsible for
711  * releasing the handle with release_handle.
712 */
find_bucket(const char *name)713 static proxied_engine_handle_t *find_bucket(const char *name) {
714     proxied_engine_handle_t *rv;
715     lock_engines();
716     rv = retain_handle(find_bucket_inner(name));
717     unlock_engines();
718     return rv;
719 }
720 
721 /**
722  * Validate that the bucket name only consists of legal characters
723  */
has_valid_bucket_name(const char *n)724 static bool has_valid_bucket_name(const char *n) {
725     bool rv = n[0] != 0;
726     for (; *n; n++) {
727         rv &= isalpha(*n) || isdigit(*n) || *n == '.' || *n == '%' || *n == '_' || *n == '-';
728     }
729     return rv;
730 }
731 
732 /**
733  * Initialize a proxied engine handle. (Assumes that it's zeroed already
734 */
init_engine_handle(proxied_engine_handle_t *peh, const char *name, const char *module)735 static ENGINE_ERROR_CODE init_engine_handle(proxied_engine_handle_t *peh,
736                                             const char *name,
737                                             const char *module) {
738     peh->stats = bucket_engine.upstream_server->stat->new_stats();
739     if (peh->stats == NULL) {
740         return ENGINE_ENOMEM;
741     }
742     if (bucket_engine.topkeys != 0) {
743         int i;
744         peh->topkeys = calloc(TK_SHARDS, sizeof(topkeys_t *));
745         for (i = 0; i < TK_SHARDS; i++) {
746             peh->topkeys[i] = topkeys_init(bucket_engine.topkeys);
747         }
748         if (peh->topkeys == NULL) {
749             bucket_engine.upstream_server->stat->release_stats(peh->stats);
750             peh->stats = NULL;
751             return ENGINE_ENOMEM;
752         }
753     }
754     peh->refcount = 1;
755     peh->name = strdup(name);
756     if (peh->name == NULL) {
757         return ENGINE_ENOMEM;
758     }
759     peh->name_len = strlen(peh->name);
760 
761     if (module && strstr(module, "default_engine") != 0) {
762         peh->tap_iterator_disabled = true;
763     }
764 
765     peh->state = STATE_RUNNING;
766     return ENGINE_SUCCESS;
767 }
768 
769 /**
770  * Release the allocated resources within a proxied engine handle.
771  * Use free_engine_handle if you like to release the memory for the
772  * proxied engine handle itself...
773  */
uninit_engine_handle(proxied_engine_handle_t *peh)774 static void uninit_engine_handle(proxied_engine_handle_t *peh) {
775     bucket_engine.upstream_server->stat->release_stats(peh->stats);
776     if (peh->topkeys != NULL) {
777         int i;
778         for (i = 0; i < TK_SHARDS; i++) {
779             topkeys_free(peh->topkeys[i]);
780         }
781         free(peh->topkeys);
782     }
783     release_memory((void*)peh->name, peh->name_len);
784     /* Note: looks like current engine API allows engine to keep some
785      * connections reserved past destroy call return. This implies
786      * that doing dlclose is raceful and thus we should not do it.
787      *
788      * Currently we also have issue with tcmalloc integration on
789      * windows where apparently unloading ep.so is causing some
790      * troubles in tcmalloc. */
791     /*
792      * if (peh->dlhandle) {
793      *     dlclose(peh->dlhandle);
794      * }
795      */
796 }
797 
798 /**
799  * Release all resources used by a proxied engine handle and
800  * invalidate the proxied engine handle itself.
801  */
free_engine_handle(proxied_engine_handle_t *peh)802 static void free_engine_handle(proxied_engine_handle_t *peh) {
803     uninit_engine_handle(peh);
804     release_memory(peh, sizeof(*peh));
805 }
806 
807 /**
808  * Creates bucket and places it's handle into *e_out. NOTE: that
809  * caller is responsible for calling release_handle on that handle
810  */
create_bucket_UNLOCKED(struct bucket_engine *e, const char *bucket_name, const char *path, const char *config, proxied_engine_handle_t **e_out, char *msg, size_t msglen)811 static ENGINE_ERROR_CODE create_bucket_UNLOCKED(struct bucket_engine *e,
812                                                 const char *bucket_name,
813                                                 const char *path,
814                                                 const char *config,
815                                                 proxied_engine_handle_t **e_out,
816                                                 char *msg, size_t msglen) {
817 
818     ENGINE_ERROR_CODE rv;
819     proxied_engine_handle_t *peh;
820     proxied_engine_handle_t *tmppeh;
821 
822     if (!has_valid_bucket_name(bucket_name)) {
823         return ENGINE_EINVAL;
824     }
825 
826     peh = calloc(sizeof(proxied_engine_handle_t), 1);
827     if (peh == NULL) {
828         return ENGINE_ENOMEM;
829     }
830     rv = init_engine_handle(peh, bucket_name, path);
831     if (rv != ENGINE_SUCCESS) {
832         release_memory(peh, sizeof(*peh));
833         return rv;
834     }
835 
836     rv = ENGINE_FAILED;
837 
838     peh->pe.v0 = load_engine(&peh->dlhandle, path);
839 
840     if (!peh->pe.v0) {
841         free_engine_handle(peh);
842         if (msg) {
843             snprintf(msg, msglen, "Failed to load engine.");
844         }
845         return rv;
846     }
847 
848     tmppeh = find_bucket_inner(bucket_name);
849     if (tmppeh == NULL) {
850         genhash_update(e->engines, bucket_name, strlen(bucket_name), peh, 0);
851 
852         /* This was already verified, but we'll check it anyway */
853         cb_assert(peh->pe.v0->interface == 1);
854 
855         rv = ENGINE_SUCCESS;
856 
857         if (peh->pe.v1->initialize(peh->pe.v0, config) != ENGINE_SUCCESS) {
858             peh->pe.v1->destroy(peh->pe.v0, false);
859             genhash_delete_all(e->engines, bucket_name, strlen(bucket_name));
860             if (msg) {
861                 snprintf(msg, msglen,
862                          "Failed to initialize instance. Error code: %d\n", rv);
863             }
864             rv = ENGINE_FAILED;
865         }
866     } else {
867         if (msg) {
868             snprintf(msg, msglen,
869                      "Bucket exists: %s", bucket_state_name(tmppeh->state));
870         }
871         peh->pe.v1->destroy(peh->pe.v0, true);
872         rv = ENGINE_KEY_EEXISTS;
873     }
874 
875     if (rv == ENGINE_SUCCESS) {
876         if (e_out) {
877             *e_out = peh;
878         } else {
879             release_handle(peh);
880         }
881     } else {
882         free_engine_handle(peh);
883     }
884 
885     return rv;
886 }
887 
888 /**
889  * The client returned from the call inside the engine. If this was the
890  * last client inside the engine, and the engine is scheduled for removal
891  * it should be safe to nuke the engine :)
892  *
893  * @param engine the proxied engine
894  */
release_engine_handle(proxied_engine_handle_t *engine)895 static void release_engine_handle(proxied_engine_handle_t *engine) {
896     int count;
897     cb_assert(engine->clients > 0);
898     count = ATOMIC_DECR(&engine->clients);
899     cb_assert(count >= 0);
900     if (count == 0 && engine->state == STATE_STOPPING) {
901         maybe_start_engine_shutdown(engine);
902     }
903 }
904 
905 /**
906  * Returns engine handle for this connection.
907  * All access to underlying engine must go through this function, because
908  * we keep a counter of how many cookies that are currently calling into
909  * the engine..
910  *
911  * NOTE: this cannot ever return engine handle that's in STATE_STOPPED
912  * and if returns non-null it also prevents STATE_STOPPED to be
913  * reached until release_engine_handle is called that'll decrement
914  * clients counter. Here's why:
915  *
916  * Assume it returned non-null but engine's state is
917  * STATE_STOPPED. But that means state was changed after it was
918  * observed to be STATE_RUNNING in this function. And because we never
919  * change from running to stopped it changed twice. Because STATE_RUNNING was seen after incrementing clients count here's sequence of inter-dependendent events:
920  *
921  * - we bump clients count
922  *
923  * - we observe STATE_RUNNING (and that also implies didn't
924      have STATE_STOPPED & STATE_STOPPING in past because we don't
925      change from STOPPING/STOPPED back to RUNNING)
926  *
927  * - some other thread changes STATE_RUNNING to STATE_STOPPING
928  *
929  * - somebody sets STATE_STOPPED (see
930      maybe_start_engine_shutdown). But that implies that somebody
931      first observed STATE_STOPPING and _then_ observed clients ==
932      0. Which assuming nobody decrements it without first incrementing
933      it cannot happen because our bumped clients count prevents that.
934  *
935  * Q.E.D.
936  */
get_engine_handle(ENGINE_HANDLE *h, const void *cookie)937 static proxied_engine_handle_t *get_engine_handle(ENGINE_HANDLE *h,
938                                                   const void *cookie) {
939     struct bucket_engine *e = (struct bucket_engine*)h;
940     engine_specific_t *es;
941     proxied_engine_handle_t *peh;
942     int count;
943 
944     es = e->upstream_server->cookie->get_engine_specific(cookie);
945     cb_assert(es);
946 
947     peh = es->peh;
948     if (!peh) {
949         if (e->default_engine.pe.v0) {
950             peh = &e->default_engine;
951         } else {
952             return NULL;
953         }
954     }
955 
956     count = ATOMIC_INCR(&peh->clients);
957     cb_assert(count > 0);
958 
959     if (peh->state != STATE_RUNNING) {
960         release_engine_handle(peh);
961         peh = NULL;
962     }
963 
964     return peh;
965 }
966 
967 /**
968  * Returns engine handle for this connection.
969  * All access to underlying engine must go through this function, because
970  * we keep a counter of how many cookies that are currently calling into
971  * the engine..
972  */
try_get_engine_handle(ENGINE_HANDLE *h, const void *cookie)973 static proxied_engine_handle_t *try_get_engine_handle(ENGINE_HANDLE *h,
974                                                       const void *cookie) {
975     struct bucket_engine *e = (struct bucket_engine*)h;
976     engine_specific_t *es;
977     proxied_engine_handle_t *peh;
978     proxied_engine_handle_t *ret;
979     int count;
980 
981     es = e->upstream_server->cookie->get_engine_specific(cookie);
982     if (es == NULL || es->peh == NULL) {
983         return NULL;
984     }
985     peh = es->peh;
986     ret = peh;
987 
988     count = ATOMIC_INCR(&peh->clients);
989     cb_assert(count > 0);
990     if (peh->state != STATE_RUNNING) {
991         release_engine_handle(peh);
992         ret = NULL;
993     }
994 
995     return ret;
996 }
997 
998 /**
999  * Create an engine specific section for the cookie
1000  */
create_engine_specific(struct bucket_engine *e, const void *cookie)1001 static void create_engine_specific(struct bucket_engine *e,
1002                                    const void *cookie) {
1003     engine_specific_t *es;
1004     es = e->upstream_server->cookie->get_engine_specific(cookie);
1005     cb_assert(es == NULL);
1006     es = calloc(1, sizeof(engine_specific_t));
1007     cb_assert(es);
1008     es->reserved = ES_CONNECTED_FLAG;
1009     e->upstream_server->cookie->store_engine_specific(cookie, es);
1010 }
1011 
1012 /**
1013  * Set the engine handle for a cookie (create if it doesn't exist)
1014  */
set_engine_handle(ENGINE_HANDLE *h, const void *cookie, proxied_engine_handle_t *peh)1015 static proxied_engine_handle_t* set_engine_handle(ENGINE_HANDLE *h,
1016                                                   const void *cookie,
1017                                                   proxied_engine_handle_t *peh) {
1018     engine_specific_t *es;
1019     proxied_engine_handle_t *old;
1020     (void)h;
1021 
1022     es = bucket_engine.upstream_server->cookie->get_engine_specific(cookie);
1023     cb_assert(es);
1024 
1025     /* we cannot switch bucket for connection that's reserved. With
1026      * current code at least. */
1027     cb_assert((es->reserved & ~ES_CONNECTED_FLAG) == 0);
1028 
1029     old = es->peh;
1030     /* In with the new */
1031     es->peh = retain_handle(peh);
1032 
1033     /* out with the old (this may be NULL if we did't have an associated */
1034     /* strucure... */
1035     release_handle(old);
1036     return es->peh;
1037 }
1038 
1039 /**
1040  * Helper function to convert an ENGINE_HANDLE* to a bucket engine pointer
1041  * without a cast
1042  */
get_handle(ENGINE_HANDLE* handle)1043 static struct bucket_engine* get_handle(ENGINE_HANDLE* handle) {
1044     return (struct bucket_engine*)handle;
1045 }
1046 
1047 /**
1048  * Implementation of the the get_info function in the engine interface
1049  */
bucket_get_info(ENGINE_HANDLE* handle)1050 static const engine_info* bucket_get_info(ENGINE_HANDLE* handle) {
1051     return &(get_handle(handle)->info.engine_info);
1052 }
1053 
1054 /***********************************************************
1055  **       Implementation of functions used by genhash     **
1056  **********************************************************/
1057 
1058 /**
1059  * Function used by genhash to check if two keys differ
1060  */
my_hash_eq(const void *k1, size_t nkey1, const void *k2, size_t nkey2)1061 static int my_hash_eq(const void *k1, size_t nkey1,
1062                       const void *k2, size_t nkey2) {
1063     return nkey1 == nkey2 && memcmp(k1, k2, nkey1) == 0;
1064 }
1065 
1066 /**
1067  * Function used by genhash to create a copy of a key
1068  */
hash_strdup(const void *k, size_t nkey)1069 static void* hash_strdup(const void *k, size_t nkey) {
1070     void *rv = calloc(nkey, 1);
1071     cb_assert(rv);
1072     memcpy(rv, k, nkey);
1073     return rv;
1074 }
1075 
1076 /**
1077  * Function used by genhash to create a copy of the value (this is
1078  * the proxied engine handle). We don't copy that value, instead
1079  * we increase the reference count.
1080  */
refcount_dup(const void* ob, size_t vlen)1081 static void* refcount_dup(const void* ob, size_t vlen) {
1082     int count;
1083     proxied_engine_handle_t *peh = (proxied_engine_handle_t *)ob;
1084 
1085     (void)vlen;
1086     cb_assert(peh);
1087     count = ATOMIC_INCR(&peh->refcount);
1088     cb_assert(count > 0);
1089     return (void*)ob;
1090 }
1091 
1092 /**
1093  * Function used by genhash to release an object.
1094  */
engine_hash_free(void* ob)1095 static void engine_hash_free(void* ob) {
1096     proxied_engine_handle_t *peh = (proxied_engine_handle_t *)ob;
1097     cb_assert(peh);
1098     release_handle(peh);
1099     peh->state = STATE_NULL;
1100 }
1101 
1102 /**
1103  * Try to load a shared object and create an engine.
1104  *
1105  * @param dlhandle The pointer to the loaded object (OUT). The caller is
1106  *                 responsible for calling dlcose() to release the resources
1107  *                 if the function succeeds.
1108  * @param soname The name of the shared object to load
1109  * @return A pointer to the created instance, or NULL if anything
1110  *         failed.
1111  */
load_engine(void **dlhandle, const char *soname)1112 static ENGINE_HANDLE *load_engine(void **dlhandle, const char *soname) {
1113     ENGINE_HANDLE *engine = NULL;
1114     /* Hack to remove the warning from C99 */
1115     union my_hack {
1116         CREATE_INSTANCE create;
1117         void* voidptr;
1118     } my_create;
1119     ENGINE_ERROR_CODE error;
1120     void *symbol;
1121     char *errmsg;
1122     cb_dlhandle_t handle = cb_dlopen(soname, &errmsg);
1123     if (handle == NULL) {
1124         logger->log(EXTENSION_LOG_WARNING, NULL,
1125                     "Failed to open library \"%s\": %s\n",
1126                     soname ? soname : "self", errmsg);
1127         free(errmsg);
1128         return NULL;
1129     }
1130 
1131     symbol = cb_dlsym(handle, "create_instance", &errmsg);
1132     if (symbol == NULL) {
1133         logger->log(EXTENSION_LOG_WARNING, NULL,
1134                 "Could not find symbol \"create_instance\" in %s: %s\n",
1135                 soname ? soname : "self",
1136                 errmsg);
1137         free(errmsg);
1138         return NULL;
1139     }
1140     my_create.voidptr = symbol;
1141 
1142     /* request a instance with protocol version 1 */
1143     error = (*my_create.create)(1, bucket_engine.get_server_api, &engine);
1144     if (error != ENGINE_SUCCESS || engine == NULL) {
1145         logger->log(EXTENSION_LOG_WARNING, NULL,
1146                     "Failed to create instance. Error code: %d\n", error);
1147         cb_dlclose(handle);
1148         return NULL;
1149     }
1150 
1151     *dlhandle = handle;
1152     return engine;
1153 }
1154 
1155 /***********************************************************
1156  **  Implementation of callbacks from the memcached core  **
1157  **********************************************************/
1158 
1159 /**
1160  * Handle the situation when a connection is disconnected
1161  * from the upstream. Propagate the command downstream and
1162  * release the allocated resources for the connection
1163  * unless it is reserved.
1164  *
1165  * @param cookie the cookie representing the connection that was closed
1166  * @param type The kind of event (should be ON_DISCONNECT)
1167  * @param event_data not used
1168  * @param cb_data The bucket instance in use
1169  */
handle_disconnect(const void *cookie, ENGINE_EVENT_TYPE type, const void *event_data, const void *cb_data)1170 static void handle_disconnect(const void *cookie,
1171                               ENGINE_EVENT_TYPE type,
1172                               const void *event_data,
1173                               const void *cb_data)
1174 {
1175     struct bucket_engine *e = (struct bucket_engine*)cb_data;
1176     engine_specific_t *es;
1177     proxied_engine_handle_t *peh;
1178     proxied_engine_handle_t *cb_peh;
1179     bool do_callback;
1180     int count;
1181 
1182     cb_assert(type == ON_DISCONNECT);
1183     logger->log(EXTENSION_LOG_DETAIL, cookie,
1184                 "Handle disconnect for: %p", cookie);
1185     es = e->upstream_server->cookie->get_engine_specific(cookie);
1186     if (es == NULL) {
1187         logger->log(EXTENSION_LOG_DETAIL, cookie,
1188                     "The connection is no longer known to bucket_engine: %p",
1189                     cookie);
1190         return;
1191     }
1192     cb_assert(es);
1193 
1194     peh = es->peh;
1195     if (peh == NULL) {
1196         logger->log(EXTENSION_LOG_DETAIL, cookie,
1197                     "The connection is not connected to an engine %p", cookie);
1198         /* Not attached to an engine! */
1199         /* Release the allocated memory, and clear the cookie data */
1200         /* upstream */
1201         cb_assert(es->reserved == ES_CONNECTED_FLAG);
1202         /**
1203          * Decrement session_cas's counter, if the connection closes
1204          * before a control command (that returned ENGINE_EWOULDBLOCK
1205          * the first time) makes another attempt.
1206          *
1207          * Commands to be considered: DELETE_BUCKET
1208          */
1209         if (es->engine_specific != NULL) {
1210             uint8_t opcode = e->upstream_server->cookie->
1211                                     get_opcode_if_ewouldblock_set(cookie);
1212             if (opcode == PROTOCOL_BINARY_CMD_DELETE_BUCKET) {
1213                 bucket_decrement_session_ctr();
1214             }
1215         }
1216         release_memory(es, sizeof(*es));
1217         e->upstream_server->cookie->store_engine_specific(cookie, NULL);
1218         return;
1219     }
1220 
1221     cb_peh = try_get_engine_handle((ENGINE_HANDLE *)e, cookie);
1222 
1223     do_callback = cb_peh != NULL && peh->wants_disconnects;
1224     if (do_callback) {
1225         logger->log(EXTENSION_LOG_DETAIL, NULL,
1226                     "Send disconnect call to engine %p cookie %p",
1227                     peh, cookie);
1228         peh->cb(cookie, type, event_data, peh->cb_data);
1229     }
1230 
1231     if (cb_peh != NULL) {
1232         release_engine_handle(cb_peh);
1233     }
1234 
1235     /*
1236      * We can't release the bucket engine yet, because the connection is
1237      * still reserved
1238      */
1239     if (es->reserved != ES_CONNECTED_FLAG) {
1240         logger->log(EXTENSION_LOG_DETAIL, cookie,
1241                     "We can't complete the shutdown due to reservations %p",
1242                     cookie);
1243         return;
1244     }
1245 
1246     logger->log(EXTENSION_LOG_DETAIL, cookie, "Complete the shutdown of %p",
1247                 cookie);
1248 
1249     /* We don't expect concurrent calls to reserve because of
1250      * restriction that reserve can be only called from upcall. And
1251      * memcached will not upcall this while doing upcall for something
1252      * else (e.g. tap_notify or tap_itertator). */
1253     /* NOTE: that concurrent release is ok */
1254     count = ATOMIC_ADD(&es->reserved, -ES_CONNECTED_FLAG);
1255     if (count == 0) {
1256         /* if we're last just clear this thing */
1257         /* Release all the memory and clear the cookie data upstream. */
1258         release_memory(es, sizeof(*es));
1259         e->upstream_server->cookie->store_engine_specific(cookie, NULL);
1260     }
1261     /* we now have one less connection holding reference to this peh.
1262      *
1263      * NOTE: we have es->peh still has this peh, and es->reserved now
1264      * guards peh 'alive'-dness so connection's engine-specific will
1265      * still not outlive peh. */
1266     release_handle(peh);
1267 }
1268 
1269 /**
1270  * Callback from the memcached core for a new connection. Associate
1271  * it with the default bucket (if it exists) and create an engine
1272  * specific structure.
1273  *
1274  * @param cookie the cookie representing the connection
1275  * @param type The kind of event (should be ON_CONNECT)
1276  * @param event_data not used
1277  * @param cb_data The bucket instance in use
1278  */
handle_connect(const void *cookie, ENGINE_EVENT_TYPE type, const void *event_data, const void *cb_data)1279 static void handle_connect(const void *cookie,
1280                            ENGINE_EVENT_TYPE type,
1281                            const void *event_data,
1282                            const void *cb_data) {
1283     struct bucket_engine *e = (struct bucket_engine*)cb_data;
1284     proxied_engine_handle_t *peh = NULL;
1285 
1286     cb_assert(type == ON_CONNECT);
1287     (void)event_data;
1288 
1289     if (e->default_bucket_name != NULL) {
1290         /* Assign a default named bucket (if there is one). */
1291         peh = find_bucket(e->default_bucket_name);
1292         if (!peh && e->auto_create) {
1293             lock_engines();
1294             create_bucket_UNLOCKED(e, e->default_bucket_name,
1295                                    e->default_engine_path,
1296                                    e->default_bucket_config, &peh, NULL, 0);
1297             unlock_engines();
1298         }
1299     } else {
1300         /* Assign the default bucket (if there is one). */
1301         peh = e->default_engine.pe.v0 ? &e->default_engine : NULL;
1302         if (peh != NULL) {
1303             /* increment refcount because final release_handle will
1304              * decrement it */
1305             proxied_engine_handle_t *t = retain_handle(peh);
1306             cb_assert(t == peh);
1307         }
1308     }
1309 
1310     create_engine_specific(e, cookie);
1311     set_engine_handle((ENGINE_HANDLE*)e, cookie, peh);
1312     release_handle(peh);
1313 }
1314 
1315 /**
1316  * Callback from the memcached core that a cookie succesfully
1317  * authenticated itself. Associate the cookie with the bucket it is
1318  * authenticated to.
1319  *
1320  * @param cookie the cookie representing the connection
1321  * @param type The kind of event (should be ON_AUTH)
1322  * @param event_data The authentication data
1323  * @param cb_data The bucket instance in use
1324  */
handle_auth(const void *cookie, ENGINE_EVENT_TYPE type, const void *event_data, const void *cb_data)1325 static void handle_auth(const void *cookie,
1326                         ENGINE_EVENT_TYPE type,
1327                         const void *event_data,
1328                         const void *cb_data) {
1329     struct bucket_engine *e = (struct bucket_engine*)cb_data;
1330     const auth_data_t *auth_data = (const auth_data_t*)event_data;
1331     proxied_engine_handle_t *peh = find_bucket(auth_data->username);
1332     cb_assert(type == ON_AUTH);
1333 
1334     if (!peh && e->auto_create) {
1335         lock_engines();
1336         create_bucket_UNLOCKED(e, auth_data->username, e->default_engine_path,
1337                                auth_data->config ? auth_data->config : "",
1338                                &peh, NULL, 0);
1339         unlock_engines();
1340     }
1341     set_engine_handle((ENGINE_HANDLE*)e, cookie, peh);
1342     release_handle(peh);
1343 
1344     /*
1345      * backward compatibility hack until ns_server tries to set this
1346      * through memcached.json
1347      */
1348     if (e->admin_user != NULL && auth_data->username != NULL) {
1349         if (strcmp(e->admin_user, auth_data->username) == 0) {
1350             e->upstream_server->cookie->set_admin(cookie);
1351         }
1352     }
1353 }
1354 
1355 /**
1356  * Initialize the default bucket.
1357  */
init_default_bucket(struct bucket_engine* se)1358 static ENGINE_ERROR_CODE init_default_bucket(struct bucket_engine* se)
1359 {
1360     ENGINE_ERROR_CODE ret;
1361     ENGINE_HANDLE_V1 *dv1;
1362 
1363     memset(&se->default_engine, 0, sizeof(se->default_engine));
1364     if ((ret = init_engine_handle(&se->default_engine, "",
1365                                   se->default_engine_path)) != ENGINE_SUCCESS) {
1366         return ret;
1367     }
1368     se->default_engine.pe.v0 = load_engine(&se->default_engine.dlhandle,
1369                                            se->default_engine_path);
1370     dv1 = (ENGINE_HANDLE_V1*)se->default_engine.pe.v0;
1371     if (!dv1) {
1372         return ENGINE_FAILED;
1373     }
1374 
1375     ret = dv1->initialize(se->default_engine.pe.v0, se->default_bucket_config);
1376     if (ret != ENGINE_SUCCESS) {
1377         dv1->destroy(se->default_engine.pe.v0, false);
1378     }
1379 
1380     return ret;
1381 }
1382 
1383 /**
1384  * This is the implementation of the "initialize" function in the engine
1385  * interface. It is called right after create_instance if memcached liked
1386  * the interface we returned. Perform all initialization and load the
1387  * default bucket (if specified in the config string).
1388  */
bucket_initialize(ENGINE_HANDLE* handle, const char* config_str)1389 static ENGINE_ERROR_CODE bucket_initialize(ENGINE_HANDLE* handle,
1390                                            const char* config_str) {
1391     static struct hash_ops my_hash_ops;
1392     struct bucket_engine* se = get_handle(handle);
1393     ENGINE_ERROR_CODE ret;
1394     char *tenv = getenv("MEMCACHED_TOP_KEYS");
1395     cb_assert(!se->initialized);
1396 
1397     if (tenv != NULL) {
1398         se->topkeys = atoi(tenv);
1399         if (se->topkeys < 0) {
1400             se->topkeys = 0;
1401         }
1402     }
1403 
1404     get_current_time = bucket_engine.upstream_server->core->get_current_time;
1405 
1406     cb_mutex_initialize(&se->engines_mutex);
1407 
1408     ret = initialize_configuration(se, config_str);
1409     if (ret != ENGINE_SUCCESS) {
1410         return ret;
1411     }
1412 
1413     my_hash_ops.hashfunc = genhash_string_hash;
1414     my_hash_ops.hasheq = my_hash_eq;
1415     my_hash_ops.dupKey = hash_strdup;
1416     my_hash_ops.dupValue = refcount_dup;
1417     my_hash_ops.freeKey = free;
1418     my_hash_ops.freeValue = engine_hash_free;
1419 
1420     se->engines = genhash_init(1, my_hash_ops);
1421     if (se->engines == NULL) {
1422         return ENGINE_ENOMEM;
1423     }
1424 
1425     se->upstream_server->callback->register_callback(handle, ON_CONNECT,
1426                                                      handle_connect, se);
1427     se->upstream_server->callback->register_callback(handle, ON_AUTH,
1428                                                      handle_auth, se);
1429     se->upstream_server->callback->register_callback(handle, ON_DISCONNECT,
1430                                                      handle_disconnect, se);
1431 
1432     /* Initialization is useful to know if we *can* start up an */
1433     /* engine, but we check flags here to see if we should have and */
1434     /* shut it down if not. */
1435     if (se->has_default) {
1436         if ((ret = init_default_bucket(se)) != ENGINE_SUCCESS) {
1437             genhash_free(se->engines);
1438             return ret;
1439         }
1440     }
1441 
1442     se->initialized = true;
1443     return ENGINE_SUCCESS;
1444 }
1445 
1446 /**
1447  * During normal shutdown we want to shut down all of the engines
1448  * cleanly. The bucket_shutdown_engine is an implementation of a
1449  * "genhash iterator", so it is called once for each engine
1450  * stored in the hash table.
1451  *
1452  * No client connections should be running during the invocation
1453  * of this function, so we don't have to check if there is any
1454  * threads currently calling into the engine.
1455  */
bucket_shutdown_engine(const void* key, size_t nkey, const void *val, size_t nval, void *args)1456 static void bucket_shutdown_engine(const void* key, size_t nkey,
1457                                    const void *val, size_t nval,
1458                                    void *args) {
1459     const proxied_engine_handle_t *peh = val;
1460     (void)key; (void)nkey; (void)nval; (void)args;
1461     if (peh->pe.v0) {
1462         logger->log(EXTENSION_LOG_INFO, NULL,
1463                     "Shutting down \"%s\"\n", peh->name);
1464         peh->pe.v1->destroy(peh->pe.v0, false);
1465         logger->log(EXTENSION_LOG_INFO, NULL,
1466                     "Completed shutdown of \"%s\"\n", peh->name);
1467     }
1468 }
1469 
1470 /**
1471  * This is the implementation of the "destroy" function in the engine
1472  * interface. It is called from memcached when memcached is shutting down,
1473  * and memcached will never again reference this object when the function
1474  * returns. Try to shut down all of the loaded engines cleanly.
1475  *
1476  * @todo we should probably pass the force variable down to the iterator.
1477  *       Right now the core will always specify false here, but that may
1478  *       change in the future...
1479  *
1480  */
bucket_destroy(ENGINE_HANDLE* handle, const bool force)1481 static void bucket_destroy(ENGINE_HANDLE* handle,
1482                            const bool force) {
1483     struct bucket_engine* se = get_handle(handle);
1484     (void)force;
1485 
1486     if (!se->initialized) {
1487         return;
1488     }
1489 
1490     cb_mutex_enter(&bucket_engine.shutdown.mutex);
1491     bucket_engine.shutdown.in_progress = true;
1492     /* kick bucket deletion threads in butt broadcasting in_progress = true condition */
1493     cb_cond_broadcast(&bucket_engine.shutdown.refcount_cond);
1494     /* Ensure that we don't race with another thread shutting down a bucket */
1495     while (bucket_engine.shutdown.bucket_counter) {
1496         cb_cond_wait(&bucket_engine.shutdown.cond,
1497                      &bucket_engine.shutdown.mutex);
1498     }
1499     cb_mutex_exit(&bucket_engine.shutdown.mutex);
1500 
1501     genhash_iter(se->engines, bucket_shutdown_engine, NULL);
1502 
1503     if (se->has_default) {
1504         uninit_engine_handle(&se->default_engine);
1505     }
1506 
1507     genhash_free(se->engines);
1508     se->engines = NULL;
1509     free(se->default_engine_path);
1510     se->default_engine_path = NULL;
1511     free(se->admin_user);
1512     se->admin_user = NULL;
1513     free(se->default_bucket_name);
1514     se->default_bucket_name = NULL;
1515     free(se->default_bucket_config);
1516     se->default_bucket_config = NULL;
1517     cb_mutex_destroy(&se->engines_mutex);
1518     se->initialized = false;
1519 }
1520 
1521 /**
1522  * The deletion (shutdown) of a bucket is performed by its own thread
1523  * for simplicity (since we can't block the worker threads while we're
1524  * waiting for all of the connections to leave the engine).
1525  *
1526  * The state for the proxied_engine_handle should be "STOPPING" before
1527  * the thread is started, so that no new connections are allowed access
1528  * into the engine. Since we don't have any connections calling functions
1529  * into the engine we can safely start shutdown of the engine, but we can't
1530  * delete the proxied engine handle until all of the connections has
1531  * released their reference to the proxied engine handle.
1532  */
engine_shutdown_thread(void *arg)1533 static void engine_shutdown_thread(void *arg) {
1534     bool skip;
1535     proxied_engine_handle_t *peh;
1536     int upd;
1537 
1538     /* XXX:  Move state from STOPPED -> NULL.  This is an unbucket. */
1539     cb_mutex_enter(&bucket_engine.shutdown.mutex);
1540     skip = bucket_engine.shutdown.in_progress;
1541     if (!skip) {
1542         ++bucket_engine.shutdown.bucket_counter;
1543     }
1544     cb_mutex_exit(&bucket_engine.shutdown.mutex);
1545 
1546     if (skip) {
1547         /* Skip shutdown because we're racing the global shutdown.. */
1548         return ;
1549     }
1550 
1551     peh = arg;
1552     logger->log(EXTENSION_LOG_INFO, NULL,
1553                 "Started thread to shut down \"%s\"\n", peh->name);
1554 
1555     /* Sanity check */
1556     cb_assert(peh->state == STATE_STOPPED);
1557     /*
1558      * Note we can check for peh->clients == 0 but that's not actually
1559      * right because get_engine_handle can temporarily increment it.
1560      */
1561 
1562     logger->log(EXTENSION_LOG_INFO, NULL,
1563                 "Destroy engine \"%s\"\n", peh->name);
1564     peh->pe.v1->destroy(peh->pe.v0, peh->force_shutdown);
1565     logger->log(EXTENSION_LOG_INFO, NULL,
1566                 "Engine \"%s\" destroyed\n", peh->name);
1567 
1568     peh->pe.v1 = NULL;
1569 
1570     /* Unlink it from the engine table so that others may create */
1571     /* it while we're waiting for the remaining clients to disconnect */
1572     logger->log(EXTENSION_LOG_INFO, NULL,
1573                 "Unlink \"%s\" from engine table\n", peh->name);
1574     lock_engines();
1575     upd = genhash_delete_all(bucket_engine.engines,
1576                              peh->name, peh->name_len);
1577     cb_assert(upd == 1);
1578     cb_assert(genhash_find(bucket_engine.engines,
1579                         peh->name, peh->name_len) == NULL);
1580     unlock_engines();
1581 
1582     if (peh->cookie != NULL) {
1583         logger->log(EXTENSION_LOG_INFO, NULL,
1584                     "Notify %p that \"%s\" is deleted", peh->cookie, peh->name);
1585         bucket_engine.upstream_server->cookie->notify_io_complete(peh->cookie,
1586                                                                   ENGINE_SUCCESS);
1587     }
1588 
1589     /* NOTE: that even though DECR in release_handle happens without
1590      * lock, engine_shutdown_thread cannot miss wakeup event. That's
1591      * because broadcast happens under lock. Here's why.
1592      *
1593      * Suppose engine_shutdown_thread went to cond_wait sleep with
1594      * refcount = 0 and was never awaken (we want to prove by
1595      * contradiction that this cannot happen). But we know it have
1596      * observed refcount > 0. This means concurrent release_handle
1597      * decremented it after we've observed refcount value. But we know
1598      * that if this happened, release_handle would go and broadcast
1599      * signal. But our assumtion tells us we've missed this
1600      * broadcast. But this cannot happen because nobody can do
1601      * broadcast between us observing refcount value and going to
1602      * sleep because we're holding mutex that broadcast takes.
1603      */
1604     cb_mutex_enter(&bucket_engine.shutdown.mutex);
1605     while (peh->refcount > 0 && !bucket_engine.shutdown.in_progress) {
1606         logger->log(EXTENSION_LOG_INFO, NULL,
1607                     "There are %d references to \"%s\".. waiting more\n",
1608                     peh->refcount, peh->name);
1609 
1610         cb_cond_wait(&bucket_engine.shutdown.refcount_cond,
1611                      &bucket_engine.shutdown.mutex);
1612     }
1613     cb_mutex_exit(&bucket_engine.shutdown.mutex);
1614 
1615     logger->log(EXTENSION_LOG_INFO, NULL,
1616                 "Release all resources for engine \"%s\"\n", peh->name);
1617 
1618     /* and free it */
1619     free_engine_handle(peh);
1620 
1621     cb_mutex_enter(&bucket_engine.shutdown.mutex);
1622     --bucket_engine.shutdown.bucket_counter;
1623     if (bucket_engine.shutdown.in_progress && bucket_engine.shutdown.bucket_counter == 0){
1624         cb_cond_signal(&bucket_engine.shutdown.cond);
1625     }
1626     cb_mutex_exit(&bucket_engine.shutdown.mutex);
1627 
1628     return ;
1629 }
1630 
1631 /**
1632  * Check to see if we should start shutdown of the specified engine. The
1633  * critera for starting shutdown is that no clients are currently calling
1634  * into the engine, and that someone requested shutdown of that engine.
1635  *
1636  * Note: we always call it with refcount protecting bucket from being
1637  * deleted under us.
1638  */
maybe_start_engine_shutdown(proxied_engine_handle_t *e)1639 static void maybe_start_engine_shutdown(proxied_engine_handle_t *e) {
1640     cb_assert(e->state == STATE_STOPPING || e->state == STATE_STOPPED || e->state == STATE_NULL);
1641     /* observing 'state' before clients == 0 is _crucial_. See
1642      * get_engine_handle. */
1643     if (e->state == STATE_STOPPING && e->clients == 0 && ATOMIC_CAS(&e->state, STATE_STOPPING, STATE_STOPPED)) {
1644         /* Spin off a new thread to shut down the engine.. */
1645         cb_thread_t tid;
1646         if (cb_create_thread(&tid, engine_shutdown_thread, e, 1) != 0) {
1647             logger->log(EXTENSION_LOG_WARNING, NULL,
1648                         "Failed to start shutdown of \"%s\"!", e->name);
1649             abort();
1650         }
1651     }
1652 }
1653 
1654 /**
1655  * Implementation of the "item_allocate" function in the engine
1656  * specification. Look up the correct engine and call into the
1657  * underlying engine if the underlying engine is "running". Disconnect
1658  * the caller if the engine isn't "running" anymore.
1659  */
bucket_item_allocate(ENGINE_HANDLE* handle, const void* cookie, item **itm, const void* key, const size_t nkey, const size_t nbytes, const int flags, const rel_time_t exptime, uint8_t datatype)1660 static ENGINE_ERROR_CODE bucket_item_allocate(ENGINE_HANDLE* handle,
1661                                               const void* cookie,
1662                                               item **itm,
1663                                               const void* key,
1664                                               const size_t nkey,
1665                                               const size_t nbytes,
1666                                               const int flags,
1667                                               const rel_time_t exptime,
1668                                               uint8_t datatype) {
1669 
1670     proxied_engine_handle_t *peh = get_engine_handle(handle, cookie);
1671     if (peh != NULL) {
1672         ENGINE_ERROR_CODE ret;
1673         ret = peh->pe.v1->allocate(peh->pe.v0, cookie, itm, key,
1674                                    nkey, nbytes, flags, exptime,
1675                                    datatype);
1676         release_engine_handle(peh);
1677         return ret;
1678     } else {
1679         return ENGINE_DISCONNECT;
1680     }
1681 }
1682 
1683 /**
1684  * Implementation of the "item_delete" function in the engine
1685  * specification. Look up the correct engine and call into the
1686  * underlying engine if the underlying engine is "running". Disconnect
1687  * the caller if the engine isn't "running" anymore.
1688  */
bucket_item_delete(ENGINE_HANDLE* handle, const void* cookie, const void* key, const size_t nkey, uint64_t* cas, uint16_t vbucket)1689 static ENGINE_ERROR_CODE bucket_item_delete(ENGINE_HANDLE* handle,
1690                                             const void* cookie,
1691                                             const void* key,
1692                                             const size_t nkey,
1693                                             uint64_t* cas,
1694                                             uint16_t vbucket) {
1695     proxied_engine_handle_t *peh = get_engine_handle(handle, cookie);
1696     if (peh) {
1697         ENGINE_ERROR_CODE ret;
1698         ret = peh->pe.v1->remove(peh->pe.v0, cookie, key, nkey, cas, vbucket);
1699         release_engine_handle(peh);
1700 
1701         if (ret == ENGINE_SUCCESS) {
1702             TK(peh->topkeys, delete_hits, key, nkey, get_current_time());
1703         } else if (ret == ENGINE_KEY_ENOENT) {
1704             TK(peh->topkeys, delete_misses, key, nkey, get_current_time());
1705         } else if (ret == ENGINE_KEY_EEXISTS) {
1706             TK(peh->topkeys, cas_badval, key, nkey, get_current_time());
1707         }
1708 
1709         return ret;
1710     } else {
1711         return ENGINE_DISCONNECT;
1712     }
1713 }
1714 
1715 /**
1716  * Implementation of the "item_release" function in the engine
1717  * specification. Look up the correct engine and call into the
1718  * underlying engine if the underlying engine is "running".
1719  */
bucket_item_release(ENGINE_HANDLE* handle, const void *cookie, item* itm)1720 static void bucket_item_release(ENGINE_HANDLE* handle,
1721                                 const void *cookie,
1722                                 item* itm) {
1723     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
1724     if (peh) {
1725         peh->pe.v1->release(peh->pe.v0, cookie, itm);
1726         release_engine_handle(peh);
1727     } else {
1728         logger->log(EXTENSION_LOG_DEBUG, NULL,
1729                     "Potential memory leak. Failed to get engine handle for %p",
1730                     cookie);
1731     }
1732 }
1733 
1734 /**
1735  * Implementation of the "get" function in the engine
1736  * specification. Look up the correct engine and call into the
1737  * underlying engine if the underlying engine is "running". Disconnect
1738  * the caller if the engine isn't "running" anymore.
1739  */
bucket_get(ENGINE_HANDLE* handle, const void* cookie, item** itm, const void* key, const int nkey, uint16_t vbucket)1740 static ENGINE_ERROR_CODE bucket_get(ENGINE_HANDLE* handle,
1741                                     const void* cookie,
1742                                     item** itm,
1743                                     const void* key,
1744                                     const int nkey,
1745                                     uint16_t vbucket) {
1746     proxied_engine_handle_t *peh = get_engine_handle(handle, cookie);
1747     if (peh) {
1748         ENGINE_ERROR_CODE ret;
1749         ret = peh->pe.v1->get(peh->pe.v0, cookie, itm, key, nkey, vbucket);
1750 
1751         if (ret == ENGINE_SUCCESS) {
1752             TK(peh->topkeys, get_hits, key, nkey, get_current_time());
1753         } else if (ret == ENGINE_KEY_ENOENT) {
1754             TK(peh->topkeys, get_misses, key, nkey, get_current_time());
1755         }
1756 
1757         release_engine_handle(peh);
1758         return ret;
1759     } else {
1760         return ENGINE_DISCONNECT;
1761     }
1762 }
1763 
add_engine(const void *key, size_t nkey, const void *val, size_t nval, void *arg)1764 static void add_engine(const void *key, size_t nkey,
1765                        const void *val, size_t nval,
1766                        void *arg) {
1767     struct bucket_list **blist_ptr = (struct bucket_list **)arg;
1768     struct bucket_list *n = calloc(sizeof(struct bucket_list), 1);
1769     (void)nval;
1770     n->name = (char*)key;
1771     n->namelen = nkey;
1772     n->peh = (proxied_engine_handle_t*) val;
1773     cb_assert(n->peh);
1774 
1775     /* we must not leak dead buckets outside of engines_mutex. Those
1776      * can be freed by bucket destructor at any time (when
1777      * engines_mutex is not held) */
1778     if (retain_handle(n->peh) == NULL) {
1779         free(n);
1780         return;
1781     }
1782 
1783     n->next = *blist_ptr;
1784     *blist_ptr = n;
1785 }
1786 
list_buckets(struct bucket_engine *e, struct bucket_list **blist)1787 static bool list_buckets(struct bucket_engine *e, struct bucket_list **blist) {
1788     lock_engines();
1789     genhash_iter(e->engines, add_engine, blist);
1790     unlock_engines();
1791     return true;
1792 }
1793 
bucket_list_free(struct bucket_list *blist)1794 static void bucket_list_free(struct bucket_list *blist) {
1795     struct bucket_list *p = blist;
1796     while (p) {
1797         struct bucket_list *tmp;
1798         release_handle(p->peh);
1799         tmp = p->next;
1800         free(p);
1801         p = tmp;
1802     }
1803 }
1804 
1805 /**
1806  * Implementation of the "aggregate_stats" function in the engine
1807  * specification. Look up the correct engine and call into the
1808  * underlying engine if the underlying engine is "running". Disconnect
1809  * the caller if the engine isn't "running" anymore.
1810  */
bucket_aggregate_stats(ENGINE_HANDLE* handle, const void* cookie, void (*callback)(void*, void*), void *stats)1811 static ENGINE_ERROR_CODE bucket_aggregate_stats(ENGINE_HANDLE* handle,
1812                                                 const void* cookie,
1813                                                 void (*callback)(void*, void*),
1814                                                 void *stats) {
1815     struct bucket_engine *e = (struct bucket_engine*)handle;
1816     struct bucket_list *blist = NULL;
1817     struct bucket_list *p;
1818     (void)cookie;
1819     if (! list_buckets(e, &blist)) {
1820         return ENGINE_FAILED;
1821     }
1822 
1823     p = blist;
1824     while (p) {
1825         callback(p->peh->stats, stats);
1826         p = p->next;
1827     }
1828 
1829     bucket_list_free(blist);
1830     return ENGINE_SUCCESS;
1831 }
1832 
1833 struct stat_context {
1834     ADD_STAT add_stat;
1835     const void *cookie;
1836 };
1837 
stat_ht_builder(const void *key, size_t nkey, const void *val, size_t nval, void *arg)1838 static void stat_ht_builder(const void *key, size_t nkey,
1839                             const void *val, size_t nval,
1840                             void *arg) {
1841     struct stat_context *ctx;
1842     proxied_engine_handle_t *bucket;
1843     const char *bucketState;
1844 
1845     (void)nval;
1846     cb_assert(arg);
1847     ctx = (struct stat_context*)arg;
1848     bucket = (proxied_engine_handle_t*)val;
1849     bucketState = bucket_state_name(bucket->state);
1850     ctx->add_stat(key, (uint16_t)nkey, bucketState,
1851                   (uint32_t)strlen(bucketState),
1852                   ctx->cookie);
1853 }
1854 
1855 /**
1856  * Get bucket-engine specific statistics
1857  */
get_bucket_stats(ENGINE_HANDLE* handle, const void *cookie, ADD_STAT add_stat)1858 static ENGINE_ERROR_CODE get_bucket_stats(ENGINE_HANDLE* handle,
1859                                           const void *cookie,
1860                                           ADD_STAT add_stat) {
1861 
1862     struct bucket_engine *e;
1863     struct stat_context sctx;
1864 
1865     if (!is_authorized(handle, cookie)) {
1866         return ENGINE_FAILED;
1867     }
1868 
1869     e = (struct bucket_engine*)handle;
1870     sctx.add_stat = add_stat;
1871     sctx.cookie = cookie;
1872 
1873     lock_engines();
1874     genhash_iter(e->engines, stat_ht_builder, &sctx);
1875     unlock_engines();
1876     return ENGINE_SUCCESS;
1877 }
1878 
1879 /**
1880  * Implementation of the "get_stats" function in the engine
1881  * specification. Look up the correct engine and call into the
1882  * underlying engine if the underlying engine is "running". Disconnect
1883  * the caller if the engine isn't "running" anymore.
1884  */
bucket_get_stats(ENGINE_HANDLE* handle, const void* cookie, const char* stat_key, int nkey, ADD_STAT add_stat)1885 static ENGINE_ERROR_CODE bucket_get_stats(ENGINE_HANDLE* handle,
1886                                           const void* cookie,
1887                                           const char* stat_key,
1888                                           int nkey,
1889                                           ADD_STAT add_stat) {
1890     ENGINE_ERROR_CODE rc;
1891     proxied_engine_handle_t *peh;
1892 
1893     /* Intercept bucket stats. */
1894     if (nkey == (sizeof("bucket") - 1) &&
1895         memcmp("bucket", stat_key, nkey) == 0) {
1896         return get_bucket_stats(handle, cookie, add_stat);
1897     }
1898 
1899     rc = ENGINE_DISCONNECT;
1900     peh = get_engine_handle(handle, cookie);
1901 
1902     if (peh) {
1903         if (nkey == (sizeof("topkeys") - 1) &&
1904             memcmp("topkeys", stat_key, nkey) == 0) {
1905             rc = topkeys_stats(peh->topkeys, TK_SHARDS, cookie, get_current_time(),
1906                                add_stat);
1907         } else {
1908             rc = peh->pe.v1->get_stats(peh->pe.v0, cookie, stat_key,
1909                                        nkey, add_stat);
1910             if (nkey == 0) {
1911                 char statval[20];
1912                 snprintf(statval, sizeof(statval), "%d", peh->refcount - 1);
1913                 add_stat("bucket_conns", sizeof("bucket_conns") - 1, statval,
1914                          (uint32_t)strlen(statval), cookie);
1915                 snprintf(statval, sizeof(statval), "%d", peh->clients);
1916                 add_stat("bucket_active_conns", sizeof("bucket_active_conns") -1,
1917                          statval, (uint32_t)strlen(statval), cookie);
1918             }
1919         }
1920         release_engine_handle(peh);
1921     }
1922     return rc;
1923 }
1924 
1925 /**
1926  * Implementation of the "get_stats_struct" function in the engine
1927  * specification. Look up the correct engine and and verify it's
1928  * state.
1929  */
bucket_get_stats_struct(ENGINE_HANDLE* handle, const void* cookie)1930 static void *bucket_get_stats_struct(ENGINE_HANDLE* handle,
1931                                      const void* cookie)
1932 {
1933     void *ret = NULL;
1934     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
1935     if (peh) {
1936         ret = peh->stats;
1937         release_engine_handle(peh);
1938     }
1939 
1940     return ret;
1941 }
1942 
1943 /**
1944  * Implementation of the "store" function in the engine
1945  * specification. Look up the correct engine and call into the
1946  * underlying engine if the underlying engine is "running". Disconnect
1947  * the caller if the engine isn't "running" anymore.
1948  */
bucket_store(ENGINE_HANDLE* handle, const void *cookie, item* itm, uint64_t *cas, ENGINE_STORE_OPERATION operation, uint16_t vbucket)1949 static ENGINE_ERROR_CODE bucket_store(ENGINE_HANDLE* handle,
1950                                       const void *cookie,
1951                                       item* itm,
1952                                       uint64_t *cas,
1953                                       ENGINE_STORE_OPERATION operation,
1954                                       uint16_t vbucket) {
1955     proxied_engine_handle_t *peh = get_engine_handle(handle, cookie);
1956     if (peh) {
1957         ENGINE_ERROR_CODE ret;
1958         ret = peh->pe.v1->store(peh->pe.v0, cookie, itm, cas, operation, vbucket);
1959         if (ret != ENGINE_EWOULDBLOCK && peh->topkeys) {
1960             item_info itm_info;
1961             itm_info.nvalue = 1;
1962             if (peh->pe.v1->get_item_info(peh->pe.v0, cookie, itm, &itm_info)) {
1963                 const void* key = itm_info.key;
1964                 const int nkey = itm_info.nkey;
1965 
1966                 if (operation != OPERATION_CAS) {
1967                     TK(peh->topkeys, cmd_set, key, nkey, get_current_time());
1968                 } else {
1969                     if (ret == ENGINE_SUCCESS) {
1970                         TK(peh->topkeys, cas_hits, key, nkey,
1971                            get_current_time());
1972                     } else if (ret == ENGINE_KEY_EEXISTS) {
1973                         TK(peh->topkeys, cas_badval, key, nkey,
1974                            get_current_time());
1975                     } else if (ret == ENGINE_KEY_ENOENT) {
1976                         TK(peh->topkeys, cas_misses, key, nkey,
1977                            get_current_time());
1978                     }
1979                 }
1980             }
1981         }
1982         release_engine_handle(peh);
1983         return ret;
1984     } else {
1985         return ENGINE_DISCONNECT;
1986     }
1987 }
1988 
1989 /**
1990  * Implementation of the "arithmetic" function in the engine
1991  * specification. Look up the correct engine and call into the
1992  * underlying engine if the underlying engine is "running". Disconnect
1993  * the caller if the engine isn't "running" anymore.
1994  */
bucket_arithmetic(ENGINE_HANDLE* handle, const void* cookie, const void* key, const int nkey, const bool increment, const bool create, const uint64_t delta, const uint64_t initial, const rel_time_t exptime, uint64_t *cas, uint8_t datatype, uint64_t *result, uint16_t vbucket)1995 static ENGINE_ERROR_CODE bucket_arithmetic(ENGINE_HANDLE* handle,
1996                                            const void* cookie,
1997                                            const void* key,
1998                                            const int nkey,
1999                                            const bool increment,
2000                                            const bool create,
2001                                            const uint64_t delta,
2002                                            const uint64_t initial,
2003                                            const rel_time_t exptime,
2004                                            uint64_t *cas,
2005                                            uint8_t datatype,
2006                                            uint64_t *result,
2007                                            uint16_t vbucket) {
2008     proxied_engine_handle_t *peh = get_engine_handle(handle, cookie);
2009     if (peh) {
2010         ENGINE_ERROR_CODE ret;
2011         ret = peh->pe.v1->arithmetic(peh->pe.v0, cookie, key, nkey,
2012                                 increment, create, delta, initial,
2013                                 exptime, cas, datatype, result, vbucket);
2014 
2015 
2016         if (ret == ENGINE_SUCCESS) {
2017             if (increment) {
2018                 TK(peh->topkeys, incr_hits, key, nkey, get_current_time());
2019             } else {
2020                 TK(peh->topkeys, decr_hits, key, nkey, get_current_time());
2021 
2022             }
2023         } else if (ret == ENGINE_KEY_ENOENT) {
2024             if (increment) {
2025                 TK(peh->topkeys, incr_misses, key, nkey, get_current_time());
2026             } else {
2027                 TK(peh->topkeys, decr_misses, key, nkey, get_current_time());
2028 
2029             }
2030         }
2031 
2032         release_engine_handle(peh);
2033         return ret;
2034     } else {
2035         return ENGINE_DISCONNECT;
2036     }
2037 }
2038 
2039 /**
2040  * Implementation of the "flush" function in the engine
2041  * specification. Look up the correct engine and call into the
2042  * underlying engine if the underlying engine is "running". Disconnect
2043  * the caller if the engine isn't "running" anymore.
2044  */
bucket_flush(ENGINE_HANDLE* handle, const void* cookie, time_t when)2045 static ENGINE_ERROR_CODE bucket_flush(ENGINE_HANDLE* handle,
2046                                       const void* cookie, time_t when) {
2047     proxied_engine_handle_t *peh = get_engine_handle(handle, cookie);
2048     if (peh) {
2049         ENGINE_ERROR_CODE ret;
2050         ret = peh->pe.v1->flush(peh->pe.v0, cookie, when);
2051         release_engine_handle(peh);
2052         return ret;
2053     } else {
2054         return ENGINE_DISCONNECT;
2055     }
2056 }
2057 
2058 /**
2059  * Implementation of the "reset_stats" function in the engine
2060  * specification. Look up the correct engine and call into the
2061  * underlying engine if the underlying engine is "running".
2062  */
bucket_reset_stats(ENGINE_HANDLE* handle, const void *cookie)2063 static void bucket_reset_stats(ENGINE_HANDLE* handle, const void *cookie) {
2064     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
2065     if (peh) {
2066         peh->pe.v1->reset_stats(peh->pe.v0, cookie);
2067         release_engine_handle(peh);
2068     }
2069 }
2070 
2071 /**
2072  * Implementation of the "get_item_info" function in the engine
2073  * specification. Look up the correct engine and call into the
2074  * underlying engine if the underlying engine is "running".
2075  */
bucket_get_item_info(ENGINE_HANDLE *handle, const void *cookie, const item* itm, item_info *itm_info)2076 static bool bucket_get_item_info(ENGINE_HANDLE *handle,
2077                                  const void *cookie,
2078                                  const item* itm,
2079                                  item_info *itm_info) {
2080     bool ret = false;
2081     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
2082     if (peh) {
2083         ret = peh->pe.v1->get_item_info(peh->pe.v0, cookie, itm, itm_info);
2084         release_engine_handle(peh);
2085     }
2086 
2087     return ret;
2088 }
2089 
2090 /**
2091  * Implementation of the "set_item_info" function in the engine
2092  * specification. Look up the correct engine and call into the
2093  * underlying engine if the underlying engine is "running".
2094  */
bucket_set_item_info(ENGINE_HANDLE *handle, const void *cookie, item* itm, const item_info *itm_info)2095 static bool bucket_set_item_info(ENGINE_HANDLE *handle,
2096                                  const void *cookie,
2097                                  item* itm,
2098                                  const item_info *itm_info) {
2099     bool ret = false;
2100     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
2101     if (peh) {
2102         ret = peh->pe.v1->set_item_info(peh->pe.v0, cookie, itm, itm_info);
2103         release_engine_handle(peh);
2104     }
2105 
2106     return ret;
2107 }
2108 
2109 /**
2110  * Implementation of the "item_set_cas" function in the engine
2111  * specification. Look up the correct engine and call into the
2112  * underlying engine if the underlying engine is "running".
2113  */
bucket_item_set_cas(ENGINE_HANDLE *handle, const void *cookie, item *itm, uint64_t cas)2114 static void bucket_item_set_cas(ENGINE_HANDLE *handle, const void *cookie,
2115                                 item *itm, uint64_t cas) {
2116 
2117     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
2118     if (peh) {
2119         peh->pe.v1->item_set_cas(peh->pe.v0, cookie, itm, cas);
2120         release_engine_handle(peh);
2121     } else {
2122         logger->log(EXTENSION_LOG_WARNING, NULL,
2123                     "The engine is no longer there... %p", cookie);
2124     }
2125 }
2126 
2127 /**
2128  * Implenentation of the tap notify in the bucket engine. Verify
2129  * that the bucket exists (and is in the correct state) before
2130  * wrapping into the engines implementationof tap notify.
2131  */
bucket_tap_notify(ENGINE_HANDLE* handle, const void *cookie, void *engine_specific, uint16_t nengine, uint8_t ttl, uint16_t tap_flags, tap_event_t tap_event, uint32_t tap_seqno, const void *key, size_t nkey, uint32_t flags, uint32_t exptime, uint64_t cas, uint8_t datatype, const void *data, size_t ndata, uint16_t vbucket)2132 static ENGINE_ERROR_CODE bucket_tap_notify(ENGINE_HANDLE* handle,
2133                                            const void *cookie,
2134                                            void *engine_specific,
2135                                            uint16_t nengine,
2136                                            uint8_t ttl,
2137                                            uint16_t tap_flags,
2138                                            tap_event_t tap_event,
2139                                            uint32_t tap_seqno,
2140                                            const void *key,
2141                                            size_t nkey,
2142                                            uint32_t flags,
2143                                            uint32_t exptime,
2144                                            uint64_t cas,
2145                                            uint8_t datatype,
2146                                            const void *data,
2147                                            size_t ndata,
2148                                            uint16_t vbucket) {
2149     proxied_engine_handle_t *peh = get_engine_handle(handle, cookie);
2150     if (peh) {
2151         ENGINE_ERROR_CODE ret;
2152         ret = peh->pe.v1->tap_notify(peh->pe.v0, cookie, engine_specific,
2153                                 nengine, ttl, tap_flags, tap_event, tap_seqno,
2154                                 key, nkey, flags, exptime, cas, datatype,
2155                                 data, ndata, vbucket);
2156         release_engine_handle(peh);
2157         return ret;
2158     } else {
2159         return ENGINE_DISCONNECT;
2160     }
2161 }
2162 
2163 /**
2164  * A specialized tap iterator that verifies that the bucket it is
2165  * connected to actually exists and is in the correct state before
2166  * calling into the engine.
2167  */
bucket_tap_iterator_shim(ENGINE_HANDLE* handle, const void *cookie, item **itm, void **engine_specific, uint16_t *nengine_specific, uint8_t *ttl, uint16_t *flags, uint32_t *seqno, uint16_t *vbucket)2168 static tap_event_t bucket_tap_iterator_shim(ENGINE_HANDLE* handle,
2169                                             const void *cookie,
2170                                             item **itm,
2171                                             void **engine_specific,
2172                                             uint16_t *nengine_specific,
2173                                             uint8_t *ttl,
2174                                             uint16_t *flags,
2175                                             uint32_t *seqno,
2176                                             uint16_t *vbucket) {
2177     proxied_engine_handle_t *e = get_engine_handle(handle, cookie);
2178     if (e && e->tap_iterator) {
2179         tap_event_t ret;
2180         cb_assert(e->pe.v0 != handle);
2181         ret = e->tap_iterator(e->pe.v0, cookie, itm,
2182                               engine_specific, nengine_specific,
2183                               ttl, flags, seqno, vbucket);
2184 
2185 
2186         release_engine_handle(e);
2187         return ret;
2188     } else {
2189         return TAP_DISCONNECT;
2190     }
2191 }
2192 
2193 /**
2194  * Implementation of the get_tap_iterator from the engine API.
2195  * If the cookie is associated with an engine who supports a tap
2196  * iterator we should return the internal shim iterator so that we
2197  * verify access every time we try to iterate.
2198  */
bucket_get_tap_iterator(ENGINE_HANDLE* handle, const void* cookie, const void* client, size_t nclient, uint32_t flags, const void* userdata, size_t nuserdata)2199 static TAP_ITERATOR bucket_get_tap_iterator(ENGINE_HANDLE* handle, const void* cookie,
2200                                             const void* client, size_t nclient,
2201                                             uint32_t flags,
2202                                             const void* userdata, size_t nuserdata) {
2203     TAP_ITERATOR ret = NULL;
2204     proxied_engine_handle_t *e = get_engine_handle(handle, cookie);
2205     if (e) {
2206         if (!e->tap_iterator_disabled) {
2207             e->tap_iterator = e->pe.v1->get_tap_iterator(e->pe.v0, cookie,
2208                                                          client, nclient,
2209                                                          flags, userdata, nuserdata);
2210             ret = e->tap_iterator ? bucket_tap_iterator_shim : NULL;
2211         }
2212         release_engine_handle(e);
2213     }
2214 
2215     return ret;
2216 }
2217 
dcp_step(ENGINE_HANDLE* handle, const void* cookie, struct dcp_message_producers *producers)2218 static ENGINE_ERROR_CODE dcp_step(ENGINE_HANDLE* handle, const void* cookie,
2219                                   struct dcp_message_producers *producers)
2220 {
2221     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
2222     ENGINE_ERROR_CODE ret;
2223     if (peh) {
2224         if (peh->pe.v1->dcp.step) {
2225             ret = peh->pe.v1->dcp.step(peh->pe.v0, cookie, producers);
2226         } else {
2227             ret = ENGINE_DISCONNECT;
2228         }
2229         release_engine_handle(peh);
2230     } else {
2231         ret = ENGINE_DISCONNECT;
2232     }
2233 
2234     return ret;
2235 }
2236 
2237 
dcp_open(ENGINE_HANDLE* handle, const void* cookie, uint32_t opaque, uint32_t seqno, uint32_t flags, void *name, uint16_t nname)2238 static ENGINE_ERROR_CODE dcp_open(ENGINE_HANDLE* handle,
2239                                   const void* cookie,
2240                                   uint32_t opaque,
2241                                   uint32_t seqno,
2242                                   uint32_t flags,
2243                                   void *name,
2244                                   uint16_t nname)
2245 {
2246     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
2247     ENGINE_ERROR_CODE ret;
2248     if (peh) {
2249         if (peh->pe.v1->dcp.open) {
2250             ret = peh->pe.v1->dcp.open(peh->pe.v0, cookie, opaque,
2251                                        seqno, flags, name, nname);
2252         } else {
2253             ret = ENGINE_DISCONNECT;
2254         }
2255         release_engine_handle(peh);
2256     } else {
2257         ret = ENGINE_DISCONNECT;
2258     }
2259 
2260     return ret;
2261 
2262 }
2263 
dcp_add_stream(ENGINE_HANDLE* handle, const void* cookie, uint32_t opaque, uint16_t vbucket, uint32_t flags)2264 static ENGINE_ERROR_CODE dcp_add_stream(ENGINE_HANDLE* handle,
2265                                         const void* cookie,
2266                                         uint32_t opaque,
2267                                         uint16_t vbucket,
2268                                         uint32_t flags)
2269 {
2270     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
2271     ENGINE_ERROR_CODE ret;
2272     if (peh) {
2273         if (peh->pe.v1->dcp.add_stream) {
2274             ret = peh->pe.v1->dcp.add_stream(peh->pe.v0, cookie,
2275                                              opaque, vbucket, flags);
2276         } else {
2277             ret = ENGINE_DISCONNECT;
2278         }
2279         release_engine_handle(peh);
2280     } else {
2281         ret = ENGINE_DISCONNECT;
2282     }
2283 
2284     return ret;
2285 }
2286 
dcp_close_stream(ENGINE_HANDLE* handle, const void* cookie, uint32_t opaque, uint16_t vbucket)2287 static ENGINE_ERROR_CODE dcp_close_stream(ENGINE_HANDLE* handle,
2288                                           const void* cookie,
2289                                           uint32_t opaque,
2290                                           uint16_t vbucket)
2291 {
2292     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
2293     ENGINE_ERROR_CODE ret;
2294     if (peh) {
2295         if (peh->pe.v1->dcp.close_stream) {
2296             ret = peh->pe.v1->dcp.close_stream(peh->pe.v0, cookie, opaque,
2297                                                vbucket);
2298         } else {
2299             ret = ENGINE_DISCONNECT;
2300         }
2301         release_engine_handle(peh);
2302     } else {
2303         ret = ENGINE_DISCONNECT;
2304     }
2305 
2306     return ret;
2307 }
2308 
dcp_stream_req(ENGINE_HANDLE* handle, const void* cookie, uint32_t flags, uint32_t opaque, uint16_t vbucket, uint64_t start_seqno, uint64_t end_seqno, uint64_t vbucket_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno, uint64_t *rollback_seqno, dcp_add_failover_log callback)2309 static ENGINE_ERROR_CODE dcp_stream_req(ENGINE_HANDLE* handle, const void* cookie,
2310                                         uint32_t flags,
2311                                         uint32_t opaque,
2312                                         uint16_t vbucket,
2313                                         uint64_t start_seqno,
2314                                         uint64_t end_seqno,
2315                                         uint64_t vbucket_uuid,
2316                                         uint64_t snap_start_seqno,
2317                                         uint64_t snap_end_seqno,
2318                                         uint64_t *rollback_seqno,
2319                                         dcp_add_failover_log callback)
2320 {
2321     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
2322     ENGINE_ERROR_CODE ret;
2323     if (peh) {
2324         if (peh->pe.v1->dcp.stream_req) {
2325             ret = peh->pe.v1->dcp.stream_req(peh->pe.v0, cookie,
2326                                              flags, opaque, vbucket,
2327                                              start_seqno, end_seqno,
2328                                              vbucket_uuid, snap_start_seqno,
2329                                              snap_end_seqno, rollback_seqno,
2330                                              callback);
2331         } else {
2332             ret = ENGINE_DISCONNECT;
2333         }
2334         release_engine_handle(peh);
2335     } else {
2336         ret = ENGINE_DISCONNECT;
2337     }
2338 
2339     return ret;
2340 }
2341 
dcp_get_failover_log(ENGINE_HANDLE* handle, const void* cookie, uint32_t opaque, uint16_t vbucket, ENGINE_ERROR_CODE (*failover_log)(vbucket_failover_t*, size_t nentries, const void *cookie))2342 static ENGINE_ERROR_CODE dcp_get_failover_log(ENGINE_HANDLE* handle, const void* cookie,
2343                                               uint32_t opaque,
2344                                               uint16_t vbucket,
2345                                               ENGINE_ERROR_CODE (*failover_log)(vbucket_failover_t*,
2346                                                                                 size_t nentries,
2347                                                                                 const void *cookie))
2348 {
2349     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
2350     ENGINE_ERROR_CODE ret;
2351     if (peh) {
2352         if (peh->pe.v1->dcp.get_failover_log) {
2353             ret = peh->pe.v1->dcp.get_failover_log(peh->pe.v0, cookie,
2354                                                    opaque, vbucket,
2355                                                    failover_log);
2356         } else {
2357             ret = ENGINE_DISCONNECT;
2358         }
2359         release_engine_handle(peh);
2360     } else {
2361         ret = ENGINE_DISCONNECT;
2362     }
2363 
2364     return ret;
2365 }
2366 
dcp_stream_end(ENGINE_HANDLE* handle, const void* cookie, uint32_t opaque, uint16_t vbucket, uint32_t flags)2367 static ENGINE_ERROR_CODE dcp_stream_end(ENGINE_HANDLE* handle, const void* cookie,
2368                                         uint32_t opaque,
2369                                         uint16_t vbucket,
2370                                         uint32_t flags)
2371 {
2372     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
2373     ENGINE_ERROR_CODE ret;
2374     if (peh) {
2375         if (peh->pe.v1->dcp.stream_end) {
2376             ret = peh->pe.v1->dcp.stream_end(peh->pe.v0, cookie,
2377                                              opaque, vbucket, flags);
2378         } else {
2379             ret = ENGINE_DISCONNECT;
2380         }
2381         release_engine_handle(peh);
2382     } else {
2383         ret = ENGINE_DISCONNECT;
2384     }
2385 
2386     return ret;
2387 }
2388 
2389 
dcp_snapshot_marker(ENGINE_HANDLE* handle, const void* cookie, uint32_t opaque, uint16_t vbucket, uint64_t start_seqno, uint64_t end_seqno, uint32_t flags)2390 static ENGINE_ERROR_CODE dcp_snapshot_marker(ENGINE_HANDLE* handle,
2391                                              const void* cookie,
2392                                              uint32_t opaque,
2393                                              uint16_t vbucket,
2394                                              uint64_t start_seqno,
2395                                              uint64_t end_seqno,
2396                                              uint32_t flags)
2397 {
2398     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
2399     ENGINE_ERROR_CODE ret;
2400     if (peh) {
2401         if (peh->pe.v1->dcp.snapshot_marker) {
2402             ret = peh->pe.v1->dcp.snapshot_marker(peh->pe.v0, cookie, opaque,
2403                                                   vbucket, start_seqno,
2404                                                   end_seqno, flags);
2405         } else {
2406             ret = ENGINE_DISCONNECT;
2407         }
2408         release_engine_handle(peh);
2409     } else {
2410         ret = ENGINE_DISCONNECT;
2411     }
2412 
2413     return ret;
2414 }
2415 
dcp_mutation(ENGINE_HANDLE* handle, const void* cookie, uint32_t opaque, const void *key, uint16_t nkey, const void *value, uint32_t nvalue, uint64_t cas, uint16_t vbucket, uint32_t flags, uint8_t datatype, uint64_t by_seqno, uint64_t rev_seqno, uint32_t expiration, uint32_t lock_time, const void *meta, uint16_t nmeta, uint8_t nru)2416 static ENGINE_ERROR_CODE dcp_mutation(ENGINE_HANDLE* handle, const void* cookie,
2417                                       uint32_t opaque,
2418                                       const void *key,
2419                                       uint16_t nkey,
2420                                       const void *value,
2421                                       uint32_t nvalue,
2422                                       uint64_t cas,
2423                                       uint16_t vbucket,
2424                                       uint32_t flags,
2425                                       uint8_t datatype,
2426                                       uint64_t by_seqno,
2427                                       uint64_t rev_seqno,
2428                                       uint32_t expiration,
2429                                       uint32_t lock_time,
2430                                       const void *meta,
2431                                       uint16_t nmeta,
2432                                       uint8_t nru)
2433 {
2434     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
2435     ENGINE_ERROR_CODE ret;
2436     if (peh) {
2437         if (peh->pe.v1->dcp.mutation) {
2438             ret = peh->pe.v1->dcp.mutation(peh->pe.v0, cookie,
2439                                            opaque, key, nkey, value, nvalue,
2440                                            cas, vbucket, flags, datatype,
2441                                            by_seqno, rev_seqno, expiration,
2442                                            lock_time, meta, nmeta, nru);
2443         } else {
2444             ret = ENGINE_DISCONNECT;
2445         }
2446         release_engine_handle(peh);
2447     } else {
2448         ret = ENGINE_DISCONNECT;
2449     }
2450 
2451     return ret;
2452 }
2453 
2454 
dcp_deletion(ENGINE_HANDLE* handle, const void* cookie, uint32_t opaque, const void *key, uint16_t nkey, uint64_t cas, uint16_t vbucket, uint64_t by_seqno, uint64_t rev_seqno, const void *meta, uint16_t nmeta)2455 static ENGINE_ERROR_CODE dcp_deletion(ENGINE_HANDLE* handle, const void* cookie,
2456                                       uint32_t opaque,
2457                                       const void *key,
2458                                       uint16_t nkey,
2459                                       uint64_t cas,
2460                                       uint16_t vbucket,
2461                                       uint64_t by_seqno,
2462                                       uint64_t rev_seqno,
2463                                       const void *meta,
2464                                       uint16_t nmeta)
2465 {
2466     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
2467     ENGINE_ERROR_CODE ret;
2468     if (peh) {
2469         if (peh->pe.v1->dcp.deletion) {
2470             ret = peh->pe.v1->dcp.deletion(peh->pe.v0, cookie, opaque, key,
2471                                            nkey, cas, vbucket, by_seqno,
2472                                            rev_seqno, meta, nmeta);
2473         } else {
2474             ret = ENGINE_DISCONNECT;
2475         }
2476         release_engine_handle(peh);
2477     } else {
2478         ret = ENGINE_DISCONNECT;
2479     }
2480 
2481     return ret;
2482 }
2483 
2484 
dcp_expiration(ENGINE_HANDLE* handle, const void* cookie, uint32_t opaque, const void *key, uint16_t nkey, uint64_t cas, uint16_t vbucket, uint64_t by_seqno, uint64_t rev_seqno, const void *meta, uint16_t nmeta)2485 static ENGINE_ERROR_CODE dcp_expiration(ENGINE_HANDLE* handle, const void* cookie,
2486                                         uint32_t opaque,
2487                                         const void *key,
2488                                         uint16_t nkey,
2489                                         uint64_t cas,
2490                                         uint16_t vbucket,
2491                                         uint64_t by_seqno,
2492                                         uint64_t rev_seqno,
2493                                         const void *meta,
2494                                         uint16_t nmeta)
2495 {
2496     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
2497     ENGINE_ERROR_CODE ret;
2498     if (peh) {
2499         if (peh->pe.v1->dcp.expiration) {
2500             ret = peh->pe.v1->dcp.expiration(peh->pe.v0, cookie, opaque, key,
2501                                              nkey, cas, vbucket, by_seqno,
2502                                              rev_seqno, meta, nmeta);
2503         } else {
2504             ret = ENGINE_DISCONNECT;
2505         }
2506         release_engine_handle(peh);
2507     } else {
2508         ret = ENGINE_DISCONNECT;
2509     }
2510 
2511     return ret;
2512 }
2513 
2514 
dcp_flush(ENGINE_HANDLE* handle, const void* cookie, uint32_t opaque, uint16_t vbucket)2515 static  ENGINE_ERROR_CODE dcp_flush(ENGINE_HANDLE* handle, const void* cookie,
2516                                    uint32_t opaque,
2517                                    uint16_t vbucket)
2518 {
2519     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
2520     ENGINE_ERROR_CODE ret;
2521     if (peh) {
2522         if (peh->pe.v1->dcp.flush) {
2523             ret = peh->pe.v1->dcp.flush(peh->pe.v0, cookie,
2524                                         opaque, vbucket);
2525         } else {
2526             ret = ENGINE_DISCONNECT;
2527         }
2528         release_engine_handle(peh);
2529     } else {
2530         ret = ENGINE_DISCONNECT;
2531     }
2532 
2533     return ret;
2534 }
2535 
2536 
dcp_set_vbucket_state(ENGINE_HANDLE* handle, const void* cookie, uint32_t opaque, uint16_t vbucket, vbucket_state_t state)2537 static ENGINE_ERROR_CODE dcp_set_vbucket_state(ENGINE_HANDLE* handle, const void* cookie,
2538                                                uint32_t opaque,
2539                                                uint16_t vbucket,
2540                                                vbucket_state_t state)
2541 {
2542     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
2543     ENGINE_ERROR_CODE ret;
2544     if (peh) {
2545         if (peh->pe.v1->dcp.set_vbucket_state) {
2546             ret = peh->pe.v1->dcp.set_vbucket_state(peh->pe.v0, cookie,
2547                                                     opaque, vbucket,
2548                                                     state);
2549         } else {
2550             ret = ENGINE_DISCONNECT;
2551         }
2552         release_engine_handle(peh);
2553     } else {
2554         ret = ENGINE_DISCONNECT;
2555     }
2556 
2557     return ret;
2558 }
2559 
dcp_noop(ENGINE_HANDLE* handle, const void* cookie, uint32_t opaque)2560 static ENGINE_ERROR_CODE dcp_noop(ENGINE_HANDLE* handle,
2561                                   const void* cookie,
2562                                   uint32_t opaque)
2563 {
2564     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
2565     ENGINE_ERROR_CODE ret;
2566     if (peh) {
2567         if (peh->pe.v1->dcp.noop) {
2568             ret = peh->pe.v1->dcp.noop(peh->pe.v0, cookie, opaque);
2569         } else {
2570             ret = ENGINE_DISCONNECT;
2571         }
2572         release_engine_handle(peh);
2573     } else {
2574         ret = ENGINE_DISCONNECT;
2575     }
2576 
2577     return ret;
2578 }
2579 
dcp_buffer_acknowledgement(ENGINE_HANDLE* handle, const void* cookie, uint32_t opaque, uint16_t vbucket, uint32_t bb)2580 static ENGINE_ERROR_CODE dcp_buffer_acknowledgement(ENGINE_HANDLE* handle,
2581                                                     const void* cookie,
2582                                                     uint32_t opaque,
2583                                                     uint16_t vbucket,
2584                                                     uint32_t bb)
2585 {
2586     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
2587     ENGINE_ERROR_CODE ret;
2588     if (peh) {
2589         if (peh->pe.v1->dcp.buffer_acknowledgement) {
2590             ret = peh->pe.v1->dcp.buffer_acknowledgement(peh->pe.v0, cookie,
2591                                                          opaque, vbucket, bb);
2592         } else {
2593             ret = ENGINE_DISCONNECT;
2594         }
2595         release_engine_handle(peh);
2596     } else {
2597         ret = ENGINE_DISCONNECT;
2598     }
2599 
2600     return ret;
2601 }
2602 
dcp_control(ENGINE_HANDLE* handle, const void* cookie, uint32_t opaque, const void *key, uint16_t nkey, const void *value, uint32_t nvalue)2603 static ENGINE_ERROR_CODE dcp_control(ENGINE_HANDLE* handle,
2604                                      const void* cookie,
2605                                      uint32_t opaque,
2606                                      const void *key,
2607                                      uint16_t nkey,
2608                                      const void *value,
2609                                      uint32_t nvalue)
2610 {
2611     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
2612     ENGINE_ERROR_CODE ret;
2613     if (peh) {
2614         if (peh->pe.v1->dcp.control) {
2615             ret = peh->pe.v1->dcp.control(peh->pe.v0, cookie, opaque,
2616                                           key, nkey, value, nvalue);
2617         } else {
2618             ret = ENGINE_DISCONNECT;
2619         }
2620         release_engine_handle(peh);
2621     } else {
2622         ret = ENGINE_DISCONNECT;
2623     }
2624 
2625     return ret;
2626 }
2627 
dcp_response_handler(ENGINE_HANDLE* handle, const void* cookie, protocol_binary_response_header *response)2628 static ENGINE_ERROR_CODE dcp_response_handler(ENGINE_HANDLE* handle,
2629                                               const void* cookie,
2630                                               protocol_binary_response_header *response)
2631 {
2632     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
2633     ENGINE_ERROR_CODE ret;
2634     if (peh) {
2635         if (peh->pe.v1->dcp.response_handler) {
2636             ret = peh->pe.v1->dcp.response_handler(peh->pe.v0, cookie,
2637                                                    response);
2638         } else {
2639             ret = ENGINE_DISCONNECT;
2640         }
2641         release_engine_handle(peh);
2642     } else {
2643         ret = ENGINE_DISCONNECT;
2644     }
2645 
2646     return ret;
2647 }
2648 
bucket_get_engine_vb_map(ENGINE_HANDLE* handle, const void * cookie, engine_get_vb_map_cb callback)2649 static ENGINE_ERROR_CODE bucket_get_engine_vb_map(ENGINE_HANDLE* handle,
2650                                                   const void * cookie,
2651                                                   engine_get_vb_map_cb callback)
2652 {
2653     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
2654     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2655 
2656     if (peh) {
2657         if (peh->pe.v1->get_engine_vb_map) {
2658             ret = peh->pe.v1->get_engine_vb_map(peh->pe.v0, cookie, callback);
2659         } else {
2660             ret = ENGINE_ENOTSUP;
2661         }
2662         release_engine_handle(peh);
2663     }
2664 
2665     return ret;
2666 }
2667 
2668 /**
2669  * Implementation of the errinfo function in the engine api.
2670  * If the cookie is connected to an engine should proxy the function down
2671  * into the engine
2672  */
bucket_errinfo(ENGINE_HANDLE *handle, const void* cookie, char *buffer, size_t buffsz)2673 static size_t bucket_errinfo(ENGINE_HANDLE *handle, const void* cookie,
2674                              char *buffer, size_t buffsz) {
2675     proxied_engine_handle_t *peh = try_get_engine_handle(handle, cookie);
2676     size_t ret = 0;
2677 
2678     if (peh) {
2679         if (peh->pe.v1->errinfo) {
2680             ret = peh->pe.v1->errinfo(peh->pe.v0, cookie, buffer, buffsz);
2681         }
2682         release_engine_handle(peh);
2683     }
2684 
2685     return ret;
2686 }
2687 
2688 /**
2689  * Initialize configuration is called during the initialization of
2690  * bucket_engine. It tries to parse the configuration string to pick
2691  * out the legal configuration options, and store them in the
2692  * one and only instance of bucket_engine.
2693  */
initialize_configuration(struct bucket_engine *me, const char *cfg_str)2694 static ENGINE_ERROR_CODE initialize_configuration(struct bucket_engine *me,
2695                                                   const char *cfg_str) {
2696     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2697 
2698     me->auto_create = true;
2699 
2700     if (cfg_str != NULL) {
2701         int r;
2702         int ii = 0;
2703 #define CONFIG_SIZE 8
2704         struct config_item items[CONFIG_SIZE];
2705         memset(&items, 0, sizeof(items));
2706 
2707         items[ii].key = "engine";
2708         items[ii].datatype = DT_STRING;
2709         items[ii].value.dt_string = &me->default_engine_path;
2710         ++ii;
2711 
2712         items[ii].key = "admin";
2713         items[ii].datatype = DT_STRING;
2714         items[ii].value.dt_string = &me->admin_user;
2715         ++ii;
2716 
2717         items[ii].key = "default";
2718         items[ii].datatype = DT_BOOL;
2719         items[ii].value.dt_bool = &me->has_default;
2720         ++ii;
2721 
2722         items[ii].key = "default_bucket_name";
2723         items[ii].datatype = DT_STRING;
2724         items[ii].value.dt_string = &me->default_bucket_name;
2725         ++ii;
2726 
2727         items[ii].key = "default_bucket_config";
2728         items[ii].datatype = DT_STRING;
2729         items[ii].value.dt_string = &me->default_bucket_config;
2730         ++ii;
2731 
2732         items[ii].key = "auto_create";
2733         items[ii].datatype = DT_BOOL;
2734         items[ii].value.dt_bool = &me->auto_create;
2735         ++ii;
2736 
2737         items[ii].key = "config_file";
2738         items[ii].datatype = DT_CONFIGFILE;
2739         ++ii;
2740 
2741         items[ii].key = NULL;
2742         ++ii;
2743         cb_assert(ii == CONFIG_SIZE);
2744 #undef CONFIG_SIZE
2745 
2746         r = me->upstream_server->core->parse_config(cfg_str, items, stderr);
2747         if (r == 0) {
2748             if (!items[0].found) {
2749                 me->default_engine_path = NULL;
2750             }
2751             if (!items[1].found) {
2752                 me->admin_user = NULL;
2753             }
2754             if (!items[3].found) {
2755                 me->default_bucket_name = NULL;
2756             }
2757             if (!items[4].found) {
2758                 me->default_bucket_config = strdup("");
2759             }
2760         } else {
2761             ret = ENGINE_FAILED;
2762         }
2763     }
2764 
2765     return ret;
2766 }
2767 
2768 /***********************************************************
2769  ** Implementation of the bucket-engine specific commands **
2770  **********************************************************/
2771 
extract_key(void *packet)2772 static char* extract_key(void *packet) {
2773     protocol_binary_request_no_extras *myptr = packet;
2774     char *out = malloc(ntohs(myptr->message.header.request.keylen) + 1);
2775     if (out == NULL) {
2776         return NULL;
2777     }
2778     memcpy(out, ((char*)packet) + sizeof(myptr->message.header) +
2779                 myptr->message.header.request.extlen,
2780            ntohs(myptr->message.header.request.keylen));
2781     out[ntohs(myptr->message.header.request.keylen)] = 0x00;
2782     return out;
2783 }
2784 
2785 
2786 /**
2787  * Implementation of the "CREATE" command.
2788  */
handle_create_bucket(ENGINE_HANDLE* handle, const void* cookie, protocol_binary_request_header *request, ADD_RESPONSE response)2789 static ENGINE_ERROR_CODE handle_create_bucket(ENGINE_HANDLE* handle,
2790                                               const void* cookie,
2791                                               protocol_binary_request_header *request,
2792                                               ADD_RESPONSE response) {
2793 
2794 #define MSGLEN 1024
2795     protocol_binary_response_status rc;
2796     ENGINE_ERROR_CODE ret;
2797     char msg[MSGLEN];
2798     struct bucket_engine *e = (void*)handle;
2799     protocol_binary_request_create_bucket *breq = (void*)request;
2800     size_t bodylen;
2801     char *config = "";
2802     char *spec;
2803     char *keyz = extract_key(breq);
2804     if (keyz == NULL) {
2805         return ENGINE_ENOMEM;
2806     }
2807 
2808     bodylen = ntohl(breq->message.header.request.bodylen)
2809         - ntohs(breq->message.header.request.keylen);
2810 
2811     if (bodylen >= (1 << 16)) { /* 64k ought to be enough for anybody */
2812         free(keyz);
2813         return ENGINE_DISCONNECT;
2814     }
2815 
2816     spec = malloc(bodylen + 1);
2817     if (spec == NULL) {
2818         free(keyz);
2819         return ENGINE_ENOMEM;
2820     }
2821 
2822     memcpy(spec, ((char*)request) + sizeof(breq->message.header)
2823            + ntohs(breq->message.header.request.keylen), bodylen);
2824     spec[bodylen] = 0x00;
2825 
2826     if (spec[0] == 0) {
2827         const char *msg = "Invalid request.";
2828         response(msg, (uint16_t)strlen(msg), "", 0, "", 0, 0,
2829                  PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
2830         free(keyz);
2831         free(spec);
2832         return ENGINE_SUCCESS;
2833     }
2834 
2835     if (strlen(spec) < bodylen) {
2836         config = spec + strlen(spec)+1;
2837     }
2838 
2839     msg[0] = 0;
2840     lock_engines();
2841     ret = create_bucket_UNLOCKED(e, keyz, spec, config, NULL, msg, MSGLEN);
2842     unlock_engines();
2843 
2844     switch(ret) {
2845     case ENGINE_SUCCESS:
2846         rc = PROTOCOL_BINARY_RESPONSE_SUCCESS;
2847         break;
2848     case ENGINE_KEY_EEXISTS:
2849         rc = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
2850         break;
2851     default:
2852         rc = PROTOCOL_BINARY_RESPONSE_NOT_STORED;
2853     }
2854 
2855     response(NULL, 0, NULL, 0, msg, (uint32_t)strlen(msg), 0, rc, 0, cookie);
2856 
2857     free(keyz);
2858     free(spec);
2859 #undef MSGLEN
2860     return ENGINE_SUCCESS;
2861 }
2862 
2863 /**
2864  * Implementation of the "DELETE" command. The delete command shuts down
2865  * the engine and waits for it's termination before sending the response
2866  * back to the caller. The user may specify if we should run a gracefull
2867  * shutdown (let the engine persist everything etc), or if it should
2868  * just stop as fast as possible. Please note that bucket_engine can only
2869  * notify the engine about this, because we need to wait until the engine
2870  * reports that it is done (otherwise it may still have threads running
2871  * etc).
2872  *
2873  * We can't block the client thread while waiting for the engine to shut
2874  * down, so instead we store the pointer to the request in the user-specific
2875  * data section to preserve the information before we return EWOULDBLOCK
2876  * back to the client.
2877  */
handle_delete_bucket(ENGINE_HANDLE* handle, const void* cookie, protocol_binary_request_header *request, ADD_RESPONSE response)2878 static ENGINE_ERROR_CODE handle_delete_bucket(ENGINE_HANDLE* handle,
2879                                               const void* cookie,
2880                                               protocol_binary_request_header *request,
2881                                               ADD_RESPONSE response) {
2882 
2883     void *userdata = bucket_get_engine_specific(cookie);
2884     bool found;
2885     proxied_engine_handle_t *peh;
2886 
2887     (void)handle;
2888     if (userdata == NULL) {
2889         protocol_binary_request_delete_bucket *breq = (void*)request;
2890         char *keyz;
2891         size_t bodylen;
2892         char *config;
2893         bool force = false;
2894 
2895         keyz = extract_key(breq);
2896         if (keyz == NULL) {
2897             return ENGINE_ENOMEM;
2898         }
2899 
2900         bodylen = ntohl(breq->message.header.request.bodylen)
2901             - ntohs(breq->message.header.request.keylen);
2902         if (bodylen >= (1 << 16)) {
2903             free(keyz);
2904             return ENGINE_DISCONNECT;
2905         }
2906         config = malloc(bodylen + 1);
2907         if (config == NULL) {
2908             free(keyz);
2909             return ENGINE_ENOMEM;
2910         }
2911         memcpy(config, ((char*)request) + sizeof(breq->message.header)
2912                + ntohs(breq->message.header.request.keylen), bodylen);
2913         config[bodylen] = 0x00;
2914 
2915         if (config[0] != 0) {
2916             struct config_item items[2];
2917             memset(&items, 0, sizeof(items));
2918             items[0].key = "force";
2919             items[0].datatype = DT_BOOL;
2920             items[0].value.dt_bool = &force;
2921             items[1].key = NULL;
2922 
2923             if (bucket_get_server_api()->core->parse_config(config, items,
2924                                                             stderr) != 0) {
2925                 const char *msg = "Invalid config parameters";
2926                 response(msg, (uint16_t)strlen(msg), "", 0, "", 0, 0,
2927                          PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
2928                 free(keyz);
2929                 free(config);
2930                 return ENGINE_SUCCESS;
2931             }
2932         }
2933         free(config);
2934 
2935         found = false;
2936         peh = find_bucket(keyz);
2937         free(keyz);
2938 
2939         if (peh) {
2940             engine_specific_t *es;
2941             /* bumped clients count protects transition from
2942              * STATE_RUNNING to STATE_STOPPED while peh->cookie is not
2943              * yet set. */
2944             int count = ATOMIC_INCR(&peh->clients);
2945             cb_assert(count > 0);
2946             if (ATOMIC_CAS(&peh->state, STATE_RUNNING, STATE_STOPPING)) {
2947                 peh->cookie = cookie;
2948                 found = true;
2949                 peh->force_shutdown = force;
2950             }
2951             /* it'll decrement clients and also initiate bucket
2952              * shutdown when there are no active clients */
2953             release_engine_handle(peh);
2954 
2955             /* If we're deleting the bucket we're connected to we need */
2956             /* to disconnect from the bucket in order to avoid trying */
2957             /* to grab it after it is released (since we're dropping) */
2958             /* the reference */
2959             es = bucket_engine.upstream_server->cookie->get_engine_specific(cookie);
2960             cb_assert(es);
2961             if (es->peh == peh) {
2962                 set_engine_handle(handle, cookie, NULL);
2963             }
2964 
2965             /* and drop reference from find_bucket */
2966             release_handle(peh);
2967         }
2968 
2969         if (found) {
2970             bucket_store_engine_specific(cookie, breq);
2971             return ENGINE_EWOULDBLOCK;
2972         } else {
2973             const char *msg = "Not found.";
2974             response(NULL, 0, NULL, 0, msg, (uint32_t)strlen(msg),
2975                      0, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
2976                      0, cookie);
2977         }
2978     } else {
2979         bucket_store_engine_specific(cookie, NULL);
2980         response(NULL, 0, NULL, 0, NULL, 0, 0,
2981                  PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
2982     }
2983 
2984     return ENGINE_SUCCESS;
2985 }
2986 
2987 /**
2988  * Implementation of the "LIST" command. This command returns a single
2989  * packet with the names of all the buckets separated by the space
2990  * character.
2991  */
handle_list_buckets(ENGINE_HANDLE* handle, const void* cookie, protocol_binary_request_header *request, ADD_RESPONSE response)2992 static ENGINE_ERROR_CODE handle_list_buckets(ENGINE_HANDLE* handle,
2993                                              const void* cookie,
2994                                              protocol_binary_request_header *request,
2995                                              ADD_RESPONSE response) {
2996     size_t len = 0;
2997     int n = 0;
2998     struct bucket_list *p;
2999     struct bucket_engine *e = (struct bucket_engine*)handle;
3000     char *blist_txt;
3001 
3002     /* Accumulate the current bucket list. */
3003     struct bucket_list *blist = NULL;
3004     if (! list_buckets(e, &blist)) {
3005         return ENGINE_FAILED;
3006     }
3007 
3008     p = blist;
3009     while (p) {
3010         len += p->namelen;
3011         n++;
3012         p = p->next;
3013     }
3014 
3015     /* Now turn it into a space-separated list. */
3016     blist_txt = calloc(sizeof(char), n + len);
3017     cb_assert(blist_txt);
3018     p = blist;
3019     while (p) {
3020         strncat(blist_txt, p->name, p->namelen);
3021         if (p->next) {
3022             strcat(blist_txt, " ");
3023         }
3024         p = p->next;
3025     }
3026 
3027     bucket_list_free(blist);
3028 
3029     /* Response body will be "" in the case of an empty response. */
3030     /* Otherwise, it needs to account for the trailing space of the */
3031     /* above append code. */
3032     response(NULL, 0, NULL, 0, blist_txt,
3033              n == 0 ? 0 : (uint32_t)((sizeof(char) * n + len) - 1),
3034              0, PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
3035     free(blist_txt);
3036 
3037     return ENGINE_SUCCESS;
3038 }
3039 
3040 /**
3041  * Implementation of the "SELECT" command. The SELECT command associates
3042  * the cookie with the named bucket.
3043  */
handle_select_bucket(ENGINE_HANDLE* handle, const void* cookie, protocol_binary_request_header *request, ADD_RESPONSE response)3044 static ENGINE_ERROR_CODE handle_select_bucket(ENGINE_HANDLE* handle,
3045                                               const void* cookie,
3046                                               protocol_binary_request_header *request,
3047                                               ADD_RESPONSE response) {
3048     proxied_engine_handle_t *proxied;
3049     char *keyz = extract_key(request);
3050     if (keyz == NULL) {
3051         return ENGINE_ENOMEM;
3052     }
3053 
3054     proxied = find_bucket(keyz);
3055     set_engine_handle(handle, cookie, proxied);
3056     release_handle(proxied);
3057 
3058     if (proxied) {
3059         response(NULL, 0, NULL, 0, NULL, 0, 0,
3060                  PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
3061     } else {
3062         const char *msg = "Engine not found";
3063         response(NULL, 0, NULL, 0, msg, (uint32_t)strlen(msg), 0,
3064                  PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0, cookie);
3065     }
3066     free(keyz);
3067     return