xref: /3.0.3-GA/ep-engine/src/ep_engine.cc (revision f991533e)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <fcntl.h>
21#include <memcached/engine.h>
22#include <memcached/protocol_binary.h>
23#include <memcached/util.h>
24#include <platform/platform.h>
25#include <stdarg.h>
26
27#include <cstdio>
28#include <cstring>
29#include <fstream>
30#include <iostream>
31#include <limits>
32#include <string>
33#include <vector>
34
35#include "backfill.h"
36#include "ep_engine.h"
37#include "failover-table.h"
38#include "flusher.h"
39#include "tapconnmap.h"
40#include "htresizer.h"
41#include "memory_tracker.h"
42#include "stats-info.h"
43#define STATWRITER_NAMESPACE core_engine
44#include "statwriter.h"
45#undef STATWRITER_NAMESPACE
46#include "tapthrottle.h"
47#include "upr-consumer.h"
48#include "upr-producer.h"
49#include "warmup.h"
50
51#include <JSON_checker.h>
52
53static ALLOCATOR_HOOKS_API *hooksApi;
54static SERVER_LOG_API *loggerApi;
55
56static size_t percentOf(size_t val, double percent) {
57    return static_cast<size_t>(static_cast<double>(val) * percent);
58}
59
60/**
61 * Helper function to avoid typing in the long cast all over the place
62 * @param handle pointer to the engine
63 * @return the engine as a class
64 */
65static inline EventuallyPersistentEngine* getHandle(ENGINE_HANDLE* handle)
66{
67    EventuallyPersistentEngine* ret;
68    ret = reinterpret_cast<EventuallyPersistentEngine*>(handle);
69    ObjectRegistry::onSwitchThread(ret);
70    return ret;
71}
72
73static inline void releaseHandle(ENGINE_HANDLE* handle) {
74    (void) handle;
75    ObjectRegistry::onSwitchThread(NULL);
76}
77
78
79/**
80 * Call the response callback and return the appropriate value so that
81 * the core knows what to do..
82 */
83static ENGINE_ERROR_CODE sendResponse(ADD_RESPONSE response, const void *key,
84                                      uint16_t keylen,
85                                      const void *ext, uint8_t extlen,
86                                      const void *body, uint32_t bodylen,
87                                      uint8_t datatype, uint16_t status,
88                                      uint64_t cas, const void *cookie)
89{
90    ENGINE_ERROR_CODE rv = ENGINE_FAILED;
91    EventuallyPersistentEngine *e = ObjectRegistry::onSwitchThread(NULL, true);
92    if (response(key, keylen, ext, extlen, body, bodylen, datatype,
93                 status, cas, cookie)) {
94        rv = ENGINE_SUCCESS;
95    }
96    ObjectRegistry::onSwitchThread(e);
97    return rv;
98}
99
100template <typename T>
101static void validate(T v, T l, T h) {
102    if (v < l || v > h) {
103        throw std::runtime_error("Value out of range.");
104    }
105}
106
107
108static void checkNumeric(const char* str) {
109    int i = 0;
110    if (str[0] == '-') {
111        i++;
112    }
113    for (; str[i]; i++) {
114        using namespace std;
115        if (!isdigit(str[i])) {
116            throw std::runtime_error("Value is not numeric");
117        }
118    }
119}
120
121// The Engine API specifies C linkage for the functions..
122extern "C" {
123
124    static const engine_info* EvpGetInfo(ENGINE_HANDLE* handle)
125    {
126        engine_info* info = getHandle(handle)->getInfo();
127        releaseHandle(handle);
128        return info;
129    }
130
131    static ENGINE_ERROR_CODE EvpInitialize(ENGINE_HANDLE* handle,
132                                           const char* config_str)
133    {
134        ENGINE_ERROR_CODE err_code = getHandle(handle)->initialize(config_str);
135        releaseHandle(handle);
136        return err_code;
137    }
138
139    static void EvpDestroy(ENGINE_HANDLE* handle, const bool force)
140    {
141        getHandle(handle)->destroy(force);
142        delete getHandle(handle);
143        releaseHandle(NULL);
144    }
145
146    static ENGINE_ERROR_CODE EvpItemAllocate(ENGINE_HANDLE* handle,
147                                             const void* cookie,
148                                             item **itm,
149                                             const void* key,
150                                             const size_t nkey,
151                                             const size_t nbytes,
152                                             const int flags,
153                                             const rel_time_t exptime,
154                                             uint8_t datatype)
155    {
156        if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
157            LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
158                    " (ItemAllocate)");
159            return ENGINE_EINVAL;
160        }
161        ENGINE_ERROR_CODE err_code = getHandle(handle)->itemAllocate(cookie,
162                                                                     itm, key,
163                                                                     nkey,
164                                                                     nbytes,
165                                                                     flags,
166                                                                     exptime,
167                                                                     datatype);
168        releaseHandle(handle);
169        return err_code;
170    }
171
172    static ENGINE_ERROR_CODE EvpItemDelete(ENGINE_HANDLE* handle,
173                                           const void* cookie,
174                                           const void* key,
175                                           const size_t nkey,
176                                           uint64_t* cas,
177                                           uint16_t vbucket)
178    {
179        ENGINE_ERROR_CODE err_code = getHandle(handle)->itemDelete(cookie, key,
180                                                                   nkey, cas,
181                                                                   vbucket);
182        releaseHandle(handle);
183        return err_code;
184    }
185
186    static void EvpItemRelease(ENGINE_HANDLE* handle,
187                               const void *cookie,
188                               item* itm)
189    {
190        getHandle(handle)->itemRelease(cookie, itm);
191        releaseHandle(handle);
192    }
193
194    static ENGINE_ERROR_CODE EvpGet(ENGINE_HANDLE* handle,
195                                    const void* cookie,
196                                    item** itm,
197                                    const void* key,
198                                    const int nkey,
199                                    uint16_t vbucket)
200    {
201        ENGINE_ERROR_CODE err_code = getHandle(handle)->get(cookie, itm, key,
202                                                            nkey, vbucket);
203        releaseHandle(handle);
204        return err_code;
205    }
206
207    static ENGINE_ERROR_CODE EvpGetStats(ENGINE_HANDLE* handle,
208                                         const void* cookie,
209                                         const char* stat_key,
210                                         int nkey,
211                                         ADD_STAT add_stat)
212    {
213        ENGINE_ERROR_CODE err_code = getHandle(handle)->getStats(cookie,
214                                                                 stat_key,
215                                                                 nkey,
216                                                                 add_stat);
217        releaseHandle(handle);
218        return err_code;
219    }
220
221    static ENGINE_ERROR_CODE EvpStore(ENGINE_HANDLE* handle,
222                                      const void *cookie,
223                                      item* itm,
224                                      uint64_t *cas,
225                                      ENGINE_STORE_OPERATION operation,
226                                      uint16_t vbucket)
227    {
228        ENGINE_ERROR_CODE err_code = getHandle(handle)->store(cookie, itm, cas,
229                                                              operation,
230                                                              vbucket);
231        releaseHandle(handle);
232        return err_code;
233    }
234
235    static ENGINE_ERROR_CODE EvpArithmetic(ENGINE_HANDLE* handle,
236                                           const void* cookie,
237                                           const void* key,
238                                           const int nkey,
239                                           const bool increment,
240                                           const bool create,
241                                           const uint64_t delta,
242                                           const uint64_t initial,
243                                           const rel_time_t exptime,
244                                           uint64_t *cas,
245                                           uint8_t datatype,
246                                           uint64_t *result,
247                                           uint16_t vbucket)
248    {
249        if (datatype > PROTOCOL_BINARY_DATATYPE_JSON) {
250            if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
251                LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
252                        " (Arithmetic)");
253            } else {
254                LOG(EXTENSION_LOG_WARNING, "Cannnot perform arithmetic "
255                    "operations on compressed data!");
256            }
257            return ENGINE_EINVAL;
258        }
259        ENGINE_ERROR_CODE ecode = getHandle(handle)->arithmetic(cookie, key,
260                                                                nkey,
261                                                                increment,
262                                                                create, delta,
263                                                                initial,
264                                                                exptime, cas,
265                                                                datatype,
266                                                                result,
267                                                                vbucket);
268        releaseHandle(handle);
269        return ecode;
270    }
271
272    static ENGINE_ERROR_CODE EvpFlush(ENGINE_HANDLE* handle,
273                                      const void* cookie, time_t when)
274    {
275        ENGINE_ERROR_CODE err_code = getHandle(handle)->flush(cookie, when);
276        releaseHandle(handle);
277        return err_code;
278    }
279
280    static void EvpResetStats(ENGINE_HANDLE* handle, const void *)
281    {
282        getHandle(handle)->resetStats();
283        releaseHandle(handle);
284    }
285
286    static protocol_binary_response_status stopFlusher(
287                                                 EventuallyPersistentEngine *e,
288                                                 const char **msg,
289                                                 size_t *msg_size) {
290        return e->stopFlusher(msg, msg_size);
291    }
292
293    static protocol_binary_response_status startFlusher(
294                                                 EventuallyPersistentEngine *e,
295                                                 const char **msg,
296                                                 size_t *msg_size) {
297        return e->startFlusher(msg, msg_size);
298    }
299
300    static protocol_binary_response_status setTapParam(
301                                                 EventuallyPersistentEngine *e,
302                                                 const char *keyz,
303                                                 const char *valz,
304                                                 const char **msg, size_t *) {
305        protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
306
307        try {
308            int v = atoi(valz);
309            if (strcmp(keyz, "tap_keepalive") == 0) {
310                checkNumeric(valz);
311                validate(v, 0, MAX_TAP_KEEP_ALIVE);
312                e->setTapKeepAlive(static_cast<uint32_t>(v));
313            } else if (strcmp(keyz, "tap_throttle_threshold") == 0) {
314                checkNumeric(valz);
315                e->getConfiguration().setTapThrottleThreshold(v);
316            } else if (strcmp(keyz, "tap_throttle_queue_cap") == 0) {
317                checkNumeric(valz);
318                e->getConfiguration().setTapThrottleQueueCap(v);
319            } else if (strcmp(keyz, "tap_throttle_cap_pcnt") == 0) {
320                checkNumeric(valz);
321                e->getConfiguration().setTapThrottleCapPcnt(v);
322            } else {
323                *msg = "Unknown config param";
324                rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
325            }
326        } catch(std::runtime_error &) {
327            *msg = "Value out of range.";
328            rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
329        }
330
331        return rv;
332    }
333
334    static protocol_binary_response_status setCheckpointParam(
335                                                 EventuallyPersistentEngine *e,
336                                                              const char *keyz,
337                                                              const char *valz,
338                                                              const char **msg,
339                                                              size_t *) {
340        protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
341
342        try {
343            int v = atoi(valz);
344            if (strcmp(keyz, "chk_max_items") == 0) {
345                checkNumeric(valz);
346                validate(v, MIN_CHECKPOINT_ITEMS, MAX_CHECKPOINT_ITEMS);
347                e->getConfiguration().setChkMaxItems(v);
348            } else if (strcmp(keyz, "chk_period") == 0) {
349                checkNumeric(valz);
350                validate(v, MIN_CHECKPOINT_PERIOD, MAX_CHECKPOINT_PERIOD);
351                e->getConfiguration().setChkPeriod(v);
352            } else if (strcmp(keyz, "max_checkpoints") == 0) {
353                checkNumeric(valz);
354                validate(v, DEFAULT_MAX_CHECKPOINTS,
355                         MAX_CHECKPOINTS_UPPER_BOUND);
356                e->getConfiguration().setMaxCheckpoints(v);
357            } else if (strcmp(keyz, "item_num_based_new_chk") == 0) {
358                if (strcmp(valz, "true") == 0) {
359                    e->getConfiguration().setItemNumBasedNewChk(true);
360                } else {
361                    e->getConfiguration().setItemNumBasedNewChk(false);
362                }
363            } else if (strcmp(keyz, "keep_closed_chks") == 0) {
364                if (strcmp(valz, "true") == 0) {
365                    e->getConfiguration().setKeepClosedChks(true);
366                } else {
367                    e->getConfiguration().setKeepClosedChks(false);
368                }
369            } else {
370                *msg = "Unknown config param";
371                rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
372            }
373        } catch(std::runtime_error &) {
374            *msg = "Value out of range.";
375            rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
376        }
377
378        return rv;
379    }
380
381    static protocol_binary_response_status setFlushParam(
382                                                 EventuallyPersistentEngine *e,
383                                                 const char *keyz,
384                                                 const char *valz,
385                                                 const char **msg,
386                                                 size_t *) {
387        protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
388
389        // Handle the actual mutation.
390        try {
391            int v = atoi(valz);
392            if (strcmp(keyz, "bg_fetch_delay") == 0) {
393                checkNumeric(valz);
394                e->getConfiguration().setBgFetchDelay(v);
395            } else if (strcmp(keyz, "flushall_enabled") == 0) {
396                if (strcmp(valz, "true") == 0) {
397                    e->getConfiguration().setFlushallEnabled(true);
398                } else if(strcmp(valz, "false") == 0) {
399                    e->getConfiguration().setFlushallEnabled(false);
400                } else {
401                    throw std::runtime_error("value out of range.");
402                }
403            } else if (strcmp(keyz, "max_size") == 0) {
404                char *ptr = NULL;
405                checkNumeric(valz);
406                uint64_t vsize = strtoull(valz, &ptr, 10);
407                validate(vsize, static_cast<uint64_t>(0),
408                         std::numeric_limits<uint64_t>::max());
409                e->getConfiguration().setMaxSize(vsize);
410                e->getConfiguration().setMemLowWat(percentOf(vsize, 0.75));
411                e->getConfiguration().setMemHighWat(percentOf(vsize, 0.85));
412            } else if (strcmp(keyz, "mem_low_wat") == 0) {
413                char *ptr = NULL;
414                checkNumeric(valz);
415                uint64_t vsize = strtoull(valz, &ptr, 10);
416                validate(vsize, static_cast<uint64_t>(0),
417                         std::numeric_limits<uint64_t>::max());
418                e->getConfiguration().setMemLowWat(vsize);
419            } else if (strcmp(keyz, "mem_high_wat") == 0) {
420                char *ptr = NULL;
421                checkNumeric(valz);
422                uint64_t vsize = strtoull(valz, &ptr, 10);
423                validate(vsize, static_cast<uint64_t>(0),
424                         std::numeric_limits<uint64_t>::max());
425                e->getConfiguration().setMemHighWat(vsize);
426            } else if (strcmp(keyz, "mutation_mem_threshold") == 0) {
427                checkNumeric(valz);
428                validate(v, 0, 100);
429                e->getConfiguration().setMutationMemThreshold(v);
430            } else if (strcmp(keyz, "timing_log") == 0) {
431                EPStats &stats = e->getEpStats();
432                std::ostream *old = stats.timingLog;
433                stats.timingLog = NULL;
434                delete old;
435                if (strcmp(valz, "off") == 0) {
436                    LOG(EXTENSION_LOG_INFO, "Disabled timing log.");
437                } else {
438                    std::ofstream *tmp(new std::ofstream(valz));
439                    if (tmp->good()) {
440                        LOG(EXTENSION_LOG_INFO,
441                            "Logging detailed timings to ``%s''.", valz);
442                        stats.timingLog = tmp;
443                    } else {
444                        LOG(EXTENSION_LOG_WARNING,
445                            "Error setting detailed timing log to ``%s'':  %s",
446                            valz, strerror(errno));
447                        delete tmp;
448                    }
449                }
450            } else if (strcmp(keyz, "exp_pager_stime") == 0) {
451                char *ptr = NULL;
452                checkNumeric(valz);
453                uint64_t vsize = strtoull(valz, &ptr, 10);
454                validate(vsize, static_cast<uint64_t>(0),
455                         std::numeric_limits<uint64_t>::max());
456                e->getConfiguration().setExpPagerStime((size_t)vsize);
457            } else if (strcmp(keyz, "couch_response_timeout") == 0) {
458                checkNumeric(valz);
459                e->getConfiguration().setCouchResponseTimeout(v);
460            } else if (strcmp(keyz, "alog_sleep_time") == 0) {
461                checkNumeric(valz);
462                e->getConfiguration().setAlogSleepTime(v);
463            } else if (strcmp(keyz, "alog_task_time") == 0) {
464                checkNumeric(valz);
465                e->getConfiguration().setAlogTaskTime(v);
466            } else if (strcmp(keyz, "pager_active_vb_pcnt") == 0) {
467                checkNumeric(valz);
468                e->getConfiguration().setPagerActiveVbPcnt(v);
469            } else if (strcmp(keyz, "warmup_min_memory_threshold") == 0) {
470                checkNumeric(valz);
471                validate(v, 0, std::numeric_limits<int>::max());
472                e->getConfiguration().setWarmupMinMemoryThreshold(v);
473            } else if (strcmp(keyz, "warmup_min_items_threshold") == 0) {
474                checkNumeric(valz);
475                validate(v, 0, std::numeric_limits<int>::max());
476                e->getConfiguration().setWarmupMinItemsThreshold(v);
477            } else {
478                *msg = "Unknown config param";
479                rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
480            }
481        } catch(std::runtime_error& ex) {
482            *msg = strdup(ex.what());
483            rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
484        }
485
486        return rv;
487    }
488
489    static protocol_binary_response_status evictKey(
490                                                 EventuallyPersistentEngine *e,
491                                                 protocol_binary_request_header
492                                                                      *request,
493                                                 const char **msg,
494                                                 size_t *msg_size) {
495        protocol_binary_request_no_extras *req =
496            (protocol_binary_request_no_extras*)request;
497
498        char keyz[256];
499
500        // Read the key.
501        int keylen = ntohs(req->message.header.request.keylen);
502        if (keylen >= (int)sizeof(keyz)) {
503            *msg = "Key is too large.";
504            return PROTOCOL_BINARY_RESPONSE_EINVAL;
505        }
506        memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen);
507        keyz[keylen] = 0x00;
508
509        uint16_t vbucket = ntohs(request->request.vbucket);
510
511        std::string key(keyz, keylen);
512
513        LOG(EXTENSION_LOG_DEBUG, "Manually evicting object with key %s\n",
514                keyz);
515
516        protocol_binary_response_status rv = e->evictKey(key, vbucket, msg,
517                                                         msg_size);
518        if (rv == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET ||
519            rv == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) {
520            if (e->isDegradedMode()) {
521                return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
522            }
523        }
524        return rv;
525    }
526
527    static ENGINE_ERROR_CODE getLocked(EventuallyPersistentEngine *e,
528                                       protocol_binary_request_header *req,
529                                       const void *cookie,
530                                       Item **itm,
531                                       const char **msg,
532                                       size_t *,
533                                       protocol_binary_response_status *res) {
534
535        uint8_t extlen = req->request.extlen;
536        if (extlen != 0 && extlen != 4) {
537            *msg = "Invalid packet format (extlen may be 0 or 4)";
538            *res = PROTOCOL_BINARY_RESPONSE_EINVAL;
539            return ENGINE_EINVAL;
540        }
541
542        protocol_binary_request_getl *grequest =
543            (protocol_binary_request_getl*)req;
544        *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
545
546        const char *keyp = reinterpret_cast<const char*>(req->bytes);
547        keyp += sizeof(req->bytes) + extlen;
548        std::string key(keyp, ntohs(req->request.keylen));
549        uint16_t vbucket = ntohs(req->request.vbucket);
550
551        RememberingCallback<GetValue> getCb;
552        uint32_t max_timeout = (unsigned int)e->getGetlMaxTimeout();
553        uint32_t default_timeout = (unsigned int)e->getGetlDefaultTimeout();
554        uint32_t lockTimeout = default_timeout;
555        if (extlen == 4) {
556            lockTimeout = ntohl(grequest->message.body.expiration);
557        }
558
559        if (lockTimeout >  max_timeout || lockTimeout < 1) {
560            LOG(EXTENSION_LOG_DEBUG, "Illegal value for lock timeout specified"
561               " %u. Using default value: %u\n", lockTimeout, default_timeout);
562            lockTimeout = default_timeout;
563        }
564
565        bool gotLock = e->getLocked(key, vbucket, getCb,
566                                    ep_current_time(),
567                                    lockTimeout, cookie);
568
569        getCb.waitForValue();
570        ENGINE_ERROR_CODE rv = getCb.val.getStatus();
571
572        if (rv == ENGINE_SUCCESS) {
573            *itm = getCb.val.getValue();
574
575        } else if (rv == ENGINE_EWOULDBLOCK) {
576
577            // need to wait for value
578            return rv;
579        } else if (!gotLock){
580
581            *msg =  "LOCK_ERROR";
582            *res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
583            return ENGINE_TMPFAIL;
584        } else {
585            if (e->isDegradedMode()) {
586                *msg = "LOCK_TMP_ERROR";
587                *res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
588                return ENGINE_TMPFAIL;
589            }
590
591            RCPtr<VBucket> vb = e->getVBucket(vbucket);
592            if (!vb) {
593                *msg = "That's not my bucket.";
594                *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
595                return ENGINE_NOT_MY_VBUCKET;
596            }
597            *msg = "NOT_FOUND";
598            *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
599            return ENGINE_KEY_ENOENT;
600        }
601
602        return rv;
603    }
604
605    static protocol_binary_response_status unlockKey(
606                                                 EventuallyPersistentEngine *e,
607                                                 protocol_binary_request_header
608                                                                      *request,
609                                                 const char **msg,
610                                                 size_t *)
611    {
612        protocol_binary_request_no_extras *req =
613            (protocol_binary_request_no_extras*)request;
614
615        protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
616        char keyz[256];
617
618        // Read the key.
619        int keylen = ntohs(req->message.header.request.keylen);
620        if (keylen >= (int)sizeof(keyz)) {
621            *msg = "Key is too large.";
622            return PROTOCOL_BINARY_RESPONSE_EINVAL;
623        }
624
625        memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen);
626        keyz[keylen] = 0x00;
627
628        uint16_t vbucket = ntohs(request->request.vbucket);
629        std::string key(keyz, keylen);
630
631        LOG(EXTENSION_LOG_DEBUG, "Executing unl for key %s\n", keyz);
632
633        RememberingCallback<GetValue> getCb;
634        uint64_t cas = ntohll(request->request.cas);
635
636        ENGINE_ERROR_CODE rv = e->unlockKey(key, vbucket, cas,
637                                            ep_current_time());
638
639        if (rv == ENGINE_SUCCESS) {
640            *msg = "UNLOCKED";
641        } else if (rv == ENGINE_TMPFAIL){
642            *msg =  "UNLOCK_ERROR";
643            res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
644        } else {
645            if (e->isDegradedMode()) {
646                *msg = "LOCK_TMP_ERROR";
647                return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
648            }
649
650            RCPtr<VBucket> vb = e->getVBucket(vbucket);
651            if (!vb) {
652                *msg = "That's not my bucket.";
653                res =  PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
654            }
655            *msg = "NOT_FOUND";
656            res =  PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
657        }
658
659        return res;
660    }
661
662    static protocol_binary_response_status setParam(
663                                            EventuallyPersistentEngine *e,
664                                            protocol_binary_request_set_param
665                                                                     *req,
666                                            const char **msg,
667                                            size_t *msg_size) {
668
669        size_t keylen = ntohs(req->message.header.request.keylen);
670        uint8_t extlen = req->message.header.request.extlen;
671        size_t vallen = ntohl(req->message.header.request.bodylen);
672        engine_param_t paramtype =
673            static_cast<engine_param_t>(ntohl(req->message.body.param_type));
674
675        if (keylen == 0 || (vallen - keylen - extlen) == 0) {
676            return PROTOCOL_BINARY_RESPONSE_EINVAL;
677        }
678
679        const char *keyp = reinterpret_cast<const char*>(req->bytes)
680                           + sizeof(req->bytes);
681        const char *valuep = keyp + keylen;
682        vallen -= (keylen + extlen);
683
684        char keyz[32];
685        char valz[512];
686
687        // Read the key.
688        if (keylen >= sizeof(keyz)) {
689            *msg = "Key is too large.";
690            return PROTOCOL_BINARY_RESPONSE_EINVAL;
691        }
692        memcpy(keyz, keyp, keylen);
693        keyz[keylen] = 0x00;
694
695        // Read the value.
696        if (vallen >= sizeof(valz)) {
697            *msg = "Value is too large.";
698            return PROTOCOL_BINARY_RESPONSE_EINVAL;
699        }
700        memcpy(valz, valuep, vallen);
701        valz[vallen] = 0x00;
702
703        protocol_binary_response_status rv;
704
705        switch (paramtype) {
706        case engine_param_flush:
707            rv = setFlushParam(e, keyz, valz, msg, msg_size);
708            break;
709        case engine_param_tap:
710            rv = setTapParam(e, keyz, valz, msg, msg_size);
711            break;
712        case engine_param_checkpoint:
713            rv = setCheckpointParam(e, keyz, valz, msg, msg_size);
714            break;
715        default:
716            rv = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
717        }
718
719        return rv;
720    }
721
722    static ENGINE_ERROR_CODE getVBucket(EventuallyPersistentEngine *e,
723                                       const void *cookie,
724                                       protocol_binary_request_header *request,
725                                       ADD_RESPONSE response) {
726        protocol_binary_request_get_vbucket *req =
727            reinterpret_cast<protocol_binary_request_get_vbucket*>(request);
728        cb_assert(req);
729
730        uint16_t vbucket = ntohs(req->message.header.request.vbucket);
731        RCPtr<VBucket> vb = e->getVBucket(vbucket);
732        if (!vb) {
733            LockHolder lh(e->clusterConfig.lock);
734            return sendResponse(response, NULL, 0, NULL, 0,
735                                e->clusterConfig.config,
736                                e->clusterConfig.len,
737                                PROTOCOL_BINARY_RAW_BYTES,
738                                PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0,
739                                cookie);
740        } else {
741            vbucket_state_t state = (vbucket_state_t)ntohl(vb->getState());
742            return sendResponse(response, NULL, 0, NULL, 0, &state,
743                                sizeof(state),
744                                PROTOCOL_BINARY_RAW_BYTES,
745                                PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
746        }
747    }
748
749    static ENGINE_ERROR_CODE setVBucket(EventuallyPersistentEngine *e,
750                                       const void *cookie,
751                                       protocol_binary_request_header *request,
752                                       ADD_RESPONSE response) {
753
754        protocol_binary_request_set_vbucket *req =
755            reinterpret_cast<protocol_binary_request_set_vbucket*>(request);
756
757        uint64_t cas = ntohll(req->message.header.request.cas);
758
759        size_t bodylen = ntohl(req->message.header.request.bodylen)
760            - ntohs(req->message.header.request.keylen);
761        if (bodylen != sizeof(vbucket_state_t)) {
762            const std::string msg("Incorrect packet format");
763            return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
764                                msg.length(), PROTOCOL_BINARY_RAW_BYTES,
765                                PROTOCOL_BINARY_RESPONSE_EINVAL,
766                                cas, cookie);
767        }
768
769        vbucket_state_t state;
770        memcpy(&state, &req->message.body.state, sizeof(state));
771        state = static_cast<vbucket_state_t>(ntohl(state));
772
773        if (!is_valid_vbucket_state_t(state)) {
774            const std::string msg("Invalid vbucket state");
775            return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
776                                msg.length(), PROTOCOL_BINARY_RAW_BYTES,
777                                PROTOCOL_BINARY_RESPONSE_EINVAL,
778                                cas, cookie);
779        }
780
781        uint16_t vb = ntohs(req->message.header.request.vbucket);
782        if(e->setVBucketState(vb, state, false) == ENGINE_ERANGE) {
783            const std::string msg("VBucket number too big");
784            return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
785                                msg.length(), PROTOCOL_BINARY_RAW_BYTES,
786                                PROTOCOL_BINARY_RESPONSE_ERANGE,
787                                cas, cookie);
788        }
789        return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
790                            PROTOCOL_BINARY_RAW_BYTES,
791                            PROTOCOL_BINARY_RESPONSE_SUCCESS,
792                            cas, cookie);
793    }
794
795    static ENGINE_ERROR_CODE delVBucket(EventuallyPersistentEngine *e,
796                                        const void *cookie,
797                                        protocol_binary_request_header *req,
798                                        ADD_RESPONSE response) {
799
800        uint64_t cas = ntohll(req->request.cas);
801
802        protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
803        uint16_t vbucket = ntohs(req->request.vbucket);
804
805        std::string msg = "";
806        if (ntohs(req->request.keylen) > 0 || req->request.extlen > 0) {
807            msg = "Incorrect packet format.";
808            return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
809                                msg.length(),
810                                PROTOCOL_BINARY_RAW_BYTES,
811                                PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
812        }
813
814        bool sync = false;
815        uint32_t bodylen = ntohl(req->request.bodylen);
816        if (bodylen > 0) {
817            const char* ptr = reinterpret_cast<const char*>(req->bytes) +
818                sizeof(req->bytes);
819            if (bodylen == 7 && strncmp(ptr, "async=0", bodylen) == 0) {
820                sync = true;
821            }
822        }
823
824        ENGINE_ERROR_CODE err;
825        void* es = e->getEngineSpecific(cookie);
826        if (sync) {
827            if (es == NULL) {
828                err = e->deleteVBucket(vbucket, cookie);
829                e->storeEngineSpecific(cookie, e);
830            } else {
831                e->storeEngineSpecific(cookie, NULL);
832                LOG(EXTENSION_LOG_INFO,
833                    "Completed sync deletion of vbucket %u",
834                    (unsigned)vbucket);
835                err = ENGINE_SUCCESS;
836            }
837        } else {
838            err = e->deleteVBucket(vbucket);
839        }
840        switch (err) {
841        case ENGINE_SUCCESS:
842            LOG(EXTENSION_LOG_WARNING,
843                "Deletion of vbucket %d was completed.", vbucket);
844            break;
845        case ENGINE_NOT_MY_VBUCKET:
846            LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
847                "because the vbucket doesn't exist!!!", vbucket);
848            res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
849            break;
850        case ENGINE_EINVAL:
851            LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
852                "because the vbucket is not in a dead state\n", vbucket);
853            msg = "Failed to delete vbucket.  Must be in the dead state.";
854            res = PROTOCOL_BINARY_RESPONSE_EINVAL;
855            break;
856        case ENGINE_EWOULDBLOCK:
857            LOG(EXTENSION_LOG_WARNING, "Requst to vbucket %d deletion is in"
858                " EWOULDBLOCK until the database file is removed from disk",
859                vbucket);
860            return ENGINE_EWOULDBLOCK;
861        default:
862            LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
863                "because of unknown reasons\n", vbucket);
864            msg = "Failed to delete vbucket.  Unknown reason.";
865            res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
866        }
867
868        if (err != ENGINE_NOT_MY_VBUCKET) {
869                return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
870                                    msg.length(), PROTOCOL_BINARY_RAW_BYTES,
871                                    res, cas,
872                                    cookie);
873        } else {
874                LockHolder lh(e->clusterConfig.lock);
875                return sendResponse(response, NULL, 0, NULL, 0,
876                                    e->clusterConfig.config,
877                                    e->clusterConfig.len,
878                                    PROTOCOL_BINARY_RAW_BYTES, res, cas,
879                                    cookie);
880        }
881
882    }
883
884    static ENGINE_ERROR_CODE getReplicaCmd(EventuallyPersistentEngine *e,
885                                       protocol_binary_request_header *request,
886                                       const void *cookie,
887                                       Item **it,
888                                       const char **msg,
889                                       protocol_binary_response_status *res) {
890        EventuallyPersistentStore *eps = e->getEpStore();
891        protocol_binary_request_no_extras *req =
892            (protocol_binary_request_no_extras*)request;
893        int keylen = ntohs(req->message.header.request.keylen);
894        uint16_t vbucket = ntohs(req->message.header.request.vbucket);
895        ENGINE_ERROR_CODE error_code;
896        std::string keystr(((char *)request) + sizeof(req->message.header),
897                            keylen);
898
899        GetValue rv(eps->getReplica(keystr, vbucket, cookie, true));
900
901        if ((error_code = rv.getStatus()) != ENGINE_SUCCESS) {
902            if (error_code == ENGINE_NOT_MY_VBUCKET) {
903                *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
904                return error_code;
905            } else if (error_code == ENGINE_TMPFAIL) {
906                *msg = "NOT_FOUND";
907                *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
908            } else {
909                return error_code;
910            }
911        } else {
912            *it = rv.getValue();
913            *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
914        }
915        return ENGINE_SUCCESS;
916    }
917
918    static ENGINE_ERROR_CODE compactDB(EventuallyPersistentEngine *e,
919                                       const void *cookie,
920                                       protocol_binary_request_compact_db *req,
921                                       ADD_RESPONSE response) {
922
923        std::string msg = "";
924        protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
925        compaction_ctx compactreq;
926        uint16_t vbucket = ntohs(req->message.header.request.vbucket);
927        uint64_t cas = ntohll(req->message.header.request.cas);
928
929        if (ntohs(req->message.header.request.keylen) > 0 ||
930             req->message.header.request.extlen != 24) {
931            LOG(EXTENSION_LOG_WARNING,
932                    "Compaction of vbucket %d received bad ext/key len %d/%d.",
933                    vbucket, req->message.header.request.extlen,
934                    ntohs(req->message.header.request.keylen));
935            msg = "Incorrect packet format.";
936            return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
937                                msg.length(),
938                                PROTOCOL_BINARY_RAW_BYTES,
939                                PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
940        }
941        EPStats &stats = e->getEpStats();
942        compactreq.max_purged_seq = 0;
943        compactreq.purge_before_ts = ntohll(req->message.body.purge_before_ts);
944        compactreq.purge_before_seq =
945                                    ntohll(req->message.body.purge_before_seq);
946        compactreq.drop_deletes     = req->message.body.drop_deletes;
947
948        ENGINE_ERROR_CODE err;
949        void* es = e->getEngineSpecific(cookie);
950        if (es == NULL) {
951            ++stats.pendingCompactions;
952            e->storeEngineSpecific(cookie, e);
953            err = e->compactDB(vbucket, compactreq, cookie);
954        } else {
955            e->storeEngineSpecific(cookie, NULL);
956            err = ENGINE_SUCCESS;
957        }
958
959        switch (err) {
960            case ENGINE_SUCCESS:
961                LOG(EXTENSION_LOG_INFO,
962                    "Compaction of vbucket %d completed.", vbucket);
963                break;
964            case ENGINE_NOT_MY_VBUCKET:
965                --stats.pendingCompactions;
966                LOG(EXTENSION_LOG_WARNING, "Compaction of vbucket %d failed "
967                    "because the vbucket doesn't exist!!!", vbucket);
968                msg = "Failed to compact vbucket.  Bucket not found.";
969                res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
970                break;
971            case ENGINE_EWOULDBLOCK:
972                LOG(EXTENSION_LOG_INFO, "Request to compact vbucket %d is "
973                        "in EWOULDBLOCK state until the database file is "
974                        "compacted on disk",
975                        vbucket);
976                return ENGINE_EWOULDBLOCK;
977            case ENGINE_TMPFAIL:
978                LOG(EXTENSION_LOG_WARNING, "Request to compact vbucket %d hit"
979                        " a temporary failure and may need to be retried",
980                        vbucket);
981                msg = "Temporary failure in compacting vbucket.";
982                res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
983            default:
984                --stats.pendingCompactions;
985                LOG(EXTENSION_LOG_WARNING, "Compaction of vbucket %d failed "
986                    "because of unknown reasons\n", vbucket);
987                msg = "Failed to compact vbucket.  Unknown reason.";
988                res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
989        }
990
991        return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
992                            msg.length(), PROTOCOL_BINARY_RAW_BYTES,
993                            res, cas, cookie);
994    }
995
996    static ENGINE_ERROR_CODE processUnknownCommand(
997                                       EventuallyPersistentEngine *h,
998                                       const void* cookie,
999                                       protocol_binary_request_header *request,
1000                                       ADD_RESPONSE response)
1001    {
1002        protocol_binary_response_status res =
1003                                      PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
1004        const char *msg = NULL;
1005        size_t msg_size = 0;
1006        Item *itm = NULL;
1007
1008        EPStats &stats = h->getEpStats();
1009        ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
1010
1011        /**
1012         * Session validation
1013         * (For ns_server commands only)
1014         */
1015        switch (request->request.opcode) {
1016            case CMD_SET_PARAM:
1017            case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1018            case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1019            case CMD_DEREGISTER_TAP_CLIENT:
1020            case CMD_CHANGE_VB_FILTER:
1021            case CMD_SET_CLUSTER_CONFIG:
1022            case CMD_COMPACT_DB:
1023            {
1024                uint64_t cas = ntohll(request->request.cas);
1025                if (!h->validateSessionCas(cas)) {
1026                    const std::string message("Invalid session token");
1027                    return sendResponse(response, NULL, 0, NULL, 0,
1028                                        message.c_str(), message.length(),
1029                                        PROTOCOL_BINARY_RAW_BYTES,
1030                                        PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS,
1031                                        cas, cookie);
1032                }
1033                break;
1034            }
1035            default:
1036                break;
1037        }
1038
1039        switch (request->request.opcode) {
1040        case PROTOCOL_BINARY_CMD_GET_VBUCKET:
1041            {
1042                BlockTimer timer(&stats.getVbucketCmdHisto);
1043                rv = getVBucket(h, cookie, request, response);
1044                return rv;
1045            }
1046        case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1047            {
1048                BlockTimer timer(&stats.delVbucketCmdHisto);
1049                rv = delVBucket(h, cookie, request, response);
1050                return rv;
1051            }
1052        case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1053            {
1054                BlockTimer timer(&stats.setVbucketCmdHisto);
1055                rv = setVBucket(h, cookie, request, response);
1056                return rv;
1057            }
1058        case PROTOCOL_BINARY_CMD_TOUCH:
1059        case PROTOCOL_BINARY_CMD_GAT:
1060        case PROTOCOL_BINARY_CMD_GATQ:
1061            {
1062                rv = h->touch(cookie, request, response);
1063                return rv;
1064            }
1065        case CMD_STOP_PERSISTENCE:
1066            res = stopFlusher(h, &msg, &msg_size);
1067            break;
1068        case CMD_START_PERSISTENCE:
1069            res = startFlusher(h, &msg, &msg_size);
1070            break;
1071        case CMD_SET_PARAM:
1072            res = setParam(h,
1073                  reinterpret_cast<protocol_binary_request_set_param*>(request),
1074                            &msg, &msg_size);
1075            break;
1076        case CMD_EVICT_KEY:
1077            res = evictKey(h, request, &msg, &msg_size);
1078            break;
1079        case CMD_GET_LOCKED:
1080            rv = getLocked(h, request, cookie, &itm, &msg, &msg_size, &res);
1081            if (rv == ENGINE_EWOULDBLOCK) {
1082                // we dont have the value for the item yet
1083                return rv;
1084            }
1085            break;
1086        case CMD_UNLOCK_KEY:
1087            res = unlockKey(h, request, &msg, &msg_size);
1088            break;
1089        case CMD_OBSERVE:
1090            return h->observe(cookie, request, response);
1091        case CMD_DEREGISTER_TAP_CLIENT:
1092            {
1093                rv = h->deregisterTapClient(cookie, request, response);
1094                return rv;
1095            }
1096        case CMD_RESET_REPLICATION_CHAIN:
1097            {
1098                rv = h->resetReplicationChain(cookie, request, response);
1099                return rv;
1100            }
1101        case CMD_CHANGE_VB_FILTER:
1102            {
1103                rv = h->changeTapVBFilter(cookie, request, response);
1104                return rv;
1105            }
1106        case CMD_LAST_CLOSED_CHECKPOINT:
1107        case CMD_CREATE_CHECKPOINT:
1108        case CMD_CHECKPOINT_PERSISTENCE:
1109            {
1110                rv = h->handleCheckpointCmds(cookie, request, response);
1111                return rv;
1112            }
1113        case CMD_SEQNO_PERSISTENCE:
1114            {
1115                rv = h->handleSeqnoCmds(cookie, request, response);
1116                return rv;
1117            }
1118        case CMD_GET_META:
1119        case CMD_GETQ_META:
1120            {
1121                rv = h->getMeta(cookie,
1122                        reinterpret_cast<protocol_binary_request_get_meta*>
1123                                                          (request), response);
1124                return rv;
1125            }
1126        case CMD_SET_WITH_META:
1127        case CMD_SETQ_WITH_META:
1128        case CMD_ADD_WITH_META:
1129        case CMD_ADDQ_WITH_META:
1130            {
1131                rv = h->setWithMeta(cookie,
1132                     reinterpret_cast<protocol_binary_request_set_with_meta*>
1133                                                          (request), response);
1134                return rv;
1135            }
1136        case CMD_DEL_WITH_META:
1137        case CMD_DELQ_WITH_META:
1138            {
1139                rv = h->deleteWithMeta(cookie,
1140                    reinterpret_cast<protocol_binary_request_delete_with_meta*>
1141                                                          (request), response);
1142                return rv;
1143            }
1144        case CMD_RETURN_META:
1145            {
1146                return h->returnMeta(cookie,
1147                reinterpret_cast<protocol_binary_request_return_meta*>
1148                                                          (request), response);
1149            }
1150        case CMD_GET_REPLICA:
1151            rv = getReplicaCmd(h, request, cookie, &itm, &msg, &res);
1152            if (rv != ENGINE_SUCCESS && rv != ENGINE_NOT_MY_VBUCKET) {
1153                return rv;
1154            }
1155            break;
1156        case CMD_ENABLE_TRAFFIC:
1157        case CMD_DISABLE_TRAFFIC:
1158            {
1159                rv = h->handleTrafficControlCmd(cookie, request, response);
1160                return rv;
1161            }
1162        case CMD_SET_CLUSTER_CONFIG:
1163            return h->setClusterConfig(cookie,
1164                 reinterpret_cast<protocol_binary_request_set_cluster_config*>
1165                                                          (request), response);
1166        case CMD_GET_CLUSTER_CONFIG:
1167            return h->getClusterConfig(cookie,
1168               reinterpret_cast<protocol_binary_request_get_cluster_config*>
1169                                                          (request), response);
1170        case CMD_COMPACT_DB:
1171            return compactDB(h, cookie,
1172                            (protocol_binary_request_compact_db*)(request),
1173                            response);
1174            break;
1175        case CMD_GET_RANDOM_KEY:
1176            if (request->request.extlen != 0 ||
1177                request->request.keylen != 0 ||
1178                request->request.bodylen != 0) {
1179                return ENGINE_EINVAL;
1180            }
1181
1182            return h->getRandomKey(cookie, response);
1183        case CMD_GET_KEYS:
1184            return h->getAllKeys(cookie,
1185               reinterpret_cast<protocol_binary_request_get_keys*>(request),
1186                                                                   response);
1187        }
1188
1189        // Send a special response for getl since we don't want to send the key
1190        if (itm && request->request.opcode == CMD_GET_LOCKED) {
1191            uint32_t flags = itm->getFlags();
1192            rv = sendResponse(response, NULL, 0, (const void *)&flags,
1193                              sizeof(uint32_t),
1194                              static_cast<const void *>(itm->getData()),
1195                              itm->getNBytes(),
1196                              PROTOCOL_BINARY_RAW_BYTES,
1197                              static_cast<uint16_t>(res), itm->getCas(),
1198                              cookie);
1199            delete itm;
1200        } else if (itm) {
1201            const std::string &key  = itm->getKey();
1202            uint32_t flags = itm->getFlags();
1203            rv = sendResponse(response, static_cast<const void *>(key.data()),
1204                              itm->getNKey(),
1205                              (const void *)&flags, sizeof(uint32_t),
1206                              static_cast<const void *>(itm->getData()),
1207                              itm->getNBytes(),
1208                              PROTOCOL_BINARY_RAW_BYTES,
1209                              static_cast<uint16_t>(res), itm->getCas(),
1210                              cookie);
1211            delete itm;
1212        } else  if (rv == ENGINE_NOT_MY_VBUCKET) {
1213            LockHolder lh(h->clusterConfig.lock);
1214            return sendResponse(response, NULL, 0, NULL, 0,
1215                                h->clusterConfig.config,
1216                                h->clusterConfig.len,
1217                                PROTOCOL_BINARY_RAW_BYTES,
1218                                PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0,
1219                                cookie);
1220        } else {
1221            msg_size = (msg_size > 0 || msg == NULL) ? msg_size : strlen(msg);
1222            rv = sendResponse(response, NULL, 0, NULL, 0,
1223                              msg, static_cast<uint16_t>(msg_size),
1224                              PROTOCOL_BINARY_RAW_BYTES,
1225                              static_cast<uint16_t>(res), 0, cookie);
1226
1227        }
1228        return rv;
1229    }
1230
1231    static ENGINE_ERROR_CODE EvpUnknownCommand(ENGINE_HANDLE* handle,
1232                                               const void* cookie,
1233                                               protocol_binary_request_header
1234                                                                      *request,
1235                                               ADD_RESPONSE response)
1236    {
1237        ENGINE_ERROR_CODE err_code = processUnknownCommand(getHandle(handle),
1238                                                           cookie,
1239                                                           request, response);
1240        releaseHandle(handle);
1241        return err_code;
1242    }
1243
1244    static void EvpItemSetCas(ENGINE_HANDLE* , const void *,
1245                              item *itm, uint64_t cas) {
1246        static_cast<Item*>(itm)->setCas(cas);
1247    }
1248
1249    static ENGINE_ERROR_CODE EvpTapNotify(ENGINE_HANDLE* handle,
1250                                          const void *cookie,
1251                                          void *engine_specific,
1252                                          uint16_t nengine,
1253                                          uint8_t ttl,
1254                                          uint16_t tap_flags,
1255                                          tap_event_t tap_event,
1256                                          uint32_t tap_seqno,
1257                                          const void *key,
1258                                          size_t nkey,
1259                                          uint32_t flags,
1260                                          uint32_t exptime,
1261                                          uint64_t cas,
1262                                          uint8_t datatype,
1263                                          const void *data,
1264                                          size_t ndata,
1265                                          uint16_t vbucket)
1266    {
1267        if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
1268            LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1269                    " (TapNotify)");
1270            return ENGINE_EINVAL;
1271        }
1272        ENGINE_ERROR_CODE err_code = getHandle(handle)->tapNotify(cookie,
1273                                                        engine_specific,
1274                                                        nengine, ttl,
1275                                                        tap_flags,
1276                                                        (uint16_t)tap_event,
1277                                                        tap_seqno,
1278                                                        key, nkey, flags,
1279                                                        exptime, cas,
1280                                                        datatype, data,
1281                                                        ndata, vbucket);
1282        releaseHandle(handle);
1283        return err_code;
1284    }
1285
1286    static tap_event_t EvpTapIterator(ENGINE_HANDLE* handle,
1287                                      const void *cookie, item **itm,
1288                                      void **es, uint16_t *nes, uint8_t *ttl,
1289                                      uint16_t *flags, uint32_t *seqno,
1290                                      uint16_t *vbucket) {
1291        uint16_t tap_event = getHandle(handle)->walkTapQueue(cookie, itm, es,
1292                                                             nes, ttl,
1293                                                             flags, seqno,
1294                                                             vbucket);
1295        releaseHandle(handle);
1296        return static_cast<tap_event_t>(tap_event);
1297    }
1298
1299    static TAP_ITERATOR EvpGetTapIterator(ENGINE_HANDLE* handle,
1300                                          const void* cookie,
1301                                          const void* client,
1302                                          size_t nclient,
1303                                          uint32_t flags,
1304                                          const void* userdata,
1305                                          size_t nuserdata)
1306    {
1307        EventuallyPersistentEngine *h = getHandle(handle);
1308        TAP_ITERATOR iterator = NULL;
1309        {
1310            std::string c(static_cast<const char*>(client), nclient);
1311            // Figure out what we want from the userdata before adding it to
1312            // the API to the handle
1313            if (h->createTapQueue(cookie, c, flags, userdata, nuserdata)) {
1314                iterator = EvpTapIterator;
1315            }
1316        }
1317        releaseHandle(handle);
1318        return iterator;
1319    }
1320
1321
1322    static ENGINE_ERROR_CODE EvpUprStep(ENGINE_HANDLE* handle,
1323                                       const void* cookie,
1324                                       struct upr_message_producers *producers)
1325    {
1326        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1327        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1328        if (conn) {
1329            errCode = conn->step(producers);
1330        }
1331        releaseHandle(handle);
1332        return errCode;
1333    }
1334
1335
1336    static ENGINE_ERROR_CODE EvpUprOpen(ENGINE_HANDLE* handle,
1337                                        const void* cookie,
1338                                        uint32_t opaque,
1339                                        uint32_t seqno,
1340                                        uint32_t flags,
1341                                        void *name,
1342                                        uint16_t nname)
1343    {
1344        ENGINE_ERROR_CODE errCode;
1345        errCode = getHandle(handle)->uprOpen(cookie, opaque, seqno, flags,
1346                                             name, nname);
1347        releaseHandle(handle);
1348        return errCode;
1349    }
1350
1351    static ENGINE_ERROR_CODE EvpUprAddStream(ENGINE_HANDLE* handle,
1352                                             const void* cookie,
1353                                             uint32_t opaque,
1354                                             uint16_t vbucket,
1355                                             uint32_t flags)
1356    {
1357        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1358        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1359        if (conn) {
1360            errCode = conn->addStream(opaque, vbucket, flags);
1361        }
1362        releaseHandle(handle);
1363        return errCode;
1364    }
1365
1366    static ENGINE_ERROR_CODE EvpUprCloseStream(ENGINE_HANDLE* handle,
1367                                               const void* cookie,
1368                                               uint32_t opaque,
1369                                               uint16_t vbucket)
1370    {
1371        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1372        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1373        if (conn) {
1374            errCode = conn->closeStream(opaque, vbucket);
1375        }
1376        releaseHandle(handle);
1377        return errCode;
1378    }
1379
1380
1381    static ENGINE_ERROR_CODE EvpUprStreamReq(ENGINE_HANDLE* handle,
1382                                             const void* cookie,
1383                                             uint32_t flags,
1384                                             uint32_t opaque,
1385                                             uint16_t vbucket,
1386                                             uint64_t startSeqno,
1387                                             uint64_t endSeqno,
1388                                             uint64_t vbucketUuid,
1389                                             uint64_t snapStartSeqno,
1390                                             uint64_t snapEndSeqno,
1391                                             uint64_t *rollbackSeqno,
1392                                             upr_add_failover_log callback)
1393    {
1394        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1395        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1396        if (conn) {
1397            errCode = conn->streamRequest(flags, opaque, vbucket, startSeqno,
1398                                          endSeqno, vbucketUuid, snapStartSeqno,
1399                                          snapEndSeqno, rollbackSeqno, callback);
1400        }
1401        releaseHandle(handle);
1402        return errCode;
1403    }
1404
1405    static ENGINE_ERROR_CODE EvpUprGetFailoverLog(ENGINE_HANDLE* handle,
1406                                                 const void* cookie,
1407                                                 uint32_t opaque,
1408                                                 uint16_t vbucket,
1409                                                 upr_add_failover_log callback)
1410    {
1411        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1412        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1413        if (conn) {
1414            errCode = conn->getFailoverLog(opaque, vbucket, callback);
1415        }
1416        releaseHandle(handle);
1417        return errCode;
1418    }
1419
1420
1421    static ENGINE_ERROR_CODE EvpUprStreamEnd(ENGINE_HANDLE* handle,
1422                                             const void* cookie,
1423                                             uint32_t opaque,
1424                                             uint16_t vbucket,
1425                                             uint32_t flags)
1426    {
1427        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1428        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1429        if (conn) {
1430            errCode = conn->streamEnd(opaque, vbucket, flags);
1431        }
1432        releaseHandle(handle);
1433        return errCode;
1434    }
1435
1436
1437    static ENGINE_ERROR_CODE EvpUprSnapshotMarker(ENGINE_HANDLE* handle,
1438                                                  const void* cookie,
1439                                                  uint32_t opaque,
1440                                                  uint16_t vbucket,
1441                                                  uint64_t start_seqno,
1442                                                  uint64_t end_seqno,
1443                                                  uint32_t flags)
1444    {
1445        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1446        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1447        if (conn) {
1448            errCode = conn->snapshotMarker(opaque, vbucket, start_seqno,
1449                                           end_seqno, flags);
1450        }
1451        releaseHandle(handle);
1452        return errCode;
1453    }
1454
1455    static ENGINE_ERROR_CODE EvpUprMutation(ENGINE_HANDLE* handle,
1456                                            const void* cookie,
1457                                            uint32_t opaque,
1458                                            const void *key,
1459                                            uint16_t nkey,
1460                                            const void *value,
1461                                            uint32_t nvalue,
1462                                            uint64_t cas,
1463                                            uint16_t vbucket,
1464                                            uint32_t flags,
1465                                            uint8_t datatype,
1466                                            uint64_t bySeqno,
1467                                            uint64_t revSeqno,
1468                                            uint32_t expiration,
1469                                            uint32_t lockTime,
1470                                            const void *meta,
1471                                            uint16_t nmeta,
1472                                            uint8_t nru)
1473    {
1474        if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
1475            LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1476                    " (UprMutation)");
1477            return ENGINE_EINVAL;
1478        }
1479        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1480        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1481        if (conn) {
1482            errCode = conn->mutation(opaque, key, nkey, value, nvalue, cas,
1483                                     vbucket, flags, datatype, lockTime,
1484                                     bySeqno, revSeqno, expiration,
1485                                     nru, meta, nmeta);
1486        }
1487        releaseHandle(handle);
1488        return errCode;
1489    }
1490
1491    static ENGINE_ERROR_CODE EvpUprDeletion(ENGINE_HANDLE* handle,
1492                                            const void* cookie,
1493                                            uint32_t opaque,
1494                                            const void *key,
1495                                            uint16_t nkey,
1496                                            uint64_t cas,
1497                                            uint16_t vbucket,
1498                                            uint64_t bySeqno,
1499                                            uint64_t revSeqno,
1500                                            const void *meta,
1501                                            uint16_t nmeta)
1502    {
1503        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1504        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1505        if (conn) {
1506            errCode = conn->deletion(opaque, key, nkey, cas, vbucket, bySeqno,
1507                                     revSeqno, meta, nmeta);
1508        }
1509        releaseHandle(handle);
1510        return errCode;
1511    }
1512
1513    static ENGINE_ERROR_CODE EvpUprExpiration(ENGINE_HANDLE* handle,
1514                                              const void* cookie,
1515                                              uint32_t opaque,
1516                                              const void *key,
1517                                              uint16_t nkey,
1518                                              uint64_t cas,
1519                                              uint16_t vbucket,
1520                                              uint64_t bySeqno,
1521                                              uint64_t revSeqno,
1522                                              const void *meta,
1523                                              uint16_t nmeta)
1524    {
1525        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1526        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1527        if (conn) {
1528            errCode = conn->expiration(opaque, key, nkey, cas, vbucket, bySeqno,
1529                                       revSeqno, meta, nmeta);
1530        }
1531        releaseHandle(handle);
1532        return errCode;
1533    }
1534
1535    static ENGINE_ERROR_CODE EvpUprFlush(ENGINE_HANDLE* handle,
1536                                         const void* cookie,
1537                                         uint32_t opaque,
1538                                         uint16_t vbucket)
1539    {
1540        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1541        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1542        if (conn) {
1543            errCode = conn->flushall(opaque, vbucket);
1544        }
1545        releaseHandle(handle);
1546        return errCode;
1547    }
1548
1549    static ENGINE_ERROR_CODE EvpUprSetVbucketState(ENGINE_HANDLE* handle,
1550                                                   const void* cookie,
1551                                                   uint32_t opaque,
1552                                                   uint16_t vbucket,
1553                                                   vbucket_state_t state)
1554    {
1555        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1556        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1557        if (conn) {
1558            errCode = conn->setVBucketState(opaque, vbucket, state);
1559        }
1560        releaseHandle(handle);
1561        return errCode;
1562    }
1563
1564    static ENGINE_ERROR_CODE EvpUprNoop(ENGINE_HANDLE* handle,
1565                                        const void* cookie,
1566                                        uint32_t opaque) {
1567        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1568        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1569        if (conn) {
1570            errCode = conn->noop(opaque);
1571        }
1572        releaseHandle(handle);
1573        return errCode;
1574    }
1575
1576    static ENGINE_ERROR_CODE EvpUprBufferAcknowledgement(ENGINE_HANDLE* handle,
1577                                                         const void* cookie,
1578                                                         uint32_t opaque,
1579                                                         uint16_t vbucket,
1580                                                         uint32_t buffer_bytes) {
1581        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1582        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1583        if (conn) {
1584            errCode = conn->bufferAcknowledgement(opaque, vbucket,
1585                                                  buffer_bytes);
1586        }
1587        releaseHandle(handle);
1588        return errCode;
1589    }
1590
1591    static ENGINE_ERROR_CODE EvpUprControl(ENGINE_HANDLE* handle,
1592                                           const void* cookie,
1593                                           uint32_t opaque,
1594                                           const void *key,
1595                                           uint16_t nkey,
1596                                           const void *value,
1597                                           uint32_t nvalue) {
1598        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1599        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1600        if (conn) {
1601            errCode = conn->control(opaque, key, nkey, value, nvalue);
1602        }
1603        releaseHandle(handle);
1604        return errCode;
1605    }
1606
1607    static ENGINE_ERROR_CODE EvpUprResponseHandler(ENGINE_HANDLE* handle,
1608                                     const void* cookie,
1609                                     protocol_binary_response_header *response)
1610    {
1611        ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1612        ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1613        if (conn) {
1614            errCode = conn->handleResponse(response);
1615        }
1616        releaseHandle(handle);
1617        return errCode;
1618    }
1619
1620    static void EvpHandleDisconnect(const void *cookie,
1621                                    ENGINE_EVENT_TYPE type,
1622                                    const void *event_data,
1623                                    const void *cb_data)
1624    {
1625        cb_assert(type == ON_DISCONNECT);
1626        cb_assert(event_data == NULL);
1627        void *c = const_cast<void*>(cb_data);
1628        getHandle(static_cast<ENGINE_HANDLE*>(c))->handleDisconnect(cookie);
1629        releaseHandle(static_cast<ENGINE_HANDLE*>(c));
1630    }
1631
1632
1633    /**
1634     * The only public interface to the eventually persistance engine.
1635     * Allocate a new instance and initialize it
1636     * @param interface the highest interface the server supports (we only
1637     *                  support interface 1)
1638     * @param get_server_api callback function to get the server exported API
1639     *                  functions
1640     * @param handle Where to return the new instance
1641     * @return ENGINE_SUCCESS on success
1642     */
1643    ENGINE_ERROR_CODE create_instance(uint64_t interface,
1644                                      GET_SERVER_API get_server_api,
1645                                      ENGINE_HANDLE **handle)
1646    {
1647        SERVER_HANDLE_V1 *api = get_server_api();
1648        if (interface != 1 || api == NULL) {
1649            return ENGINE_ENOTSUP;
1650        }
1651
1652        hooksApi = api->alloc_hooks;
1653        loggerApi = api->log;
1654        MemoryTracker::getInstance();
1655
1656        AtomicValue<size_t>* inital_tracking = new AtomicValue<size_t>();
1657
1658        ObjectRegistry::setStats(inital_tracking);
1659        EventuallyPersistentEngine *engine;
1660        engine = new EventuallyPersistentEngine(get_server_api);
1661        ObjectRegistry::setStats(NULL);
1662
1663        if (engine == NULL) {
1664            return ENGINE_ENOMEM;
1665        }
1666
1667        if (MemoryTracker::trackingMemoryAllocations()) {
1668            engine->getEpStats().memoryTrackerEnabled.store(true);
1669            engine->getEpStats().totalMemory.store(inital_tracking->load());
1670        }
1671        delete inital_tracking;
1672
1673        ep_current_time = api->core->get_current_time;
1674        ep_abs_time = api->core->abstime;
1675        ep_reltime = api->core->realtime;
1676
1677        *handle = reinterpret_cast<ENGINE_HANDLE*> (engine);
1678
1679        return ENGINE_SUCCESS;
1680    }
1681
1682    static bool EvpGetItemInfo(ENGINE_HANDLE *, const void *,
1683                               const item* itm, item_info *itm_info)
1684    {
1685        const Item *it = reinterpret_cast<const Item*>(itm);
1686        if (itm_info->nvalue < 1) {
1687            return false;
1688        }
1689        itm_info->cas = it->getCas();
1690        itm_info->exptime = it->getExptime();
1691        itm_info->nbytes = it->getNBytes();
1692        itm_info->datatype = it->getDataType();
1693        itm_info->flags = it->getFlags();
1694        itm_info->clsid = 0;
1695        itm_info->nkey = static_cast<uint16_t>(it->getNKey());
1696        itm_info->nvalue = 1;
1697        itm_info->key = it->getKey().c_str();
1698        itm_info->value[0].iov_base = const_cast<char*>(it->getData());
1699        itm_info->value[0].iov_len = it->getNBytes();
1700        return true;
1701    }
1702
1703    static bool EvpSetItemInfo(ENGINE_HANDLE* handle, const void* cookie,
1704                               item* itm, const item_info *itm_info)
1705    {
1706        Item *it = reinterpret_cast<Item*>(itm);
1707        if (!it) {
1708            return false;
1709        }
1710        it->setDataType(itm_info->datatype);
1711        return true;
1712    }
1713
1714    static ENGINE_ERROR_CODE EvpGetClusterConfig(ENGINE_HANDLE* handle,
1715                                                 const void* cookie,
1716                                                 engine_get_vb_map_cb callback)
1717    {
1718        EventuallyPersistentEngine *h = getHandle(handle);
1719        LockHolder lh(h->clusterConfig.lock);
1720        uint8_t *config = h->clusterConfig.config;
1721        uint32_t len = h->clusterConfig.len;
1722        releaseHandle(handle);
1723        return callback(cookie, config, len);
1724    }
1725
1726} // C linkage
1727
1728void LOG(EXTENSION_LOG_LEVEL severity, const char *fmt, ...) {
1729    char buffer[2048];
1730
1731    if (loggerApi != NULL) {
1732        EXTENSION_LOGGER_DESCRIPTOR* logger =
1733            (EXTENSION_LOGGER_DESCRIPTOR*)loggerApi->get_logger();
1734        EventuallyPersistentEngine* engine =
1735                                            ObjectRegistry::getCurrentEngine();
1736
1737        if (loggerApi->get_level() <= severity) {
1738            va_list va;
1739            va_start(va, fmt);
1740            vsnprintf(buffer, sizeof(buffer) - 1, fmt, va);
1741            if (engine) {
1742                logger->log(severity, NULL, "(%s) %s", engine->getName(),
1743                            buffer);
1744            } else {
1745                logger->log(severity, NULL, "(No Engine) %s", buffer);
1746            }
1747            va_end(va);
1748        }
1749    }
1750}
1751
1752ALLOCATOR_HOOKS_API *getHooksApi(void) {
1753    return hooksApi;
1754}
1755
1756EventuallyPersistentEngine::EventuallyPersistentEngine(
1757                                    GET_SERVER_API get_server_api) :
1758    clusterConfig(), epstore(NULL), workload(NULL),
1759    workloadPriority(NO_BUCKET_PRIORITY),
1760    tapThrottle(NULL), getServerApiFunc(get_server_api),
1761    tapConnMap(NULL), tapConfig(NULL), checkpointConfig(NULL),
1762    trafficEnabled(false), flushAllEnabled(false), startupTime(0)
1763{
1764    interface.interface = 1;
1765    ENGINE_HANDLE_V1::get_info = EvpGetInfo;
1766    ENGINE_HANDLE_V1::initialize = EvpInitialize;
1767    ENGINE_HANDLE_V1::destroy = EvpDestroy;
1768    ENGINE_HANDLE_V1::allocate = EvpItemAllocate;
1769    ENGINE_HANDLE_V1::remove = EvpItemDelete;
1770    ENGINE_HANDLE_V1::release = EvpItemRelease;
1771    ENGINE_HANDLE_V1::get = EvpGet;
1772    ENGINE_HANDLE_V1::get_stats = EvpGetStats;
1773    ENGINE_HANDLE_V1::reset_stats = EvpResetStats;
1774    ENGINE_HANDLE_V1::store = EvpStore;
1775    ENGINE_HANDLE_V1::arithmetic = EvpArithmetic;
1776    ENGINE_HANDLE_V1::flush = EvpFlush;
1777    ENGINE_HANDLE_V1::unknown_command = EvpUnknownCommand;
1778    ENGINE_HANDLE_V1::get_tap_iterator = EvpGetTapIterator;
1779    ENGINE_HANDLE_V1::tap_notify = EvpTapNotify;
1780    ENGINE_HANDLE_V1::item_set_cas = EvpItemSetCas;
1781    ENGINE_HANDLE_V1::get_item_info = EvpGetItemInfo;
1782    ENGINE_HANDLE_V1::set_item_info = EvpSetItemInfo;
1783    ENGINE_HANDLE_V1::get_engine_vb_map = EvpGetClusterConfig;
1784    ENGINE_HANDLE_V1::get_stats_struct = NULL;
1785    ENGINE_HANDLE_V1::errinfo = NULL;
1786    ENGINE_HANDLE_V1::aggregate_stats = NULL;
1787
1788
1789    ENGINE_HANDLE_V1::upr.step = EvpUprStep;
1790    ENGINE_HANDLE_V1::upr.open = EvpUprOpen;
1791    ENGINE_HANDLE_V1::upr.add_stream = EvpUprAddStream;
1792    ENGINE_HANDLE_V1::upr.close_stream = EvpUprCloseStream;
1793    ENGINE_HANDLE_V1::upr.get_failover_log = EvpUprGetFailoverLog;
1794    ENGINE_HANDLE_V1::upr.stream_req = EvpUprStreamReq;
1795    ENGINE_HANDLE_V1::upr.stream_end = EvpUprStreamEnd;
1796    ENGINE_HANDLE_V1::upr.snapshot_marker = EvpUprSnapshotMarker;
1797    ENGINE_HANDLE_V1::upr.mutation = EvpUprMutation;
1798    ENGINE_HANDLE_V1::upr.deletion = EvpUprDeletion;
1799    ENGINE_HANDLE_V1::upr.expiration = EvpUprExpiration;
1800    ENGINE_HANDLE_V1::upr.flush = EvpUprFlush;
1801    ENGINE_HANDLE_V1::upr.set_vbucket_state = EvpUprSetVbucketState;
1802    ENGINE_HANDLE_V1::upr.noop = EvpUprNoop;
1803    ENGINE_HANDLE_V1::upr.buffer_acknowledgement = EvpUprBufferAcknowledgement;
1804    ENGINE_HANDLE_V1::upr.control = EvpUprControl;
1805    ENGINE_HANDLE_V1::upr.response_handler = EvpUprResponseHandler;
1806
1807    serverApi = getServerApiFunc();
1808    memset(&info, 0, sizeof(info));
1809    info.info.description = "EP engine v" VERSION;
1810    info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_CAS;
1811    info.info.features[info.info.num_features++].feature =
1812                                             ENGINE_FEATURE_PERSISTENT_STORAGE;
1813    info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_LRU;
1814    info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_DATATYPE;
1815
1816}
1817
1818ENGINE_ERROR_CODE EventuallyPersistentEngine::reserveCookie(const void *cookie)
1819{
1820    EventuallyPersistentEngine *epe =
1821                                    ObjectRegistry::onSwitchThread(NULL, true);
1822    ENGINE_ERROR_CODE rv = serverApi->cookie->reserve(cookie);
1823    ObjectRegistry::onSwitchThread(epe);
1824    return rv;
1825}
1826
1827ENGINE_ERROR_CODE EventuallyPersistentEngine::releaseCookie(const void *cookie)
1828{
1829    EventuallyPersistentEngine *epe =
1830                                    ObjectRegistry::onSwitchThread(NULL, true);
1831    ENGINE_ERROR_CODE rv = serverApi->cookie->release(cookie);
1832    ObjectRegistry::onSwitchThread(epe);
1833    return rv;
1834}
1835
1836void EventuallyPersistentEngine::registerEngineCallback(ENGINE_EVENT_TYPE type,
1837                                                        EVENT_CALLBACK cb,
1838                                                        const void *cb_data) {
1839    EventuallyPersistentEngine *epe =
1840                                    ObjectRegistry::onSwitchThread(NULL, true);
1841    SERVER_CALLBACK_API *sapi = getServerApi()->callback;
1842    sapi->register_callback(reinterpret_cast<ENGINE_HANDLE*>(this),
1843                            type, cb, cb_data);
1844    ObjectRegistry::onSwitchThread(epe);
1845}
1846
1847/**
1848 * A configuration value changed listener that responds to ep-engine
1849 * parameter changes by invoking engine-specific methods on
1850 * configuration change events.
1851 */
1852class EpEngineValueChangeListener : public ValueChangedListener {
1853public:
1854    EpEngineValueChangeListener(EventuallyPersistentEngine &e) : engine(e) {
1855        // EMPTY
1856    }
1857
1858    virtual void sizeValueChanged(const std::string &key, size_t value) {
1859        if (key.compare("getl_max_timeout") == 0) {
1860            engine.setGetlMaxTimeout(value);
1861        } else if (key.compare("getl_default_timeout") == 0) {
1862            engine.setGetlDefaultTimeout(value);
1863        } else if (key.compare("max_item_size") == 0) {
1864            engine.setMaxItemSize(value);
1865        }
1866    }
1867
1868    virtual void booleanValueChanged(const std::string &key, bool value) {
1869        if (key.compare("flushall_enabled") == 0) {
1870            engine.setFlushAll(value);
1871        }
1872    }
1873private:
1874    EventuallyPersistentEngine &engine;
1875};
1876
1877
1878
1879ENGINE_ERROR_CODE EventuallyPersistentEngine::initialize(const char* config) {
1880    resetStats();
1881    if (config != NULL) {
1882        if (!configuration.parseConfiguration(config, serverApi)) {
1883            return ENGINE_FAILED;
1884        }
1885    }
1886
1887    name = configuration.getCouchBucket();
1888    maxFailoverEntries = configuration.getMaxFailoverEntries();
1889
1890    // Start updating the variables from the config!
1891    HashTable::setDefaultNumBuckets(configuration.getHtSize());
1892    HashTable::setDefaultNumLocks(configuration.getHtLocks());
1893    StoredValue::setMutationMemoryThreshold(
1894                                      configuration.getMutationMemThreshold());
1895
1896    if (configuration.getMaxSize() == 0) {
1897        configuration.setMaxSize(std::numeric_limits<size_t>::max());
1898    }
1899
1900    if (configuration.getMemLowWat() == std::numeric_limits<size_t>::max()) {
1901        configuration.setMemLowWat(percentOf(
1902                                            configuration.getMaxSize(), 0.75));
1903    }
1904
1905    if (configuration.getMemHighWat() == std::numeric_limits<size_t>::max()) {
1906        configuration.setMemHighWat(percentOf(
1907                                            configuration.getMaxSize(), 0.85));
1908    }
1909
1910    maxItemSize = configuration.getMaxItemSize();
1911    configuration.addValueChangedListener("max_item_size",
1912                                       new EpEngineValueChangeListener(*this));
1913
1914    getlDefaultTimeout = configuration.getGetlDefaultTimeout();
1915    configuration.addValueChangedListener("getl_default_timeout",
1916                                       new EpEngineValueChangeListener(*this));
1917    getlMaxTimeout = configuration.getGetlMaxTimeout();
1918    configuration.addValueChangedListener("getl_max_timeout",
1919                                       new EpEngineValueChangeListener(*this));
1920
1921    flushAllEnabled = configuration.isFlushallEnabled();
1922    configuration.addValueChangedListener("flushall_enabled",
1923                                       new EpEngineValueChangeListener(*this));
1924
1925    workload = new WorkLoadPolicy(configuration.getMaxNumWorkers(),
1926                                  configuration.getMaxNumShards());
1927    if ((unsigned int)workload->getNumShards() >
1928                                              configuration.getMaxVbuckets()) {
1929        LOG(EXTENSION_LOG_WARNING, "Invalid configuration: Shards must be "
1930            "equal or less than max number of vbuckets");
1931        return ENGINE_FAILED;
1932    }
1933
1934    uprConnMap_ = new UprConnMap(*this);
1935    tapConnMap = new TapConnMap(*this);
1936    tapConfig = new TapConfig(*this);
1937    tapThrottle = new TapThrottle(configuration, stats);
1938    TapConfig::addConfigChangeListener(*this);
1939
1940    checkpointConfig = new CheckpointConfig(*this);
1941    CheckpointConfig::addConfigChangeListener(*this);
1942
1943    epstore = new EventuallyPersistentStore(*this);
1944    if (epstore == NULL) {
1945        return ENGINE_ENOMEM;
1946    }
1947
1948    // Register the callback
1949    registerEngineCallback(ON_DISCONNECT, EvpHandleDisconnect, this);
1950
1951    // Complete the initialization of the ep-store
1952    if (!epstore->initialize()) {
1953        return ENGINE_FAILED;
1954    }
1955
1956    if(configuration.isDataTrafficEnabled()) {
1957        enableTraffic(true);
1958    }
1959
1960    tapConnMap->initialize();
1961    uprConnMap_->initialize();
1962
1963    // record engine initialization time
1964    startupTime = ep_real_time();
1965
1966    LOG(EXTENSION_LOG_DEBUG, "Engine init complete.\n");
1967
1968    return ENGINE_SUCCESS;
1969}
1970
1971void EventuallyPersistentEngine::destroy(bool force) {
1972    stats.forceShutdown = force;
1973    stats.isShutdown = true;
1974
1975    if (epstore) {
1976        epstore->snapshotStats();
1977    }
1978    if (tapConnMap) {
1979        tapConnMap->shutdownAllConnections();
1980    }
1981    if (uprConnMap_) {
1982        uprConnMap_->shutdownAllConnections();
1983    }
1984}
1985
1986class FlushAllTask : public GlobalTask {
1987public:
1988    FlushAllTask(EventuallyPersistentStore *st, TapConnMap &tcm, double when)
1989        : GlobalTask(&st->getEPEngine(), Priority::FlushAllPriority, when,
1990                     false), epstore(st), tapConnMap(tcm) { }
1991
1992    bool run(void) {
1993        epstore->reset();
1994        tapConnMap.addFlushEvent();
1995        return false;
1996    }
1997
1998    std::string getDescription() {
1999        return std::string("Performing flush.");
2000    }
2001
2002private:
2003    EventuallyPersistentStore *epstore;
2004    TapConnMap                &tapConnMap;
2005};
2006
2007ENGINE_ERROR_CODE EventuallyPersistentEngine::flush(const void *, time_t when){
2008    if (!flushAllEnabled) {
2009        return ENGINE_ENOTSUP;
2010    }
2011
2012    if (isDegradedMode()) {
2013        return ENGINE_TMPFAIL;
2014    }
2015
2016    if (when == 0) {
2017        epstore->reset();
2018        tapConnMap->addFlushEvent();
2019    } else {
2020        ExTask flushTask = new FlushAllTask(epstore, *tapConnMap,
2021                static_cast<double>(when));
2022        ExecutorPool::get()->schedule(flushTask, NONIO_TASK_IDX);
2023    }
2024
2025    return ENGINE_SUCCESS;
2026}
2027
2028ENGINE_ERROR_CODE EventuallyPersistentEngine::store(const void *cookie,
2029                                                    item* itm,
2030                                                    uint64_t *cas,
2031                                                    ENGINE_STORE_OPERATION
2032                                                                     operation,
2033                                                    uint16_t vbucket) {
2034    BlockTimer timer(&stats.storeCmdHisto);
2035    ENGINE_ERROR_CODE ret;
2036    Item *it = static_cast<Item*>(itm);
2037    item *i = NULL;
2038
2039    it->setVBucketId(vbucket);
2040
2041    switch (operation) {
2042    case OPERATION_CAS:
2043        if (it->getCas() == 0) {
2044            // Using a cas command with a cas wildcard doesn't make sense
2045            ret = ENGINE_NOT_STORED;
2046            break;
2047        }
2048        // FALLTHROUGH
2049    case OPERATION_SET:
2050        if (isDegradedMode()) {
2051            return ENGINE_TMPFAIL;
2052        }
2053        ret = epstore->set(*it, cookie);
2054        if (ret == ENGINE_SUCCESS) {
2055            *cas = it->getCas();
2056        }
2057
2058        break;
2059
2060    case OPERATION_ADD:
2061        if (isDegradedMode()) {
2062            return ENGINE_TMPFAIL;
2063        }
2064
2065        if (it->getCas() != 0) {
2066            // Adding an item with a cas value doesn't really make sense...
2067            return ENGINE_KEY_EEXISTS;
2068        }
2069
2070        ret = epstore->add(*it, cookie);
2071        if (ret == ENGINE_SUCCESS) {
2072            *cas = it->getCas();
2073        }
2074        break;
2075
2076    case OPERATION_REPLACE:
2077        // @todo this isn't atomic!
2078        ret = get(cookie, &i, it->getKey().c_str(),
2079                  it->getNKey(), vbucket);
2080        switch (ret) {
2081        case ENGINE_SUCCESS:
2082            itemRelease(cookie, i);
2083            ret = epstore->set(*it, cookie);
2084            if (ret == ENGINE_SUCCESS) {
2085                *cas = it->getCas();
2086            }
2087            break;
2088        case ENGINE_KEY_ENOENT:
2089            ret = ENGINE_NOT_STORED;
2090            break;
2091        default:
2092            // Just return the error we got.
2093            break;
2094        }
2095        break;
2096    case OPERATION_APPEND:
2097    case OPERATION_PREPEND:
2098        do {
2099            if ((ret = get(cookie, &i, it->getKey().c_str(),
2100                           it->getNKey(), vbucket)) == ENGINE_SUCCESS) {
2101                Item *old = reinterpret_cast<Item*>(i);
2102
2103                if (old->getCas() == (uint64_t) -1) {
2104                    // item is locked against updates
2105                    itemRelease(cookie, i);
2106                    return ENGINE_TMPFAIL;
2107                }
2108
2109                if (it->getCas() != 0 && old->getCas() != it->getCas()) {
2110                    itemRelease(cookie, i);
2111                    return ENGINE_KEY_EEXISTS;
2112                }
2113
2114                if (operation == OPERATION_APPEND) {
2115                    ret = old->append(*it, maxItemSize);
2116                } else {
2117                    ret = old->prepend(*it, maxItemSize);
2118                }
2119
2120                if (ret != ENGINE_SUCCESS) {
2121                    itemRelease(cookie, i);
2122                    if (ret == ENGINE_E2BIG) {
2123                        return ret;
2124                    } else {
2125                        return memoryCondition();
2126                    }
2127                }
2128
2129                ret = store(cookie, old, cas, OPERATION_CAS, vbucket);
2130                itemRelease(cookie, i);
2131            }
2132        } while (ret == ENGINE_KEY_EEXISTS);
2133
2134        break;
2135
2136    default:
2137        ret = ENGINE_ENOTSUP;
2138    }
2139
2140    if (ret == ENGINE_ENOMEM) {
2141        ret = memoryCondition();
2142    } else if (ret == ENGINE_NOT_STORED || ret == ENGINE_NOT_MY_VBUCKET) {
2143        if (isDegradedMode()) {
2144            return ENGINE_TMPFAIL;
2145        }
2146    }
2147
2148    return ret;
2149}
2150
2151inline uint16_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie,
2152                                                           item **itm,
2153                                                           void **es,
2154                                                           uint16_t *nes,
2155                                                           uint8_t *ttl,
2156                                                           uint16_t *flags,
2157                                                           uint32_t *seqno,
2158                                                           uint16_t *vbucket,
2159                                                           TapProducer
2160                                                                   *connection,
2161                                                           bool &retry) {
2162    *es = NULL;
2163    *nes = 0;
2164    *ttl = (uint8_t)-1;
2165    *seqno = 0;
2166    *flags = 0;
2167    *vbucket = 0;
2168
2169    retry = false;
2170
2171    if (connection->shouldFlush()) {
2172        return TAP_FLUSH;
2173    }
2174
2175    if (connection->isTimeForNoop()) {
2176        LOG(EXTENSION_LOG_INFO, "%s Sending a NOOP message.\n",
2177            connection->logHeader());
2178        return TAP_NOOP;
2179    }
2180
2181    if (connection->isSuspended() || connection->windowIsFull()) {
2182        LOG(EXTENSION_LOG_INFO, "%s Connection in pause state because it is in"
2183            " suspended state or its ack windows is full.\n",
2184            connection->logHeader());
2185        return TAP_PAUSE;
2186    }
2187
2188    uint16_t ret = TAP_PAUSE;
2189    VBucketEvent ev = connection->nextVBucketHighPriority();
2190    if (ev.event != TAP_PAUSE) {
2191        switch (ev.event) {
2192        case TAP_VBUCKET_SET:
2193            LOG(EXTENSION_LOG_WARNING,
2194               "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2195                connection->logHeader(), ev.vbucket,
2196                VBucket::toString(ev.state));
2197            connection->encodeVBucketStateTransition(ev, es, nes, vbucket);
2198            break;
2199        case TAP_OPAQUE:
2200            LOG(EXTENSION_LOG_WARNING,
2201                "%s Sending TAP_OPAQUE with command \"%s\" and vbucket %d\n",
2202                connection->logHeader(),
2203                TapProducer::opaqueCmdToString(ntohl((uint32_t) ev.state)),
2204                ev.vbucket);
2205            connection->opaqueCommandCode = (uint32_t) ev.state;
2206            *vbucket = ev.vbucket;
2207            *es = &connection->opaqueCommandCode;
2208            *nes = sizeof(connection->opaqueCommandCode);
2209            break;
2210        default:
2211            LOG(EXTENSION_LOG_WARNING,
2212                "%s Unknown VBucketEvent message type %d\n",
2213                connection->logHeader(), ev.event);
2214            abort();
2215        }
2216        return ev.event;
2217    }
2218
2219    if (connection->waitForOpaqueMsgAck()) {
2220        return TAP_PAUSE;
2221    }
2222
2223    VBucketFilter backFillVBFilter;
2224    if (connection->runBackfill(backFillVBFilter)) {
2225        queueBackfill(backFillVBFilter, connection);
2226    }
2227
2228    uint8_t nru = INITIAL_NRU_VALUE;
2229    Item *it = connection->getNextItem(cookie, vbucket, ret, nru);
2230    switch (ret) {
2231    case TAP_CHECKPOINT_START:
2232    case TAP_CHECKPOINT_END:
2233    case TAP_MUTATION:
2234    case TAP_DELETION:
2235        *itm = it;
2236        if (ret == TAP_MUTATION) {
2237            *nes = TapEngineSpecific::packSpecificData(ret, connection,
2238                                                       it->getRevSeqno(), nru);
2239            *es = connection->specificData;
2240        } else if (ret == TAP_DELETION) {
2241            *nes = TapEngineSpecific::packSpecificData(ret, connection,
2242                                                       it->getRevSeqno());
2243            *es = connection->specificData;
2244        } else if (ret == TAP_CHECKPOINT_START) {
2245            // Send the current value of the max deleted seqno
2246            RCPtr<VBucket> vb = getVBucket(*vbucket);
2247            if (!vb) {
2248                retry = true;
2249                return TAP_NOOP;
2250            }
2251            *nes = TapEngineSpecific::packSpecificData(ret, connection,
2252                                               vb->ht.getMaxDeletedRevSeqno());
2253            *es = connection->specificData;
2254        }
2255        break;
2256    case TAP_NOOP:
2257        retry = true;
2258        break;
2259    default:
2260        break;
2261    }
2262
2263    if (ret == TAP_PAUSE && (connection->dumpQueue || connection->doTakeOver)){
2264        VBucketEvent vbev = connection->checkDumpOrTakeOverCompletion();
2265        if (vbev.event == TAP_VBUCKET_SET) {
2266            LOG(EXTENSION_LOG_WARNING,
2267               "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2268                connection->logHeader(), vbev.vbucket,
2269                VBucket::toString(vbev.state));
2270            connection->encodeVBucketStateTransition(vbev, es, nes, vbucket);
2271        }
2272        ret = vbev.event;
2273    }
2274
2275    return ret;
2276}
2277
2278uint16_t EventuallyPersistentEngine::walkTapQueue(const void *cookie,
2279                                                  item **itm,
2280                                                  void **es,
2281                                                  uint16_t *nes,
2282                                                  uint8_t *ttl,
2283                                                  uint16_t *flags,
2284                                                  uint32_t *seqno,
2285                                                  uint16_t *vbucket) {
2286    TapProducer *connection = getTapProducer(cookie);
2287    if (!connection) {
2288        LOG(EXTENSION_LOG_WARNING,
2289            "Failed to lookup TAP connection.. Disconnecting\n");
2290        return TAP_DISCONNECT;
2291    }
2292
2293    connection->setPaused(false);
2294
2295    bool retry = false;
2296    uint16_t ret;
2297
2298    connection->setLastWalkTime();
2299    do {
2300        ret = doWalkTapQueue(cookie, itm, es, nes, ttl, flags,
2301                             seqno, vbucket, connection, retry);
2302    } while (retry);
2303
2304    if (ret != TAP_PAUSE && ret != TAP_DISCONNECT) {
2305        connection->lastMsgTime = ep_current_time();
2306        if (ret == TAP_NOOP) {
2307            *seqno = 0;
2308        } else {
2309            ++stats.numTapFetched;
2310            *seqno = connection->getSeqno();
2311            if (connection->requestAck(ret, *vbucket)) {
2312                *flags = TAP_FLAG_ACK;
2313                connection->seqnoAckRequested = *seqno;
2314            }
2315
2316            if (ret == TAP_MUTATION) {
2317                if (connection->haveFlagByteorderSupport()) {
2318                    *flags |= TAP_FLAG_NETWORK_BYTE_ORDER;
2319                }
2320            }
2321        }
2322    } else {
2323        connection->setPaused(true);
2324        connection->setNotifySent(false);
2325    }
2326
2327    return ret;
2328}
2329
2330bool EventuallyPersistentEngine::createTapQueue(const void *cookie,
2331                                                std::string &client,
2332                                                uint32_t flags,
2333                                                const void *userdata,
2334                                                size_t nuserdata) {
2335    if (reserveCookie(cookie) != ENGINE_SUCCESS) {
2336        return false;
2337    }
2338
2339    std::string tapName = "eq_tapq:";
2340    if (client.length() == 0) {
2341        tapName.assign(ConnHandler::getAnonName());
2342    } else {
2343        tapName.append(client);
2344    }
2345
2346    // Decoding the userdata section of the packet and update the filters
2347    const char *ptr = static_cast<const char*>(userdata);
2348    uint64_t backfillAge = 0;
2349    std::vector<uint16_t> vbuckets;
2350    std::map<uint16_t, uint64_t> lastCheckpointIds;
2351
2352    if (flags & TAP_CONNECT_FLAG_BACKFILL) { /* */
2353        if (nuserdata < sizeof(backfillAge)) {
2354            LOG(EXTENSION_LOG_WARNING,
2355                "Backfill age is missing. Reject connection request from %s\n",
2356                tapName.c_str());
2357            return false;
2358        }
2359        // use memcpy to avoid alignemt issues
2360        memcpy(&backfillAge, ptr, sizeof(backfillAge));
2361        backfillAge = ntohll(backfillAge);
2362        nuserdata -= sizeof(backfillAge);
2363        ptr += sizeof(backfillAge);
2364    }
2365
2366    if (flags & TAP_CONNECT_FLAG_LIST_VBUCKETS) {
2367        uint16_t nvbuckets;
2368        if (nuserdata < sizeof(nvbuckets)) {
2369            LOG(EXTENSION_LOG_WARNING,
2370            "Number of vbuckets is missing. Reject connection request from %s"
2371            "\n", tapName.c_str());
2372            return false;
2373        }
2374        memcpy(&nvbuckets, ptr, sizeof(nvbuckets));
2375        nuserdata -= sizeof(nvbuckets);
2376        ptr += sizeof(nvbuckets);
2377        nvbuckets = ntohs(nvbuckets);
2378        if (nvbuckets > 0) {
2379            if (nuserdata < (sizeof(uint16_t) * nvbuckets)) {
2380                LOG(EXTENSION_LOG_WARNING,
2381                "# of vbuckets not matched. Reject connection request from %s"
2382                "\n", tapName.c_str());
2383                return false;
2384            }
2385            for (uint16_t ii = 0; ii < nvbuckets; ++ii) {
2386                uint16_t val;
2387                memcpy(&val, ptr, sizeof(nvbuckets));
2388                ptr += sizeof(uint16_t);
2389                vbuckets.push_back(ntohs(val));
2390            }
2391            nuserdata -= (sizeof(uint16_t) * nvbuckets);
2392        }
2393    }
2394
2395    if (flags & TAP_CONNECT_CHECKPOINT) {
2396        uint16_t nCheckpoints = 0;
2397        if (nuserdata >= sizeof(nCheckpoints)) {
2398            memcpy(&nCheckpoints, ptr, sizeof(nCheckpoints));
2399            nuserdata -= sizeof(nCheckpoints);
2400            ptr += sizeof(nCheckpoints);
2401            nCheckpoints = ntohs(nCheckpoints);
2402        }
2403        if (nCheckpoints > 0) {
2404            if (nuserdata <
2405                ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints)) {
2406                LOG(EXTENSION_LOG_WARNING, "# of checkpoint Ids not matched. "
2407                    "Reject connection request from %s\n", tapName.c_str());
2408                return false;
2409            }
2410            for (uint16_t j = 0; j < nCheckpoints; ++j) {
2411                uint16_t vbid;
2412                uint64_t checkpointId;
2413                memcpy(&vbid, ptr, sizeof(vbid));
2414                ptr += sizeof(uint16_t);
2415                memcpy(&checkpointId, ptr, sizeof(checkpointId));
2416                ptr += sizeof(uint64_t);
2417                lastCheckpointIds[ntohs(vbid)] = ntohll(checkpointId);
2418            }
2419            nuserdata -=
2420                        ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints);
2421        }
2422    }
2423
2424    TapProducer *tp = tapConnMap->newProducer(cookie, tapName, flags,
2425                                 backfillAge,
2426                                 static_cast<int>(
2427                                 configuration.getTapKeepalive()),
2428                                 vbuckets,
2429                                 lastCheckpointIds);
2430
2431    tapConnMap->notifyPausedConnection(tp, true);
2432    return true;
2433}
2434
2435ENGINE_ERROR_CODE EventuallyPersistentEngine::tapNotify(const void *cookie,
2436                                                        void *engine_specific,
2437                                                        uint16_t nengine,
2438                                                        uint8_t ttl,
2439                                                        uint16_t tap_flags,
2440                                                        uint16_t tap_event,
2441                                                        uint32_t tap_seqno,
2442                                                        const void *key,
2443                                                        size_t nkey,
2444                                                        uint32_t flags,
2445                                                        uint32_t exptime,
2446                                                        uint64_t cas,
2447                                                        uint8_t datatype,
2448                                                        const void *data,
2449                                                        size_t ndata,
2450                                                        uint16_t vbucket)
2451{
2452    (void) ttl;
2453    void *specific = getEngineSpecific(cookie);
2454    ConnHandler *connection = NULL;
2455    if (specific == NULL) {
2456        if (tap_event == TAP_ACK) {
2457            LOG(EXTENSION_LOG_WARNING, "Tap producer with cookie %s does not "
2458                "exist. Force disconnect...\n", (char *) cookie);
2459            // tap producer is no longer connected..
2460            return ENGINE_DISCONNECT;
2461        } else {
2462            // Create a new tap consumer...
2463            connection = tapConnMap->newConsumer(cookie);
2464            if (connection == NULL) {
2465                LOG(EXTENSION_LOG_WARNING, "Failed to create new tap consumer."
2466                    " Force disconnect\n");
2467                return ENGINE_DISCONNECT;
2468            }
2469            storeEngineSpecific(cookie, connection);
2470        }
2471    } else {
2472        connection = reinterpret_cast<ConnHandler *>(specific);
2473    }
2474
2475    std::string k(static_cast<const char*>(key), nkey);
2476    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2477
2478    if (tap_event == TAP_MUTATION || tap_event == TAP_DELETION) {
2479        if (!tapThrottle->shouldProcess()) {
2480            ++stats.tapThrottled;
2481            if (connection->supportsAck()) {
2482                ret = ENGINE_TMPFAIL;
2483            } else {
2484                ret = ENGINE_DISCONNECT;
2485                LOG(EXTENSION_LOG_WARNING, "%s Can't throttle streams without "
2486                    "ack support. Force disconnect...\n",
2487                    connection->logHeader());
2488            }
2489            return ret;
2490        }
2491    }
2492
2493    switch (tap_event) {
2494    case TAP_ACK:
2495        ret = processTapAck(cookie, tap_seqno, tap_flags, k);
2496        break;
2497    case TAP_FLUSH:
2498        ret = flush(cookie, 0);
2499        LOG(EXTENSION_LOG_WARNING, "%s Received flush.\n",
2500            connection->logHeader());
2501        break;
2502    case TAP_DELETION:
2503        {
2504            uint64_t revSeqno;
2505            TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2506                                                nengine, &revSeqno);
2507
2508            ret = connection->deletion(0, key, nkey, cas, vbucket, 0, revSeqno,
2509                                       NULL, 0);
2510        }
2511        break;
2512
2513    case TAP_CHECKPOINT_START:
2514    case TAP_CHECKPOINT_END:
2515        {
2516            TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2517            if (tc) {
2518                if (tap_event == TAP_CHECKPOINT_START &&
2519                    nengine == TapEngineSpecific::sizeRevSeqno) {
2520                    // Set the current value for the max deleted seqno
2521                    RCPtr<VBucket> vb = getVBucket(vbucket);
2522                    if (!vb) {
2523                        return ENGINE_TMPFAIL;
2524                    }
2525                    uint64_t seqnum;
2526                    TapEngineSpecific::readSpecificData(tap_event,
2527                                                        engine_specific,
2528                                                        nengine,
2529                                                        &seqnum);
2530                    vb->ht.setMaxDeletedRevSeqno(seqnum);
2531                }
2532
2533                if (data) {
2534                    uint64_t checkpointId;
2535                    memcpy(&checkpointId, data, sizeof(checkpointId));
2536                    checkpointId = ntohll(checkpointId);
2537                    ConnHandlerCheckPoint(tc, tap_event, vbucket,
2538                                          checkpointId);
2539                }
2540                else {
2541                    ret = ENGINE_DISCONNECT;
2542                    LOG(EXTENSION_LOG_WARNING,
2543                        "%s Checkpoint Id is missing in "
2544                        "CHECKPOINT messages. Force disconnect...\n",
2545                        connection->logHeader());
2546                }
2547            }
2548            else {
2549                ret = ENGINE_DISCONNECT;
2550                LOG(EXTENSION_LOG_WARNING,
2551                    "%s not a consumer! Force disconnect\n",
2552                    connection->logHeader());
2553            }
2554        }
2555
2556        break;
2557
2558    case TAP_MUTATION:
2559        {
2560            uint8_t nru = INITIAL_NRU_VALUE;
2561            uint64_t revSeqno = 0;
2562            TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2563                                                nengine, &revSeqno, &nru);
2564
2565            ret = connection->mutation(0, key, nkey, data, ndata, cas, vbucket,
2566                                       flags, 0, 0, 0, revSeqno, exptime, nru,
2567                                       NULL, 0);
2568        }
2569
2570        break;
2571
2572    case TAP_OPAQUE:
2573        if (nengine == sizeof(uint32_t)) {
2574            uint32_t cc;
2575            memcpy(&cc, engine_specific, sizeof(cc));
2576            cc = ntohl(cc);
2577
2578            switch (cc) {
2579            case TAP_OPAQUE_ENABLE_AUTO_NACK:
2580                // @todo: the memcached core will _ALWAYS_ send nack
2581                //        if it encounter an error. This should be
2582                // set as the default when we move to .next after 2.0
2583                // (currently we need to allow the message for
2584                // backwards compatibility)
2585                LOG(EXTENSION_LOG_INFO, "%s Enable auto nack mode\n",
2586                    connection->logHeader());
2587                break;
2588            case TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC:
2589                connection->setSupportCheckpointSync(true);
2590                LOG(EXTENSION_LOG_INFO,
2591                    "%s Enable checkpoint synchronization\n",
2592                    connection->logHeader());
2593                break;
2594            case TAP_OPAQUE_OPEN_CHECKPOINT:
2595                /**
2596                 * This event is only received by the TAP client that wants to
2597                 * get mutations from closed checkpoints only. At this time,
2598                 * only incremental backup client receives this event so that
2599                 * it can close the connection and reconnect later.
2600                 */
2601                LOG(EXTENSION_LOG_INFO, "%s Beginning of checkpoint.\n",
2602                    connection->logHeader());
2603                break;
2604            case TAP_OPAQUE_INITIAL_VBUCKET_STREAM:
2605                {
2606                    LOG(EXTENSION_LOG_INFO,
2607                        "%s Backfill started for vbucket %d.\n",
2608                        connection->logHeader(), vbucket);
2609                    BlockTimer timer(&stats.tapVbucketResetHisto);
2610                    ret = resetVBucket(vbucket) ? ENGINE_SUCCESS :
2611                                                  ENGINE_DISCONNECT;
2612                    if (ret == ENGINE_DISCONNECT) {
2613                        LOG(EXTENSION_LOG_WARNING,
2614                         "%s Failed to reset a vbucket %d. Force disconnect\n",
2615                            connection->logHeader(), vbucket);
2616                    } else {
2617                        LOG(EXTENSION_LOG_WARNING,
2618                         "%s Reset vbucket %d was completed succecssfully.\n",
2619                            connection->logHeader(), vbucket);
2620                    }
2621
2622                    TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2623                    if (tc) {
2624                        tc->setBackfillPhase(true, vbucket);
2625                    } else {
2626                        ret = ENGINE_DISCONNECT;
2627                        LOG(EXTENSION_LOG_WARNING,
2628                            "TAP consumer doesn't exists. Force disconnect\n");
2629                    }
2630                }
2631                break;
2632            case TAP_OPAQUE_CLOSE_BACKFILL:
2633                {
2634                    LOG(EXTENSION_LOG_INFO, "%s Backfill finished.\n",
2635                        connection->logHeader());
2636                    TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2637                    if (tc) {
2638                        tc->setBackfillPhase(false, vbucket);
2639                    } else {
2640                        ret = ENGINE_DISCONNECT;
2641                        LOG(EXTENSION_LOG_WARNING,
2642                            "%s not a consumer! Force disconnect\n",
2643                            connection->logHeader());
2644                    }
2645                }
2646                break;
2647            case TAP_OPAQUE_CLOSE_TAP_STREAM:
2648                /**
2649                 * This event is sent by the eVBucketMigrator to notify that
2650                 * the source node closes the tap replication stream and
2651                 * switches to TAKEOVER_VBUCKETS phase.
2652                 * This is just an informative message and doesn't require any
2653                 * action.
2654                 */
2655                LOG(EXTENSION_LOG_INFO,
2656                "%s Received close tap stream. Switching to takeover phase.\n",
2657                    connection->logHeader());
2658                break;
2659            case TAP_OPAQUE_COMPLETE_VB_FILTER_CHANGE:
2660                /**
2661                 * This opaque message is just for notifying that the source
2662                 * node receives change_vbucket_filter request and processes
2663                 * it successfully.
2664                 */
2665                LOG(EXTENSION_LOG_INFO,
2666                "%s Notified that the source node changed a vbucket filter.\n",
2667                    connection->logHeader());
2668                break;
2669            default:
2670                LOG(EXTENSION_LOG_WARNING,
2671                    "%s Received an unknown opaque command\n",
2672                    connection->logHeader());
2673            }
2674        } else {
2675            LOG(EXTENSION_LOG_WARNING,
2676                "%s Received tap opaque with unknown size %d\n",
2677                connection->logHeader(), nengine);
2678        }
2679        break;
2680
2681    case TAP_VBUCKET_SET:
2682        {
2683            BlockTimer timer(&stats.tapVbucketSetHisto);
2684
2685            if (nengine != sizeof(vbucket_state_t)) {
2686                // illegal datasize
2687                LOG(EXTENSION_LOG_WARNING,
2688                    "%s Received TAP_VBUCKET_SET with illegal size."
2689                    " Force disconnect\n", connection->logHeader());
2690                ret = ENGINE_DISCONNECT;
2691                break;
2692            }
2693
2694            vbucket_state_t state;
2695            memcpy(&state, engine_specific, nengine);
2696            state = (vbucket_state_t)ntohl(state);
2697
2698            ret = connection->setVBucketState(0, vbucket, state);
2699        }
2700        break;
2701
2702    default:
2703        // Unknown command
2704        LOG(EXTENSION_LOG_WARNING,
2705            "%s Recieved bad opcode, ignoring message\n",
2706            connection->logHeader());
2707    }
2708
2709    connection->processedEvent(tap_event, ret);
2710    return ret;
2711}
2712
2713ENGINE_ERROR_CODE EventuallyPersistentEngine::ConnHandlerCheckPoint(
2714                                                      TapConsumer *consumer,
2715                                                      uint8_t event,
2716                                                      uint16_t vbucket,
2717                                                      uint64_t checkpointId) {
2718    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2719
2720    if (consumer->processCheckpointCommand(event, vbucket, checkpointId)) {
2721        getEpStore()->wakeUpFlusher();
2722        ret = ENGINE_SUCCESS;
2723    }
2724    else {
2725        ret = ENGINE_DISCONNECT;
2726        LOG(EXTENSION_LOG_WARNING, "%s Error processing "
2727            "checkpoint %llu. Force disconnect\n",
2728            consumer->logHeader(), checkpointId);
2729    }
2730
2731    return ret;
2732}
2733
2734TapProducer* EventuallyPersistentEngine::getTapProducer(const void *cookie) {
2735    TapProducer *rv =
2736        reinterpret_cast<TapProducer*>(getEngineSpecific(cookie));
2737    if (!(rv && rv->isConnected())) {
2738        LOG(EXTENSION_LOG_WARNING,
2739            "Walking a non-existent tap queue, disconnecting\n");
2740        return NULL;
2741    }
2742
2743    if (rv->doDisconnect()) {
2744        LOG(EXTENSION_LOG_WARNING,
2745            "%s Disconnecting pending connection\n", rv->logHeader());
2746        return NULL;
2747    }
2748    return rv;
2749}
2750
2751ENGINE_ERROR_CODE EventuallyPersistentEngine::processTapAck(const void *cookie,
2752                                                            uint32_t seqno,
2753                                                            uint16_t status,
2754                                                            const std::string
2755                                                            &msg)
2756{
2757    TapProducer *connection = getTapProducer(cookie);
2758    if (!connection) {
2759        LOG(EXTENSION_LOG_WARNING,
2760            "Unable to process tap ack. No producer found\n");
2761        return ENGINE_DISCONNECT;
2762    }
2763
2764    return connection->processAck(seqno, status, msg);
2765}
2766
2767void EventuallyPersistentEngine::queueBackfill(const VBucketFilter
2768                                                             &backfillVBFilter,
2769                                               Producer *tc)
2770{
2771    ExTask backfillTask = new BackfillTask(this, *tapConnMap, tc,
2772                                           backfillVBFilter);
2773    ExecutorPool::get()->schedule(backfillTask, NONIO_TASK_IDX);
2774}
2775
2776bool VBucketCountVisitor::visitBucket(RCPtr<VBucket> &vb) {
2777    ++numVbucket;
2778    item_eviction_policy_t policy = engine.getEpStore()->
2779                                                       getItemEvictionPolicy();
2780    numItems += vb->getNumItems(policy);
2781    numTempItems += vb->getNumTempItems();
2782    nonResident += vb->getNumNonResidentItems(policy);
2783
2784    if (vb->getHighPriorityChkSize() > 0) {
2785        chkPersistRemaining++;
2786    }
2787
2788    fileSpaceUsed += vb->fileSpaceUsed;
2789    fileSize += vb->fileSize;
2790
2791    if (desired_state != vbucket_state_dead) {
2792        htMemory += vb->ht.memorySize();
2793        htItemMemory += vb->ht.getItemMemory();
2794        htCacheSize += vb->ht.cacheSize;
2795        numEjects += vb->ht.getNumEjects();
2796        numExpiredItems += vb->numExpiredItems;
2797        metaDataMemory += vb->ht.metaDataMemory;
2798        opsCreate += vb->opsCreate;
2799        opsUpdate += vb->opsUpdate;
2800        opsDelete += vb->opsDelete;
2801        opsReject += vb->opsReject;
2802
2803        queueSize += vb->dirtyQueueSize;
2804        queueMemory += vb->dirtyQueueMem;
2805        queueFill += vb->dirtyQueueFill;
2806        queueDrain += vb->dirtyQueueDrain;
2807        queueAge += vb->getQueueAge();
2808        pendingWrites += vb->dirtyQueuePendingWrites;
2809    }
2810
2811    return false;
2812}
2813
2814/**
2815 * A container class holding VBucketCountVisitors to aggregate stats for
2816 * different vbucket states.
2817 */
2818class VBucketCountAggregator : public VBucketVisitor  {
2819public:
2820    bool visitBucket(RCPtr<VBucket> &vb)  {
2821        std::map<vbucket_state_t, VBucketCountVisitor*>::iterator it;
2822        it = visitorMap.find(vb->getState());
2823        if ( it != visitorMap.end() ) {
2824            it->second->visitBucket(vb);
2825        }
2826
2827        return false;
2828    }
2829
2830    void addVisitor(VBucketCountVisitor* visitor)  {
2831        visitorMap[visitor->getVBucketState()] = visitor;
2832    }
2833private:
2834    std::map<vbucket_state_t, VBucketCountVisitor*> visitorMap;
2835};
2836
2837ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
2838                                                           ADD_STAT add_stat) {
2839    VBucketCountAggregator aggregator;
2840
2841    VBucketCountVisitor activeCountVisitor(*this, vbucket_state_active);
2842    aggregator.addVisitor(&activeCountVisitor);
2843
2844    VBucketCountVisitor replicaCountVisitor(*this, vbucket_state_replica);
2845    aggregator.addVisitor(&replicaCountVisitor);
2846
2847    VBucketCountVisitor pendingCountVisitor(*this, vbucket_state_pending);
2848    aggregator.addVisitor(&pendingCountVisitor);
2849
2850    VBucketCountVisitor deadCountVisitor(*this, vbucket_state_dead);
2851    aggregator.addVisitor(&deadCountVisitor);
2852
2853    epstore->visit(aggregator);
2854
2855    epstore->updateCachedResidentRatio(activeCountVisitor.getMemResidentPer(),
2856                                      replicaCountVisitor.getMemResidentPer());
2857    tapThrottle->adjustWriteQueueCap(activeCountVisitor.getNumItems() +
2858                                     replicaCountVisitor.getNumItems() +
2859                                     pendingCountVisitor.getNumItems());
2860
2861    configuration.addStats(add_stat, cookie);
2862
2863    EPStats &epstats = getEpStats();
2864    add_casted_stat("ep_version", VERSION, add_stat, cookie);
2865    add_casted_stat("ep_storage_age",
2866                    epstats.dirtyAge, add_stat, cookie);
2867    add_casted_stat("ep_storage_age_highwat",
2868                    epstats.dirtyAgeHighWat, add_stat, cookie);
2869    add_casted_stat("ep_num_workers", ExecutorPool::get()->getNumWorkersStat(),
2870                    add_stat, cookie);
2871
2872    if (getWorkloadPriority() == HIGH_BUCKET_PRIORITY) {
2873        add_casted_stat("ep_bucket_priority", "HIGH", add_stat, cookie);
2874    } else if (getWorkloadPriority() == LOW_BUCKET_PRIORITY) {
2875        add_casted_stat("ep_bucket_priority", "LOW", add_stat, cookie);
2876    }
2877
2878    add_casted_stat("ep_total_enqueued",
2879                    epstats.totalEnqueued, add_stat, cookie);
2880    add_casted_stat("ep_total_persisted",
2881                    epstats.totalPersisted, add_stat, cookie);
2882    add_casted_stat("ep_item_flush_failed",
2883                    epstats.flushFailed, add_stat, cookie);
2884    add_casted_stat("ep_item_commit_failed",
2885                    epstats.commitFailed, add_stat, cookie);
2886    add_casted_stat("ep_item_begin_failed",
2887                    epstats.beginFailed, add_stat, cookie);
2888    add_casted_stat("ep_expired_access", epstats.expired_access,
2889                    add_stat, cookie);
2890    add_casted_stat("ep_expired_pager", epstats.expired_pager,
2891                    add_stat, cookie);
2892    add_casted_stat("ep_item_flush_expired",
2893                    epstats.flushExpired, add_stat, cookie);
2894    add_casted_stat("ep_queue_size",
2895                    epstats.diskQueueSize, add_stat, cookie);
2896    add_casted_stat("ep_flusher_todo",
2897                    epstats.flusher_todo, add_stat, cookie);
2898    add_casted_stat("ep_uncommitted_items",
2899                    epstats.flusher_todo, add_stat, cookie);
2900    add_casted_stat("ep_diskqueue_items",
2901                    epstats.diskQueueSize, add_stat, cookie);
2902    add_casted_stat("ep_flusher_state",
2903                    epstore->getFlusher(0)->stateName(),
2904                    add_stat, cookie);
2905    add_casted_stat("ep_commit_num", epstats.flusherCommits,
2906                    add_stat, cookie);
2907    add_casted_stat("ep_commit_time",
2908                    epstats.commit_time, add_stat, cookie);
2909    add_casted_stat("ep_commit_time_total",
2910                    epstats.cumulativeCommitTime, add_stat, cookie);
2911    add_casted_stat("ep_vbucket_del",
2912                    epstats.vbucketDeletions, add_stat, cookie);
2913    add_casted_stat("ep_vbucket_del_fail",
2914                    epstats.vbucketDeletionFail, add_stat, cookie);
2915    add_casted_stat("ep_flush_duration_total",
2916                    epstats.cumulativeFlushTime, add_stat, cookie);
2917    add_casted_stat("ep_flush_all",
2918                    epstore->isFlushAllScheduled() ? "true" : "false",
2919                    add_stat, cookie);
2920    add_casted_stat("curr_items", activeCountVisitor.getNumItems(), add_stat,
2921                    cookie);
2922    add_casted_stat("curr_temp_items", activeCountVisitor.getNumTempItems(),
2923                    add_stat, cookie);
2924    add_casted_stat("curr_items_tot",
2925                    activeCountVisitor.getNumItems() +
2926                    replicaCountVisitor.getNumItems() +
2927                    pendingCountVisitor.getNumItems(),
2928                    add_stat, cookie);
2929    add_casted_stat("vb_active_num", activeCountVisitor.getVBucketNumber(),
2930                    add_stat, cookie);
2931    add_casted_stat("vb_active_curr_items", activeCountVisitor.getNumItems(),
2932                    add_stat, cookie);
2933    add_casted_stat("vb_active_num_non_resident",
2934                    activeCountVisitor.getNonResident(),
2935                    add_stat, cookie);
2936    add_casted_stat("vb_active_perc_mem_resident",
2937                    activeCountVisitor.getMemResidentPer(),
2938                    add_stat, cookie);
2939    add_casted_stat("vb_active_eject", activeCountVisitor.getEjects(),
2940                    add_stat, cookie);
2941    add_casted_stat("vb_active_expired", activeCountVisitor.getExpired(),
2942                    add_stat, cookie);
2943    add_casted_stat("vb_active_meta_data_memory",
2944                    activeCountVisitor.getMetaDataMemory(),
2945                    add_stat, cookie);
2946    add_casted_stat("vb_active_ht_memory",
2947                    activeCountVisitor.getHashtableMemory(),
2948                    add_stat, cookie);
2949    add_casted_stat("vb_active_itm_memory", activeCountVisitor.getItemMemory(),
2950                    add_stat, cookie);
2951    add_casted_stat("vb_active_ops_create", activeCountVisitor.getOpsCreate(),
2952                    add_stat, cookie);
2953    add_casted_stat("vb_active_ops_update", activeCountVisitor.getOpsUpdate(),
2954                    add_stat, cookie);
2955    add_casted_stat("vb_active_ops_delete", activeCountVisitor.getOpsDelete(),
2956                    add_stat, cookie);
2957    add_casted_stat("vb_active_ops_reject", activeCountVisitor.getOpsReject(),
2958                    add_stat, cookie);
2959    add_casted_stat("vb_active_queue_size", activeCountVisitor.getQueueSize(),
2960                    add_stat, cookie);
2961    add_casted_stat("vb_active_queue_memory",
2962                    activeCountVisitor.getQueueMemory(), add_stat, cookie);
2963    add_casted_stat("vb_active_queue_age", activeCountVisitor.getAge(),
2964                    add_stat, cookie);
2965    add_casted_stat("vb_active_queue_pending",
2966                    activeCountVisitor.getPendingWrites(), add_stat, cookie);
2967    add_casted_stat("vb_active_queue_fill", activeCountVisitor.getQueueFill(),
2968                    add_stat, cookie);
2969    add_casted_stat("vb_active_queue_drain",
2970                    activeCountVisitor.getQueueDrain(), add_stat, cookie);
2971
2972    add_casted_stat("vb_replica_num", replicaCountVisitor.getVBucketNumber(),
2973                    add_stat, cookie);
2974    add_casted_stat("vb_replica_curr_items", replicaCountVisitor.getNumItems(),
2975                    add_stat, cookie);
2976    add_casted_stat("vb_replica_num_non_resident",
2977                    replicaCountVisitor.getNonResident(), add_stat, cookie);
2978    add_casted_stat("vb_replica_perc_mem_resident",
2979                    replicaCountVisitor.getMemResidentPer(),
2980                    add_stat, cookie);
2981    add_casted_stat("vb_replica_eject", replicaCountVisitor.getEjects(),
2982                    add_stat, cookie);
2983    add_casted_stat("vb_replica_expired", replicaCountVisitor.getExpired(),
2984                    add_stat, cookie);
2985    add_casted_stat("vb_replica_meta_data_memory",
2986                    replicaCountVisitor.getMetaDataMemory(), add_stat, cookie);
2987    add_casted_stat("vb_replica_ht_memory",
2988                    replicaCountVisitor.getHashtableMemory(),
2989                    add_stat, cookie);
2990    add_casted_stat("vb_replica_itm_memory",
2991                    replicaCountVisitor.getItemMemory(), add_stat, cookie);
2992    add_casted_stat("vb_replica_ops_create",
2993                    replicaCountVisitor.getOpsCreate(), add_stat, cookie);
2994    add_casted_stat("vb_replica_ops_update",
2995                    replicaCountVisitor.getOpsUpdate(), add_stat, cookie);
2996    add_casted_stat("vb_replica_ops_delete",
2997                    replicaCountVisitor.getOpsDelete(), add_stat, cookie);
2998    add_casted_stat("vb_replica_ops_reject",
2999                    replicaCountVisitor.getOpsReject(), add_stat, cookie);
3000    add_casted_stat("vb_replica_queue_size",
3001                    replicaCountVisitor.getQueueSize(), add_stat, cookie);
3002    add_casted_stat("vb_replica_queue_memory",
3003                    replicaCountVisitor.getQueueMemory(),
3004                    add_stat, cookie);
3005    add_casted_stat("vb_replica_queue_age",
3006                    replicaCountVisitor.getAge(), add_stat, cookie);
3007    add_casted_stat("vb_replica_queue_pending",
3008                    replicaCountVisitor.getPendingWrites(),
3009                    add_stat, cookie);
3010    add_casted_stat("vb_replica_queue_fill",
3011                    replicaCountVisitor.getQueueFill(), add_stat, cookie);
3012    add_casted_stat("vb_replica_queue_drain",
3013                    replicaCountVisitor.getQueueDrain(), add_stat, cookie);
3014
3015    add_casted_stat("vb_pending_num",
3016                    pendingCountVisitor.getVBucketNumber(), add_stat, cookie);
3017    add_casted_stat("vb_pending_curr_items",
3018                    pendingCountVisitor.getNumItems(), add_stat, cookie);
3019    add_casted_stat("vb_pending_num_non_resident",
3020                    pendingCountVisitor.getNonResident(),
3021                    add_stat, cookie);
3022    add_casted_stat("vb_pending_perc_mem_resident",
3023                    pendingCountVisitor.getMemResidentPer(), add_stat, cookie);
3024    add_casted_stat("vb_pending_eject", pendingCountVisitor.getEjects(),
3025                    add_stat, cookie);
3026    add_casted_stat("vb_pending_expired", pendingCountVisitor.getExpired(),
3027                    add_stat, cookie);
3028    add_casted_stat("vb_pending_meta_data_memory",
3029                    pendingCountVisitor.getMetaDataMemory(),
3030                    add_stat, cookie);
3031    add_casted_stat("vb_pending_ht_memory",
3032                    pendingCountVisitor.getHashtableMemory(),
3033                    add_stat, cookie);
3034    add_casted_stat("vb_pending_itm_memory",
3035                    pendingCountVisitor.getItemMemory(), add_stat, cookie);
3036    add_casted_stat("vb_pending_ops_create",
3037                    pendingCountVisitor.getOpsCreate(), add_stat, cookie);
3038    add_casted_stat("vb_pending_ops_update",
3039                    pendingCountVisitor.getOpsUpdate(), add_stat, cookie);
3040    add_casted_stat("vb_pending_ops_delete",
3041                    pendingCountVisitor.getOpsDelete(), add_stat, cookie);
3042    add_casted_stat("vb_pending_ops_reject",
3043                    pendingCountVisitor.getOpsReject(), add_stat, cookie);
3044    add_casted_stat("vb_pending_queue_size",
3045                    pendingCountVisitor.getQueueSize(), add_stat, cookie);
3046    add_casted_stat("vb_pending_queue_memory",
3047                    pendingCountVisitor.getQueueMemory(),
3048                    add_stat, cookie);
3049    add_casted_stat("vb_pending_queue_age", pendingCountVisitor.getAge(),
3050                    add_stat, cookie);
3051    add_casted_stat("vb_pending_queue_pending",
3052                    pendingCountVisitor.getPendingWrites(),
3053                    add_stat, cookie);
3054    add_casted_stat("vb_pending_queue_fill",
3055                    pendingCountVisitor.getQueueFill(), add_stat, cookie);
3056    add_casted_stat("vb_pending_queue_drain",
3057                    pendingCountVisitor.getQueueDrain(), add_stat, cookie);
3058
3059    add_casted_stat("vb_dead_num", deadCountVisitor.getVBucketNumber(),
3060                    add_stat, cookie);
3061
3062    add_casted_stat("ep_db_data_size",
3063                    activeCountVisitor.getFileSpaceUsed() +
3064                    replicaCountVisitor.getFileSpaceUsed() +
3065                    pendingCountVisitor.getFileSpaceUsed() +
3066                    deadCountVisitor.getFileSpaceUsed(),
3067                    add_stat, cookie);
3068    add_casted_stat("ep_db_file_size",
3069                    activeCountVisitor.getFileSize() +
3070                    replicaCountVisitor.getFileSize() +
3071                    pendingCountVisitor.getFileSize() +
3072                    deadCountVisitor.getFileSize(),
3073                    add_stat, cookie);
3074
3075    add_casted_stat("ep_vb_snapshot_total",
3076                    epstats.snapshotVbucketHisto.total(), add_stat, cookie);
3077
3078    add_casted_stat("ep_vb_total",
3079                    activeCountVisitor.getVBucketNumber() +
3080                    replicaCountVisitor.getVBucketNumber() +
3081                    pendingCountVisitor.getVBucketNumber() +
3082                    deadCountVisitor.getVBucketNumber(),
3083                    add_stat, cookie);
3084
3085    add_casted_stat("ep_total_new_items",
3086                    activeCountVisitor.getOpsCreate() +
3087                    replicaCountVisitor.getOpsCreate() +
3088                    pendingCountVisitor.getOpsCreate(),
3089                    add_stat, cookie);
3090    add_casted_stat("ep_total_del_items",
3091                    activeCountVisitor.getOpsDelete() +
3092                    replicaCountVisitor.getOpsDelete() +
3093                    pendingCountVisitor.getOpsDelete(),
3094                    add_stat, cookie);
3095    add_casted_stat("ep_diskqueue_memory",
3096                    activeCountVisitor.getQueueMemory() +
3097                    replicaCountVisitor.getQueueMemory() +
3098                    pendingCountVisitor.getQueueMemory(),
3099                    add_stat, cookie);
3100    add_casted_stat("ep_diskqueue_fill",
3101                    activeCountVisitor.getQueueFill() +
3102                    replicaCountVisitor.getQueueFill() +
3103                    pendingCountVisitor.getQueueFill(),
3104                    add_stat, cookie);
3105    add_casted_stat("ep_diskqueue_drain",
3106                    activeCountVisitor.getQueueDrain() +
3107                    replicaCountVisitor.getQueueDrain() +
3108                    pendingCountVisitor.getQueueDrain(),
3109                    add_stat, cookie);
3110    add_casted_stat("ep_diskqueue_pending",
3111                    activeCountVisitor.getPendingWrites() +
3112                    replicaCountVisitor.getPendingWrites() +
3113                    pendingCountVisitor.getPendingWrites(),
3114                    add_stat, cookie);
3115    add_casted_stat("ep_meta_data_memory",
3116                    activeCountVisitor.getMetaDataMemory() +
3117                    replicaCountVisitor.getMetaDataMemory() +
3118                    pendingCountVisitor.getMetaDataMemory(),
3119                    add_stat, cookie);
3120
3121    size_t memUsed =  stats.getTotalMemoryUsed();
3122    add_casted_stat("mem_used", memUsed, add_stat, cookie);
3123    add_casted_stat("bytes", memUsed, add_stat, cookie);
3124    add_casted_stat("ep_kv_size", stats.currentSize, add_stat, cookie);
3125    add_casted_stat("ep_value_size", stats.totalValueSize, add_stat, cookie);
3126    add_casted_stat("ep_overhead", stats.memOverhead, add_stat, cookie);
3127    add_casted_stat("ep_total_cache_size",
3128                    activeCountVisitor.getCacheSize() +
3129                    replicaCountVisitor.getCacheSize() +
3130                    pendingCountVisitor.getCacheSize(),
3131                    add_stat, cookie);
3132    add_casted_stat("ep_oom_errors", stats.oom_errors, add_stat, cookie);
3133    add_casted_stat("ep_tmp_oom_errors", stats.tmp_oom_errors,
3134                    add_stat, cookie);
3135    add_casted_stat("ep_mem_tracker_enabled",
3136                    stats.memoryTrackerEnabled ? "true" : "false",
3137                    add_stat, cookie);
3138    add_casted_stat("ep_bg_fetched", epstats.bg_fetched,
3139                    add_stat, cookie);
3140    add_casted_stat("ep_bg_meta_fetched", epstats.bg_meta_fetched,
3141                    add_stat, cookie);
3142    add_casted_stat("ep_bg_remaining_jobs", epstats.numRemainingBgJobs,
3143                    add_stat, cookie);
3144    add_casted_stat("ep_max_bg_remaining_jobs", epstats.maxRemainingBgJobs,
3145                    add_stat, cookie);
3146    add_casted_stat("ep_tap_bg_fetched", stats.numTapBGFetched,
3147                    add_stat, cookie);
3148    add_casted_stat("ep_tap_bg_fetch_requeued", stats.numTapBGFetchRequeued,
3149                    add_stat, cookie);
3150    add_casted_stat("ep_num_pager_runs", epstats.pagerRuns,
3151                    add_stat, cookie);
3152    add_casted_stat("ep_num_expiry_pager_runs", epstats.expiryPagerRuns,
3153                    add_stat, cookie);
3154    add_casted_stat("ep_items_rm_from_checkpoints",
3155                    epstats.itemsRemovedFromCheckpoints,
3156                    add_stat, cookie);
3157    add_casted_stat("ep_num_value_ejects", epstats.numValueEjects,
3158                    add_stat, cookie);
3159    add_casted_stat("ep_num_eject_failures", epstats.numFailedEjects,
3160                    add_stat, cookie);
3161    add_casted_stat("ep_num_not_my_vbuckets", epstats.numNotMyVBuckets,
3162                    add_stat, cookie);
3163
3164    add_casted_stat("ep_io_num_read", epstats.io_num_read,
3165                    add_stat, cookie);
3166    add_casted_stat("ep_io_num_write", epstats.io_num_write, add_stat, cookie);
3167    add_casted_stat("ep_io_read_bytes", epstats.io_read_bytes,
3168                    add_stat, cookie);
3169    add_casted_stat("ep_io_write_bytes", epstats.io_write_bytes,
3170                     add_stat, cookie);
3171
3172    add_casted_stat("ep_pending_ops", epstats.pendingOps, add_stat, cookie);
3173    add_casted_stat("ep_pending_ops_total", epstats.pendingOpsTotal,
3174                    add_stat, cookie);
3175    add_casted_stat("ep_pending_ops_max", epstats.pendingOpsMax,
3176                    add_stat, cookie);
3177    add_casted_stat("ep_pending_ops_max_duration",
3178                    epstats.pendingOpsMaxDuration,
3179                    add_stat, cookie);
3180
3181    add_casted_stat("ep_pending_compactions", epstats.pendingCompactions,
3182                    add_stat, cookie);
3183    add_casted_stat("ep_rollback_count", epstats.rollbackCount,
3184                    add_stat, cookie);
3185
3186    size_t vbDeletions = epstats.vbucketDeletions.load();
3187    if (vbDeletions > 0) {
3188        add_casted_stat("ep_vbucket_del_max_walltime",
3189                        epstats.vbucketDelMaxWalltime,
3190                        add_stat, cookie);
3191        add_casted_stat("ep_vbucket_del_avg_walltime",
3192                        epstats.vbucketDelTotWalltime / vbDeletions,
3193                        add_stat, cookie);
3194    }
3195
3196    size_t numBgOps = epstats.bgNumOperations.load();
3197    if (numBgOps > 0) {
3198        add_casted_stat("ep_bg_num_samples", epstats.bgNumOperations,
3199                        add_stat, cookie);
3200        add_casted_stat("ep_bg_min_wait",
3201                        epstats.bgMinWait,
3202                        add_stat, cookie);
3203        add_casted_stat("ep_bg_max_wait",
3204                        epstats.bgMaxWait,
3205                        add_stat, cookie);
3206        add_casted_stat("ep_bg_wait_avg",
3207                        epstats.bgWait / numBgOps,
3208                        add_stat, cookie);
3209        add_casted_stat("ep_bg_min_load",
3210                        epstats.bgMinLoad,
3211                        add_stat, cookie);
3212        add_casted_stat("ep_bg_max_load",
3213                        epstats.bgMaxLoad,
3214                        add_stat, cookie);
3215        add_casted_stat("ep_bg_load_avg",
3216                        epstats.bgLoad / numBgOps,
3217                        add_stat, cookie);
3218        add_casted_stat("ep_bg_wait",
3219                        epstats.bgWait,
3220                        add_stat, cookie);
3221        add_casted_stat("ep_bg_load",
3222                        epstats.bgLoad,
3223                        add_stat, cookie);
3224    }
3225
3226    add_casted_stat("ep_num_non_resident",
3227                    activeCountVisitor.getNonResident() +
3228                    pendingCountVisitor.getNonResident() +
3229                    replicaCountVisitor.getNonResident(),
3230                    add_stat, cookie);
3231
3232    add_casted_stat("ep_degraded_mode", isDegradedMode(), add_stat, cookie);
3233    add_casted_stat("ep_exp_pager_stime", epstore->getExpiryPagerSleeptime(),
3234                    add_stat, cookie);
3235
3236    add_casted_stat("ep_mlog_compactor_runs", epstats.mlogCompactorRuns,
3237                    add_stat, cookie);
3238    add_casted_stat("ep_num_access_scanner_runs", epstats.alogRuns,
3239                    add_stat, cookie);
3240    add_casted_stat("ep_access_scanner_last_runtime",