1
2/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3/*
4 *     Copyright 2010 Couchbase, Inc
5 *
6 *   Licensed under the Apache License, Version 2.0 (the "License");
7 *   you may not use this file except in compliance with the License.
8 *   You may obtain a copy of the License at
9 *
10 *       http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *   Unless required by applicable law or agreed to in writing, software
13 *   distributed under the License is distributed on an "AS IS" BASIS,
14 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 *   See the License for the specific language governing permissions and
16 *   limitations under the License.
17 */
18
19#include "config.h"
20
21#include <fcntl.h>
22#include <memcached/engine.h>
23#include <memcached/protocol_binary.h>
24#include <memcached/util.h>
25#include <platform/platform.h>
26#include <stdarg.h>
27
28#include <cstdio>
29#include <cstring>
30#include <fstream>
31#include <iostream>
32#include <limits>
33#include <string>
34#include <vector>
35
36#include "backfill.h"
37#include "ep_engine.h"
38#include "failover-table.h"
39#include "flusher.h"
40#include "connmap.h"
41#include "htresizer.h"
42#include "memory_tracker.h"
43#include "stats-info.h"
44#define STATWRITER_NAMESPACE core_engine
45#include "statwriter.h"
46#undef STATWRITER_NAMESPACE
47#include "tapthrottle.h"
48#include "dcp-consumer.h"
49#include "dcp-producer.h"
50#include "warmup.h"
51
52#include <JSON_checker.h>
53
54static ALLOCATOR_HOOKS_API *hooksApi;
55static SERVER_LOG_API *loggerApi;
56
57static size_t percentOf(size_t val, double percent) {
58    return static_cast<size_t>(static_cast<double>(val) * percent);
59}
60
61/**
62 * Helper function to avoid typing in the long cast all over the place
63 * @param handle pointer to the engine
64 * @return the engine as a class
65 */
66static inline EventuallyPersistentEngine* getHandle(ENGINE_HANDLE* handle)
67{
68    EventuallyPersistentEngine* ret;
69    ret = reinterpret_cast<EventuallyPersistentEngine*>(handle);
70    ObjectRegistry::onSwitchThread(ret);
71    return ret;
72}
73
74static inline void releaseHandle(ENGINE_HANDLE* handle) {
75    (void) handle;
76    ObjectRegistry::onSwitchThread(NULL);
77}
78
79
80/**
81 * Call the response callback and return the appropriate value so that
82 * the core knows what to do..
83 */
84static ENGINE_ERROR_CODE sendResponse(ADD_RESPONSE response, const void *key,
85                                      uint16_t keylen,
86                                      const void *ext, uint8_t extlen,
87                                      const void *body, uint32_t bodylen,
88                                      uint8_t datatype, uint16_t status,
89                                      uint64_t cas, const void *cookie)
90{
91    ENGINE_ERROR_CODE rv = ENGINE_FAILED;
92    EventuallyPersistentEngine *e = ObjectRegistry::onSwitchThread(NULL, true);
93    if (response(key, keylen, ext, extlen, body, bodylen, datatype,
94                 status, cas, cookie)) {
95        rv = ENGINE_SUCCESS;
96    }
97    ObjectRegistry::onSwitchThread(e);
98    return rv;
99}
100
101template <typename T>
102static void validate(T v, T l, T h) {
103    if (v < l || v > h) {
104        throw std::runtime_error("Value out of range.");
105    }
106}
107
108
109static void checkNumeric(const char* str) {
110    int i = 0;
111    if (str[0] == '-') {
112        i++;
113    }
114    for (; str[i]; i++) {
115        using namespace std;
116        if (!isdigit(str[i])) {
117            throw std::runtime_error("Value is not numeric");
118        }
119    }
120}
121
122// The Engine API specifies C linkage for the functions..
123extern "C" {
124
125    static const engine_info* EvpGetInfo(ENGINE_HANDLE* handle)
126    {
127        engine_info* info = getHandle(handle)->getInfo();
128        releaseHandle(handle);
129        return info;
130    }
131
132    static ENGINE_ERROR_CODE EvpInitialize(ENGINE_HANDLE* handle,
133                                           const char* config_str)
134    {
135        ENGINE_ERROR_CODE err_code = getHandle(handle)->initialize(config_str);
136        releaseHandle(handle);
137        return err_code;
138    }
139
140    static void EvpDestroy(ENGINE_HANDLE* handle, const bool force)
141    {
142        getHandle(handle)->destroy(force);
143        delete getHandle(handle);
144        releaseHandle(NULL);
145    }
146
147    static ENGINE_ERROR_CODE EvpItemAllocate(ENGINE_HANDLE* handle,
148                                             const void* cookie,
149                                             item **itm,
150                                             const void* key,
151                                             const size_t nkey,
152                                             const size_t nbytes,
153                                             const int flags,
154                                             const rel_time_t exptime,
155                                             uint8_t datatype)
156    {
157        if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
158            LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
159                    " (ItemAllocate)");
160            return ENGINE_EINVAL;
161        }
162        ENGINE_ERROR_CODE err_code = getHandle(handle)->itemAllocate(cookie,
163                                                                     itm, key,
164                                                                     nkey,
165                                                                     nbytes,
166                                                                     flags,
167                                                                     exptime,
168                                                                     datatype);
169        releaseHandle(handle);
170        return err_code;
171    }
172
173    static ENGINE_ERROR_CODE EvpItemDelete(ENGINE_HANDLE* handle,
174                                           const void* cookie,
175                                           const void* key,
176                                           const size_t nkey,
177                                           uint64_t* cas,
178                                           uint16_t vbucket)
179    {
180        ENGINE_ERROR_CODE err_code = getHandle(handle)->itemDelete(cookie, key,
181                                                                   nkey, cas,
182                                                                   vbucket);
183        releaseHandle(handle);
184        return err_code;
185    }
186
187    static void EvpItemRelease(ENGINE_HANDLE* handle,
188                               const void *cookie,
189                               item* itm)
190    {
191        getHandle(handle)->itemRelease(cookie, itm);
192        releaseHandle(handle);
193    }
194
195    static ENGINE_ERROR_CODE EvpGet(ENGINE_HANDLE* handle,
196                                    const void* cookie,
197                                    item** itm,
198                                    const void* key,
199                                    const int nkey,
200                                    uint16_t vbucket)
201    {
202        ENGINE_ERROR_CODE err_code = getHandle(handle)->get(cookie, itm, key,
203                                                            nkey, vbucket, true);
204        releaseHandle(handle);
205        return err_code;
206    }
207
208    static ENGINE_ERROR_CODE EvpGetStats(ENGINE_HANDLE* handle,
209                                         const void* cookie,
210                                         const char* stat_key,
211                                         int nkey,
212                                         ADD_STAT add_stat)
213    {
214        ENGINE_ERROR_CODE err_code = getHandle(handle)->getStats(cookie,
215                                                                 stat_key,
216                                                                 nkey,
217                                                                 add_stat);
218        releaseHandle(handle);
219        return err_code;
220    }
221
222    static ENGINE_ERROR_CODE EvpStore(ENGINE_HANDLE* handle,
223                                      const void *cookie,
224                                      item* itm,
225                                      uint64_t *cas,
226                                      ENGINE_STORE_OPERATION operation,
227                                      uint16_t vbucket)
228    {
229        ENGINE_ERROR_CODE err_code = getHandle(handle)->store(cookie, itm, cas,
230                                                              operation,
231                                                              vbucket);
232        releaseHandle(handle);
233        return err_code;
234    }
235
236    static ENGINE_ERROR_CODE EvpArithmetic(ENGINE_HANDLE* handle,
237                                           const void* cookie,
238                                           const void* key,
239                                           const int nkey,
240                                           const bool increment,
241                                           const bool create,
242                                           const uint64_t delta,
243                                           const uint64_t initial,
244                                           const rel_time_t exptime,
245                                           uint64_t *cas,
246                                           uint8_t datatype,
247                                           uint64_t *result,
248                                           uint16_t vbucket)
249    {
250        if (datatype > PROTOCOL_BINARY_DATATYPE_JSON) {
251            if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
252                LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
253                        " (Arithmetic)");
254            } else {
255                LOG(EXTENSION_LOG_WARNING, "Cannnot perform arithmetic "
256                    "operations on compressed data!");
257            }
258            return ENGINE_EINVAL;
259        }
260        ENGINE_ERROR_CODE ecode = getHandle(handle)->arithmetic(cookie, key,
261                                                                nkey,
262                                                                increment,
263                                                                create, delta,
264                                                                initial,
265                                                                exptime, cas,
266                                                                datatype,
267                                                                result,
268                                                                vbucket);
269        releaseHandle(handle);
270        return ecode;
271    }
272
273    static ENGINE_ERROR_CODE EvpFlush(ENGINE_HANDLE* handle,
274                                      const void* cookie, time_t when)
275    {
276        ENGINE_ERROR_CODE err_code = getHandle(handle)->flush(cookie, when);
277        releaseHandle(handle);
278        return err_code;
279    }
280
281    static void EvpResetStats(ENGINE_HANDLE* handle, const void *)
282    {
283        getHandle(handle)->resetStats();
284        releaseHandle(handle);
285    }
286
287    static protocol_binary_response_status stopFlusher(
288                                                 EventuallyPersistentEngine *e,
289                                                 const char **msg,
290                                                 size_t *msg_size) {
291        return e->stopFlusher(msg, msg_size);
292    }
293
294    static protocol_binary_response_status startFlusher(
295                                                 EventuallyPersistentEngine *e,
296                                                 const char **msg,
297                                                 size_t *msg_size) {
298        return e->startFlusher(msg, msg_size);
299    }
300
301    static protocol_binary_response_status setTapParam(
302                                                 EventuallyPersistentEngine *e,
303                                                 const char *keyz,
304                                                 const char *valz,
305                                                 const char **msg, size_t *) {
306        protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
307
308        try {
309            int v = atoi(valz);
310            if (strcmp(keyz, "tap_keepalive") == 0) {
311                checkNumeric(valz);
312                validate(v, 0, MAX_TAP_KEEP_ALIVE);
313                e->setTapKeepAlive(static_cast<uint32_t>(v));
314            } else if (strcmp(keyz, "tap_throttle_threshold") == 0) {
315                checkNumeric(valz);
316                e->getConfiguration().setTapThrottleThreshold(v);
317            } else if (strcmp(keyz, "tap_throttle_queue_cap") == 0) {
318                checkNumeric(valz);
319                e->getConfiguration().setTapThrottleQueueCap(v);
320            } else if (strcmp(keyz, "tap_throttle_cap_pcnt") == 0) {
321                checkNumeric(valz);
322                e->getConfiguration().setTapThrottleCapPcnt(v);
323            } else {
324                *msg = "Unknown config param";
325                rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
326            }
327        } catch(std::runtime_error &) {
328            *msg = "Value out of range.";
329            rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
330        }
331
332        return rv;
333    }
334
335    static protocol_binary_response_status setCheckpointParam(
336                                                 EventuallyPersistentEngine *e,
337                                                              const char *keyz,
338                                                              const char *valz,
339                                                              const char **msg,
340                                                              size_t *) {
341        protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
342
343        try {
344            int v = atoi(valz);
345            if (strcmp(keyz, "chk_max_items") == 0) {
346                checkNumeric(valz);
347                validate(v, MIN_CHECKPOINT_ITEMS, MAX_CHECKPOINT_ITEMS);
348                e->getConfiguration().setChkMaxItems(v);
349            } else if (strcmp(keyz, "chk_period") == 0) {
350                checkNumeric(valz);
351                validate(v, MIN_CHECKPOINT_PERIOD, MAX_CHECKPOINT_PERIOD);
352                e->getConfiguration().setChkPeriod(v);
353            } else if (strcmp(keyz, "max_checkpoints") == 0) {
354                checkNumeric(valz);
355                validate(v, DEFAULT_MAX_CHECKPOINTS,
356                         MAX_CHECKPOINTS_UPPER_BOUND);
357                e->getConfiguration().setMaxCheckpoints(v);
358            } else if (strcmp(keyz, "item_num_based_new_chk") == 0) {
359                if (strcmp(valz, "true") == 0) {
360                    e->getConfiguration().setItemNumBasedNewChk(true);
361                } else {
362                    e->getConfiguration().setItemNumBasedNewChk(false);
363                }
364            } else if (strcmp(keyz, "keep_closed_chks") == 0) {
365                if (strcmp(valz, "true") == 0) {
366                    e->getConfiguration().setKeepClosedChks(true);
367                } else {
368                    e->getConfiguration().setKeepClosedChks(false);
369                }
370            } else if (strcmp(keyz, "enable_chk_merge") == 0) {
371                if (strcmp(valz, "true") == 0) {
372                    e->getConfiguration().setEnableChkMerge(true);
373                } else {
374                    e->getConfiguration().setEnableChkMerge(false);
375                }
376            } else {
377                *msg = "Unknown config param";
378                rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
379            }
380        } catch(std::runtime_error &) {
381            *msg = "Value out of range.";
382            rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
383        }
384
385        return rv;
386    }
387
388    static protocol_binary_response_status setFlushParam(
389                                                 EventuallyPersistentEngine *e,
390                                                 const char *keyz,
391                                                 const char *valz,
392                                                 const char **msg,
393                                                 size_t *) {
394        protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
395
396        // Handle the actual mutation.
397        try {
398            int v = atoi(valz);
399            if (strcmp(keyz, "bg_fetch_delay") == 0) {
400                checkNumeric(valz);
401                e->getConfiguration().setBgFetchDelay(v);
402            } else if (strcmp(keyz, "flushall_enabled") == 0) {
403                if (strcmp(valz, "true") == 0) {
404                    e->getConfiguration().setFlushallEnabled(true);
405                } else if(strcmp(valz, "false") == 0) {
406                    e->getConfiguration().setFlushallEnabled(false);
407                } else {
408                    throw std::runtime_error("value out of range.");
409                }
410            } else if (strcmp(keyz, "max_size") == 0) {
411                char *ptr = NULL;
412                checkNumeric(valz);
413                uint64_t vsize = strtoull(valz, &ptr, 10);
414                validate(vsize, static_cast<uint64_t>(0),
415                         std::numeric_limits<uint64_t>::max());
416                e->getConfiguration().setMaxSize(vsize);
417                e->getConfiguration().setMemLowWat(percentOf(vsize, 0.75));
418                e->getConfiguration().setMemHighWat(percentOf(vsize, 0.85));
419            } else if (strcmp(keyz, "mem_low_wat") == 0) {
420                char *ptr = NULL;
421                checkNumeric(valz);
422                uint64_t vsize = strtoull(valz, &ptr, 10);
423                validate(vsize, static_cast<uint64_t>(0),
424                         std::numeric_limits<uint64_t>::max());
425                e->getConfiguration().setMemLowWat(vsize);
426            } else if (strcmp(keyz, "mem_high_wat") == 0) {
427                char *ptr = NULL;
428                checkNumeric(valz);
429                uint64_t vsize = strtoull(valz, &ptr, 10);
430                validate(vsize, static_cast<uint64_t>(0),
431                         std::numeric_limits<uint64_t>::max());
432                e->getConfiguration().setMemHighWat(vsize);
433            } else if (strcmp(keyz, "backfill_mem_threshold") == 0) {
434                checkNumeric(valz);
435                validate(v, 0, 100);
436                e->getConfiguration().setBackfillMemThreshold(v);
437            } else if (strcmp(keyz, "compaction_exp_mem_threshold") == 0) {
438                checkNumeric(valz);
439                validate(v, 0, 100);
440                e->getConfiguration().setCompactionExpMemThreshold(v);
441            } else if (strcmp(keyz, "mutation_mem_threshold") == 0) {
442                checkNumeric(valz);
443                validate(v, 0, 100);
444                e->getConfiguration().setMutationMemThreshold(v);
445            } else if (strcmp(keyz, "timing_log") == 0) {
446                EPStats &stats = e->getEpStats();
447                std::ostream *old = stats.timingLog;
448                stats.timingLog = NULL;
449                delete old;
450                if (strcmp(valz, "off") == 0) {
451                    LOG(EXTENSION_LOG_INFO, "Disabled timing log.");
452                } else {
453                    std::ofstream *tmp(new std::ofstream(valz));
454                    if (tmp->good()) {
455                        LOG(EXTENSION_LOG_INFO,
456                            "Logging detailed timings to ``%s''.", valz);
457                        stats.timingLog = tmp;
458                    } else {
459                        LOG(EXTENSION_LOG_WARNING,
460                            "Error setting detailed timing log to ``%s'':  %s",
461                            valz, strerror(errno));
462                        delete tmp;
463                    }
464                }
465            } else if (strcmp(keyz, "exp_pager_stime") == 0) {
466                char *ptr = NULL;
467                checkNumeric(valz);
468                uint64_t vsize = strtoull(valz, &ptr, 10);
469                validate(vsize, static_cast<uint64_t>(0),
470                         std::numeric_limits<uint64_t>::max());
471                e->getConfiguration().setExpPagerStime((size_t)vsize);
472            } else if (strcmp(keyz, "access_scanner_enabled") == 0) {
473                if (strcmp(valz, "true") == 0) {
474                    e->getConfiguration().setAccessScannerEnabled(true);
475                } else if (strcmp(valz, "false") == 0) {
476                    e->getConfiguration().setAccessScannerEnabled(false);
477                } else {
478                    throw std::runtime_error("Value expected: true/false.");
479                }
480            } else if (strcmp(keyz, "alog_sleep_time") == 0) {
481                checkNumeric(valz);
482                e->getConfiguration().setAlogSleepTime(v);
483            } else if (strcmp(keyz, "alog_task_time") == 0) {
484                checkNumeric(valz);
485                e->getConfiguration().setAlogTaskTime(v);
486            } else if (strcmp(keyz, "pager_active_vb_pcnt") == 0) {
487                checkNumeric(valz);
488                e->getConfiguration().setPagerActiveVbPcnt(v);
489            } else if (strcmp(keyz, "warmup_min_memory_threshold") == 0) {
490                checkNumeric(valz);
491                validate(v, 0, std::numeric_limits<int>::max());
492                e->getConfiguration().setWarmupMinMemoryThreshold(v);
493            } else if (strcmp(keyz, "warmup_min_items_threshold") == 0) {
494                checkNumeric(valz);
495                validate(v, 0, std::numeric_limits<int>::max());
496                e->getConfiguration().setWarmupMinItemsThreshold(v);
497            } else if (strcmp(keyz, "max_num_readers") == 0) {
498                checkNumeric(valz);
499                validate(v, 0, std::numeric_limits<int>::max());
500                e->getConfiguration().setMaxNumReaders(v);
501                ExecutorPool::get()->setMaxReaders(v);
502            } else if (strcmp(keyz, "max_num_writers") == 0) {
503                checkNumeric(valz);
504                validate(v, 0, std::numeric_limits<int>::max());
505                e->getConfiguration().setMaxNumWriters(v);
506                ExecutorPool::get()->setMaxWriters(v);
507            } else if (strcmp(keyz, "max_num_auxio") == 0) {
508                checkNumeric(valz);
509                validate(v, 0, std::numeric_limits<int>::max());
510                e->getConfiguration().setMaxNumAuxio(v);
511                ExecutorPool::get()->setMaxAuxIO(v);
512            } else if (strcmp(keyz, "max_num_nonio") == 0) {
513                checkNumeric(valz);
514                validate(v, 0, std::numeric_limits<int>::max());
515                e->getConfiguration().setMaxNumNonio(v);
516                ExecutorPool::get()->setMaxNonIO(v);
517            } else if (strcmp(keyz, "compaction_write_queue_cap") == 0) {
518                checkNumeric(valz);
519                validate(v, 1, std::numeric_limits<int>::max());
520                e->getConfiguration().setCompactionWriteQueueCap(v);
521            } else {
522                *msg = "Unknown config param";
523                rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
524            }
525        } catch(std::runtime_error& ex) {
526            *msg = strdup(ex.what());
527            rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
528        }
529
530        return rv;
531    }
532
533    static protocol_binary_response_status evictKey(
534                                                 EventuallyPersistentEngine *e,
535                                                 protocol_binary_request_header
536                                                                      *request,
537                                                 const char **msg,
538                                                 size_t *msg_size) {
539        protocol_binary_request_no_extras *req =
540            (protocol_binary_request_no_extras*)request;
541
542        char keyz[256];
543
544        // Read the key.
545        int keylen = ntohs(req->message.header.request.keylen);
546        if (keylen >= (int)sizeof(keyz)) {
547            *msg = "Key is too large.";
548            return PROTOCOL_BINARY_RESPONSE_EINVAL;
549        }
550        memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen);
551        keyz[keylen] = 0x00;
552
553        uint16_t vbucket = ntohs(request->request.vbucket);
554
555        std::string key(keyz, keylen);
556
557        LOG(EXTENSION_LOG_DEBUG, "Manually evicting object with key %s\n",
558                keyz);
559
560        protocol_binary_response_status rv = e->evictKey(key, vbucket, msg,
561                                                         msg_size);
562        if (rv == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET ||
563            rv == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) {
564            if (e->isDegradedMode()) {
565                return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
566            }
567        }
568        return rv;
569    }
570
571    static ENGINE_ERROR_CODE getLocked(EventuallyPersistentEngine *e,
572                                       protocol_binary_request_header *req,
573                                       const void *cookie,
574                                       Item **itm,
575                                       const char **msg,
576                                       size_t *,
577                                       protocol_binary_response_status *res) {
578
579        uint8_t extlen = req->request.extlen;
580        if (extlen != 0 && extlen != 4) {
581            *msg = "Invalid packet format (extlen may be 0 or 4)";
582            *res = PROTOCOL_BINARY_RESPONSE_EINVAL;
583            return ENGINE_EINVAL;
584        }
585
586        protocol_binary_request_getl *grequest =
587            (protocol_binary_request_getl*)req;
588        *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
589
590        const char *keyp = reinterpret_cast<const char*>(req->bytes);
591        keyp += sizeof(req->bytes) + extlen;
592        std::string key(keyp, ntohs(req->request.keylen));
593        uint16_t vbucket = ntohs(req->request.vbucket);
594
595        RememberingCallback<GetValue> getCb;
596        uint32_t max_timeout = (unsigned int)e->getGetlMaxTimeout();
597        uint32_t default_timeout = (unsigned int)e->getGetlDefaultTimeout();
598        uint32_t lockTimeout = default_timeout;
599        if (extlen == 4) {
600            lockTimeout = ntohl(grequest->message.body.expiration);
601        }
602
603        if (lockTimeout >  max_timeout || lockTimeout < 1) {
604            LOG(EXTENSION_LOG_WARNING,
605                "Illegal value for lock timeout specified"
606                " %u. Using default value: %u", lockTimeout, default_timeout);
607            lockTimeout = default_timeout;
608        }
609
610        bool gotLock = e->getLocked(key, vbucket, getCb,
611                                    ep_current_time(),
612                                    lockTimeout, cookie);
613
614        getCb.waitForValue();
615        ENGINE_ERROR_CODE rv = getCb.val.getStatus();
616
617        if (rv == ENGINE_SUCCESS) {
618            *itm = getCb.val.getValue();
619            ++(e->getEpStats().numOpsGet);
620        } else if (rv == ENGINE_EWOULDBLOCK) {
621
622            // need to wait for value
623            return rv;
624        } else if (rv == ENGINE_NOT_MY_VBUCKET) {
625            *msg = "That's not my bucket.";
626            *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
627            return ENGINE_NOT_MY_VBUCKET;
628        } else if (!gotLock){
629            *msg =  "LOCK_ERROR";
630            *res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
631            return ENGINE_TMPFAIL;
632        } else {
633            if (e->isDegradedMode()) {
634                *msg = "LOCK_TMP_ERROR";
635                *res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
636                return ENGINE_TMPFAIL;
637            }
638
639            *msg = "NOT_FOUND";
640            *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
641            return ENGINE_KEY_ENOENT;
642        }
643
644        return rv;
645    }
646
647    static protocol_binary_response_status unlockKey(
648                                                 EventuallyPersistentEngine *e,
649                                                 protocol_binary_request_header
650                                                                      *request,
651                                                 const char **msg,
652                                                 size_t *)
653    {
654        protocol_binary_request_no_extras *req =
655            (protocol_binary_request_no_extras*)request;
656
657        protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
658        char keyz[256];
659
660        // Read the key.
661        int keylen = ntohs(req->message.header.request.keylen);
662        if (keylen >= (int)sizeof(keyz)) {
663            *msg = "Key is too large.";
664            return PROTOCOL_BINARY_RESPONSE_EINVAL;
665        }
666
667        memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen);
668        keyz[keylen] = 0x00;
669
670        uint16_t vbucket = ntohs(request->request.vbucket);
671        std::string key(keyz, keylen);
672
673        LOG(EXTENSION_LOG_DEBUG, "Executing unl for key %s\n", keyz);
674
675        RememberingCallback<GetValue> getCb;
676        uint64_t cas = ntohll(request->request.cas);
677
678        ENGINE_ERROR_CODE rv = e->unlockKey(key, vbucket, cas,
679                                            ep_current_time());
680
681        if (rv == ENGINE_SUCCESS) {
682            *msg = "UNLOCKED";
683        } else if (rv == ENGINE_TMPFAIL){
684            *msg =  "UNLOCK_ERROR";
685            res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
686        } else {
687            if (e->isDegradedMode()) {
688                *msg = "LOCK_TMP_ERROR";
689                return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
690            }
691
692            RCPtr<VBucket> vb = e->getVBucket(vbucket);
693            if (!vb) {
694                *msg = "That's not my bucket.";
695                res =  PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
696            }
697            *msg = "NOT_FOUND";
698            res =  PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
699        }
700
701        return res;
702    }
703
704    static protocol_binary_response_status setParam(
705                                            EventuallyPersistentEngine *e,
706                                            protocol_binary_request_set_param
707                                                                     *req,
708                                            const char **msg,
709                                            size_t *msg_size) {
710
711        size_t keylen = ntohs(req->message.header.request.keylen);
712        uint8_t extlen = req->message.header.request.extlen;
713        size_t vallen = ntohl(req->message.header.request.bodylen);
714        protocol_binary_engine_param_t paramtype =
715            static_cast<protocol_binary_engine_param_t>(ntohl(req->message.body.param_type));
716
717        if (keylen == 0 || (vallen - keylen - extlen) == 0) {
718            return PROTOCOL_BINARY_RESPONSE_EINVAL;
719        }
720
721        const char *keyp = reinterpret_cast<const char*>(req->bytes)
722                           + sizeof(req->bytes);
723        const char *valuep = keyp + keylen;
724        vallen -= (keylen + extlen);
725
726        char keyz[32];
727        char valz[512];
728
729        // Read the key.
730        if (keylen >= sizeof(keyz)) {
731            *msg = "Key is too large.";
732            return PROTOCOL_BINARY_RESPONSE_EINVAL;
733        }
734        memcpy(keyz, keyp, keylen);
735        keyz[keylen] = 0x00;
736
737        // Read the value.
738        if (vallen >= sizeof(valz)) {
739            *msg = "Value is too large.";
740            return PROTOCOL_BINARY_RESPONSE_EINVAL;
741        }
742        memcpy(valz, valuep, vallen);
743        valz[vallen] = 0x00;
744
745        protocol_binary_response_status rv;
746
747        switch (paramtype) {
748        case protocol_binary_engine_param_flush:
749            rv = setFlushParam(e, keyz, valz, msg, msg_size);
750            break;
751        case protocol_binary_engine_param_tap:
752            rv = setTapParam(e, keyz, valz, msg, msg_size);
753            break;
754        case protocol_binary_engine_param_checkpoint:
755            rv = setCheckpointParam(e, keyz, valz, msg, msg_size);
756            break;
757        default:
758            rv = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
759        }
760
761        return rv;
762    }
763
764    static ENGINE_ERROR_CODE getVBucket(EventuallyPersistentEngine *e,
765                                       const void *cookie,
766                                       protocol_binary_request_header *request,
767                                       ADD_RESPONSE response) {
768        protocol_binary_request_get_vbucket *req =
769            reinterpret_cast<protocol_binary_request_get_vbucket*>(request);
770        cb_assert(req);
771
772        uint16_t vbucket = ntohs(req->message.header.request.vbucket);
773        RCPtr<VBucket> vb = e->getVBucket(vbucket);
774        if (!vb) {
775            LockHolder lh(e->clusterConfig.lock);
776            return sendResponse(response, NULL, 0, NULL, 0,
777                                e->clusterConfig.config,
778                                e->clusterConfig.len,
779                                PROTOCOL_BINARY_RAW_BYTES,
780                                PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0,
781                                cookie);
782        } else {
783            vbucket_state_t state = (vbucket_state_t)ntohl(vb->getState());
784            return sendResponse(response, NULL, 0, NULL, 0, &state,
785                                sizeof(state),
786                                PROTOCOL_BINARY_RAW_BYTES,
787                                PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
788        }
789    }
790
791    static ENGINE_ERROR_CODE setVBucket(EventuallyPersistentEngine *e,
792                                       const void *cookie,
793                                       protocol_binary_request_header *request,
794                                       ADD_RESPONSE response) {
795
796        protocol_binary_request_set_vbucket *req =
797            reinterpret_cast<protocol_binary_request_set_vbucket*>(request);
798
799        uint64_t cas = ntohll(req->message.header.request.cas);
800
801        size_t bodylen = ntohl(req->message.header.request.bodylen)
802            - ntohs(req->message.header.request.keylen);
803        if (bodylen != sizeof(vbucket_state_t)) {
804            const std::string msg("Incorrect packet format");
805            return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
806                                msg.length(), PROTOCOL_BINARY_RAW_BYTES,
807                                PROTOCOL_BINARY_RESPONSE_EINVAL,
808                                cas, cookie);
809        }
810
811        vbucket_state_t state;
812        memcpy(&state, &req->message.body.state, sizeof(state));
813        state = static_cast<vbucket_state_t>(ntohl(state));
814
815        if (!is_valid_vbucket_state_t(state)) {
816            const std::string msg("Invalid vbucket state");
817            return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
818                                msg.length(), PROTOCOL_BINARY_RAW_BYTES,
819                                PROTOCOL_BINARY_RESPONSE_EINVAL,
820                                cas, cookie);
821        }
822
823        uint16_t vb = ntohs(req->message.header.request.vbucket);
824        if(e->setVBucketState(vb, state, false) == ENGINE_ERANGE) {
825            const std::string msg("VBucket number too big");
826            return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
827                                msg.length(), PROTOCOL_BINARY_RAW_BYTES,
828                                PROTOCOL_BINARY_RESPONSE_ERANGE,
829                                cas, cookie);
830        }
831        return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
832                            PROTOCOL_BINARY_RAW_BYTES,
833                            PROTOCOL_BINARY_RESPONSE_SUCCESS,
834                            cas, cookie);
835    }
836
837    static ENGINE_ERROR_CODE delVBucket(EventuallyPersistentEngine *e,
838                                        const void *cookie,
839                                        protocol_binary_request_header *req,
840                                        ADD_RESPONSE response) {
841
842        uint64_t cas = ntohll(req->request.cas);
843
844        protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
845        uint16_t vbucket = ntohs(req->request.vbucket);
846
847        std::string msg = "";
848        if (ntohs(req->request.keylen) > 0 || req->request.extlen > 0) {
849            msg = "Incorrect packet format.";
850            return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
851                                msg.length(),
852                                PROTOCOL_BINARY_RAW_BYTES,
853                                PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
854        }
855
856        bool sync = false;
857        uint32_t bodylen = ntohl(req->request.bodylen);
858        if (bodylen > 0) {
859            const char* ptr = reinterpret_cast<const char*>(req->bytes) +
860                sizeof(req->bytes);
861            if (bodylen == 7 && strncmp(ptr, "async=0", bodylen) == 0) {
862                sync = true;
863            }
864        }
865
866        ENGINE_ERROR_CODE err;
867        void* es = e->getEngineSpecific(cookie);
868        if (sync) {
869            if (es == NULL) {
870                err = e->deleteVBucket(vbucket, cookie);
871                e->storeEngineSpecific(cookie, e);
872            } else {
873                e->storeEngineSpecific(cookie, NULL);
874                LOG(EXTENSION_LOG_INFO,
875                    "Completed sync deletion of vbucket %u",
876                    (unsigned)vbucket);
877                err = ENGINE_SUCCESS;
878            }
879        } else {
880            err = e->deleteVBucket(vbucket);
881        }
882        switch (err) {
883        case ENGINE_SUCCESS:
884            LOG(EXTENSION_LOG_WARNING,
885                "Deletion of vbucket %d was completed.", vbucket);
886            break;
887        case ENGINE_NOT_MY_VBUCKET:
888            LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
889                "because the vbucket doesn't exist!!!", vbucket);
890            res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
891            break;
892        case ENGINE_EINVAL:
893            LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
894                "because the vbucket is not in a dead state\n", vbucket);
895            msg = "Failed to delete vbucket.  Must be in the dead state.";
896            res = PROTOCOL_BINARY_RESPONSE_EINVAL;
897            break;
898        case ENGINE_EWOULDBLOCK:
899            LOG(EXTENSION_LOG_WARNING, "Requst to vbucket %d deletion is in"
900                " EWOULDBLOCK until the database file is removed from disk",
901                vbucket);
902            e->storeEngineSpecific(cookie, req);
903            return ENGINE_EWOULDBLOCK;
904        default:
905            LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
906                "because of unknown reasons\n", vbucket);
907            msg = "Failed to delete vbucket.  Unknown reason.";
908            res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
909        }
910
911        if (err != ENGINE_NOT_MY_VBUCKET) {
912            return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
913                                msg.length(), PROTOCOL_BINARY_RAW_BYTES,
914                                res, cas, cookie);
915        } else {
916            LockHolder lh(e->clusterConfig.lock);
917            return sendResponse(response, NULL, 0, NULL, 0,
918                                e->clusterConfig.config,
919                                e->clusterConfig.len,
920                                PROTOCOL_BINARY_RAW_BYTES,
921                                res, cas, cookie);
922        }
923
924    }
925
926    static ENGINE_ERROR_CODE getReplicaCmd(EventuallyPersistentEngine *e,
927                                       protocol_binary_request_header *request,
928                                       const void *cookie,
929                                       Item **it,
930                                       const char **msg,
931                                       protocol_binary_response_status *res) {
932        EventuallyPersistentStore *eps = e->getEpStore();
933        protocol_binary_request_no_extras *req =
934            (protocol_binary_request_no_extras*)request;
935        int keylen = ntohs(req->message.header.request.keylen);
936        uint16_t vbucket = ntohs(req->message.header.request.vbucket);
937        ENGINE_ERROR_CODE error_code;
938        std::string keystr(((char *)request) + sizeof(req->message.header),
939                            keylen);
940
941        GetValue rv(eps->getReplica(keystr, vbucket, cookie, true));
942
943        if ((error_code = rv.getStatus()) != ENGINE_SUCCESS) {
944            if (error_code == ENGINE_NOT_MY_VBUCKET) {
945                *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
946                return error_code;
947            } else if (error_code == ENGINE_TMPFAIL) {
948                *msg = "NOT_FOUND";
949                *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
950            } else {
951                return error_code;
952            }
953        } else {
954            *it = rv.getValue();
955            *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
956        }
957        ++(e->getEpStats().numOpsGet);
958        return ENGINE_SUCCESS;
959    }
960
961    static ENGINE_ERROR_CODE compactDB(EventuallyPersistentEngine *e,
962                                       const void *cookie,
963                                       protocol_binary_request_compact_db *req,
964                                       ADD_RESPONSE response) {
965
966        std::string msg = "";
967        protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
968        compaction_ctx compactreq;
969        uint16_t vbucket = ntohs(req->message.header.request.vbucket);
970        uint64_t cas = ntohll(req->message.header.request.cas);
971
972        if (ntohs(req->message.header.request.keylen) > 0 ||
973             req->message.header.request.extlen != 24) {
974            LOG(EXTENSION_LOG_WARNING,
975                    "Compaction of vbucket %d received bad ext/key len %d/%d.",
976                    vbucket, req->message.header.request.extlen,
977                    ntohs(req->message.header.request.keylen));
978            msg = "Incorrect packet format.";
979            return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
980                                msg.length(),
981                                PROTOCOL_BINARY_RAW_BYTES,
982                                PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
983        }
984        EPStats &stats = e->getEpStats();
985        compactreq.max_purged_seq = 0;
986        compactreq.purge_before_ts = ntohll(req->message.body.purge_before_ts);
987        compactreq.purge_before_seq =
988                                    ntohll(req->message.body.purge_before_seq);
989        compactreq.drop_deletes     = req->message.body.drop_deletes;
990
991        ENGINE_ERROR_CODE err;
992        void* es = e->getEngineSpecific(cookie);
993        if (es == NULL) {
994            ++stats.pendingCompactions;
995            e->storeEngineSpecific(cookie, e);
996            err = e->compactDB(vbucket, compactreq, cookie);
997        } else {
998            e->storeEngineSpecific(cookie, NULL);
999            err = ENGINE_SUCCESS;
1000        }
1001
1002        switch (err) {
1003            case ENGINE_SUCCESS:
1004                LOG(EXTENSION_LOG_INFO,
1005                    "Compaction of vbucket %d completed.", vbucket);
1006                break;
1007            case ENGINE_NOT_MY_VBUCKET:
1008                --stats.pendingCompactions;
1009                LOG(EXTENSION_LOG_WARNING, "Compaction of vbucket %d failed "
1010                    "because the vbucket doesn't exist!!!", vbucket);
1011                res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1012                break;
1013            case ENGINE_EWOULDBLOCK:
1014                LOG(EXTENSION_LOG_INFO, "Request to compact vbucket %d is "
1015                        "in EWOULDBLOCK state until the database file is "
1016                        "compacted on disk",
1017                        vbucket);
1018                e->storeEngineSpecific(cookie, req);
1019                return ENGINE_EWOULDBLOCK;
1020            case ENGINE_TMPFAIL:
1021                LOG(EXTENSION_LOG_WARNING, "Request to compact vbucket %d hit"
1022                        " a temporary failure and may need to be retried",
1023                        vbucket);
1024                msg = "Temporary failure in compacting vbucket.";
1025                res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
1026                break;
1027            default:
1028                --stats.pendingCompactions;
1029                LOG(EXTENSION_LOG_WARNING, "Compaction of vbucket %d failed "
1030                    "because of unknown reasons\n", vbucket);
1031                msg = "Failed to compact vbucket.  Unknown reason.";
1032                res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
1033                break;
1034        }
1035
1036        if (err != ENGINE_NOT_MY_VBUCKET) {
1037            return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
1038                                msg.length(), PROTOCOL_BINARY_RAW_BYTES,
1039                                res, cas, cookie);
1040        } else {
1041            LockHolder lh(e->clusterConfig.lock);
1042            return sendResponse(response, NULL, 0, NULL, 0,
1043                                e->clusterConfig.config,
1044                                e->clusterConfig.len,
1045                                PROTOCOL_BINARY_RAW_BYTES,
1046                                res, cas, cookie);
1047        }
1048    }
1049
1050    static ENGINE_ERROR_CODE processUnknownCommand(
1051                                       EventuallyPersistentEngine *h,
1052                                       const void* cookie,
1053                                       protocol_binary_request_header *request,
1054                                       ADD_RESPONSE response)
1055    {
1056        protocol_binary_response_status res =
1057                                      PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
1058        const char *msg = NULL;
1059        size_t msg_size = 0;
1060        Item *itm = NULL;
1061
1062        EPStats &stats = h->getEpStats();
1063        ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
1064
1065        /**
1066         * Session validation
1067         * (For ns_server commands only)
1068         */
1069        switch (request->request.opcode) {
1070            case PROTOCOL_BINARY_CMD_SET_PARAM:
1071            case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1072            case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1073            case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT:
1074            case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER:
1075            case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG:
1076            case PROTOCOL_BINARY_CMD_COMPACT_DB:
1077            {
1078                if (h->getEngineSpecific(cookie) == NULL) {
1079                    uint64_t cas = ntohll(request->request.cas);
1080                    if (!h->validateSessionCas(cas)) {
1081                        const std::string message("Invalid session token");
1082                        return sendResponse(response, NULL, 0, NULL, 0,
1083                                            message.c_str(), message.length(),
1084                                            PROTOCOL_BINARY_RAW_BYTES,
1085                                            PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS,
1086                                            cas, cookie);
1087                    }
1088                }
1089                break;
1090            }
1091            default:
1092                break;
1093        }
1094
1095        switch (request->request.opcode) {
1096        case PROTOCOL_BINARY_CMD_GET_VBUCKET:
1097            {
1098                BlockTimer timer(&stats.getVbucketCmdHisto);
1099                rv = getVBucket(h, cookie, request, response);
1100                return rv;
1101            }
1102        case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1103            {
1104                BlockTimer timer(&stats.delVbucketCmdHisto);
1105                rv = delVBucket(h, cookie, request, response);
1106                if (rv != ENGINE_EWOULDBLOCK) {
1107                    h->decrementSessionCtr();
1108                    h->storeEngineSpecific(cookie, NULL);
1109                }
1110                return rv;
1111            }
1112        case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1113            {
1114                BlockTimer timer(&stats.setVbucketCmdHisto);
1115                rv = setVBucket(h, cookie, request, response);
1116                h->decrementSessionCtr();
1117                return rv;
1118            }
1119        case PROTOCOL_BINARY_CMD_TOUCH:
1120        case PROTOCOL_BINARY_CMD_GAT:
1121        case PROTOCOL_BINARY_CMD_GATQ:
1122            {
1123                rv = h->touch(cookie, request, response);
1124                return rv;
1125            }
1126        case PROTOCOL_BINARY_CMD_STOP_PERSISTENCE:
1127            res = stopFlusher(h, &msg, &msg_size);
1128            break;
1129        case PROTOCOL_BINARY_CMD_START_PERSISTENCE:
1130            res = startFlusher(h, &msg, &msg_size);
1131            break;
1132        case PROTOCOL_BINARY_CMD_SET_PARAM:
1133            res = setParam(h,
1134                  reinterpret_cast<protocol_binary_request_set_param*>(request),
1135                            &msg, &msg_size);
1136            h->decrementSessionCtr();
1137            break;
1138        case PROTOCOL_BINARY_CMD_EVICT_KEY:
1139            res = evictKey(h, request, &msg, &msg_size);
1140            break;
1141        case PROTOCOL_BINARY_CMD_GET_LOCKED:
1142            rv = getLocked(h, request, cookie, &itm, &msg, &msg_size, &res);
1143            if (rv == ENGINE_EWOULDBLOCK) {
1144                // we dont have the value for the item yet
1145                return rv;
1146            }
1147            break;
1148        case PROTOCOL_BINARY_CMD_UNLOCK_KEY:
1149            res = unlockKey(h, request, &msg, &msg_size);
1150            break;
1151        case PROTOCOL_BINARY_CMD_OBSERVE:
1152            return h->observe(cookie, request, response);
1153        case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT:
1154            {
1155                rv = h->deregisterTapClient(cookie, request, response);
1156                h->decrementSessionCtr();
1157                return rv;
1158            }
1159        case PROTOCOL_BINARY_CMD_RESET_REPLICATION_CHAIN:
1160            {
1161                rv = h->resetReplicationChain(cookie, request, response);
1162                return rv;
1163            }
1164        case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER:
1165            {
1166                rv = h->changeTapVBFilter(cookie, request, response);
1167                h->decrementSessionCtr();
1168                return rv;
1169            }
1170        case PROTOCOL_BINARY_CMD_LAST_CLOSED_CHECKPOINT:
1171        case PROTOCOL_BINARY_CMD_CREATE_CHECKPOINT:
1172        case PROTOCOL_BINARY_CMD_CHECKPOINT_PERSISTENCE:
1173            {
1174                rv = h->handleCheckpointCmds(cookie, request, response);
1175                return rv;
1176            }
1177        case PROTOCOL_BINARY_CMD_SEQNO_PERSISTENCE:
1178            {
1179                rv = h->handleSeqnoCmds(cookie, request, response);
1180                return rv;
1181            }
1182        case PROTOCOL_BINARY_CMD_GET_META:
1183        case PROTOCOL_BINARY_CMD_GETQ_META:
1184            {
1185                rv = h->getMeta(cookie,
1186                        reinterpret_cast<protocol_binary_request_get_meta*>
1187                                                          (request), response);
1188                return rv;
1189            }
1190        case PROTOCOL_BINARY_CMD_SET_WITH_META:
1191        case PROTOCOL_BINARY_CMD_SETQ_WITH_META:
1192        case PROTOCOL_BINARY_CMD_ADD_WITH_META:
1193        case PROTOCOL_BINARY_CMD_ADDQ_WITH_META:
1194            {
1195                rv = h->setWithMeta(cookie,
1196                     reinterpret_cast<protocol_binary_request_set_with_meta*>
1197                                                          (request), response);
1198                return rv;
1199            }
1200        case PROTOCOL_BINARY_CMD_DEL_WITH_META:
1201        case PROTOCOL_BINARY_CMD_DELQ_WITH_META:
1202            {
1203                rv = h->deleteWithMeta(cookie,
1204                    reinterpret_cast<protocol_binary_request_delete_with_meta*>
1205                                                          (request), response);
1206                return rv;
1207            }
1208        case PROTOCOL_BINARY_CMD_RETURN_META:
1209            {
1210                return h->returnMeta(cookie,
1211                reinterpret_cast<protocol_binary_request_return_meta*>
1212                                                          (request), response);
1213            }
1214        case PROTOCOL_BINARY_CMD_GET_REPLICA:
1215            rv = getReplicaCmd(h, request, cookie, &itm, &msg, &res);
1216            if (rv != ENGINE_SUCCESS && rv != ENGINE_NOT_MY_VBUCKET) {
1217                return rv;
1218            }
1219            break;
1220        case PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC:
1221        case PROTOCOL_BINARY_CMD_DISABLE_TRAFFIC:
1222            {
1223                rv = h->handleTrafficControlCmd(cookie, request, response);
1224                return rv;
1225            }
1226        case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG:
1227            {
1228                rv = h->setClusterConfig(cookie,
1229                 reinterpret_cast<protocol_binary_request_set_cluster_config*>
1230                                                          (request), response);
1231                h->decrementSessionCtr();
1232                return rv;
1233            }
1234        case PROTOCOL_BINARY_CMD_GET_CLUSTER_CONFIG:
1235            return h->getClusterConfig(cookie,
1236               reinterpret_cast<protocol_binary_request_get_cluster_config*>
1237                                                          (request), response);
1238        case PROTOCOL_BINARY_CMD_COMPACT_DB:
1239            {
1240                rv = compactDB(h, cookie,
1241                               (protocol_binary_request_compact_db*)(request),
1242                               response);
1243                if (rv != ENGINE_EWOULDBLOCK) {
1244                    h->decrementSessionCtr();
1245                    h->storeEngineSpecific(cookie, NULL);
1246                }
1247                return rv;
1248            }
1249        case PROTOCOL_BINARY_CMD_GET_RANDOM_KEY:
1250            if (request->request.extlen != 0 ||
1251                request->request.keylen != 0 ||
1252                request->request.bodylen != 0) {
1253                return ENGINE_EINVAL;
1254            }
1255
1256            return h->getRandomKey(cookie, response);
1257        case CMD_GET_KEYS:
1258            return h->getAllKeys(cookie,
1259               reinterpret_cast<protocol_binary_request_get_keys*>(request),
1260                                                                   response);
1261        }
1262
1263        // Send a special response for getl since we don't want to send the key
1264        if (itm && request->request.opcode == PROTOCOL_BINARY_CMD_GET_LOCKED) {
1265            uint32_t flags = itm->getFlags();
1266            rv = sendResponse(response, NULL, 0, (const void *)&flags,
1267                              sizeof(uint32_t),
1268                              static_cast<const void *>(itm->getData()),
1269                              itm->getNBytes(), itm->getDataType(),
1270                              static_cast<uint16_t>(res), itm->getCas(),
1271                              cookie);
1272            delete itm;
1273        } else if (itm) {
1274            const std::string &key  = itm->getKey();
1275            uint32_t flags = itm->getFlags();
1276            rv = sendResponse(response, static_cast<const void *>(key.data()),
1277                              itm->getNKey(),
1278                              (const void *)&flags, sizeof(uint32_t),
1279                              static_cast<const void *>(itm->getData()),
1280                              itm->getNBytes(), itm->getDataType(),
1281                              static_cast<uint16_t>(res), itm->getCas(),
1282                              cookie);
1283            delete itm;
1284        } else  if (rv == ENGINE_NOT_MY_VBUCKET) {
1285            LockHolder lh(h->clusterConfig.lock);
1286            return sendResponse(response, NULL, 0, NULL, 0,
1287                                h->clusterConfig.config,
1288                                h->clusterConfig.len,
1289                                PROTOCOL_BINARY_RAW_BYTES,
1290                                PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0,
1291                                cookie);
1292        } else {
1293            msg_size = (msg_size > 0 || msg == NULL) ? msg_size : strlen(msg);
1294            rv = sendResponse(response, NULL, 0, NULL, 0,
1295                              msg, static_cast<uint16_t>(msg_size),
1296                              PROTOCOL_BINARY_RAW_BYTES,
1297                              static_cast<uint16_t>(res), 0, cookie);
1298
1299        }
1300        return rv;
1301    }
1302
1303    static ENGINE_ERROR_CODE EvpUnknownCommand(ENGINE_HANDLE* handle,
1304                                               const void* cookie,
1305                                               protocol_binary_request_header
1306                                                                      *request,
1307                                               ADD_RESPONSE response)
1308    {
1309        ENGINE_ERROR_CODE err_code = processUnknownCommand(getHandle(handle),
1310                                                           cookie,
1311                                                           request, response);
1312        releaseHandle(handle);
1313        return err_code;
1314    }
1315
1316    static void EvpItemSetCas(ENGINE_HANDLE* , const void *,
1317                              item *itm, uint64_t cas) {
1318        static_cast<Item*>(itm)->setCas(cas);
1319    }
1320
1321    static ENGINE_ERROR_CODE EvpTapNotify(ENGINE_HANDLE* handle,
1322                                          const void *cookie,
1323                                          void *engine_specific,
1324                                          uint16_t nengine,
1325                                          uint8_t ttl,
1326                                          uint16_t tap_flags,
1327                                          tap_event_t tap_event,
1328                                          uint32_t tap_seqno,
1329                                          const void *key,
1330                                          size_t nkey,
1331                                          uint32_t flags,
1332                                          uint32_t exptime,
1333                                          uint64_t cas,
1334                                          uint8_t datatype,
1335                                          const void *data,
1336                                          size_t ndata,
1337                                          uint16_t vbucket)
1338    {
1339        if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
1340            LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1341                    " (TapNotify)");
1342            return ENGINE_EINVAL;
1343        }
1344        ENGINE_ERROR_CODE err_code = getHandle(handle)->tapNotify(cookie,
1345                                                        engine_specific,
1346                                                        nengine, ttl,
1347                                                        tap_flags,
1348                                                        (uint16_t)tap_event,
1349                                                        tap_seqno,
1350                                                        key, nkey, flags,
1351                                                        exptime, cas,
1352                                                        datatype, data,
1353                                                        ndata, vbucket);
1354        releaseHandle(handle);
1355        return err_code;
1356    }
1357
1358    static tap_event_t EvpTapIterator(ENGINE_HANDLE* handle,
1359                                      const void *cookie, item **itm,
1360                                      void **es, uint16_t *nes, uint8_t *ttl,
1361                                      uint16_t *flags, uint32_t *seqno,
1362                                      uint16_t *vbucket) {
1363        uint16_t tap_event = getHandle(handle)->walkTapQueue(cookie, itm, es,
1364                                                             nes, ttl,
1365                                                             flags, seqno,
1366                                                             vbucket);
1367        releaseHandle(handle);
1368        return static_cast<tap_event_t>(tap_event);
1369    }
1370
1371    static TAP_ITERATOR EvpGetTapIterator(ENGINE_HANDLE* handle,
1372                                          const void* cookie,
1373                                          const void* client,
1374                                          size_t nclient,
1375                                          uint32_t flags,
1376                                          const void* userdata,
1377                                          size_t nuserdata)
1378    {
1379        EventuallyPersistentEngine *h = getHandle(handle);
1380        TAP_ITERATOR iterator = NULL;
1381        {
1382            std::string c(static_cast<const char*>(client), nclient);
1383            // Figure out what we want from the userdata before adding it to
1384            // the API to the handle
1385            if (h->createTapQueue(cookie, c, flags, userdata, nuserdata)) {
1386                iterator = EvpTapIterator;
1387            }
1388        }
1389        releaseHandle(handle);
1390        return iterator;
1391    }
1392
1393
1394    static ENGINE_ERROR_CODE EvpDcpStep(ENGINE_HANDLE* handle,
1395                                       const void* cookie,
1396                                       struct dcp_message_producers *producers)
1397    {
1398        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1399        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1400        if (conn) {
1401            errCode = conn->step(producers);
1402        }
1403        releaseHandle(handle);
1404        return errCode;
1405    }
1406
1407
1408    static ENGINE_ERROR_CODE EvpDcpOpen(ENGINE_HANDLE* handle,
1409                                        const void* cookie,
1410                                        uint32_t opaque,
1411                                        uint32_t seqno,
1412                                        uint32_t flags,
1413                                        void *name,
1414                                        uint16_t nname)
1415    {
1416        ENGINE_ERROR_CODE errCode;
1417        errCode = getHandle(handle)->dcpOpen(cookie, opaque, seqno, flags,
1418                                             name, nname);
1419        releaseHandle(handle);
1420        return errCode;
1421    }
1422
1423    static ENGINE_ERROR_CODE EvpDcpAddStream(ENGINE_HANDLE* handle,
1424                                             const void* cookie,
1425                                             uint32_t opaque,
1426                                             uint16_t vbucket,
1427                                             uint32_t flags)
1428    {
1429        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1430        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1431        if (conn) {
1432            errCode = conn->addStream(opaque, vbucket, flags);
1433        }
1434        releaseHandle(handle);
1435        return errCode;
1436    }
1437
1438    static ENGINE_ERROR_CODE EvpDcpCloseStream(ENGINE_HANDLE* handle,
1439                                               const void* cookie,
1440                                               uint32_t opaque,
1441                                               uint16_t vbucket)
1442    {
1443        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1444        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1445        if (conn) {
1446            errCode = conn->closeStream(opaque, vbucket);
1447        }
1448        releaseHandle(handle);
1449        return errCode;
1450    }
1451
1452
1453    static ENGINE_ERROR_CODE EvpDcpStreamReq(ENGINE_HANDLE* handle,
1454                                             const void* cookie,
1455                                             uint32_t flags,
1456                                             uint32_t opaque,
1457                                             uint16_t vbucket,
1458                                             uint64_t startSeqno,
1459                                             uint64_t endSeqno,
1460                                             uint64_t vbucketUuid,
1461                                             uint64_t snapStartSeqno,
1462                                             uint64_t snapEndSeqno,
1463                                             uint64_t *rollbackSeqno,
1464                                             dcp_add_failover_log callback)
1465    {
1466        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1467        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1468        if (conn) {
1469            errCode = conn->streamRequest(flags, opaque, vbucket, startSeqno,
1470                                          endSeqno, vbucketUuid, snapStartSeqno,
1471                                          snapEndSeqno, rollbackSeqno, callback);
1472        }
1473        releaseHandle(handle);
1474        return errCode;
1475    }
1476
1477    static ENGINE_ERROR_CODE EvpDcpGetFailoverLog(ENGINE_HANDLE* handle,
1478                                                 const void* cookie,
1479                                                 uint32_t opaque,
1480                                                 uint16_t vbucket,
1481                                                 dcp_add_failover_log callback)
1482    {
1483        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1484        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1485        if (conn) {
1486            errCode = conn->getFailoverLog(opaque, vbucket, callback);
1487        }
1488        releaseHandle(handle);
1489        return errCode;
1490    }
1491
1492
1493    static ENGINE_ERROR_CODE EvpDcpStreamEnd(ENGINE_HANDLE* handle,
1494                                             const void* cookie,
1495                                             uint32_t opaque,
1496                                             uint16_t vbucket,
1497                                             uint32_t flags)
1498    {
1499        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1500        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1501        if (conn) {
1502            errCode = conn->streamEnd(opaque, vbucket, flags);
1503        }
1504        releaseHandle(handle);
1505        return errCode;
1506    }
1507
1508
1509    static ENGINE_ERROR_CODE EvpDcpSnapshotMarker(ENGINE_HANDLE* handle,
1510                                                  const void* cookie,
1511                                                  uint32_t opaque,
1512                                                  uint16_t vbucket,
1513                                                  uint64_t start_seqno,
1514                                                  uint64_t end_seqno,
1515                                                  uint32_t flags)
1516    {
1517        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1518        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1519        if (conn) {
1520            errCode = conn->snapshotMarker(opaque, vbucket, start_seqno,
1521                                           end_seqno, flags);
1522        }
1523        releaseHandle(handle);
1524        return errCode;
1525    }
1526
1527    static ENGINE_ERROR_CODE EvpDcpMutation(ENGINE_HANDLE* handle,
1528                                            const void* cookie,
1529                                            uint32_t opaque,
1530                                            const void *key,
1531                                            uint16_t nkey,
1532                                            const void *value,
1533                                            uint32_t nvalue,
1534                                            uint64_t cas,
1535                                            uint16_t vbucket,
1536                                            uint32_t flags,
1537                                            uint8_t datatype,
1538                                            uint64_t bySeqno,
1539                                            uint64_t revSeqno,
1540                                            uint32_t expiration,
1541                                            uint32_t lockTime,
1542                                            const void *meta,
1543                                            uint16_t nmeta,
1544                                            uint8_t nru)
1545    {
1546        if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
1547            LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1548                    " (DCPMutation)");
1549            return ENGINE_EINVAL;
1550        }
1551        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1552        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1553        if (conn) {
1554            errCode = conn->mutation(opaque, key, nkey, value, nvalue, cas,
1555                                     vbucket, flags, datatype, lockTime,
1556                                     bySeqno, revSeqno, expiration,
1557                                     nru, meta, nmeta);
1558        }
1559        releaseHandle(handle);
1560        return errCode;
1561    }
1562
1563    static ENGINE_ERROR_CODE EvpDcpDeletion(ENGINE_HANDLE* handle,
1564                                            const void* cookie,
1565                                            uint32_t opaque,
1566                                            const void *key,
1567                                            uint16_t nkey,
1568                                            uint64_t cas,
1569                                            uint16_t vbucket,
1570                                            uint64_t bySeqno,
1571                                            uint64_t revSeqno,
1572                                            const void *meta,
1573                                            uint16_t nmeta)
1574    {
1575        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1576        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1577        if (conn) {
1578            errCode = conn->deletion(opaque, key, nkey, cas, vbucket, bySeqno,
1579                                     revSeqno, meta, nmeta);
1580        }
1581        releaseHandle(handle);
1582        return errCode;
1583    }
1584
1585    static ENGINE_ERROR_CODE EvpDcpExpiration(ENGINE_HANDLE* handle,
1586                                              const void* cookie,
1587                                              uint32_t opaque,
1588                                              const void *key,
1589                                              uint16_t nkey,
1590                                              uint64_t cas,
1591                                              uint16_t vbucket,
1592                                              uint64_t bySeqno,
1593                                              uint64_t revSeqno,
1594                                              const void *meta,
1595                                              uint16_t nmeta)
1596    {
1597        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1598        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1599        if (conn) {
1600            errCode = conn->expiration(opaque, key, nkey, cas, vbucket, bySeqno,
1601                                       revSeqno, meta, nmeta);
1602        }
1603        releaseHandle(handle);
1604        return errCode;
1605    }
1606
1607    static ENGINE_ERROR_CODE EvpDcpFlush(ENGINE_HANDLE* handle,
1608                                         const void* cookie,
1609                                         uint32_t opaque,
1610                                         uint16_t vbucket)
1611    {
1612        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1613        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1614        if (conn) {
1615            errCode = conn->flushall(opaque, vbucket);
1616        }
1617        releaseHandle(handle);
1618        return errCode;
1619    }
1620
1621    static ENGINE_ERROR_CODE EvpDcpSetVbucketState(ENGINE_HANDLE* handle,
1622                                                   const void* cookie,
1623                                                   uint32_t opaque,
1624                                                   uint16_t vbucket,
1625                                                   vbucket_state_t state)
1626    {
1627        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1628        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1629        if (conn) {
1630            errCode = conn->setVBucketState(opaque, vbucket, state);
1631        }
1632        releaseHandle(handle);
1633        return errCode;
1634    }
1635
1636    static ENGINE_ERROR_CODE EvpDcpNoop(ENGINE_HANDLE* handle,
1637                                        const void* cookie,
1638                                        uint32_t opaque) {
1639        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1640        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1641        if (conn) {
1642            errCode = conn->noop(opaque);
1643        }
1644        releaseHandle(handle);
1645        return errCode;
1646    }
1647
1648    static ENGINE_ERROR_CODE EvpDcpBufferAcknowledgement(ENGINE_HANDLE* handle,
1649                                                         const void* cookie,
1650                                                         uint32_t opaque,
1651                                                         uint16_t vbucket,
1652                                                         uint32_t buffer_bytes) {
1653        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1654        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1655        if (conn) {
1656            errCode = conn->bufferAcknowledgement(opaque, vbucket,
1657                                                  buffer_bytes);
1658        }
1659        releaseHandle(handle);
1660        return errCode;
1661    }
1662
1663    static ENGINE_ERROR_CODE EvpDcpControl(ENGINE_HANDLE* handle,
1664                                           const void* cookie,
1665                                           uint32_t opaque,
1666                                           const void *key,
1667                                           uint16_t nkey,
1668                                           const void *value,
1669                                           uint32_t nvalue) {
1670        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1671        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1672        if (conn) {
1673            errCode = conn->control(opaque, key, nkey, value, nvalue);
1674        }
1675        releaseHandle(handle);
1676        return errCode;
1677    }
1678
1679    static ENGINE_ERROR_CODE EvpDcpResponseHandler(ENGINE_HANDLE* handle,
1680                                     const void* cookie,
1681                                     protocol_binary_response_header *response)
1682    {
1683        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1684        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1685        if (conn) {
1686            errCode = conn->handleResponse(response);
1687        }
1688        releaseHandle(handle);
1689        return errCode;
1690    }
1691
1692    static void EvpHandleDisconnect(const void *cookie,
1693                                    ENGINE_EVENT_TYPE type,
1694                                    const void *event_data,
1695                                    const void *cb_data)
1696    {
1697        cb_assert(type == ON_DISCONNECT);
1698        cb_assert(event_data == NULL);
1699        void *c = const_cast<void*>(cb_data);
1700        getHandle(static_cast<ENGINE_HANDLE*>(c))->handleDisconnect(cookie);
1701        releaseHandle(static_cast<ENGINE_HANDLE*>(c));
1702    }
1703
1704
1705    /**
1706     * The only public interface to the eventually persistance engine.
1707     * Allocate a new instance and initialize it
1708     * @param interface the highest interface the server supports (we only
1709     *                  support interface 1)
1710     * @param get_server_api callback function to get the server exported API
1711     *                  functions
1712     * @param handle Where to return the new instance
1713     * @return ENGINE_SUCCESS on success
1714     */
1715    ENGINE_ERROR_CODE create_instance(uint64_t interface,
1716                                      GET_SERVER_API get_server_api,
1717                                      ENGINE_HANDLE **handle)
1718    {
1719        SERVER_HANDLE_V1 *api = get_server_api();
1720        if (interface != 1 || api == NULL) {
1721            return ENGINE_ENOTSUP;
1722        }
1723
1724        hooksApi = api->alloc_hooks;
1725        loggerApi = api->log;
1726        MemoryTracker::getInstance();
1727        ObjectRegistry::initialize(api->alloc_hooks->get_allocation_size);
1728
1729        AtomicValue<size_t>* inital_tracking = new AtomicValue<size_t>();
1730
1731        ObjectRegistry::setStats(inital_tracking);
1732        EventuallyPersistentEngine *engine;
1733        engine = new EventuallyPersistentEngine(get_server_api);
1734        ObjectRegistry::setStats(NULL);
1735
1736        if (engine == NULL) {
1737            return ENGINE_ENOMEM;
1738        }
1739
1740        if (MemoryTracker::trackingMemoryAllocations()) {
1741            engine->getEpStats().memoryTrackerEnabled.store(true);
1742            engine->getEpStats().totalMemory.store(inital_tracking->load());
1743        }
1744        delete inital_tracking;
1745
1746        ep_current_time = api->core->get_current_time;
1747        ep_abs_time = api->core->abstime;
1748        ep_reltime = api->core->realtime;
1749
1750        *handle = reinterpret_cast<ENGINE_HANDLE*> (engine);
1751
1752        return ENGINE_SUCCESS;
1753    }
1754
1755    static bool EvpGetItemInfo(ENGINE_HANDLE *, const void *,
1756                               const item* itm, item_info *itm_info)
1757    {
1758        const Item *it = reinterpret_cast<const Item*>(itm);
1759        if (itm_info->nvalue < 1) {
1760            return false;
1761        }
1762        itm_info->cas = it->getCas();
1763        itm_info->exptime = it->getExptime();
1764        itm_info->nbytes = it->getNBytes();
1765        itm_info->datatype = it->getDataType();
1766        itm_info->flags = it->getFlags();
1767        itm_info->clsid = 0;
1768        itm_info->nkey = static_cast<uint16_t>(it->getNKey());
1769        itm_info->nvalue = 1;
1770        itm_info->key = it->getKey().c_str();
1771        itm_info->value[0].iov_base = const_cast<char*>(it->getData());
1772        itm_info->value[0].iov_len = it->getNBytes();
1773        return true;
1774    }
1775
1776    static bool EvpSetItemInfo(ENGINE_HANDLE* handle, const void* cookie,
1777                               item* itm, const item_info *itm_info)
1778    {
1779        Item *it = reinterpret_cast<Item*>(itm);
1780        if (!it) {
1781            return false;
1782        }
1783        it->setDataType(itm_info->datatype);
1784        return true;
1785    }
1786
1787    static ENGINE_ERROR_CODE EvpGetClusterConfig(ENGINE_HANDLE* handle,
1788                                                 const void* cookie,
1789                                                 engine_get_vb_map_cb callback)
1790    {
1791        EventuallyPersistentEngine *h = getHandle(handle);
1792        LockHolder lh(h->clusterConfig.lock);
1793        uint8_t *config = h->clusterConfig.config;
1794        uint32_t len = h->clusterConfig.len;
1795        releaseHandle(handle);
1796        return callback(cookie, config, len);
1797    }
1798
1799} // C linkage
1800
1801void LOG(EXTENSION_LOG_LEVEL severity, const char *fmt, ...) {
1802    char buffer[2048];
1803
1804    if (loggerApi != NULL) {
1805        EXTENSION_LOGGER_DESCRIPTOR* logger =
1806            (EXTENSION_LOGGER_DESCRIPTOR*)loggerApi->get_logger();
1807
1808        if (loggerApi->get_level() <= severity) {
1809            EventuallyPersistentEngine *engine = ObjectRegistry::onSwitchThread(NULL, true);
1810            va_list va;
1811            va_start(va, fmt);
1812            vsnprintf(buffer, sizeof(buffer) - 1, fmt, va);
1813            if (engine) {
1814                logger->log(severity, NULL, "(%s) %s", engine->getName(),
1815                            buffer);
1816            } else {
1817                logger->log(severity, NULL, "(No Engine) %s", buffer);
1818            }
1819            va_end(va);
1820            ObjectRegistry::onSwitchThread(engine);
1821        }
1822    }
1823}
1824
1825ALLOCATOR_HOOKS_API *getHooksApi(void) {
1826    return hooksApi;
1827}
1828
1829EventuallyPersistentEngine::EventuallyPersistentEngine(
1830                                    GET_SERVER_API get_server_api) :
1831    clusterConfig(), epstore(NULL), workload(NULL),
1832    workloadPriority(NO_BUCKET_PRIORITY),
1833    tapThrottle(NULL), getServerApiFunc(get_server_api),
1834    tapConnMap(NULL), tapConfig(NULL), checkpointConfig(NULL),
1835    trafficEnabled(false), flushAllEnabled(false), startupTime(0)
1836{
1837    interface.interface = 1;
1838    ENGINE_HANDLE_V1::get_info = EvpGetInfo;
1839    ENGINE_HANDLE_V1::initialize = EvpInitialize;
1840    ENGINE_HANDLE_V1::destroy = EvpDestroy;
1841    ENGINE_HANDLE_V1::allocate = EvpItemAllocate;
1842    ENGINE_HANDLE_V1::remove = EvpItemDelete;
1843    ENGINE_HANDLE_V1::release = EvpItemRelease;
1844    ENGINE_HANDLE_V1::get = EvpGet;
1845    ENGINE_HANDLE_V1::get_stats = EvpGetStats;
1846    ENGINE_HANDLE_V1::reset_stats = EvpResetStats;
1847    ENGINE_HANDLE_V1::store = EvpStore;
1848    ENGINE_HANDLE_V1::arithmetic = EvpArithmetic;
1849    ENGINE_HANDLE_V1::flush = EvpFlush;
1850    ENGINE_HANDLE_V1::unknown_command = EvpUnknownCommand;
1851    ENGINE_HANDLE_V1::get_tap_iterator = EvpGetTapIterator;
1852    ENGINE_HANDLE_V1::tap_notify = EvpTapNotify;
1853    ENGINE_HANDLE_V1::item_set_cas = EvpItemSetCas;
1854    ENGINE_HANDLE_V1::get_item_info = EvpGetItemInfo;
1855    ENGINE_HANDLE_V1::set_item_info = EvpSetItemInfo;
1856    ENGINE_HANDLE_V1::get_engine_vb_map = EvpGetClusterConfig;
1857    ENGINE_HANDLE_V1::get_stats_struct = NULL;
1858    ENGINE_HANDLE_V1::errinfo = NULL;
1859    ENGINE_HANDLE_V1::aggregate_stats = NULL;
1860
1861
1862    ENGINE_HANDLE_V1::dcp.step = EvpDcpStep;
1863    ENGINE_HANDLE_V1::dcp.open = EvpDcpOpen;
1864    ENGINE_HANDLE_V1::dcp.add_stream = EvpDcpAddStream;
1865    ENGINE_HANDLE_V1::dcp.close_stream = EvpDcpCloseStream;
1866    ENGINE_HANDLE_V1::dcp.get_failover_log = EvpDcpGetFailoverLog;
1867    ENGINE_HANDLE_V1::dcp.stream_req = EvpDcpStreamReq;
1868    ENGINE_HANDLE_V1::dcp.stream_end = EvpDcpStreamEnd;
1869    ENGINE_HANDLE_V1::dcp.snapshot_marker = EvpDcpSnapshotMarker;
1870    ENGINE_HANDLE_V1::dcp.mutation = EvpDcpMutation;
1871    ENGINE_HANDLE_V1::dcp.deletion = EvpDcpDeletion;
1872    ENGINE_HANDLE_V1::dcp.expiration = EvpDcpExpiration;
1873    ENGINE_HANDLE_V1::dcp.flush = EvpDcpFlush;
1874    ENGINE_HANDLE_V1::dcp.set_vbucket_state = EvpDcpSetVbucketState;
1875    ENGINE_HANDLE_V1::dcp.noop = EvpDcpNoop;
1876    ENGINE_HANDLE_V1::dcp.buffer_acknowledgement = EvpDcpBufferAcknowledgement;
1877    ENGINE_HANDLE_V1::dcp.control = EvpDcpControl;
1878    ENGINE_HANDLE_V1::dcp.response_handler = EvpDcpResponseHandler;
1879
1880    serverApi = getServerApiFunc();
1881    memset(&info, 0, sizeof(info));
1882    info.info.description = "EP engine v" VERSION;
1883    info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_CAS;
1884    info.info.features[info.info.num_features++].feature =
1885                                             ENGINE_FEATURE_PERSISTENT_STORAGE;
1886    info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_LRU;
1887    info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_DATATYPE;
1888
1889}
1890
1891ENGINE_ERROR_CODE EventuallyPersistentEngine::reserveCookie(const void *cookie)
1892{
1893    EventuallyPersistentEngine *epe =
1894                                    ObjectRegistry::onSwitchThread(NULL, true);
1895    ENGINE_ERROR_CODE rv = serverApi->cookie->reserve(cookie);
1896    ObjectRegistry::onSwitchThread(epe);
1897    return rv;
1898}
1899
1900ENGINE_ERROR_CODE EventuallyPersistentEngine::releaseCookie(const void *cookie)
1901{
1902    EventuallyPersistentEngine *epe =
1903                                    ObjectRegistry::onSwitchThread(NULL, true);
1904    ENGINE_ERROR_CODE rv = serverApi->cookie->release(cookie);
1905    ObjectRegistry::onSwitchThread(epe);
1906    return rv;
1907}
1908
1909void EventuallyPersistentEngine::registerEngineCallback(ENGINE_EVENT_TYPE type,
1910                                                        EVENT_CALLBACK cb,
1911                                                        const void *cb_data) {
1912    EventuallyPersistentEngine *epe =
1913                                    ObjectRegistry::onSwitchThread(NULL, true);
1914    SERVER_CALLBACK_API *sapi = getServerApi()->callback;
1915    sapi->register_callback(reinterpret_cast<ENGINE_HANDLE*>(this),
1916                            type, cb, cb_data);
1917    ObjectRegistry::onSwitchThread(epe);
1918}
1919
1920/**
1921 * A configuration value changed listener that responds to ep-engine
1922 * parameter changes by invoking engine-specific methods on
1923 * configuration change events.
1924 */
1925class EpEngineValueChangeListener : public ValueChangedListener {
1926public:
1927    EpEngineValueChangeListener(EventuallyPersistentEngine &e) : engine(e) {
1928        // EMPTY
1929    }
1930
1931    virtual void sizeValueChanged(const std::string &key, size_t value) {
1932        if (key.compare("getl_max_timeout") == 0) {
1933            engine.setGetlMaxTimeout(value);
1934        } else if (key.compare("getl_default_timeout") == 0) {
1935            engine.setGetlDefaultTimeout(value);
1936        } else if (key.compare("max_item_size") == 0) {
1937            engine.setMaxItemSize(value);
1938        }
1939    }
1940
1941    virtual void booleanValueChanged(const std::string &key, bool value) {
1942        if (key.compare("flushall_enabled") == 0) {
1943            engine.setFlushAll(value);
1944        }
1945    }
1946private:
1947    EventuallyPersistentEngine &engine;
1948};
1949
1950
1951
1952ENGINE_ERROR_CODE EventuallyPersistentEngine::initialize(const char* config) {
1953    resetStats();
1954    if (config != NULL) {
1955        if (!configuration.parseConfiguration(config, serverApi)) {
1956            return ENGINE_FAILED;
1957        }
1958    }
1959
1960    name = configuration.getCouchBucket();
1961    maxFailoverEntries = configuration.getMaxFailoverEntries();
1962
1963    // Start updating the variables from the config!
1964    HashTable::setDefaultNumBuckets(configuration.getHtSize());
1965    HashTable::setDefaultNumLocks(configuration.getHtLocks());
1966    StoredValue::setMutationMemoryThreshold(
1967                                      configuration.getMutationMemThreshold());
1968
1969    if (configuration.getMaxSize() == 0) {
1970        configuration.setMaxSize(std::numeric_limits<size_t>::max());
1971    }
1972
1973    if (configuration.getMemLowWat() == std::numeric_limits<size_t>::max()) {
1974        configuration.setMemLowWat(percentOf(
1975                                            configuration.getMaxSize(), 0.75));
1976    }
1977
1978    if (configuration.getMemHighWat() == std::numeric_limits<size_t>::max()) {
1979        configuration.setMemHighWat(percentOf(
1980                                            configuration.getMaxSize(), 0.85));
1981    }
1982
1983    maxItemSize = configuration.getMaxItemSize();
1984    configuration.addValueChangedListener("max_item_size",
1985                                       new EpEngineValueChangeListener(*this));
1986
1987    getlDefaultTimeout = configuration.getGetlDefaultTimeout();
1988    configuration.addValueChangedListener("getl_default_timeout",
1989                                       new EpEngineValueChangeListener(*this));
1990    getlMaxTimeout = configuration.getGetlMaxTimeout();
1991    configuration.addValueChangedListener("getl_max_timeout",
1992                                       new EpEngineValueChangeListener(*this));
1993
1994    flushAllEnabled = configuration.isFlushallEnabled();
1995    configuration.addValueChangedListener("flushall_enabled",
1996                                       new EpEngineValueChangeListener(*this));
1997
1998    workload = new WorkLoadPolicy(configuration.getMaxNumWorkers(),
1999                                  configuration.getMaxNumShards());
2000    if ((unsigned int)workload->getNumShards() >
2001                                              configuration.getMaxVbuckets()) {
2002        LOG(EXTENSION_LOG_WARNING, "Invalid configuration: Shards must be "
2003            "equal or less than max number of vbuckets");
2004        return ENGINE_FAILED;
2005    }
2006
2007    dcpConnMap_ = new DcpConnMap(*this);
2008    tapConnMap = new TapConnMap(*this);
2009    tapConfig = new TapConfig(*this);
2010    tapThrottle = new TapThrottle(configuration, stats);
2011    TapConfig::addConfigChangeListener(*this);
2012
2013    checkpointConfig = new CheckpointConfig(*this);
2014    CheckpointConfig::addConfigChangeListener(*this);
2015
2016    epstore = new EventuallyPersistentStore(*this);
2017    if (epstore == NULL) {
2018        return ENGINE_ENOMEM;
2019    }
2020
2021    // Register the callback
2022    registerEngineCallback(ON_DISCONNECT, EvpHandleDisconnect, this);
2023
2024    // Complete the initialization of the ep-store
2025    if (!epstore->initialize()) {
2026        return ENGINE_FAILED;
2027    }
2028
2029    if(configuration.isDataTrafficEnabled()) {
2030        enableTraffic(true);
2031    }
2032
2033    tapConnMap->initialize(TAP_CONN_NOTIFIER);
2034    dcpConnMap_->initialize(DCP_CONN_NOTIFIER);
2035
2036    // record engine initialization time
2037    startupTime = ep_real_time();
2038
2039    LOG(EXTENSION_LOG_DEBUG, "Engine init complete.\n");
2040
2041    return ENGINE_SUCCESS;
2042}
2043
2044void EventuallyPersistentEngine::destroy(bool force) {
2045    stats.forceShutdown = force;
2046    stats.isShutdown = true;
2047
2048    if (epstore) {
2049        epstore->snapshotStats();
2050    }
2051    if (tapConnMap) {
2052        tapConnMap->shutdownAllConnections();
2053    }
2054    if (dcpConnMap_) {
2055        dcpConnMap_->shutdownAllConnections();
2056    }
2057}
2058
2059class FlushAllTask : public GlobalTask {
2060public:
2061    FlushAllTask(EventuallyPersistentStore *st, TapConnMap &tcm, double when)
2062        : GlobalTask(&st->getEPEngine(), Priority::FlushAllPriority, when,
2063                     false), epstore(st), tapConnMap(tcm) { }
2064
2065    bool run(void) {
2066        epstore->reset();
2067        tapConnMap.addFlushEvent();
2068        return false;
2069    }
2070
2071    std::string getDescription() {
2072        return std::string("Performing flush.");
2073    }
2074
2075private:
2076    EventuallyPersistentStore *epstore;
2077    TapConnMap                &tapConnMap;
2078};
2079
2080ENGINE_ERROR_CODE EventuallyPersistentEngine::flush(const void *, time_t when){
2081    if (!flushAllEnabled) {
2082        return ENGINE_ENOTSUP;
2083    }
2084
2085    if (isDegradedMode()) {
2086        return ENGINE_TMPFAIL;
2087    }
2088
2089    if (when == 0) {
2090        epstore->reset();
2091        tapConnMap->addFlushEvent();
2092    } else {
2093        ExTask flushTask = new FlushAllTask(epstore, *tapConnMap,
2094                static_cast<double>(when));
2095        ExecutorPool::get()->schedule(flushTask, NONIO_TASK_IDX);
2096    }
2097
2098    return ENGINE_SUCCESS;
2099}
2100
2101ENGINE_ERROR_CODE EventuallyPersistentEngine::store(const void *cookie,
2102                                                    item* itm,
2103                                                    uint64_t *cas,
2104                                                    ENGINE_STORE_OPERATION
2105                                                                     operation,
2106                                                    uint16_t vbucket) {
2107    BlockTimer timer(&stats.storeCmdHisto);
2108    ENGINE_ERROR_CODE ret;
2109    Item *it = static_cast<Item*>(itm);
2110    item *i = NULL;
2111
2112    it->setVBucketId(vbucket);
2113
2114    switch (operation) {
2115    case OPERATION_CAS:
2116        if (it->getCas() == 0) {
2117            // Using a cas command with a cas wildcard doesn't make sense
2118            ret = ENGINE_NOT_STORED;
2119            break;
2120        }
2121        // FALLTHROUGH
2122    case OPERATION_SET:
2123        if (isDegradedMode()) {
2124            return ENGINE_TMPFAIL;
2125        }
2126        ret = epstore->set(*it, cookie);
2127        if (ret == ENGINE_SUCCESS) {
2128            *cas = it->getCas();
2129        }
2130
2131        break;
2132
2133    case OPERATION_ADD:
2134        if (isDegradedMode()) {
2135            return ENGINE_TMPFAIL;
2136        }
2137
2138        if (it->getCas() != 0) {
2139            // Adding an item with a cas value doesn't really make sense...
2140            return ENGINE_KEY_EEXISTS;
2141        }
2142
2143        ret = epstore->add(*it, cookie);
2144        if (ret == ENGINE_SUCCESS) {
2145            *cas = it->getCas();
2146        }
2147        break;
2148
2149    case OPERATION_REPLACE:
2150        ret = epstore->replace(*it, cookie);
2151        if (ret == ENGINE_SUCCESS) {
2152            *cas = it->getCas();
2153        }
2154        break;
2155
2156    case OPERATION_APPEND:
2157    case OPERATION_PREPEND:
2158        do {
2159            if ((ret = get(cookie, &i, it->getKey().c_str(),
2160                           it->getNKey(), vbucket)) == ENGINE_SUCCESS) {
2161                Item *old = reinterpret_cast<Item*>(i);
2162
2163                if (old->getCas() == (uint64_t) -1) {
2164                    // item is locked against updates
2165                    itemRelease(cookie, i);
2166                    return ENGINE_TMPFAIL;
2167                }
2168
2169                if (it->getCas() != 0 && old->getCas() != it->getCas()) {
2170                    itemRelease(cookie, i);
2171                    return ENGINE_KEY_EEXISTS;
2172                }
2173
2174                if (operation == OPERATION_APPEND) {
2175                    ret = old->append(*it, maxItemSize);
2176                } else {
2177                    ret = old->prepend(*it, maxItemSize);
2178                }
2179
2180                if (ret != ENGINE_SUCCESS) {
2181                    itemRelease(cookie, i);
2182                    if (ret == ENGINE_E2BIG) {
2183                        return ret;
2184                    } else {
2185                        return memoryCondition();
2186                    }
2187                } else {
2188                    if (old->getDataType() == PROTOCOL_BINARY_DATATYPE_JSON) {
2189                        // Set the datatype of the new document to BINARY (0),
2190                        // as appending/prepending anything to JSON breaks the
2191                        // json data structure.
2192                        old->setDataType(PROTOCOL_BINARY_RAW_BYTES);
2193                    } else if (old->getDataType() ==
2194                                    PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
2195                        // Set the datatype of the new document to
2196                        // COMPRESSED_BINARY, as appending/prepending anything
2197                        // to JSON breaks the json data structure.
2198                        old->setDataType(PROTOCOL_BINARY_DATATYPE_COMPRESSED);
2199                    }
2200                }
2201
2202                ret = store(cookie, old, cas, OPERATION_CAS, vbucket);
2203                itemRelease(cookie, i);
2204            }
2205        } while (ret == ENGINE_KEY_EEXISTS);
2206
2207        // Map the error code back to what memcacpable expects
2208        if (ret == ENGINE_KEY_ENOENT) {
2209            ret = ENGINE_NOT_STORED;
2210        }
2211        break;
2212
2213    default:
2214        ret = ENGINE_ENOTSUP;
2215    }
2216
2217    switch (ret) {
2218    case ENGINE_SUCCESS:
2219        ++stats.numOpsStore;
2220        break;
2221    case ENGINE_ENOMEM:
2222        ret = memoryCondition();
2223        break;
2224    case ENGINE_NOT_STORED:
2225    case ENGINE_NOT_MY_VBUCKET:
2226        if (isDegradedMode()) {
2227            return ENGINE_TMPFAIL;
2228        }
2229        break;
2230    default:
2231        break;
2232    }
2233
2234    return ret;
2235}
2236
2237inline uint16_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie,
2238                                                           item **itm,
2239                                                           void **es,
2240                                                           uint16_t *nes,
2241                                                           uint8_t *ttl,
2242                                                           uint16_t *flags,
2243                                                           uint32_t *seqno,
2244                                                           uint16_t *vbucket,
2245                                                           TapProducer
2246                                                                   *connection,
2247                                                           bool &retry) {
2248    *es = NULL;
2249    *nes = 0;
2250    *ttl = (uint8_t)-1;
2251    *seqno = 0;
2252    *flags = 0;
2253    *vbucket = 0;
2254
2255    retry = false;
2256
2257    if (connection->shouldFlush()) {
2258        return TAP_FLUSH;
2259    }
2260
2261    if (connection->isTimeForNoop()) {
2262        LOG(EXTENSION_LOG_INFO, "%s Sending a NOOP message.\n",
2263            connection->logHeader());
2264        return TAP_NOOP;
2265    }
2266
2267    if (connection->isSuspended() || connection->windowIsFull()) {
2268        LOG(EXTENSION_LOG_INFO, "%s Connection in pause state because it is in"
2269            " suspended state or its ack windows is full.\n",
2270            connection->logHeader());
2271        return TAP_PAUSE;
2272    }
2273
2274    uint16_t ret = TAP_PAUSE;
2275    VBucketEvent ev = connection->nextVBucketHighPriority();
2276    if (ev.event != TAP_PAUSE) {
2277        switch (ev.event) {
2278        case TAP_VBUCKET_SET:
2279            LOG(EXTENSION_LOG_WARNING,
2280               "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2281                connection->logHeader(), ev.vbucket,
2282                VBucket::toString(ev.state));
2283            connection->encodeVBucketStateTransition(ev, es, nes, vbucket);
2284            break;
2285        case TAP_OPAQUE:
2286            LOG(EXTENSION_LOG_WARNING,
2287                "%s Sending TAP_OPAQUE with command \"%s\" and vbucket %d\n",
2288                connection->logHeader(),
2289                TapProducer::opaqueCmdToString(ntohl((uint32_t) ev.state)),
2290                ev.vbucket);
2291            connection->opaqueCommandCode = (uint32_t) ev.state;
2292            *vbucket = ev.vbucket;
2293            *es = &connection->opaqueCommandCode;
2294            *nes = sizeof(connection->opaqueCommandCode);
2295            break;
2296        default:
2297            LOG(EXTENSION_LOG_WARNING,
2298                "%s Unknown VBucketEvent message type %d\n",
2299                connection->logHeader(), ev.event);
2300            abort();
2301        }
2302        return ev.event;
2303    }
2304
2305    if (connection->waitForOpaqueMsgAck()) {
2306        return TAP_PAUSE;
2307    }
2308
2309    VBucketFilter backFillVBFilter;
2310    if (connection->runBackfill(backFillVBFilter)) {
2311        queueBackfill(backFillVBFilter, connection);
2312    }
2313
2314    uint8_t nru = INITIAL_NRU_VALUE;
2315    Item *it = connection->getNextItem(cookie, vbucket, ret, nru);
2316    switch (ret) {
2317    case TAP_CHECKPOINT_START:
2318    case TAP_CHECKPOINT_END:
2319    case TAP_MUTATION:
2320    case TAP_DELETION:
2321        *itm = it;
2322        if (ret == TAP_MUTATION) {
2323            *nes = TapEngineSpecific::packSpecificData(ret, connection,
2324                                                       it->getRevSeqno(), nru);
2325            *es = connection->specificData;
2326        } else if (ret == TAP_DELETION) {
2327            *nes = TapEngineSpecific::packSpecificData(ret, connection,
2328                                                       it->getRevSeqno());
2329            *es = connection->specificData;
2330        } else if (ret == TAP_CHECKPOINT_START) {
2331            // Send the current value of the max deleted seqno
2332            RCPtr<VBucket> vb = getVBucket(*vbucket);
2333            if (!vb) {
2334                retry = true;
2335                return TAP_NOOP;
2336            }
2337            *nes = TapEngineSpecific::packSpecificData(ret, connection,
2338                                               vb->ht.getMaxDeletedRevSeqno());
2339            *es = connection->specificData;
2340        }
2341        break;
2342    case TAP_NOOP:
2343        retry = true;
2344        break;
2345    default:
2346        break;
2347    }
2348
2349    if (ret == TAP_PAUSE && (connection->dumpQueue || connection->doTakeOver)){
2350        VBucketEvent vbev = connection->checkDumpOrTakeOverCompletion();
2351        if (vbev.event == TAP_VBUCKET_SET) {
2352            LOG(EXTENSION_LOG_WARNING,
2353               "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2354                connection->logHeader(), vbev.vbucket,
2355                VBucket::toString(vbev.state));
2356            connection->encodeVBucketStateTransition(vbev, es, nes, vbucket);
2357        }
2358        ret = vbev.event;
2359    }
2360
2361    return ret;
2362}
2363
2364uint16_t EventuallyPersistentEngine::walkTapQueue(const void *cookie,
2365                                                  item **itm,
2366                                                  void **es,
2367                                                  uint16_t *nes,
2368                                                  uint8_t *ttl,
2369                                                  uint16_t *flags,
2370                                                  uint32_t *seqno,
2371                                                  uint16_t *vbucket) {
2372    TapProducer *connection = getTapProducer(cookie);
2373    if (!connection) {
2374        LOG(EXTENSION_LOG_WARNING,
2375            "Failed to lookup TAP connection.. Disconnecting\n");
2376        return TAP_DISCONNECT;
2377    }
2378
2379    connection->setPaused(false);
2380
2381    bool retry = false;
2382    uint16_t ret;
2383
2384    connection->setLastWalkTime();
2385    do {
2386        ret = doWalkTapQueue(cookie, itm, es, nes, ttl, flags,
2387                             seqno, vbucket, connection, retry);
2388    } while (retry);
2389
2390    if (ret != TAP_PAUSE && ret != TAP_DISCONNECT) {
2391        connection->lastMsgTime = ep_current_time();
2392        if (ret == TAP_NOOP) {
2393            *seqno = 0;
2394        } else {
2395            ++stats.numTapFetched;
2396            *seqno = connection->getSeqno();
2397            if (connection->requestAck(ret, *vbucket)) {
2398                *flags = TAP_FLAG_ACK;
2399                connection->seqnoAckRequested = *seqno;
2400            }
2401
2402            if (ret == TAP_MUTATION) {
2403                if (connection->haveFlagByteorderSupport()) {
2404                    *flags |= TAP_FLAG_NETWORK_BYTE_ORDER;
2405                }
2406            }
2407        }
2408    } else {
2409        connection->setPaused(true);
2410        connection->setNotifySent(false);
2411    }
2412
2413    return ret;
2414}
2415
2416bool EventuallyPersistentEngine::createTapQueue(const void *cookie,
2417                                                std::string &client,
2418                                                uint32_t flags,
2419                                                const void *userdata,
2420                                                size_t nuserdata) {
2421    if (reserveCookie(cookie) != ENGINE_SUCCESS) {
2422        return false;
2423    }
2424
2425    std::string tapName = "eq_tapq:";
2426    if (client.length() == 0) {
2427        tapName.assign(ConnHandler::getAnonName());
2428    } else {
2429        tapName.append(client);
2430    }
2431
2432    // Decoding the userdata section of the packet and update the filters
2433    const char *ptr = static_cast<const char*>(userdata);
2434    uint64_t backfillAge = 0;
2435    std::vector<uint16_t> vbuckets;
2436    std::map<uint16_t, uint64_t> lastCheckpointIds;
2437
2438    if (flags & TAP_CONNECT_FLAG_BACKFILL) { /* */
2439        if (nuserdata < sizeof(backfillAge)) {
2440            LOG(EXTENSION_LOG_WARNING,
2441                "Backfill age is missing. Reject connection request from %s\n",
2442                tapName.c_str());
2443            return false;
2444        }
2445        // use memcpy to avoid alignemt issues
2446        memcpy(&backfillAge, ptr, sizeof(backfillAge));
2447        backfillAge = ntohll(backfillAge);
2448        nuserdata -= sizeof(backfillAge);
2449        ptr += sizeof(backfillAge);
2450    }
2451
2452    if (flags & TAP_CONNECT_FLAG_LIST_VBUCKETS) {
2453        uint16_t nvbuckets;
2454        if (nuserdata < sizeof(nvbuckets)) {
2455            LOG(EXTENSION_LOG_WARNING,
2456            "Number of vbuckets is missing. Reject connection request from %s"
2457            "\n", tapName.c_str());
2458            return false;
2459        }
2460        memcpy(&nvbuckets, ptr, sizeof(nvbuckets));
2461        nuserdata -= sizeof(nvbuckets);
2462        ptr += sizeof(nvbuckets);
2463        nvbuckets = ntohs(nvbuckets);
2464        if (nvbuckets > 0) {
2465            if (nuserdata < (sizeof(uint16_t) * nvbuckets)) {
2466                LOG(EXTENSION_LOG_WARNING,
2467                "# of vbuckets not matched. Reject connection request from %s"
2468                "\n", tapName.c_str());
2469                return false;
2470            }
2471            for (uint16_t ii = 0; ii < nvbuckets; ++ii) {
2472                uint16_t val;
2473                memcpy(&val, ptr, sizeof(nvbuckets));
2474                ptr += sizeof(uint16_t);
2475                vbuckets.push_back(ntohs(val));
2476            }
2477            nuserdata -= (sizeof(uint16_t) * nvbuckets);
2478        }
2479    }
2480
2481    if (flags & TAP_CONNECT_CHECKPOINT) {
2482        uint16_t nCheckpoints = 0;
2483        if (nuserdata >= sizeof(nCheckpoints)) {
2484            memcpy(&nCheckpoints, ptr, sizeof(nCheckpoints));
2485            nuserdata -= sizeof(nCheckpoints);
2486            ptr += sizeof(nCheckpoints);
2487            nCheckpoints = ntohs(nCheckpoints);
2488        }
2489        if (nCheckpoints > 0) {
2490            if (nuserdata <
2491                ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints)) {
2492                LOG(EXTENSION_LOG_WARNING, "# of checkpoint Ids not matched. "
2493                    "Reject connection request from %s\n", tapName.c_str());
2494                return false;
2495            }
2496            for (uint16_t j = 0; j < nCheckpoints; ++j) {
2497                uint16_t vbid;
2498                uint64_t checkpointId;
2499                memcpy(&vbid, ptr, sizeof(vbid));
2500                ptr += sizeof(uint16_t);
2501                memcpy(&checkpointId, ptr, sizeof(checkpointId));
2502                ptr += sizeof(uint64_t);
2503                lastCheckpointIds[ntohs(vbid)] = ntohll(checkpointId);
2504            }
2505            nuserdata -=
2506                        ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints);
2507        }
2508    }
2509
2510    TapProducer *tp = tapConnMap->newProducer(cookie, tapName, flags,
2511                                 backfillAge,
2512                                 static_cast<int>(
2513                                 configuration.getTapKeepalive()),
2514                                 vbuckets,
2515                                 lastCheckpointIds);
2516
2517    tapConnMap->notifyPausedConnection(tp, true);
2518    return true;
2519}
2520
2521ENGINE_ERROR_CODE EventuallyPersistentEngine::tapNotify(const void *cookie,
2522                                                        void *engine_specific,
2523                                                        uint16_t nengine,
2524                                                        uint8_t ttl,
2525                                                        uint16_t tap_flags,
2526                                                        uint16_t tap_event,
2527                                                        uint32_t tap_seqno,
2528                                                        const void *key,
2529                                                        size_t nkey,
2530                                                        uint32_t flags,
2531                                                        uint32_t exptime,
2532                                                        uint64_t cas,
2533                                                        uint8_t datatype,
2534                                                        const void *data,
2535                                                        size_t ndata,
2536                                                        uint16_t vbucket)
2537{
2538    (void) ttl;
2539    void *specific = getEngineSpecific(cookie);
2540    ConnHandler *connection = NULL;
2541    if (specific == NULL) {
2542        if (tap_event == TAP_ACK) {
2543            LOG(EXTENSION_LOG_WARNING, "Tap producer with cookie %s does not "
2544                "exist. Force disconnect...\n", (char *) cookie);
2545            // tap producer is no longer connected..
2546            return ENGINE_DISCONNECT;
2547        } else {
2548            // Create a new tap consumer...
2549            connection = tapConnMap->newConsumer(cookie);
2550            if (connection == NULL) {
2551                LOG(EXTENSION_LOG_WARNING, "Failed to create new tap consumer."
2552                    " Force disconnect\n");
2553                return ENGINE_DISCONNECT;
2554            }
2555            storeEngineSpecific(cookie, connection);
2556        }
2557    } else {
2558        connection = reinterpret_cast<ConnHandler *>(specific);
2559    }
2560
2561    std::string k(static_cast<const char*>(key), nkey);
2562    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2563
2564    if (tap_event == TAP_MUTATION || tap_event == TAP_DELETION) {
2565        if (!tapThrottle->shouldProcess()) {
2566            ++stats.tapThrottled;
2567            if (connection->supportsAck()) {
2568                ret = ENGINE_TMPFAIL;
2569            } else {
2570                ret = ENGINE_DISCONNECT;
2571                LOG(EXTENSION_LOG_WARNING, "%s Can't throttle streams without "
2572                    "ack support. Force disconnect...\n",
2573                    connection->logHeader());
2574            }
2575            return ret;
2576        }
2577    }
2578
2579    switch (tap_event) {
2580    case TAP_ACK:
2581        ret = processTapAck(cookie, tap_seqno, tap_flags, k);
2582        break;
2583    case TAP_FLUSH:
2584        ret = flush(cookie, 0);
2585        LOG(EXTENSION_LOG_WARNING, "%s Received flush.\n",
2586            connection->logHeader());
2587        break;
2588    case TAP_DELETION:
2589        {
2590            uint64_t revSeqno;
2591            TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2592                                                nengine, &revSeqno);
2593
2594            ret = connection->deletion(0, key, nkey, cas, vbucket, 0, revSeqno,
2595                                       NULL, 0);
2596        }
2597        break;
2598
2599    case TAP_CHECKPOINT_START:
2600    case TAP_CHECKPOINT_END:
2601        {
2602            TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2603            if (tc) {
2604                if (tap_event == TAP_CHECKPOINT_START &&
2605                    nengine == TapEngineSpecific::sizeRevSeqno) {
2606                    // Set the current value for the max deleted seqno
2607                    RCPtr<VBucket> vb = getVBucket(vbucket);
2608                    if (!vb) {
2609                        return ENGINE_TMPFAIL;
2610                    }
2611                    uint64_t seqnum;
2612                    TapEngineSpecific::readSpecificData(tap_event,
2613                                                        engine_specific,
2614                                                        nengine,
2615                                                        &seqnum);
2616                    vb->ht.setMaxDeletedRevSeqno(seqnum);
2617                }
2618
2619                if (data) {
2620                    uint64_t checkpointId;
2621                    memcpy(&checkpointId, data, sizeof(checkpointId));
2622                    checkpointId = ntohll(checkpointId);
2623                    ConnHandlerCheckPoint(tc, tap_event, vbucket,
2624                                          checkpointId);
2625                }
2626                else {
2627                    ret = ENGINE_DISCONNECT;
2628                    LOG(EXTENSION_LOG_WARNING,
2629                        "%s Checkpoint Id is missing in "
2630                        "CHECKPOINT messages. Force disconnect...\n",
2631                        connection->logHeader());
2632                }
2633            }
2634            else {
2635                ret = ENGINE_DISCONNECT;
2636                LOG(EXTENSION_LOG_WARNING,
2637                    "%s not a consumer! Force disconnect\n",
2638                    connection->logHeader());
2639            }
2640        }
2641
2642        break;
2643
2644    case TAP_MUTATION:
2645        {
2646            uint8_t nru = INITIAL_NRU_VALUE;
2647            uint64_t revSeqno = 0;
2648            TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2649                                                nengine, &revSeqno, &nru);
2650
2651            if (!isDatatypeSupported(cookie)) {
2652                datatype = PROTOCOL_BINARY_RAW_BYTES;
2653                const unsigned char *dat = (const unsigned char*)data;
2654                const int datlen = ndata;
2655                if (checkUTF8JSON(dat, datlen)) {
2656                    datatype = PROTOCOL_BINARY_DATATYPE_JSON;
2657                }
2658            }
2659            ret = connection->mutation(0, key, nkey, data, ndata, cas, vbucket,
2660                                       flags, datatype, 0, 0, revSeqno, exptime,
2661                                       nru, NULL, 0);
2662        }
2663
2664        break;
2665
2666    case TAP_OPAQUE:
2667        if (nengine == sizeof(uint32_t)) {
2668            uint32_t cc;
2669            memcpy(&cc, engine_specific, sizeof(cc));
2670            cc = ntohl(cc);
2671
2672            switch (cc) {
2673            case TAP_OPAQUE_ENABLE_AUTO_NACK:
2674                // @todo: the memcached core will _ALWAYS_ send nack
2675                //        if it encounter an error. This should be
2676                // set as the default when we move to .next after 2.0
2677                // (currently we need to allow the message for
2678                // backwards compatibility)
2679                LOG(EXTENSION_LOG_INFO, "%s Enable auto nack mode\n",
2680                    connection->logHeader());
2681                break;
2682            case TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC:
2683                connection->setSupportCheckpointSync(true);
2684                LOG(EXTENSION_LOG_INFO,
2685                    "%s Enable checkpoint synchronization\n",
2686                    connection->logHeader());
2687                break;
2688            case TAP_OPAQUE_OPEN_CHECKPOINT:
2689                /**
2690                 * This event is only received by the TAP client that wants to
2691                 * get mutations from closed checkpoints only. At this time,
2692                 * only incremental backup client receives this event so that
2693                 * it can close the connection and reconnect later.
2694                 */
2695                LOG(EXTENSION_LOG_INFO, "%s Beginning of checkpoint.\n",
2696                    connection->logHeader());
2697                break;
2698            case TAP_OPAQUE_INITIAL_VBUCKET_STREAM:
2699                {
2700                    LOG(EXTENSION_LOG_INFO,
2701                        "%s Backfill started for vbucket %d.\n",
2702                        connection->logHeader(), vbucket);
2703                    BlockTimer timer(&stats.tapVbucketResetHisto);
2704                    ret = resetVBucket(vbucket) ? ENGINE_SUCCESS :
2705                                                  ENGINE_DISCONNECT;
2706                    if (ret == ENGINE_DISCONNECT) {
2707                        LOG(EXTENSION_LOG_WARNING,
2708                         "%s Failed to reset a vbucket %d. Force disconnect\n",
2709                            connection->logHeader(), vbucket);
2710                    } else {
2711                        LOG(EXTENSION_LOG_WARNING,
2712                         "%s Reset vbucket %d was completed succecssfully.\n",
2713                            connection->logHeader(), vbucket);
2714                    }
2715
2716                    TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2717                    if (tc) {
2718                        tc->setBackfillPhase(true, vbucket);
2719                    } else {
2720                        ret = ENGINE_DISCONNECT;
2721                        LOG(EXTENSION_LOG_WARNING,
2722                            "TAP consumer doesn't exists. Force disconnect\n");
2723                    }
2724                }
2725                break;
2726            case TAP_OPAQUE_CLOSE_BACKFILL:
2727                {
2728                    LOG(EXTENSION_LOG_INFO, "%s Backfill finished.\n",
2729                        connection->logHeader());
2730                    TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2731                    if (tc) {
2732                        tc->setBackfillPhase(false, vbucket);
2733                    } else {
2734                        ret = ENGINE_DISCONNECT;
2735                        LOG(EXTENSION_LOG_WARNING,
2736                            "%s not a consumer! Force disconnect\n",
2737                            connection->logHeader());
2738                    }
2739                }
2740                break;
2741            case TAP_OPAQUE_CLOSE_TAP_STREAM:
2742                /**
2743                 * This event is sent by the eVBucketMigrator to notify that
2744                 * the source node closes the tap replication stream and
2745                 * switches to TAKEOVER_VBUCKETS phase.
2746                 * This is just an informative message and doesn't require any
2747                 * action.
2748                 */
2749                LOG(EXTENSION_LOG_INFO,
2750                "%s Received close tap stream. Switching to takeover phase.\n",
2751                    connection->logHeader());
2752                break;
2753            case TAP_OPAQUE_COMPLETE_VB_FILTER_CHANGE:
2754                /**
2755                 * This opaque message is just for notifying that the source
2756                 * node receives change_vbucket_filter request and processes
2757                 * it successfully.
2758                 */
2759                LOG(EXTENSION_LOG_INFO,
2760                "%s Notified that the source node changed a vbucket filter.\n",
2761                    connection->logHeader());
2762                break;
2763            default:
2764                LOG(EXTENSION_LOG_WARNING,
2765                    "%s Received an unknown opaque command\n",
2766                    connection->logHeader());
2767            }
2768        } else {
2769            LOG(EXTENSION_LOG_WARNING,
2770                "%s Received tap opaque with unknown size %d\n",
2771                connection->logHeader(), nengine);
2772        }
2773        break;
2774
2775    case TAP_VBUCKET_SET:
2776        {
2777            BlockTimer timer(&stats.tapVbucketSetHisto);
2778
2779            if (nengine != sizeof(vbucket_state_t)) {
2780                // illegal datasize
2781                LOG(EXTENSION_LOG_WARNING,
2782                    "%s Received TAP_VBUCKET_SET with illegal size."
2783                    " Force disconnect\n", connection->logHeader());
2784                ret = ENGINE_DISCONNECT;
2785                break;
2786            }
2787
2788            vbucket_state_t state;
2789            memcpy(&state, engine_specific, nengine);
2790            state = (vbucket_state_t)ntohl(state);
2791
2792            ret = connection->setVBucketState(0, vbucket, state);
2793        }
2794        break;
2795
2796    default:
2797        // Unknown command
2798        LOG(EXTENSION_LOG_WARNING,
2799            "%s Recieved bad opcode, ignoring message\n",
2800            connection->logHeader());
2801    }
2802
2803    connection->processedEvent(tap_event, ret);
2804    return ret;
2805}
2806
2807ENGINE_ERROR_CODE EventuallyPersistentEngine::ConnHandlerCheckPoint(
2808                                                      TapConsumer *consumer,
2809                                                      uint8_t event,
2810                                                      uint16_t vbucket,
2811                                                      uint64_t checkpointId) {
2812    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2813
2814    if (consumer->processCheckpointCommand(event, vbucket, checkpointId)) {
2815        getEpStore()->wakeUpFlusher();
2816        ret = ENGINE_SUCCESS;
2817    }
2818    else {
2819        ret = ENGINE_DISCONNECT;
2820        LOG(EXTENSION_LOG_WARNING, "%s Error processing "
2821            "checkpoint %llu. Force disconnect\n",
2822            consumer->logHeader(), checkpointId);
2823    }
2824
2825    return ret;
2826}
2827
2828TapProducer* EventuallyPersistentEngine::getTapProducer(const void *cookie) {
2829    TapProducer *rv =
2830        reinterpret_cast<TapProducer*>(getEngineSpecific(cookie));
2831    if (!(rv && rv->isConnected())) {
2832        LOG(EXTENSION_LOG_WARNING,
2833            "Walking a non-existent tap queue, disconnecting\n");
2834        return NULL;
2835    }
2836
2837    if (rv->doDisconnect()) {
2838        LOG(EXTENSION_LOG_WARNING,
2839            "%s Disconnecting pending connection\n", rv->logHeader());
2840        return NULL;
2841    }
2842    return rv;
2843}
2844
2845ENGINE_ERROR_CODE EventuallyPersistentEngine::processTapAck(const void *cookie,
2846                                                            uint32_t seqno,
2847                                                            uint16_t status,
2848                                                            const std::string
2849                                                            &msg)
2850{
2851    TapProducer *connection = getTapProducer(cookie);
2852    if (!connection) {
2853        LOG(EXTENSION_LOG_WARNING,
2854            "Unable to process tap ack. No producer found\n");
2855        return ENGINE_DISCONNECT;
2856    }
2857
2858    return connection->processAck(seqno, status, msg);
2859}
2860
2861void EventuallyPersistentEngine::queueBackfill(const VBucketFilter
2862                                                             &backfillVBFilter,
2863                                               Producer *tc)
2864{
2865    ExTask backfillTask = new BackfillTask(this, *tapConnMap, tc,
2866                                           backfillVBFilter);
2867    ExecutorPool::get()->schedule(backfillTask, NONIO_TASK_IDX);
2868}
2869
2870bool VBucketCountVisitor::visitBucket(RCPtr<VBucket> &vb) {
2871    ++numVbucket;
2872    item_eviction_policy_t policy = engine.getEpStore()->
2873                                                       getItemEvictionPolicy();
2874    numItems += vb->getNumItems(policy);
2875    numTempItems += vb->getNumTempItems();
2876    nonResident += vb->getNumNonResidentItems(policy);
2877
2878    if (vb->getHighPriorityChkSize() > 0) {
2879        chkPersistRemaining++;
2880    }
2881
2882    fileSpaceUsed += vb->fileSpaceUsed;
2883    fileSize += vb->fileSize;
2884
2885    if (desired_state != vbucket_state_dead) {
2886        htMemory += vb->ht.memorySize();
2887        htItemMemory += vb->ht.getItemMemory();
2888        htCacheSize += vb->ht.cacheSize;
2889        numEjects += vb->ht.getNumEjects();
2890        numExpiredItems += vb->numExpiredItems;
2891        metaDataMemory += vb->ht.metaDataMemory;
2892        metaDataDisk += vb->metaDataDisk;
2893        opsCreate += vb->opsCreate;
2894        opsUpdate += vb->opsUpdate;
2895        opsDelete += vb->opsDelete;
2896        opsReject += vb->opsReject;
2897
2898        queueSize += vb->dirtyQueueSize;
2899        queueMemory += vb->dirtyQueueMem;
2900        queueFill += vb->dirtyQueueFill;
2901        queueDrain += vb->dirtyQueueDrain;
2902        queueAge += vb->getQueueAge();
2903        pendingWrites += vb->dirtyQueuePendingWrites;
2904    }
2905
2906    return false;
2907}
2908
2909/**
2910 * A container class holding VBucketCountVisitors to aggregate stats for
2911 * different vbucket states.
2912 */
2913class VBucketCountAggregator : public VBucketVisitor  {
2914public:
2915    bool visitBucket(RCPtr<VBucket> &vb)  {
2916        std::map<vbucket_state_t, VBucketCountVisitor*>::iterator it;
2917        it = visitorMap.find(vb->getState());
2918        if ( it != visitorMap.end() ) {
2919            it->second->visitBucket(vb);
2920        }
2921
2922        return false;
2923    }
2924
2925    void addVisitor(VBucketCountVisitor* visitor)  {
2926        visitorMap[visitor->getVBucketState()] = visitor;
2927    }
2928private:
2929    std::map<vbucket_state_t, VBucketCountVisitor*> visitorMap;
2930};
2931
2932ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
2933                                                           ADD_STAT add_stat) {
2934    VBucketCountAggregator aggregator;
2935
2936    VBucketCountVisitor activeCountVisitor(*this, vbucket_state_active);
2937    aggregator.addVisitor(&activeCountVisitor);
2938
2939    VBucketCountVisitor replicaCountVisitor(*this, vbucket_state_replica);
2940    aggregator.addVisitor(&replicaCountVisitor);
2941
2942    VBucketCountVisitor pendingCountVisitor(*this, vbucket_state_pending);
2943    aggregator.addVisitor(&pendingCountVisitor);
2944
2945    VBucketCountVisitor deadCountVisitor(*this, vbucket_state_dead);
2946    aggregator.addVisitor(&deadCountVisitor);
2947
2948    epstore->visit(aggregator);
2949
2950    epstore->updateCachedResidentRatio(activeCountVisitor.getMemResidentPer(),
2951                                      replicaCountVisitor.getMemResidentPer());
2952    tapThrottle->adjustWriteQueueCap(activeCountVisitor.getNumItems() +
2953                                     replicaCountVisitor.getNumItems() +
2954                                     pendingCountVisitor.getNumItems());
2955
2956    configuration.addStats(add_stat, cookie);
2957
2958    EPStats &epstats = getEpStats();
2959    add_casted_stat("ep_version", VERSION, add_stat, cookie);
2960    add_casted_stat("ep_storage_age",
2961                    epstats.dirtyAge, add_stat, cookie);
2962    add_casted_stat("ep_storage_age_highwat",
2963                    epstats.dirtyAgeHighWat, add_stat, cookie);
2964    add_casted_stat("ep_num_workers", ExecutorPool::get()->getNumWorkersStat(),
2965                    add_stat, cookie);
2966
2967    if (getWorkloadPriority() == HIGH_BUCKET_PRIORITY) {
2968        add_casted_stat("ep_bucket_priority", "HIGH", add_stat, cookie);
2969    } else if (getWorkloadPriority() == LOW_BUCKET_PRIORITY) {
2970        add_casted_stat("ep_bucket_priority", "LOW", add_stat, cookie);
2971    }
2972
2973    add_casted_stat("ep_total_enqueued",
2974                    epstats.totalEnqueued, add_stat, cookie);
2975    add_casted_stat("ep_total_persisted",
2976                    epstats.totalPersisted, add_stat, cookie);
2977    add_casted_stat("ep_item_flush_failed",
2978                    epstats.flushFailed, add_stat, cookie);
2979    add_casted_stat("ep_item_commit_failed",
2980                    epstats.commitFailed, add_stat, cookie);
2981    add_casted_stat("ep_item_begin_failed",
2982                    epstats.beginFailed, add_stat, cookie);
2983    add_casted_stat("ep_expired_access", epstats.expired_access,
2984                    add_stat, cookie);
2985    add_casted_stat("ep_expired_pager", epstats.expired_pager,
2986                    add_stat, cookie);
2987    add_casted_stat("ep_item_flush_expired",
2988                    epstats.flushExpired, add_stat, cookie);
2989    add_casted_stat("ep_queue_size",
2990                    epstats.diskQueueSize, add_stat, cookie);
2991    add_casted_stat("ep_flusher_todo",
2992                    epstats.flusher_todo, add_stat, cookie);
2993    add_casted_stat("ep_uncommitted_items",
2994                    epstats.flusher_todo, add_stat, cookie);
2995    add_casted_stat("ep_diskqueue_items",
2996                    epstats.diskQueueSize, add_stat, cookie);
2997    add_casted_stat("ep_flusher_state",
2998                    epstore->getFlusher(0)->stateName(),
2999                    add_stat, cookie);
3000    add_casted_stat("ep_commit_num", epstats.flusherCommits,
3001                    add_stat, cookie);
3002    add_casted_stat("ep_commit_time",
3003                    epstats.commit_time, add_stat, cookie);
3004    add_casted_stat("ep_commit_time_total",
3005                    epstats.cumulativeCommitTime, add_stat, cookie);
3006    add_casted_stat("ep_vbucket_del",
3007                    epstats.vbucketDeletions, add_stat, cookie);
3008    add_casted_stat("ep_vbucket_del_fail",
3009                    epstats.vbucketDeletionFail, add_stat, cookie);
3010    add_casted_stat("ep_flush_duration_total",
3011                    epstats.cumulativeFlushTime, add_stat, cookie);
3012    add_casted_stat("ep_flush_all",
3013                    epstore->isFlushAllScheduled() ? "true" : "false",
3014                    add_stat, cookie);
3015    add_casted_stat("curr_items", activeCountVisitor.getNumItems(), add_stat,
3016                    cookie);
3017    add_casted_stat("curr_temp_items", activeCountVisitor.getNumTempItems(),
3018                    add_stat, cookie);
3019    add_casted_stat("curr_items_tot",
3020                    activeCountVisitor.getNumItems() +
3021                    replicaCountVisitor.getNumItems() +
3022                    pendingCountVisitor.getNumItems(),
3023                    add_stat, cookie);
3024    add_casted_stat("vb_active_num", activeCountVisitor.getVBucketNumber(),
3025                    add_stat, cookie);
3026    add_casted_stat("vb_active_curr_items", activeCountVisitor.getNumItems(),
3027                    add_stat, cookie);
3028    add_casted_stat("vb_active_num_non_resident",
3029                    activeCountVisitor.getNonResident(),
3030                    add_stat, cookie);
3031    add_casted_stat("vb_active_perc_mem_resident",
3032                    activeCountVisitor.getMemResidentPer(),
3033                    add_stat, cookie);
3034    add_casted_stat("vb_active_eject", activeCountVisitor.getEjects(),
3035                    add_stat, cookie);
3036    add_casted_stat("vb_active_expired", activeCountVisitor.getExpired(),
3037                    add_stat, cookie);
3038    add_casted_stat("vb_active_meta_data_memory",
3039                    activeCountVisitor.getMetaDataMemory(),
3040                    add_stat, cookie);
3041    add_casted_stat("vb_active_meta_data_disk",
3042                    activeCountVisitor.getMetaDataDisk(),
3043                    add_stat, cookie);
3044    add_casted_stat("vb_active_ht_memory",
3045                    activeCountVisitor.getHashtableMemory(),
3046                    add_stat, cookie);
3047    add_casted_stat("vb_active_itm_memory", activeCountVisitor.getItemMemory(),
3048                    add_stat, cookie);
3049    add_casted_stat("vb_active_ops_create", activeCountVisitor.getOpsCreate(),
3050                    add_stat, cookie);
3051    add_casted_stat("vb_active_ops_update", activeCountVisitor.getOpsUpdate(),
3052                    add_stat, cookie);
3053    add_casted_stat("vb_active_ops_delete", activeCountVisitor.getOpsDelete(),
3054                    add_stat, cookie);
3055    add_casted_stat("vb_active_ops_reject", activeCountVisitor.getOpsReject(),
3056                    add_stat, cookie);
3057    add_casted_stat("vb_active_queue_size", activeCountVisitor.getQueueSize(),
3058                    add_stat, cookie);
3059    add_casted_stat("vb_active_queue_memory",
3060                    activeCountVisitor.getQueueMemory(), add_stat, cookie);
3061    add_casted_stat("vb_active_queue_age", activeCountVisitor.getAge(),
3062                    add_stat, cookie);
3063    add_casted_stat("vb_active_queue_pending",
3064                    activeCountVisitor.getPendingWrites(), add_stat, cookie);
3065    add_casted_stat("vb_active_queue_fill", activeCountVisitor.getQueueFill(),
3066                    add_stat, cookie);
3067    add_casted_stat("vb_active_queue_drain",
3068                    activeCountVisitor.getQueueDrain(), add_stat, cookie);
3069
3070    add_casted_stat("vb_replica_num", replicaCountVisitor.getVBucketNumber(),
3071                    add_stat, cookie);
3072    add_casted_stat("vb_replica_curr_items", replicaCountVisitor.getNumItems(),
3073                    add_stat, cookie);
3074    add_casted_stat("vb_replica_num_non_resident",
3075                    replicaCountVisitor.getNonResident(), add_stat, cookie);
3076    add_casted_stat("vb_replica_perc_mem_resident",
3077                    replicaCountVisitor.getMemResidentPer(),
3078                    add_stat, cookie);
3079    add_casted_stat("vb_replica_eject", replicaCountVisitor.getEjects(),
3080                    add_stat, cookie);
3081    add_casted_stat("vb_replica_expired", replicaCountVisitor.getExpired(),
3082                    add_stat, cookie);
3083    add_casted_stat("vb_replica_meta_data_memory",
3084                    replicaCountVisitor.getMetaDataMemory(), add_stat, cookie);
3085    add_casted_stat("vb_replica_meta_data_disk",
3086                    replicaCountVisitor.getMetaDataDisk(), add_stat, cookie);
3087    add_casted_stat("vb_replica_ht_memory",
3088                    replicaCountVisitor.getHashtableMemory(),
3089                    add_stat, cookie);
3090    add_casted_stat("vb_replica_itm_memory",
3091                    replicaCountVisitor.getItemMemory(), add_stat, cookie);
3092    add_casted_stat("vb_replica_ops_create",
3093                    replicaCountVisitor.getOpsCreate(), add_stat, cookie);
3094    add_casted_stat("vb_replica_ops_update",
3095                    replicaCountVisitor.getOpsUpdate(), add_stat, cookie);
3096    add_casted_stat("vb_replica_ops_delete",
3097                    replicaCountVisitor.getOpsDelete(), add_stat, cookie);
3098    add_casted_stat("vb_replica_ops_reject",
3099                    replicaCountVisitor.getOpsReject(), add_stat, cookie);
3100    add_casted_stat("vb_replica_queue_size",
3101                    replicaCountVisitor.getQueueSize(), add_stat, cookie);
3102    add_casted_stat("vb_replica_queue_memory",
3103                    replicaCountVisitor.getQueueMemory(),
3104                    add_stat, cookie);
3105    add_casted_stat("vb_replica_queue_age",
3106                    replicaCountVisitor.getAge(), add_stat, cookie);
3107    add_casted_stat("vb_replica_queue_pending",
3108                    replicaCountVisitor.getPendingWrites(),
3109                    add_stat, cookie);
3110    add_casted_stat("vb_replica_queue_fill",
3111                    replicaCountVisitor.getQueueFill(), add_stat, cookie);
3112    add_casted_stat("vb_replica_queue_drain",
3113                    replicaCountVisitor.getQueueDrain(), add_stat, cookie);
3114
3115    add_casted_stat("vb_pending_num",
3116                    pendingCountVisitor.getVBucketNumber(), add_stat, cookie);
3117    add_casted_stat("vb_pending_curr_items",
3118                    pendingCountVisitor.getNumItems(), add_stat, cookie);
3119    add_casted_stat("vb_pending_num_non_resident",
3120                    pendingCountVisitor.getNonResident(),
3121                    add_stat, cookie);
3122    add_casted_stat("vb_pending_perc_mem_resident",
3123                    pendingCountVisitor.getMemResidentPer(), add_stat, cookie);
3124    add_casted_stat("vb_pending_eject", pendingCountVisitor.getEjects(),
3125                    add_stat, cookie);
3126    add_casted_stat("vb_pending_expired", pendingCountVisitor.getExpired(),
3127                    add_stat, cookie);
3128    add_casted_stat("vb_pending_meta_data_memory",
3129                    pendingCountVisitor.getMetaDataMemory(),
3130                    add_stat, cookie);
3131    add_casted_stat("vb_pending_meta_data_disk",
3132                    pendingCountVisitor.getMetaDataDisk(),
3133                    add_stat, cookie);
3134    add_casted_stat("vb_pending_ht_memory",
3135                    pendingCountVisitor.getHashtableMemory(),
3136                    add_stat, cookie);
3137    add_casted_stat("vb_pending_itm_memory",
3138                    pendingCountVisitor.getItemMemory(), add_stat, cookie);
3139    add_casted_stat("vb_pending_ops_create",
3140                    pendingCountVisitor.getOpsCreate(), add_stat, cookie);
3141    add_casted_stat("vb_pending_ops_update",
3142                    pendingCountVisitor.getOpsUpdate(), add_stat, cookie);
3143    add_casted_stat("vb_pending_ops_delete",
3144                    pendingCountVisitor.getOpsDelete(), add_stat, cookie);
3145    add_casted_stat("vb_pending_ops_reject",
3146                    pendingCountVisitor.getOpsReject(), add_stat, cookie);
3147    add_casted_stat("vb_pending_queue_size",
3148                    pendingCountVisitor.getQueueSize(), add_stat, cookie);
3149    add_casted_stat("vb_pending_queue_memory",
3150                    pendingCountVisitor.getQueueMemory(),
3151                    add_stat, cookie);
3152    add_casted_stat("vb_pending_queue_age", pendingCountVisitor.getAge(),
3153                    add_stat, cookie);
3154    add_casted_stat("vb_pending_queue_pending",
3155                    pendingCountVisitor.getPendingWrites(),
3156                    add_stat, cookie);
3157    add_casted_stat("vb_pending_queue_fill",
3158                    pendingCountVisitor.getQueueFill(), add_stat, cookie);
3159    add_casted_stat("vb_pending_queue_drain",
3160                    pendingCountVisitor.getQueueDrain(), add_stat, cookie);
3161
3162    add_casted_stat("vb_dead_num", deadCountVisitor.getVBucketNumber(),
3163                    add_stat, cookie);
3164
3165    add_casted_stat("ep_db_data_size",
3166                    activeCountVisitor.getFileSpaceUsed() +
3167                    replicaCountVisitor.getFileSpaceUsed() +
3168                    pendingCountVisitor.getFileSpaceUsed() +
3169                    deadCountVisitor.getFileSpaceUsed(),
3170                    add_stat, cookie);
3171    add_casted_stat("ep_db_file_size",
3172                    activeCountVisitor.getFileSize() +
3173                    replicaCountVisitor.getFileSize() +
3174                    pendingCountVisitor.getFileSize() +
3175                    deadCountVisitor.getFileSize(),
3176                    add_stat, cookie);
3177
3178    add_casted_stat("ep_vb_snapshot_total",
3179                    epstats.snapshotVbucketHisto.total(), add_stat, cookie);
3180
3181    add_casted_stat("ep_vb_total",
3182                    activeCountVisitor.getVBucketNumber() +
3183                    replicaCountVisitor.getVBucketNumber() +
3184                    pendingCountVisitor.getVBucketNumber() +
3185                    deadCountVisitor.getVBucketNumber(),
3186                    add_stat, cookie);
3187
3188    add_casted_stat("ep_total_new_items",
3189                    activeCountVisitor.getOpsCreate() +
3190                    replicaCountVisitor.getOpsCreate() +
3191                    pendingCountVisitor.getOpsCreate(),
3192                    add_stat, cookie);
3193    add_casted_stat("ep_total_del_items",
3194                    activeCountVisitor.getOpsDelete() +
3195                    replicaCountVisitor.getOpsDelete() +
3196                    pendingCountVisitor.getOpsDelete(),
3197                    add_stat, cookie);
3198    add_casted_stat("ep_diskqueue_memory",
3199                    activeCountVisitor.getQueueMemory() +
3200                    replicaCountVisitor.getQueueMemory() +
3201                    pendingCountVisitor.getQueueMemory(),
3202                    add_stat, cookie);
3203    add_casted_stat("ep_diskqueue_fill",
3204                    activeCountVisitor.getQueueFill() +
3205                    replicaCountVisitor.getQueueFill() +
3206                    pendingCountVisitor.getQueueFill(),
3207                    add_stat, cookie);
3208    add_casted_stat("ep_diskqueue_drain",
3209                    activeCountVisitor.getQueueDrain() +
3210                    replicaCountVisitor.getQueueDrain() +
3211                    pendingCountVisitor.getQueueDrain(),
3212                    add_stat, cookie);
3213    add_casted_stat("ep_diskqueue_pending",
3214                    activeCountVisitor.getPendingWrites() +
3215                    replicaCountVisitor.getPendingWrites() +
3216                    pendingCountVisitor.getPendingWrites(),
3217                    add_stat, cookie);
3218    add_casted_stat("ep_meta_data_memory",
3219                    activeCountVisitor.getMetaDataMemory() +
3220                    replicaCountVisitor.getMetaDataMemory() +
3221                    pendingCountVisitor.getMetaDataMemory(),
3222                    add_stat, cookie);
3223    add_casted_stat("ep_meta_data_disk",
3224                    activeCountVisitor.getMetaDataDisk() +
3225                    replicaCountVisitor.getMetaDataDisk() +
3226                    pendingCountVisitor.getMetaDataDisk(),
3227                    add_stat, cookie);
3228
3229    size_t memUsed =  stats.getTotalMemoryUsed();
3230    add_casted_stat("mem_used", memUsed, add_stat, cookie);
3231    add_casted_stat("bytes", memUsed, add_stat, cookie);
3232    add_casted_stat("ep_kv_size", stats.currentSize, add_stat, cookie);
3233    add_casted_stat("ep_blob_num", stats.numBlob, add_stat, cookie);
3234#if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3235    add_casted_stat("ep_blob_overhead", stats.blobOverhead, add_stat, cookie);
3236#else
3237    add_casted_stat("ep_blob_overhead", "unknown", add_stat, cookie);
3238#endif
3239    add_casted_stat("ep_value_size", stats.totalValueSize, add_stat, cookie);
3240    add_casted_stat("ep_storedval_size", stats.totalStoredValSize,
3241                    add_stat, cookie);
3242#if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3243    add_casted_stat("ep_storedval_overhead", stats.blobOverhead, add_stat, cookie);
3244#else
3245    add_casted_stat("ep_storedval_overhead", "unknown", add_stat, cookie);
3246#endif
3247    add_casted_stat("ep_storedval_num", stats.numStoredVal, add_stat, cookie);
3248    add_casted_stat("ep_overhead", stats.memOverhead, add_stat, cookie);
3249    add_casted_stat("ep_item_num", stats.numItem, add_stat, cookie);
3250    add_casted_stat("ep_total_cache_size",
3251                    activeCountVisitor.getCacheSize() +
3252                    replicaCountVisitor.getCacheSize() +
3253                    pendingCountVisitor.getCacheSize(),
3254                    add_stat, cookie);
3255    add_casted_stat("ep_oom_errors", stats.oom_errors, add_stat, cookie);
3256    add_casted_stat("ep_tmp_oom_errors", stats.tmp_oom_errors,
3257                    add_stat, cookie);
3258    add_casted_stat("ep_mem_tracker_enabled",
3259                    stats.memoryTrackerEnabled ? "true" : "false",
3260                    add_stat, cookie);
3261    add_casted_stat("ep_bg_fetched", epstats.bg_fetched,
3262