xref: /5.5.2/kv_engine/engines/ep/src/ep_engine.cc (revision bb60376f)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2017 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include "ep_engine.h"
21#include "kv_bucket.h"
22
23#include "checkpoint.h"
24#include "collections/manager.h"
25#include "common.h"
26#include "connmap.h"
27#include "dcp/consumer.h"
28#include "dcp/dcpconnmap.h"
29#include "dcp/flow-control-manager.h"
30#include "dcp/producer.h"
31#include "ep_bucket.h"
32#include "ep_vb.h"
33#include "ephemeral_bucket.h"
34#include "failover-table.h"
35#include "flusher.h"
36#include "htresizer.h"
37#include "logger.h"
38#include "memory_tracker.h"
39#include "replicationthrottle.h"
40#include "stats-info.h"
41#include "statwriter.h"
42#include "string_utils.h"
43#include "vb_count_visitor.h"
44#include "warmup.h"
45
46#include <JSON_checker.h>
47#include <cJSON_utils.h>
48#include <memcached/engine.h>
49#include <memcached/extension.h>
50#include <memcached/protocol_binary.h>
51#include <memcached/server_api.h>
52#include <memcached/util.h>
53#include <platform/cb_malloc.h>
54#include <platform/checked_snprintf.h>
55#include <platform/compress.h>
56#include <platform/make_unique.h>
57#include <platform/platform.h>
58#include <platform/processclock.h>
59#include <platform/scope_timer.h>
60#include <tracing/trace_helpers.h>
61#include <utilities/logtags.h>
62#include <xattr/utils.h>
63
64#include <cstdio>
65#include <cstring>
66#include <fcntl.h>
67#include <fstream>
68#include <iostream>
69#include <limits>
70#include <mutex>
71#include <stdarg.h>
72#include <string>
73#include <vector>
74
75using cb::tracing::TraceCode;
76
77static size_t percentOf(size_t val, double percent) {
78    return static_cast<size_t>(static_cast<double>(val) * percent);
79}
80
81struct EPHandleReleaser {
82    void operator()(EventuallyPersistentEngine*) {
83        ObjectRegistry::onSwitchThread(nullptr);
84    }
85};
86
87using EPHandle = std::unique_ptr<EventuallyPersistentEngine, EPHandleReleaser>;
88
89/**
90 * Helper function to acquire a handle to the engine which allows access to
91 * the engine while the handle is in scope.
92 * @param handle pointer to the engine
93 * @return EPHandle which is a unique_ptr to an EventuallyPersistentEngine
94 * with a custom deleter (EPHandleReleaser) which performs the required
95 * ObjectRegistry release.
96 */
97
98static inline EPHandle acquireEngine(ENGINE_HANDLE* handle) {
99    auto ret = reinterpret_cast<EventuallyPersistentEngine*>(handle);
100    ObjectRegistry::onSwitchThread(ret);
101
102    return EPHandle(ret);
103}
104
105/**
106 * Call the response callback and return the appropriate value so that
107 * the core knows what to do..
108 */
109static ENGINE_ERROR_CODE sendResponse(ADD_RESPONSE response, const void *key,
110                                      uint16_t keylen,
111                                      const void *ext, uint8_t extlen,
112                                      const void *body, uint32_t bodylen,
113                                      uint8_t datatype, uint16_t status,
114                                      uint64_t cas, const void *cookie)
115{
116    ENGINE_ERROR_CODE rv = ENGINE_FAILED;
117    EventuallyPersistentEngine *e = ObjectRegistry::onSwitchThread(NULL, true);
118    if (response(key, keylen, ext, extlen, body, bodylen, datatype,
119                 status, cas, cookie)) {
120        rv = ENGINE_SUCCESS;
121    }
122    ObjectRegistry::onSwitchThread(e);
123    return rv;
124}
125
126template <typename T>
127static void validate(T v, T l, T h) {
128    if (v < l || v > h) {
129        throw std::runtime_error("Value out of range.");
130    }
131}
132
133
134static void checkNumeric(const char* str) {
135    int i = 0;
136    if (str[0] == '-') {
137        i++;
138    }
139    for (; str[i]; i++) {
140        using namespace std;
141        if (!isdigit(str[i])) {
142            throw std::runtime_error("Value is not numeric");
143        }
144    }
145}
146
147static ENGINE_ERROR_CODE EvpInitialize(gsl::not_null<ENGINE_HANDLE*> handle,
148                                       const char* config_str) {
149    return acquireEngine(handle)->initialize(config_str);
150}
151
152static void EvpDestroy(gsl::not_null<ENGINE_HANDLE*> handle, const bool force) {
153    auto eng = acquireEngine(handle);
154    eng->destroy(force);
155    delete eng.get();
156}
157
158static cb::EngineErrorItemPair EvpItemAllocate(
159        gsl::not_null<ENGINE_HANDLE*> handle,
160        gsl::not_null<const void*> cookie,
161        const DocKey& key,
162        const size_t nbytes,
163        const int flags,
164        const rel_time_t exptime,
165        uint8_t datatype,
166        uint16_t vbucket) {
167    if (!mcbp::datatype::is_valid(datatype)) {
168        LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
169            " (ItemAllocate)");
170        return cb::makeEngineErrorItemPair(cb::engine_errc::invalid_arguments);
171    }
172
173    item* itm = nullptr;
174    auto ret = acquireEngine(handle)->itemAllocate(&itm,
175                                                   key,
176                                                   nbytes,
177                                                   0, // No privileged bytes
178                                                   flags,
179                                                   exptime,
180                                                   datatype,
181                                                   vbucket);
182    return cb::makeEngineErrorItemPair(cb::engine_errc(ret), itm, handle);
183}
184
185static bool EvpGetItemInfo(gsl::not_null<ENGINE_HANDLE*> handle,
186                           gsl::not_null<const item*> itm,
187                           gsl::not_null<item_info*> itm_info);
188static void EvpItemRelease(gsl::not_null<ENGINE_HANDLE*> handle,
189                           gsl::not_null<item*> itm);
190
191static std::pair<cb::unique_item_ptr, item_info> EvpItemAllocateEx(
192        gsl::not_null<ENGINE_HANDLE*> handle,
193        gsl::not_null<const void*> cookie,
194        const DocKey& key,
195        size_t nbytes,
196        size_t priv_nbytes,
197        int flags,
198        rel_time_t exptime,
199        uint8_t datatype,
200        uint16_t vbucket) {
201    item* it = nullptr;
202    auto err = acquireEngine(handle)->itemAllocate(
203            &it, key, nbytes, priv_nbytes, flags, exptime, datatype, vbucket);
204
205    if (err != ENGINE_SUCCESS) {
206        throw cb::engine_error(cb::engine_errc(err),
207                               "EvpItemAllocateEx: failed to allocate memory");
208    }
209
210    item_info info;
211    if (!EvpGetItemInfo(handle, it, &info)) {
212        EvpItemRelease(handle, it);
213        throw cb::engine_error(cb::engine_errc::failed,
214                               "EvpItemAllocateEx: EvpGetItemInfo failed");
215    }
216
217    return std::make_pair(cb::unique_item_ptr{it, cb::ItemDeleter{handle}},
218                          info);
219}
220
221static ENGINE_ERROR_CODE EvpItemDelete(gsl::not_null<ENGINE_HANDLE*> handle,
222                                       gsl::not_null<const void*> cookie,
223                                       const DocKey& key,
224                                       uint64_t& cas,
225                                       uint16_t vbucket,
226                                       mutation_descr_t& mut_info) {
227    return acquireEngine(handle)->itemDelete(
228            cookie, key, cas, vbucket, nullptr, mut_info);
229}
230
231static void EvpItemRelease(gsl::not_null<ENGINE_HANDLE*> handle,
232                           gsl::not_null<item*> itm) {
233    acquireEngine(handle)->itemRelease(itm);
234}
235
236static cb::EngineErrorItemPair EvpGet(gsl::not_null<ENGINE_HANDLE*> handle,
237                                      gsl::not_null<const void*> cookie,
238                                      const DocKey& key,
239                                      uint16_t vbucket,
240                                      DocStateFilter documentStateFilter) {
241    get_options_t options = static_cast<get_options_t>(QUEUE_BG_FETCH |
242                                                       HONOR_STATES |
243                                                       TRACK_REFERENCE |
244                                                       DELETE_TEMP |
245                                                       HIDE_LOCKED_CAS |
246                                                       TRACK_STATISTICS);
247
248    switch (documentStateFilter) {
249    case DocStateFilter::Alive:
250        break;
251    case DocStateFilter::Deleted:
252        // MB-23640 was caused by this bug as the frontend asked for
253        // Alive and Deleted documents. The internals don't have a
254        // way of requesting just deleted documents, and luckily for
255        // us no part of our code is using this yet. Return an error
256        // if anyone start using it
257        return std::make_pair(
258                cb::engine_errc::not_supported,
259                cb::unique_item_ptr{nullptr, cb::ItemDeleter{handle}});
260    case DocStateFilter::AliveOrDeleted:
261        options = static_cast<get_options_t>(options | GET_DELETED_VALUE);
262        break;
263    }
264
265    item* itm = nullptr;
266    ENGINE_ERROR_CODE ret =
267            acquireEngine(handle)->get(cookie, &itm, key, vbucket, options);
268    return cb::makeEngineErrorItemPair(cb::engine_errc(ret), itm, handle);
269}
270
271static cb::EngineErrorItemPair EvpGetIf(
272        gsl::not_null<ENGINE_HANDLE*> handle,
273        gsl::not_null<const void*> cookie,
274        const DocKey& key,
275        uint16_t vbucket,
276        std::function<bool(const item_info&)> filter) {
277    return acquireEngine(handle)->get_if(cookie, key, vbucket, filter);
278}
279
280static cb::EngineErrorItemPair EvpGetAndTouch(
281        gsl::not_null<ENGINE_HANDLE*> handle,
282        gsl::not_null<const void*> cookie,
283        const DocKey& key,
284        uint16_t vbucket,
285        uint32_t expiry_time) {
286    return acquireEngine(handle)->get_and_touch(cookie, key, vbucket,
287                                                expiry_time);
288}
289
290static cb::EngineErrorItemPair EvpGetLocked(
291        gsl::not_null<ENGINE_HANDLE*> handle,
292        gsl::not_null<const void*> cookie,
293        const DocKey& key,
294        uint16_t vbucket,
295        uint32_t lock_timeout) {
296    item* itm = nullptr;
297    auto ret = acquireEngine(handle)->get_locked(
298            cookie, &itm, key, vbucket, lock_timeout);
299    return cb::makeEngineErrorItemPair(cb::engine_errc(ret), itm, handle);
300}
301
302static size_t EvpGetMaxItemSize(
303        gsl::not_null<ENGINE_HANDLE*> handle) {
304    return acquireEngine(handle)->getMaxItemSize();
305}
306
307static ENGINE_ERROR_CODE EvpUnlock(gsl::not_null<ENGINE_HANDLE*> handle,
308                                   gsl::not_null<const void*> cookie,
309                                   const DocKey& key,
310                                   uint16_t vbucket,
311                                   uint64_t cas) {
312    return acquireEngine(handle)->unlock(cookie, key, vbucket, cas);
313}
314
315static ENGINE_ERROR_CODE EvpGetStats(gsl::not_null<ENGINE_HANDLE*> handle,
316                                     gsl::not_null<const void*> cookie,
317                                     cb::const_char_buffer key,
318                                     ADD_STAT add_stat) {
319    return acquireEngine(handle)->getStats(
320            cookie, key.data(), gsl::narrow_cast<int>(key.size()), add_stat);
321}
322
323static ENGINE_ERROR_CODE EvpStore(gsl::not_null<ENGINE_HANDLE*> handle,
324                                  gsl::not_null<const void*> cookie,
325                                  gsl::not_null<item*> itm,
326                                  uint64_t& cas,
327                                  ENGINE_STORE_OPERATION operation,
328                                  DocumentState document_state) {
329    auto engine = acquireEngine(handle);
330
331    if (document_state == DocumentState::Deleted) {
332        Item* item = static_cast<Item*>(itm.get());
333        item->setDeleted();
334    }
335
336    return engine->store(cookie, itm, cas, operation);
337}
338
339static cb::EngineErrorCasPair EvpStoreIf(gsl::not_null<ENGINE_HANDLE*> handle,
340                                         gsl::not_null<const void*> cookie,
341                                         gsl::not_null<item*> itm,
342                                         uint64_t cas,
343                                         ENGINE_STORE_OPERATION operation,
344                                         cb::StoreIfPredicate predicate,
345                                         DocumentState document_state) {
346    auto engine = acquireEngine(handle);
347
348    Item& item = static_cast<Item&>(*static_cast<Item*>(itm.get()));
349
350    if (document_state == DocumentState::Deleted) {
351        item.setDeleted();
352    }
353    return engine->store_if(cookie, item, cas, operation, predicate);
354}
355
356static ENGINE_ERROR_CODE EvpFlush(gsl::not_null<ENGINE_HANDLE*> handle,
357                                  gsl::not_null<const void*> cookie) {
358    return acquireEngine(handle)->flush(cookie);
359}
360
361static void EvpResetStats(gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie) {
362    acquireEngine(handle)->resetStats();
363}
364
365protocol_binary_response_status EventuallyPersistentEngine::setReplicationParam(
366        const char* keyz, const char* valz, std::string& msg) {
367    protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
368
369    try {
370        if (strcmp(keyz, "replication_throttle_threshold") == 0) {
371            getConfiguration().setReplicationThrottleThreshold(
372                    std::stoull(valz));
373        } else if (strcmp(keyz, "replication_throttle_queue_cap") == 0) {
374            getConfiguration().setReplicationThrottleQueueCap(std::stoll(valz));
375        } else if (strcmp(keyz, "replication_throttle_cap_pcnt") == 0) {
376            getConfiguration().setReplicationThrottleCapPcnt(std::stoull(valz));
377        } else {
378            msg = "Unknown config param";
379            rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
380        }
381        // Handles exceptions thrown by the standard
382        // library stoi/stoul style functions when not numeric
383    } catch (std::invalid_argument&) {
384        msg = "Argument was not numeric";
385        rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
386
387        // Handles exceptions thrown by the standard library stoi/stoul
388        // style functions when the conversion does not fit in the datatype
389    } catch (std::out_of_range&) {
390        msg = "Argument was out of range";
391        rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
392
393        // Handles any miscellaenous exceptions in addition to the range_error
394        // exceptions thrown by the configuration::set<param>() methods
395    } catch (std::exception& error) {
396        msg = error.what();
397        rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
398    }
399
400    return rv;
401}
402
403protocol_binary_response_status EventuallyPersistentEngine::setCheckpointParam(
404        const char* keyz, const char* valz, std::string& msg) {
405    protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
406
407    try {
408        if (strcmp(keyz, "chk_max_items") == 0) {
409            size_t v = std::stoull(valz);
410            validate(v, size_t(MIN_CHECKPOINT_ITEMS),
411                     size_t(MAX_CHECKPOINT_ITEMS));
412            getConfiguration().setChkMaxItems(v);
413        } else if (strcmp(keyz, "chk_period") == 0) {
414            size_t v = std::stoull(valz);
415            validate(v, size_t(MIN_CHECKPOINT_PERIOD),
416                     size_t(MAX_CHECKPOINT_PERIOD));
417            getConfiguration().setChkPeriod(v);
418        } else if (strcmp(keyz, "max_checkpoints") == 0) {
419            size_t v = std::stoull(valz);
420            validate(v, size_t(DEFAULT_MAX_CHECKPOINTS),
421                     size_t(MAX_CHECKPOINTS_UPPER_BOUND));
422            getConfiguration().setMaxCheckpoints(v);
423        } else if (strcmp(keyz, "item_num_based_new_chk") == 0) {
424            getConfiguration().setItemNumBasedNewChk(cb_stob(valz));
425        } else if (strcmp(keyz, "keep_closed_chks") == 0) {
426            getConfiguration().setKeepClosedChks(cb_stob(valz));
427        } else if (strcmp(keyz, "enable_chk_merge") == 0) {
428            getConfiguration().setEnableChkMerge(cb_stob(valz));
429        } else if (strcmp(keyz, "cursor_dropping_checkpoint_mem_upper_mark") ==
430                   0) {
431            size_t v = std::stoull(valz);
432            validate(v,
433                     getConfiguration().getCursorDroppingCheckpointMemLowerMark(),
434                     size_t(100));
435            getConfiguration().setCursorDroppingCheckpointMemUpperMark(v);
436        } else if (strcmp(keyz, "cursor_dropping_checkpoint_mem_lower_mark") ==
437                   0) {
438            size_t v = std::stoull(valz);
439            validate(
440                    v, size_t(0), getConfiguration().getCursorDroppingCheckpointMemUpperMark());
441            getConfiguration().setCursorDroppingCheckpointMemLowerMark(v);
442        } else {
443            msg = "Unknown config param";
444            rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
445        }
446
447        // Handles exceptions thrown by the cb_stob function
448    } catch (invalid_argument_bool& error) {
449        msg = error.what();
450        rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
451
452        // Handles exceptions thrown by the standard
453        // library stoi/stoul style functions when not numeric
454    } catch (std::invalid_argument&) {
455        msg = "Argument was not numeric";
456        rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
457
458        // Handles exceptions thrown by the standard library stoi/stoul
459        // style functions when the conversion does not fit in the datatype
460    } catch (std::out_of_range&) {
461        msg = "Argument was out of range";
462        rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
463
464        // Handles any miscellaenous exceptions in addition to the range_error
465        // exceptions thrown by the configuration::set<param>() methods
466    } catch (std::exception& error) {
467        msg = error.what();
468        rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
469    }
470
471    return rv;
472}
473
474protocol_binary_response_status EventuallyPersistentEngine::setFlushParam(
475        const char* keyz, const char* valz, std::string& msg) {
476    protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
477
478    // Handle the actual mutation.
479    try {
480        if (strcmp(keyz, "bg_fetch_delay") == 0) {
481            getConfiguration().setBgFetchDelay(std::stoull(valz));
482        } else if (strcmp(keyz, "max_size") == 0) {
483            size_t vsize = std::stoull(valz);
484
485            getConfiguration().setMaxSize(vsize);
486            EPStats& st = getEpStats();
487            getConfiguration().setMemLowWat(
488                    percentOf(vsize, st.mem_low_wat_percent));
489            getConfiguration().setMemHighWat(
490                    percentOf(vsize, st.mem_high_wat_percent));
491        } else if (strcmp(keyz, "mem_low_wat") == 0) {
492            getConfiguration().setMemLowWat(std::stoull(valz));
493        } else if (strcmp(keyz, "mem_high_wat") == 0) {
494            getConfiguration().setMemHighWat(std::stoull(valz));
495        } else if (strcmp(keyz, "backfill_mem_threshold") == 0) {
496            getConfiguration().setBackfillMemThreshold(std::stoull(valz));
497        } else if (strcmp(keyz, "compaction_exp_mem_threshold") == 0) {
498            getConfiguration().setCompactionExpMemThreshold(std::stoull(valz));
499        } else if (strcmp(keyz, "mutation_mem_threshold") == 0) {
500            getConfiguration().setMutationMemThreshold(std::stoull(valz));
501        } else if (strcmp(keyz, "timing_log") == 0) {
502            EPStats& stats = getEpStats();
503            std::ostream* old = stats.timingLog;
504            stats.timingLog = NULL;
505            delete old;
506            if (strcmp(valz, "off") == 0) {
507                LOG(EXTENSION_LOG_INFO, "Disabled timing log.");
508            } else {
509                std::ofstream* tmp(new std::ofstream(valz));
510                if (tmp->good()) {
511                    LOG(EXTENSION_LOG_INFO,
512                        "Logging detailed timings to ``%s''.", valz);
513                    stats.timingLog = tmp;
514                } else {
515                    LOG(EXTENSION_LOG_WARNING,
516                        "Error setting detailed timing log to ``%s'':  %s",
517                        valz, strerror(errno));
518                    delete tmp;
519                }
520            }
521        } else if (strcmp(keyz, "exp_pager_enabled") == 0) {
522            getConfiguration().setExpPagerEnabled(cb_stob(valz));
523        } else if (strcmp(keyz, "exp_pager_stime") == 0) {
524            getConfiguration().setExpPagerStime(std::stoull(valz));
525        } else if (strcmp(keyz, "exp_pager_initial_run_time") == 0) {
526            getConfiguration().setExpPagerInitialRunTime(std::stoll(valz));
527        } else if (strcmp(keyz, "access_scanner_enabled") == 0) {
528            getConfiguration().requirementsMetOrThrow("access_scanner_enabled");
529            getConfiguration().setAccessScannerEnabled(cb_stob(valz));
530        } else if (strcmp(keyz, "alog_sleep_time") == 0) {
531            getConfiguration().requirementsMetOrThrow("alog_sleep_time");
532            getConfiguration().setAlogSleepTime(std::stoull(valz));
533        } else if (strcmp(keyz, "alog_task_time") == 0) {
534            getConfiguration().requirementsMetOrThrow("alog_task_time");
535            getConfiguration().setAlogTaskTime(std::stoull(valz));
536            /* Start of ItemPager parameters */
537        } else if (strcmp(keyz, "pager_active_vb_pcnt") == 0) {
538            getConfiguration().setPagerActiveVbPcnt(std::stoull(valz));
539        } else if (strcmp(keyz, "pager_sleep_time_ms") == 0) {
540            getConfiguration().setPagerSleepTimeMs(std::stoull(valz));
541        } else if (strcmp(keyz, "ht_eviction_policy") == 0) {
542            getConfiguration().setHtEvictionPolicy(valz);
543        } else if (strcmp(keyz, "item_eviction_age_percentage") == 0) {
544            getConfiguration().setItemEvictionAgePercentage(std::stoull(valz));
545        } else if (strcmp(keyz, "item_eviction_freq_counter_age_threshold") ==
546                   0) {
547            getConfiguration().setItemEvictionFreqCounterAgeThreshold(
548                    std::stoull(valz));
549        } else if (strcmp(keyz, "item_freq_decayer_chunk_duration") == 0) {
550            getConfiguration().setItemFreqDecayerChunkDuration(
551                    std::stoull(valz));
552        } else if (strcmp(keyz, "item_freq_decayer_percent") == 0) {
553            getConfiguration().setItemFreqDecayerPercent(std::stoull(valz));
554            /* End of ItemPager parameters */
555        } else if (strcmp(keyz, "warmup_min_memory_threshold") == 0) {
556            getConfiguration().setWarmupMinMemoryThreshold(std::stoull(valz));
557        } else if (strcmp(keyz, "warmup_min_items_threshold") == 0) {
558            getConfiguration().setWarmupMinItemsThreshold(std::stoull(valz));
559        } else if (strcmp(keyz, "num_reader_threads") == 0) {
560            size_t value = std::stoull(valz);
561            getConfiguration().setNumReaderThreads(value);
562            ExecutorPool::get()->setNumReaders(value);
563        } else if (strcmp(keyz, "num_writer_threads") == 0) {
564            size_t value = std::stoull(valz);
565            getConfiguration().setNumWriterThreads(value);
566            ExecutorPool::get()->setNumWriters(value);
567        } else if (strcmp(keyz, "num_auxio_threads") == 0) {
568            size_t value = std::stoull(valz);
569            getConfiguration().setNumAuxioThreads(value);
570            ExecutorPool::get()->setNumAuxIO(value);
571        } else if (strcmp(keyz, "num_nonio_threads") == 0) {
572            size_t value = std::stoull(valz);
573            getConfiguration().setNumNonioThreads(value);
574            ExecutorPool::get()->setNumNonIO(value);
575        } else if (strcmp(keyz, "bfilter_enabled") == 0) {
576            getConfiguration().setBfilterEnabled(cb_stob(valz));
577        } else if (strcmp(keyz, "bfilter_residency_threshold") == 0) {
578            getConfiguration().setBfilterResidencyThreshold(std::stof(valz));
579        } else if (strcmp(keyz, "defragmenter_enabled") == 0) {
580            getConfiguration().setDefragmenterEnabled(cb_stob(valz));
581        } else if (strcmp(keyz, "defragmenter_interval") == 0) {
582            auto v = std::stod(valz);
583            getConfiguration().setDefragmenterInterval(v);
584        } else if (strcmp(keyz, "item_compressor_interval") == 0) {
585            size_t v = std::stoull(valz);
586            // Adding separate validation as external limit is minimum 1
587            // to prevent setting item compressor to constantly run
588            validate(v, size_t(1), std::numeric_limits<size_t>::max());
589            getConfiguration().setItemCompressorInterval(v);
590        } else if (strcmp(keyz, "item_compressor_chunk_duration") == 0) {
591            getConfiguration().setItemCompressorChunkDuration(
592                    std::stoull(valz));
593        } else if (strcmp(keyz, "defragmenter_age_threshold") == 0) {
594            getConfiguration().setDefragmenterAgeThreshold(std::stoull(valz));
595        } else if (strcmp(keyz, "defragmenter_chunk_duration") == 0) {
596            getConfiguration().setDefragmenterChunkDuration(std::stoull(valz));
597        } else if (strcmp(keyz, "defragmenter_run") == 0) {
598            runDefragmenterTask();
599        } else if (strcmp(keyz, "compaction_write_queue_cap") == 0) {
600            getConfiguration().setCompactionWriteQueueCap(std::stoull(valz));
601        } else if (strcmp(keyz, "dcp_min_compression_ratio") == 0) {
602            getConfiguration().setDcpMinCompressionRatio(std::stof(valz));
603        } else if (strcmp(keyz, "dcp_noop_mandatory_for_v5_features") == 0) {
604            getConfiguration().setDcpNoopMandatoryForV5Features(cb_stob(valz));
605        } else if (strcmp(keyz, "access_scanner_run") == 0) {
606            if (!(runAccessScannerTask())) {
607                rv = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
608            }
609        } else if (strcmp(keyz, "vb_state_persist_run") == 0) {
610            runVbStatePersistTask(std::stoi(valz));
611        } else if (strcmp(keyz, "ephemeral_full_policy") == 0) {
612            getConfiguration().requirementsMetOrThrow("ephemeral_full_policy");
613            getConfiguration().setEphemeralFullPolicy(valz);
614        } else if (strcmp(keyz, "ephemeral_metadata_purge_age") == 0) {
615            getConfiguration().requirementsMetOrThrow(
616                    "ephemeral_metadata_purge_age");
617            getConfiguration().setEphemeralMetadataPurgeAge(std::stoull(valz));
618        } else if (strcmp(keyz, "ephemeral_metadata_purge_interval") == 0) {
619            getConfiguration().requirementsMetOrThrow("ephemeral_metadata_purge_interval");
620            getConfiguration().setEphemeralMetadataPurgeInterval(
621                    std::stoull(valz));
622        } else if (strcmp(keyz, "fsync_after_every_n_bytes_written") == 0) {
623            getConfiguration().setFsyncAfterEveryNBytesWritten(
624                    std::stoull(valz));
625        } else if (strcmp(keyz, "xattr_enabled") == 0) {
626            getConfiguration().setXattrEnabled(cb_stob(valz));
627        } else if (strcmp(keyz, "compression_mode") == 0) {
628            getConfiguration().setCompressionMode(valz);
629        } else if (strcmp(keyz, "min_compression_ratio") == 0) {
630            float min_comp_ratio;
631            if (safe_strtof(valz, min_comp_ratio)) {
632                getConfiguration().setMinCompressionRatio(min_comp_ratio);
633            } else {
634                rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
635            }
636        } else if (strcmp(keyz, "max_ttl") == 0) {
637            getConfiguration().setMaxTtl(std::stoull(valz));
638        } else if (strcmp(keyz, "mem_used_merge_threshold_percent") == 0) {
639            getConfiguration().setMemUsedMergeThresholdPercent(std::stof(valz));
640        } else {
641            msg = "Unknown config param";
642            rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
643        }
644        // Handles exceptions thrown by the cb_stob function
645    } catch (invalid_argument_bool& error) {
646        msg = error.what();
647        rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
648
649        // Handles exceptions thrown by the standard
650        // library stoi/stoul style functions when not numeric
651    } catch (std::invalid_argument&) {
652        msg = "Argument was not numeric";
653        rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
654
655        // Handles exceptions thrown by the standard library stoi/stoul
656        // style functions when the conversion does not fit in the datatype
657    } catch (std::out_of_range&) {
658        msg = "Argument was out of range";
659        rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
660
661        // Handles any miscellaneous exceptions in addition to the range_error
662        // exceptions thrown by the configuration::set<param>() methods
663    } catch (std::exception& error) {
664        msg = error.what();
665        rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
666    }
667
668    return rv;
669}
670
671protocol_binary_response_status EventuallyPersistentEngine::setDcpParam(
672        const char* keyz, const char* valz, std::string& msg) {
673    protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
674    try {
675
676        if (strcmp(keyz,
677                   "dcp_consumer_process_buffered_messages_yield_limit") == 0) {
678            size_t v = atoi(valz);
679            checkNumeric(valz);
680            validate(v, size_t(1), std::numeric_limits<size_t>::max());
681            getConfiguration().setDcpConsumerProcessBufferedMessagesYieldLimit(
682                    v);
683        } else if (
684            strcmp(keyz, "dcp_consumer_process_buffered_messages_batch_size") ==
685            0) {
686            size_t v = atoi(valz);
687            checkNumeric(valz);
688            validate(v, size_t(1), std::numeric_limits<size_t>::max());
689            getConfiguration().setDcpConsumerProcessBufferedMessagesBatchSize(
690                    v);
691        } else {
692            msg = "Unknown config param";
693            rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
694        }
695    } catch (std::runtime_error&) {
696        msg = "Value out of range.";
697        rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
698    }
699
700    return rv;
701}
702
703protocol_binary_response_status EventuallyPersistentEngine::setVbucketParam(
704        uint16_t vbucket,
705        const char* keyz,
706        const char* valz,
707        std::string& msg) {
708    protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
709    try {
710        if (strcmp(keyz, "hlc_drift_ahead_threshold_us") == 0) {
711            uint64_t v = std::strtoull(valz, nullptr, 10);
712            checkNumeric(valz);
713            getConfiguration().setHlcDriftAheadThresholdUs(v);
714        } else if (strcmp(keyz, "hlc_drift_behind_threshold_us") == 0) {
715            uint64_t v = std::strtoull(valz, nullptr, 10);
716            checkNumeric(valz);
717            getConfiguration().setHlcDriftBehindThresholdUs(v);
718        } else if (strcmp(keyz, "max_cas") == 0) {
719            uint64_t v = std::strtoull(valz, nullptr, 10);
720            checkNumeric(valz);
721            LOG(EXTENSION_LOG_WARNING,
722                "setVbucketParam: max_cas:%" PRIu64
723                " "
724                "vb:%" PRIu16,
725                v,
726                vbucket);
727            if (getKVBucket()->forceMaxCas(vbucket, v) != ENGINE_SUCCESS) {
728                rv = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
729                msg = "Not my vbucket";
730            }
731        } else {
732            msg = "Unknown config param";
733            rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
734        }
735    } catch (std::runtime_error&) {
736        msg = "Value out of range.";
737        rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
738    }
739    return rv;
740}
741
742static protocol_binary_response_status evictKey(
743    EventuallyPersistentEngine* e,
744    protocol_binary_request_header
745    * request,
746    const char** msg,
747    size_t* msg_size,
748    DocNamespace docNamespace) {
749    protocol_binary_request_no_extras* req =
750        (protocol_binary_request_no_extras*)request;
751
752    const uint8_t* keyPtr = reinterpret_cast<const uint8_t*>(request) +
753                            sizeof(*request);
754    size_t keylen = ntohs(req->message.header.request.keylen);
755    uint16_t vbucket = ntohs(request->request.vbucket);
756
757    LOG(EXTENSION_LOG_DEBUG,
758        "Manually evicting object with key %s",
759        cb::logtags::tagUserData(std::string{(const char*)keyPtr, keylen})
760                .c_str());
761    msg_size = 0;
762    auto rv = e->evictKey(DocKey(keyPtr, keylen, docNamespace), vbucket, msg);
763    if (rv == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET ||
764        rv == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) {
765        if (e->isDegradedMode()) {
766            return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
767        }
768    }
769    return rv;
770}
771
772protocol_binary_response_status EventuallyPersistentEngine::setParam(
773        protocol_binary_request_set_param* req, std::string& msg) {
774    size_t keylen = ntohs(req->message.header.request.keylen);
775    uint8_t extlen = req->message.header.request.extlen;
776    size_t vallen = ntohl(req->message.header.request.bodylen);
777    uint16_t vbucket = ntohs(req->message.header.request.vbucket);
778    protocol_binary_engine_param_t paramtype =
779        static_cast<protocol_binary_engine_param_t>(ntohl(
780            req->message.body.param_type));
781
782    if (keylen == 0 || (vallen - keylen - extlen) == 0) {
783        return PROTOCOL_BINARY_RESPONSE_EINVAL;
784    }
785
786    const char* keyp = reinterpret_cast<const char*>(req->bytes)
787                       + sizeof(req->bytes);
788    const char* valuep = keyp + keylen;
789    vallen -= (keylen + extlen);
790
791    char keyz[128];
792    char valz[512];
793
794    // Read the key.
795    if (keylen >= sizeof(keyz)) {
796        msg = "Key is too large.";
797        return PROTOCOL_BINARY_RESPONSE_EINVAL;
798    }
799    memcpy(keyz, keyp, keylen);
800    keyz[keylen] = 0x00;
801
802    // Read the value.
803    if (vallen >= sizeof(valz)) {
804        msg = "Value is too large.";
805        return PROTOCOL_BINARY_RESPONSE_EINVAL;
806    }
807    memcpy(valz, valuep, vallen);
808    valz[vallen] = 0x00;
809
810    protocol_binary_response_status rv;
811
812    switch (paramtype) {
813    case protocol_binary_engine_param_flush:
814        rv = setFlushParam(keyz, valz, msg);
815        break;
816    case protocol_binary_engine_param_replication:
817        rv = setReplicationParam(keyz, valz, msg);
818        break;
819    case protocol_binary_engine_param_checkpoint:
820        rv = setCheckpointParam(keyz, valz, msg);
821        break;
822    case protocol_binary_engine_param_dcp:
823        rv = setDcpParam(keyz, valz, msg);
824        break;
825    case protocol_binary_engine_param_vbucket:
826        rv = setVbucketParam(vbucket, keyz, valz, msg);
827        break;
828    default:
829        rv = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
830    }
831
832    return rv;
833}
834
835static ENGINE_ERROR_CODE getVBucket(EventuallyPersistentEngine* e,
836                                    const void* cookie,
837                                    protocol_binary_request_header* request,
838                                    ADD_RESPONSE response) {
839    protocol_binary_request_get_vbucket* req =
840        reinterpret_cast<protocol_binary_request_get_vbucket*>(request);
841    if (req == nullptr) {
842        throw std::invalid_argument("getVBucket: Unable to convert req"
843                                        " to protocol_binary_request_get_vbucket");
844    }
845
846    uint16_t vbucket = ntohs(req->message.header.request.vbucket);
847    VBucketPtr vb = e->getVBucket(vbucket);
848    if (!vb) {
849        return ENGINE_NOT_MY_VBUCKET;
850    } else {
851        vbucket_state_t state = (vbucket_state_t)ntohl(vb->getState());
852        return sendResponse(response, NULL, 0, NULL, 0, &state,
853                            sizeof(state),
854                            PROTOCOL_BINARY_RAW_BYTES,
855                            PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
856    }
857}
858
859static ENGINE_ERROR_CODE setVBucket(EventuallyPersistentEngine* e,
860                                    const void* cookie,
861                                    protocol_binary_request_header* request,
862                                    ADD_RESPONSE response) {
863
864    protocol_binary_request_set_vbucket* req =
865        reinterpret_cast<protocol_binary_request_set_vbucket*>(request);
866
867    uint64_t cas = ntohll(req->message.header.request.cas);
868
869    size_t bodylen = ntohl(req->message.header.request.bodylen)
870                     - ntohs(req->message.header.request.keylen);
871    if (bodylen != sizeof(vbucket_state_t)) {
872        e->setErrorContext(cookie, "Body too short");
873        return sendResponse(response, NULL, 0, NULL, 0, NULL,
874                            0, PROTOCOL_BINARY_RAW_BYTES,
875                            PROTOCOL_BINARY_RESPONSE_EINVAL,
876                            cas, cookie);
877    }
878
879    vbucket_state_t state;
880    memcpy(&state, &req->message.body.state, sizeof(state));
881    state = static_cast<vbucket_state_t>(ntohl(state));
882
883    if (!is_valid_vbucket_state_t(state)) {
884        e->setErrorContext(cookie, "Invalid vbucket state");
885        return sendResponse(response, NULL, 0, NULL, 0, NULL,
886                            0, PROTOCOL_BINARY_RAW_BYTES,
887                            PROTOCOL_BINARY_RESPONSE_EINVAL,
888                            cas, cookie);
889    }
890
891    uint16_t vb = ntohs(req->message.header.request.vbucket);
892    return e->setVBucketState(cookie, response, vb, state, false, cas);
893}
894
895static ENGINE_ERROR_CODE delVBucket(EventuallyPersistentEngine* e,
896                                    const void* cookie,
897                                    protocol_binary_request_header* req,
898                                    ADD_RESPONSE response) {
899
900    uint64_t cas = ntohll(req->request.cas);
901
902    protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
903    uint16_t vbucket = ntohs(req->request.vbucket);
904
905    if (ntohs(req->request.keylen) > 0 || req->request.extlen > 0) {
906        e->setErrorContext(cookie, "Key and extras required");
907        return sendResponse(response, NULL, 0, NULL, 0, NULL,
908                            0,
909                            PROTOCOL_BINARY_RAW_BYTES,
910                            PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
911    }
912
913    bool sync = false;
914    uint32_t bodylen = ntohl(req->request.bodylen);
915    if (bodylen > 0) {
916        const char* ptr = reinterpret_cast<const char*>(req->bytes) +
917                          sizeof(req->bytes);
918        if (bodylen == 7 && strncmp(ptr, "async=0", bodylen) == 0) {
919            sync = true;
920        }
921    }
922
923    ENGINE_ERROR_CODE err;
924    void* es = e->getEngineSpecific(cookie);
925    if (sync) {
926        if (es == NULL) {
927            err = e->deleteVBucket(vbucket, cookie);
928            e->storeEngineSpecific(cookie, e);
929        } else {
930            e->storeEngineSpecific(cookie, NULL);
931            LOG(EXTENSION_LOG_INFO,
932                "Completed sync deletion of vbucket %u",
933                (unsigned)vbucket);
934            err = ENGINE_SUCCESS;
935        }
936    } else {
937        err = e->deleteVBucket(vbucket);
938    }
939    switch (err) {
940    case ENGINE_SUCCESS:
941        LOG(EXTENSION_LOG_NOTICE,
942            "Deletion of vbucket %d was completed.", vbucket);
943        break;
944    case ENGINE_NOT_MY_VBUCKET:
945        LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
946            "because the vbucket doesn't exist!!!", vbucket);
947        res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
948        break;
949    case ENGINE_EINVAL:
950        LOG(EXTENSION_LOG_WARNING,
951            "Deletion of vbucket %d failed "
952            "because the vbucket is not in a dead state",
953            vbucket);
954        e->setErrorContext(
955                cookie,
956                "Failed to delete vbucket.  Must be in the dead state.");
957        res = PROTOCOL_BINARY_RESPONSE_EINVAL;
958        break;
959    case ENGINE_EWOULDBLOCK:
960        LOG(EXTENSION_LOG_NOTICE, "Request for vbucket %d deletion is in"
961                " EWOULDBLOCK until the database file is removed from disk",
962            vbucket);
963        e->storeEngineSpecific(cookie, req);
964        return ENGINE_EWOULDBLOCK;
965    default:
966        LOG(EXTENSION_LOG_WARNING,
967            "Deletion of vbucket %d failed "
968            "because of unknown reasons",
969            vbucket);
970        e->setErrorContext(cookie, "Failed to delete vbucket.  Unknown reason.");
971        res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
972    }
973
974    if (err == ENGINE_NOT_MY_VBUCKET) {
975        return err;
976    }
977
978    return sendResponse(response,
979                        NULL,
980                        0,
981                        NULL,
982                        0,
983                        NULL,
984                        0,
985                        PROTOCOL_BINARY_RAW_BYTES,
986                        res,
987                        cas,
988                        cookie);
989}
990
991static ENGINE_ERROR_CODE getReplicaCmd(EventuallyPersistentEngine* e,
992                                       protocol_binary_request_header* request,
993                                       const void* cookie,
994                                       Item** it,
995                                       const char** msg,
996                                       protocol_binary_response_status* res,
997                                       DocNamespace docNamespace) {
998    KVBucketIface* kvb = e->getKVBucket();
999    protocol_binary_request_no_extras* req =
1000        (protocol_binary_request_no_extras*)request;
1001    int keylen = ntohs(req->message.header.request.keylen);
1002    uint16_t vbucket = ntohs(req->message.header.request.vbucket);
1003    ENGINE_ERROR_CODE error_code;
1004    DocKey key(reinterpret_cast<const uint8_t*>(request) + sizeof(*request),
1005               keylen, docNamespace);
1006
1007    GetValue rv(kvb->getReplica(key, vbucket, cookie));
1008
1009    if ((error_code = rv.getStatus()) != ENGINE_SUCCESS) {
1010        if (error_code == ENGINE_NOT_MY_VBUCKET) {
1011            *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1012            return error_code;
1013        } else if (error_code == ENGINE_TMPFAIL) {
1014            *msg = "NOT_FOUND";
1015            *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1016        } else {
1017            return error_code;
1018        }
1019    } else {
1020        *it = rv.item.release();
1021        *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
1022    }
1023    ++(e->getEpStats().numOpsGet);
1024    return ENGINE_SUCCESS;
1025}
1026
1027static ENGINE_ERROR_CODE compactDB(EventuallyPersistentEngine* e,
1028                                   const void* cookie,
1029                                   protocol_binary_request_compact_db* req,
1030                                   ADD_RESPONSE response) {
1031
1032    protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
1033    compaction_ctx compactreq;
1034    uint64_t cas = ntohll(req->message.header.request.cas);
1035
1036    if (ntohs(req->message.header.request.keylen) > 0 ||
1037        req->message.header.request.extlen != 24) {
1038        LOG(EXTENSION_LOG_WARNING,
1039            "Compaction received bad ext/key len %d/%d.",
1040            req->message.header.request.extlen,
1041            ntohs(req->message.header.request.keylen));
1042        e->setErrorContext(cookie, "Key and correct extras required");
1043        return sendResponse(response, NULL, 0, NULL, 0, NULL,
1044                            0,
1045                            PROTOCOL_BINARY_RAW_BYTES,
1046                            PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
1047    }
1048    EPStats& stats = e->getEpStats();
1049    compactreq.purge_before_ts = ntohll(req->message.body.purge_before_ts);
1050    compactreq.purge_before_seq =
1051        ntohll(req->message.body.purge_before_seq);
1052    compactreq.drop_deletes = req->message.body.drop_deletes;
1053    compactreq.db_file_id = e->getKVBucket()->getDBFileId(*req);
1054    uint16_t vbid = ntohs(req->message.header.request.vbucket);
1055
1056    ENGINE_ERROR_CODE err;
1057    void* es = e->getEngineSpecific(cookie);
1058    if (es == NULL) {
1059        ++stats.pendingCompactions;
1060        e->storeEngineSpecific(cookie, e);
1061        err = e->compactDB(vbid, compactreq, cookie);
1062    } else {
1063        e->storeEngineSpecific(cookie, NULL);
1064        err = ENGINE_SUCCESS;
1065    }
1066
1067    switch (err) {
1068    case ENGINE_SUCCESS:
1069        break;
1070    case ENGINE_NOT_MY_VBUCKET:
1071        --stats.pendingCompactions;
1072        LOG(EXTENSION_LOG_WARNING, "Compaction of db file id: %d failed "
1073            "because the db file doesn't exist!!!", compactreq.db_file_id);
1074        res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1075        break;
1076    case ENGINE_EINVAL:
1077        --stats.pendingCompactions;
1078        LOG(EXTENSION_LOG_WARNING, "Compaction of db file id: %d failed "
1079            "because of an invalid argument", compactreq.db_file_id);
1080        res = PROTOCOL_BINARY_RESPONSE_EINVAL;
1081        break;
1082    case ENGINE_EWOULDBLOCK:
1083        LOG(EXTENSION_LOG_NOTICE,
1084            "Compaction of db file id: %d scheduled "
1085                "(awaiting completion).", compactreq.db_file_id);
1086        e->storeEngineSpecific(cookie, req);
1087        return ENGINE_EWOULDBLOCK;
1088    case ENGINE_TMPFAIL:
1089        LOG(EXTENSION_LOG_WARNING, "Request to compact db file id: %d hit"
1090                " a temporary failure and may need to be retried",
1091            compactreq.db_file_id);
1092        e->setErrorContext(cookie, "Temporary failure in compacting db file.");
1093        res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
1094        break;
1095    default:
1096        --stats.pendingCompactions;
1097        LOG(EXTENSION_LOG_WARNING,
1098            "Compaction of db file id: %d failed "
1099            "because of unknown reasons",
1100            compactreq.db_file_id);
1101        e->setErrorContext(cookie, "Failed to compact db file.  Unknown reason.");
1102        res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
1103        break;
1104    }
1105
1106    if (err == ENGINE_NOT_MY_VBUCKET) {
1107        return err;
1108    }
1109
1110    return sendResponse(response,
1111                        NULL,
1112                        0,
1113                        NULL,
1114                        0,
1115                        NULL,
1116                        0,
1117                        PROTOCOL_BINARY_RAW_BYTES,
1118                        res,
1119                        cas,
1120                        cookie);
1121}
1122
1123static ENGINE_ERROR_CODE processUnknownCommand(
1124    EventuallyPersistentEngine* h,
1125    const void* cookie,
1126    protocol_binary_request_header* request,
1127    ADD_RESPONSE response,
1128    DocNamespace docNamespace) {
1129    protocol_binary_response_status res =
1130        PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
1131    std::string dynamic_msg;
1132    const char* msg = NULL;
1133    size_t msg_size = 0;
1134    Item* itm = NULL;
1135
1136    EPStats& stats = h->getEpStats();
1137    ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
1138
1139    /**
1140     * Session validation
1141     * (For ns_server commands only)
1142     */
1143    switch (request->request.opcode) {
1144    case PROTOCOL_BINARY_CMD_SET_PARAM:
1145    case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1146    case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1147    case PROTOCOL_BINARY_CMD_COMPACT_DB: {
1148        if (h->getEngineSpecific(cookie) == NULL) {
1149            uint64_t cas = ntohll(request->request.cas);
1150            if (!h->validateSessionCas(cas)) {
1151                h->setErrorContext(cookie, "Invalid session token");
1152                return sendResponse(response, NULL, 0, NULL, 0,
1153                                    NULL, 0,
1154                                    PROTOCOL_BINARY_RAW_BYTES,
1155                                    PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS,
1156                                    cas, cookie);
1157            }
1158        }
1159        break;
1160    }
1161    default:
1162        break;
1163    }
1164
1165    switch (request->request.opcode) {
1166    case PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS:
1167        return h->getAllVBucketSequenceNumbers(cookie, request, response);
1168
1169    case PROTOCOL_BINARY_CMD_GET_VBUCKET: {
1170        BlockTimer timer(&stats.getVbucketCmdHisto);
1171        rv = getVBucket(h, cookie, request, response);
1172        return rv;
1173    }
1174    case PROTOCOL_BINARY_CMD_DEL_VBUCKET: {
1175        BlockTimer timer(&stats.delVbucketCmdHisto);
1176        rv = delVBucket(h, cookie, request, response);
1177        if (rv != ENGINE_EWOULDBLOCK) {
1178            h->decrementSessionCtr();
1179            h->storeEngineSpecific(cookie, NULL);
1180        }
1181        return rv;
1182    }
1183    case PROTOCOL_BINARY_CMD_SET_VBUCKET: {
1184        BlockTimer timer(&stats.setVbucketCmdHisto);
1185        rv = setVBucket(h, cookie, request, response);
1186        h->decrementSessionCtr();
1187        return rv;
1188    }
1189    case PROTOCOL_BINARY_CMD_STOP_PERSISTENCE:
1190        res = h->stopFlusher(&msg, &msg_size);
1191        break;
1192    case PROTOCOL_BINARY_CMD_START_PERSISTENCE:
1193        res = h->startFlusher(&msg, &msg_size);
1194        break;
1195    case PROTOCOL_BINARY_CMD_SET_PARAM:
1196        res = h->setParam(
1197                reinterpret_cast<protocol_binary_request_set_param*>(request),
1198                dynamic_msg);
1199        msg = dynamic_msg.c_str();
1200        msg_size = dynamic_msg.length();
1201        h->decrementSessionCtr();
1202        break;
1203    case PROTOCOL_BINARY_CMD_EVICT_KEY:
1204        res = evictKey(h, request, &msg, &msg_size, docNamespace);
1205        break;
1206    case PROTOCOL_BINARY_CMD_OBSERVE:
1207        return h->observe(cookie, request, response, docNamespace);
1208    case PROTOCOL_BINARY_CMD_OBSERVE_SEQNO:
1209        return h->observe_seqno(cookie, request, response);
1210    case PROTOCOL_BINARY_CMD_LAST_CLOSED_CHECKPOINT:
1211    case PROTOCOL_BINARY_CMD_CREATE_CHECKPOINT:
1212    case PROTOCOL_BINARY_CMD_CHECKPOINT_PERSISTENCE: {
1213        rv = h->handleCheckpointCmds(cookie, request, response);
1214        return rv;
1215    }
1216    case PROTOCOL_BINARY_CMD_SEQNO_PERSISTENCE: {
1217        rv = h->handleSeqnoCmds(cookie, request, response);
1218        return rv;
1219    }
1220    case PROTOCOL_BINARY_CMD_SET_WITH_META:
1221    case PROTOCOL_BINARY_CMD_SETQ_WITH_META:
1222    case PROTOCOL_BINARY_CMD_ADD_WITH_META:
1223    case PROTOCOL_BINARY_CMD_ADDQ_WITH_META: {
1224        rv = h->setWithMeta(cookie,
1225                            reinterpret_cast<protocol_binary_request_set_with_meta*>
1226                            (request), response,
1227                            docNamespace);
1228        return rv;
1229    }
1230    case PROTOCOL_BINARY_CMD_DEL_WITH_META:
1231    case PROTOCOL_BINARY_CMD_DELQ_WITH_META: {
1232        rv = h->deleteWithMeta(cookie,
1233                               reinterpret_cast<protocol_binary_request_delete_with_meta*>
1234                               (request), response,
1235                               docNamespace);
1236        return rv;
1237    }
1238    case PROTOCOL_BINARY_CMD_RETURN_META: {
1239        return h->returnMeta(cookie,
1240                             reinterpret_cast<protocol_binary_request_return_meta*>
1241                             (request), response,
1242                             docNamespace);
1243    }
1244    case PROTOCOL_BINARY_CMD_GET_REPLICA:
1245        rv = getReplicaCmd(h, request, cookie, &itm, &msg, &res, docNamespace);
1246        if (rv != ENGINE_SUCCESS && rv != ENGINE_NOT_MY_VBUCKET) {
1247            return rv;
1248        }
1249        break;
1250    case PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC:
1251    case PROTOCOL_BINARY_CMD_DISABLE_TRAFFIC: {
1252        rv = h->handleTrafficControlCmd(cookie, request, response);
1253        return rv;
1254    }
1255    case PROTOCOL_BINARY_CMD_COMPACT_DB: {
1256        rv = compactDB(h, cookie,
1257                       (protocol_binary_request_compact_db*)(request),
1258                       response);
1259        if (rv != ENGINE_EWOULDBLOCK) {
1260            h->decrementSessionCtr();
1261            h->storeEngineSpecific(cookie, NULL);
1262        }
1263        return rv;
1264    }
1265    case PROTOCOL_BINARY_CMD_GET_RANDOM_KEY: {
1266        if (request->request.extlen != 0 ||
1267            request->request.keylen != 0 ||
1268            request->request.bodylen != 0) {
1269            return ENGINE_EINVAL;
1270        }
1271        return h->getRandomKey(cookie, response);
1272    }
1273    case PROTOCOL_BINARY_CMD_GET_KEYS: {
1274        return h->getAllKeys(cookie,
1275                             reinterpret_cast<protocol_binary_request_get_keys*>
1276                             (request), response,
1277                             docNamespace);
1278    }
1279        // MB-21143: Remove adjusted time/drift API, but return NOT_SUPPORTED
1280    case PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME:
1281    case PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE: {
1282        return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
1283                            PROTOCOL_BINARY_RAW_BYTES,
1284                            PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0,
1285                            cookie);
1286    }
1287    }
1288
1289    if (itm) {
1290        uint32_t flags = itm->getFlags();
1291        rv = sendResponse(response,
1292                          static_cast<const void*>(itm->getKey().data()),
1293                          itm->getKey().size(),
1294                          (const void*)&flags, sizeof(uint32_t),
1295                          static_cast<const void*>(itm->getData()),
1296                          itm->getNBytes(), itm->getDataType(),
1297                          static_cast<uint16_t>(res), itm->getCas(),
1298                          cookie);
1299        delete itm;
1300    } else if (rv == ENGINE_NOT_MY_VBUCKET) {
1301        return rv;
1302    } else {
1303        msg_size = (msg_size > 0 || msg == NULL) ? msg_size : strlen(msg);
1304        rv = sendResponse(response, NULL, 0, NULL, 0,
1305                          msg, static_cast<uint16_t>(msg_size),
1306                          PROTOCOL_BINARY_RAW_BYTES,
1307                          static_cast<uint16_t>(res), 0, cookie);
1308
1309    }
1310    return rv;
1311}
1312
1313static ENGINE_ERROR_CODE EvpUnknownCommand(
1314        gsl::not_null<ENGINE_HANDLE*> handle,
1315        const void* cookie,
1316        gsl::not_null<protocol_binary_request_header*> request,
1317        ADD_RESPONSE response,
1318        DocNamespace doc_namespace) {
1319    auto engine = acquireEngine(handle);
1320    auto ret = processUnknownCommand(
1321            engine.get(), cookie, request, response, doc_namespace);
1322    return ret;
1323}
1324
1325static void EvpItemSetCas(gsl::not_null<ENGINE_HANDLE*>,
1326                          gsl::not_null<item*> itm,
1327                          uint64_t cas) {
1328    static_cast<Item*>(itm.get())->setCas(cas);
1329}
1330
1331static void EvpItemSetDatatype(gsl::not_null<ENGINE_HANDLE*>,
1332                               gsl::not_null<item*> itm,
1333                               protocol_binary_datatype_t datatype) {
1334    static_cast<Item*>(itm.get())->setDataType(datatype);
1335}
1336
1337static ENGINE_ERROR_CODE EvpDcpStep(
1338        gsl::not_null<ENGINE_HANDLE*> handle,
1339        gsl::not_null<const void*> cookie,
1340        gsl::not_null<dcp_message_producers*> producers) {
1341    auto engine = acquireEngine(handle);
1342    ConnHandler* conn = engine->getConnHandler(cookie);
1343    if (conn) {
1344        return conn->step(producers);
1345    }
1346    return ENGINE_DISCONNECT;
1347}
1348
1349static ENGINE_ERROR_CODE EvpDcpOpen(gsl::not_null<ENGINE_HANDLE*> handle,
1350                                    gsl::not_null<const void*> cookie,
1351                                    uint32_t opaque,
1352                                    uint32_t seqno,
1353                                    uint32_t flags,
1354                                    cb::const_char_buffer name,
1355                                    cb::const_byte_buffer jsonExtra) {
1356    return acquireEngine(handle)->dcpOpen(
1357            cookie, opaque, seqno, flags, name, jsonExtra);
1358}
1359
1360static ENGINE_ERROR_CODE EvpDcpAddStream(gsl::not_null<ENGINE_HANDLE*> handle,
1361                                         gsl::not_null<const void*> cookie,
1362                                         uint32_t opaque,
1363                                         uint16_t vbucket,
1364                                         uint32_t flags) {
1365    return acquireEngine(handle)->dcpAddStream(cookie, opaque, vbucket, flags);
1366}
1367
1368static ENGINE_ERROR_CODE EvpDcpCloseStream(gsl::not_null<ENGINE_HANDLE*> handle,
1369                                           gsl::not_null<const void*> cookie,
1370                                           uint32_t opaque,
1371                                           uint16_t vbucket) {
1372    auto engine = acquireEngine(handle);
1373    ConnHandler* conn = engine->getConnHandler(cookie);
1374    if (conn) {
1375        return conn->closeStream(opaque, vbucket);
1376    }
1377    return ENGINE_DISCONNECT;
1378}
1379
1380static ENGINE_ERROR_CODE EvpDcpStreamReq(gsl::not_null<ENGINE_HANDLE*> handle,
1381                                         gsl::not_null<const void*> cookie,
1382                                         uint32_t flags,
1383                                         uint32_t opaque,
1384                                         uint16_t vbucket,
1385                                         uint64_t startSeqno,
1386                                         uint64_t endSeqno,
1387                                         uint64_t vbucketUuid,
1388                                         uint64_t snapStartSeqno,
1389                                         uint64_t snapEndSeqno,
1390                                         uint64_t* rollbackSeqno,
1391                                         dcp_add_failover_log callback) {
1392    auto engine = acquireEngine(handle);
1393    ConnHandler* conn = engine->getConnHandler(cookie);
1394    if (conn) {
1395        return conn->streamRequest(flags,
1396                                   opaque,
1397                                   vbucket,
1398                                   startSeqno,
1399                                   endSeqno,
1400                                   vbucketUuid,
1401                                   snapStartSeqno,
1402                                   snapEndSeqno,
1403                                   rollbackSeqno,
1404                                   callback);
1405    }
1406    return ENGINE_DISCONNECT;
1407}
1408
1409static ENGINE_ERROR_CODE EvpDcpGetFailoverLog(
1410        gsl::not_null<ENGINE_HANDLE*> handle,
1411        gsl::not_null<const void*> cookie,
1412        uint32_t opaque,
1413        uint16_t vbucket,
1414        dcp_add_failover_log callback) {
1415    // This function covers two commands:
1416    // 1) DCP_GET_FAILOVER_LOG
1417    //     It is valid only on a DCP Producer connection. Updates the
1418    //     'lastReceiveTime' for the Producer.
1419    // 2) GET_FAILOVER_LOG
1420    //     It does not require a DCP connection (the client has opened
1421    //     a regular MCBP connection).
1422    auto engine = acquireEngine(handle);
1423    ConnHandler* conn = engine->getConnHandler(cookie);
1424    // Note: (conn != nullptr) only if conn is a DCP connection
1425    if (conn) {
1426        auto* producer = dynamic_cast<DcpProducer*>(conn);
1427        // GetFailoverLog not supported for DcpConsumer
1428        if (!producer) {
1429            LOG(EXTENSION_LOG_WARNING,
1430                "Disconnecting - This connection doesn't support the dcp get "
1431                "failover log API");
1432            return ENGINE_DISCONNECT;
1433        }
1434        producer->setLastReceiveTime(ep_current_time());
1435        if (producer->doDisconnect()) {
1436            return ENGINE_DISCONNECT;
1437        }
1438    }
1439    VBucketPtr vb = engine->getVBucket(vbucket);
1440    if (!vb) {
1441        LOG(EXTENSION_LOG_WARNING,
1442            "%s (vb %d) Get Failover Log failed because this vbucket doesn't "
1443            "exist",
1444            conn ? conn->logHeader() : "MCBP-Connection",
1445            vbucket);
1446        return ENGINE_NOT_MY_VBUCKET;
1447    }
1448    auto failoverEntries = vb->failovers->getFailoverLog();
1449    auto* epEngine = ObjectRegistry::onSwitchThread(NULL, true);
1450    auto ret = callback(failoverEntries.data(), failoverEntries.size(), cookie);
1451    ObjectRegistry::onSwitchThread(epEngine);
1452    return ret;
1453}
1454
1455static ENGINE_ERROR_CODE EvpDcpStreamEnd(gsl::not_null<ENGINE_HANDLE*> handle,
1456                                         gsl::not_null<const void*> cookie,
1457                                         uint32_t opaque,
1458                                         uint16_t vbucket,
1459                                         uint32_t flags) {
1460    auto engine = acquireEngine(handle);
1461    ConnHandler* conn = engine->getConnHandler(cookie);
1462    if (conn) {
1463        return conn->streamEnd(opaque, vbucket, flags);
1464    }
1465    return ENGINE_DISCONNECT;
1466}
1467
1468static ENGINE_ERROR_CODE EvpDcpSnapshotMarker(
1469        gsl::not_null<ENGINE_HANDLE*> handle,
1470        gsl::not_null<const void*> cookie,
1471        uint32_t opaque,
1472        uint16_t vbucket,
1473        uint64_t start_seqno,
1474        uint64_t end_seqno,
1475        uint32_t flags) {
1476    auto engine = acquireEngine(handle);
1477    ConnHandler* conn = engine->getConnHandler(cookie);
1478    if (conn) {
1479        return conn->snapshotMarker(
1480                opaque, vbucket, start_seqno, end_seqno, flags);
1481    }
1482    return ENGINE_DISCONNECT;
1483}
1484
1485static ENGINE_ERROR_CODE EvpDcpMutation(gsl::not_null<ENGINE_HANDLE*> handle,
1486                                        gsl::not_null<const void*> cookie,
1487                                        uint32_t opaque,
1488                                        const DocKey& key,
1489                                        cb::const_byte_buffer value,
1490                                        size_t priv_bytes,
1491                                        uint8_t datatype,
1492                                        uint64_t cas,
1493                                        uint16_t vbucket,
1494                                        uint32_t flags,
1495                                        uint64_t by_seqno,
1496                                        uint64_t rev_seqno,
1497                                        uint32_t expiration,
1498                                        uint32_t lock_time,
1499                                        cb::const_byte_buffer meta,
1500                                        uint8_t nru) {
1501    if (!mcbp::datatype::is_valid(datatype)) {
1502        LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1503            " (DCPMutation)");
1504        return ENGINE_EINVAL;
1505    }
1506    auto engine = acquireEngine(handle);
1507    ConnHandler* conn = engine->getConnHandler(cookie);
1508    if (conn) {
1509        return conn->mutation(opaque, key, value, priv_bytes, datatype, cas,
1510                              vbucket, flags, by_seqno, rev_seqno, expiration,
1511                              lock_time, meta, nru);
1512    }
1513    return ENGINE_DISCONNECT;
1514}
1515
1516static ENGINE_ERROR_CODE EvpDcpDeletion(gsl::not_null<ENGINE_HANDLE*> handle,
1517                                        gsl::not_null<const void*> cookie,
1518                                        uint32_t opaque,
1519                                        const DocKey& key,
1520                                        cb::const_byte_buffer value,
1521                                        size_t priv_bytes,
1522                                        uint8_t datatype,
1523                                        uint64_t cas,
1524                                        uint16_t vbucket,
1525                                        uint64_t by_seqno,
1526                                        uint64_t rev_seqno,
1527                                        cb::const_byte_buffer meta) {
1528    auto engine = acquireEngine(handle);
1529    ConnHandler* conn = engine->getConnHandler(cookie);
1530    if (conn) {
1531        return conn->deletion(opaque, key, value, priv_bytes, datatype, cas,
1532                              vbucket, by_seqno, rev_seqno, meta);
1533    }
1534    return ENGINE_DISCONNECT;
1535}
1536
1537static ENGINE_ERROR_CODE EvpDcpDeletionV2(gsl::not_null<ENGINE_HANDLE*> handle,
1538                                          gsl::not_null<const void*> cookie,
1539                                          uint32_t opaque,
1540                                          const DocKey& key,
1541                                          cb::const_byte_buffer value,
1542                                          size_t priv_bytes,
1543                                          uint8_t datatype,
1544                                          uint64_t cas,
1545                                          uint16_t vbucket,
1546                                          uint64_t by_seqno,
1547                                          uint64_t rev_seqno,
1548                                          uint32_t delete_time) {
1549    auto engine = acquireEngine(handle);
1550    ConnHandler* conn = engine->getConnHandler(cookie);
1551    if (conn) {
1552        return conn->deletionV2(opaque,
1553                                key,
1554                                value,
1555                                priv_bytes,
1556                                datatype,
1557                                cas,
1558                                vbucket,
1559                                by_seqno,
1560                                rev_seqno,
1561                                delete_time);
1562    }
1563    return ENGINE_DISCONNECT;
1564}
1565
1566static ENGINE_ERROR_CODE EvpDcpExpiration(gsl::not_null<ENGINE_HANDLE*> handle,
1567                                          gsl::not_null<const void*> cookie,
1568                                          uint32_t opaque,
1569                                          const DocKey& key,
1570                                          cb::const_byte_buffer value,
1571                                          size_t priv_bytes,
1572                                          uint8_t datatype,
1573                                          uint64_t cas,
1574                                          uint16_t vbucket,
1575                                          uint64_t by_seqno,
1576                                          uint64_t rev_seqno,
1577                                          cb::const_byte_buffer meta) {
1578    auto engine = acquireEngine(handle);
1579    ConnHandler* conn = engine->getConnHandler(cookie);
1580    if (conn) {
1581        return conn->expiration(opaque, key, value, priv_bytes, datatype, cas,
1582                                vbucket, by_seqno, rev_seqno, meta);
1583    }
1584    return ENGINE_DISCONNECT;
1585}
1586
1587static ENGINE_ERROR_CODE EvpDcpFlush(gsl::not_null<ENGINE_HANDLE*> handle,
1588                                     gsl::not_null<const void*> cookie,
1589                                     uint32_t opaque,
1590                                     uint16_t vbucket) {
1591    auto engine = acquireEngine(handle);
1592    ConnHandler* conn = engine->getConnHandler(cookie);
1593    if (conn) {
1594        return conn->flushall(opaque, vbucket);
1595    }
1596    return ENGINE_DISCONNECT;
1597}
1598
1599static ENGINE_ERROR_CODE EvpDcpSetVbucketState(
1600        gsl::not_null<ENGINE_HANDLE*> handle,
1601        gsl::not_null<const void*> cookie,
1602        uint32_t opaque,
1603        uint16_t vbucket,
1604        vbucket_state_t state) {
1605    auto engine = acquireEngine(handle);
1606    ConnHandler* conn = engine->getConnHandler(cookie);
1607    if (conn) {
1608        return conn->setVBucketState(opaque, vbucket, state);
1609    }
1610    return ENGINE_DISCONNECT;
1611}
1612
1613static ENGINE_ERROR_CODE EvpDcpNoop(gsl::not_null<ENGINE_HANDLE*> handle,
1614                                    gsl::not_null<const void*> cookie,
1615                                    uint32_t opaque) {
1616    auto engine = acquireEngine(handle);
1617    ConnHandler* conn = engine->getConnHandler(cookie);
1618    if (conn) {
1619        return conn->noop(opaque);
1620    }
1621    return ENGINE_DISCONNECT;
1622}
1623
1624static ENGINE_ERROR_CODE EvpDcpBufferAcknowledgement(
1625        gsl::not_null<ENGINE_HANDLE*> handle,
1626        gsl::not_null<const void*> cookie,
1627        uint32_t opaque,
1628        uint16_t vbucket,
1629        uint32_t buffer_bytes) {
1630    auto engine = acquireEngine(handle);
1631    ConnHandler* conn = engine->getConnHandler(cookie);
1632    if (conn) {
1633        return conn->bufferAcknowledgement(opaque, vbucket, buffer_bytes);
1634    }
1635    return ENGINE_DISCONNECT;
1636}
1637
1638static ENGINE_ERROR_CODE EvpDcpControl(gsl::not_null<ENGINE_HANDLE*> handle,
1639                                       gsl::not_null<const void*> cookie,
1640                                       uint32_t opaque,
1641                                       const void* key,
1642                                       uint16_t nkey,
1643                                       const void* value,
1644                                       uint32_t nvalue) {
1645    auto engine = acquireEngine(handle);
1646    ConnHandler* conn = engine->getConnHandler(cookie);
1647    if (conn) {
1648        return conn->control(opaque, key, nkey, value, nvalue);
1649    }
1650    return ENGINE_DISCONNECT;
1651}
1652
1653static ENGINE_ERROR_CODE EvpDcpResponseHandler(
1654        gsl::not_null<ENGINE_HANDLE*> handle,
1655        gsl::not_null<const void*> cookie,
1656        const protocol_binary_response_header* response) {
1657    auto engine = acquireEngine(handle);
1658    ConnHandler* conn = engine->getConnHandler(cookie);
1659    if (conn) {
1660        if (conn->handleResponse(response)) {
1661            return ENGINE_SUCCESS;
1662        }
1663    }
1664    return ENGINE_DISCONNECT;
1665}
1666
1667static ENGINE_ERROR_CODE EvpDcpSystemEvent(gsl::not_null<ENGINE_HANDLE*> handle,
1668                                           gsl::not_null<const void*> cookie,
1669                                           uint32_t opaque,
1670                                           uint16_t vbucket,
1671                                           mcbp::systemevent::id event,
1672                                           uint64_t bySeqno,
1673                                           cb::const_byte_buffer key,
1674                                           cb::const_byte_buffer eventData) {
1675    auto engine = acquireEngine(handle);
1676    ConnHandler* conn = engine->getConnHandler(cookie);
1677    if (conn) {
1678        return conn->systemEvent(
1679                opaque, vbucket, event, bySeqno, key, eventData);
1680    }
1681    return ENGINE_DISCONNECT;
1682}
1683
1684static void EvpHandleDisconnect(const void* cookie,
1685                                ENGINE_EVENT_TYPE type,
1686                                const void* event_data,
1687                                const void* cb_data) {
1688    if (type != ON_DISCONNECT) {
1689        throw std::invalid_argument("EvpHandleDisconnect: type "
1690                                        "(which is" + std::to_string(type) +
1691                                    ") is not ON_DISCONNECT");
1692    }
1693    if (event_data != nullptr) {
1694        throw std::invalid_argument("EvpHandleDisconnect: event_data "
1695                                        "is not NULL");
1696    }
1697    void* c = const_cast<void*>(cb_data);
1698    acquireEngine(static_cast<ENGINE_HANDLE*>(c))->handleDisconnect(cookie);
1699}
1700
1701static void EvpHandleDeleteBucket(const void* cookie,
1702                                  ENGINE_EVENT_TYPE type,
1703                                  const void* event_data,
1704                                  const void* cb_data) {
1705    if (type != ON_DELETE_BUCKET) {
1706        throw std::invalid_argument("EvpHandleDeleteBucket: type "
1707                                        "(which is" + std::to_string(type) +
1708                                    ") is not ON_DELETE_BUCKET");
1709    }
1710    if (event_data != nullptr) {
1711        throw std::invalid_argument("EvpHandleDeleteBucket: event_data "
1712                                        "is not NULL");
1713    }
1714    void* c = const_cast<void*>(cb_data);
1715    acquireEngine(static_cast<ENGINE_HANDLE*>(c))->handleDeleteBucket(cookie);
1716}
1717
1718void EvpSetLogLevel(gsl::not_null<ENGINE_HANDLE*> handle,
1719                    EXTENSION_LOG_LEVEL level) {
1720    Logger::setGlobalLogLevel(level);
1721}
1722
1723/**
1724 * The only public interface to the eventually persistent engine.
1725 * Allocate a new instance and initialize it
1726 * @param interface the highest interface the server supports (we only
1727 *                  support interface 1)
1728 * @param get_server_api callback function to get the server exported API
1729 *                  functions
1730 * @param handle Where to return the new instance
1731 * @return ENGINE_SUCCESS on success
1732 */
1733ENGINE_ERROR_CODE create_instance(uint64_t interface,
1734                                  GET_SERVER_API get_server_api,
1735                                  ENGINE_HANDLE** handle) {
1736    SERVER_HANDLE_V1* api = get_server_api();
1737    if (interface != 1 || api == NULL) {
1738        return ENGINE_ENOTSUP;
1739    }
1740
1741    Logger::setLoggerAPI(api->log);
1742
1743    MemoryTracker::getInstance(*api->alloc_hooks);
1744    ObjectRegistry::initialize(api->alloc_hooks->get_allocation_size);
1745
1746    std::atomic<size_t>* inital_tracking = new std::atomic<size_t>();
1747
1748    ObjectRegistry::setStats(inital_tracking);
1749    EventuallyPersistentEngine* engine;
1750    engine = new EventuallyPersistentEngine(get_server_api);
1751    ObjectRegistry::setStats(NULL);
1752
1753    if (engine == NULL) {
1754        return ENGINE_ENOMEM;
1755    }
1756
1757    if (MemoryTracker::trackingMemoryAllocations()) {
1758        engine->getEpStats().estimatedTotalMemory.get()->store(
1759                inital_tracking->load());
1760        engine->getEpStats().memoryTrackerEnabled.store(true);
1761    }
1762    delete inital_tracking;
1763
1764    initialize_time_functions(api->core);
1765
1766    *handle = reinterpret_cast<ENGINE_HANDLE*> (engine);
1767
1768    return ENGINE_SUCCESS;
1769}
1770
1771/*
1772    This method is called prior to unloading of the shared-object.
1773    Global clean-up should be performed from this method.
1774*/
1775void destroy_engine() {
1776    ExecutorPool::shutdown();
1777    // A single MemoryTracker exists for *all* buckets
1778    // and must be destroyed before unloading the shared object.
1779    MemoryTracker::destroyInstance();
1780    ObjectRegistry::reset();
1781}
1782
1783static bool EvpGetItemInfo(gsl::not_null<ENGINE_HANDLE*> handle,
1784                           gsl::not_null<const item*> itm,
1785                           gsl::not_null<item_info*> itm_info) {
1786    const Item* it = reinterpret_cast<const Item*>(itm.get());
1787    auto engine = acquireEngine(handle);
1788    *itm_info = engine->getItemInfo(*it);
1789    return true;
1790}
1791
1792static cb::EngineErrorMetadataPair EvpGetMeta(
1793        gsl::not_null<ENGINE_HANDLE*> handle,
1794        gsl::not_null<const void*> cookie,
1795        const DocKey& key,
1796        uint16_t vbucket) {
1797    return acquireEngine(handle)->getMeta(cookie, key, vbucket);
1798}
1799
1800static bool EvpSetItemInfo(gsl::not_null<ENGINE_HANDLE*> handle,
1801                           gsl::not_null<item*> itm,
1802                           gsl::not_null<const item_info*> itm_info) {
1803    Item* it = reinterpret_cast<Item*>(itm.get());
1804    it->setDataType(itm_info->datatype);
1805    return true;
1806}
1807
1808static cb::engine_error EvpCollectionsSetManifest(
1809        gsl::not_null<ENGINE_HANDLE*> handle, cb::const_char_buffer json) {
1810    auto engine = acquireEngine(handle);
1811    return engine->getKVBucket()->setCollections(json);
1812}
1813
1814static cb::EngineErrorStringPair EvpCollectionsGetManifest(
1815        gsl::not_null<ENGINE_HANDLE*> handle) {
1816    auto engine = acquireEngine(handle);
1817    return engine->getKVBucket()->getCollections();
1818}
1819
1820static bool EvpIsXattrEnabled(gsl::not_null<ENGINE_HANDLE*> handle) {
1821    auto engine = acquireEngine(handle);
1822    return engine->getKVBucket()->isXattrEnabled();
1823}
1824
1825static BucketCompressionMode EvpGetCompressionMode(gsl::not_null<ENGINE_HANDLE*> handle) {
1826    auto engine = acquireEngine(handle);
1827    return engine->getCompressionMode();
1828}
1829
1830static float EvpGetMinCompressionRatio(gsl::not_null<ENGINE_HANDLE*> handle) {
1831    auto engine = acquireEngine(handle);
1832    return engine->getMinCompressionRatio();
1833}
1834
1835void LOG(EXTENSION_LOG_LEVEL severity, const char *fmt, ...) {
1836    va_list va;
1837    va_start(va, fmt);
1838    global_logger.vlog(severity, fmt, va);
1839    va_end(va);
1840}
1841
1842EventuallyPersistentEngine::EventuallyPersistentEngine(
1843        GET_SERVER_API get_server_api)
1844    : kvBucket(nullptr),
1845      workload(NULL),
1846      workloadPriority(NO_BUCKET_PRIORITY),
1847      getServerApiFunc(get_server_api),
1848      checkpointConfig(NULL),
1849      trafficEnabled(false),
1850      startupTime(0),
1851      taskable(this),
1852      compressionMode(BucketCompressionMode::Off),
1853      minCompressionRatio(default_min_compression_ratio) {
1854    interface.interface = 1;
1855    ENGINE_HANDLE_V1::initialize = EvpInitialize;
1856    ENGINE_HANDLE_V1::destroy = EvpDestroy;
1857    ENGINE_HANDLE_V1::allocate = EvpItemAllocate;
1858    ENGINE_HANDLE_V1::allocate_ex = EvpItemAllocateEx;
1859    ENGINE_HANDLE_V1::remove = EvpItemDelete;
1860    ENGINE_HANDLE_V1::release = EvpItemRelease;
1861    ENGINE_HANDLE_V1::get = EvpGet;
1862    ENGINE_HANDLE_V1::get_if = EvpGetIf;
1863    ENGINE_HANDLE_V1::get_and_touch = EvpGetAndTouch;
1864    ENGINE_HANDLE_V1::get_locked = EvpGetLocked;
1865    ENGINE_HANDLE_V1::get_meta = EvpGetMeta;
1866    ENGINE_HANDLE_V1::unlock = EvpUnlock;
1867    ENGINE_HANDLE_V1::get_stats = EvpGetStats;
1868    ENGINE_HANDLE_V1::reset_stats = EvpResetStats;
1869    ENGINE_HANDLE_V1::store = EvpStore;
1870    ENGINE_HANDLE_V1::store_if = EvpStoreIf;
1871    ENGINE_HANDLE_V1::flush = EvpFlush;
1872    ENGINE_HANDLE_V1::unknown_command = EvpUnknownCommand;
1873    ENGINE_HANDLE_V1::item_set_cas = EvpItemSetCas;
1874    ENGINE_HANDLE_V1::item_set_datatype = EvpItemSetDatatype;
1875    ENGINE_HANDLE_V1::get_item_info = EvpGetItemInfo;
1876    ENGINE_HANDLE_V1::set_item_info = EvpSetItemInfo;
1877
1878    ENGINE_HANDLE_V1::dcp.step = EvpDcpStep;
1879    ENGINE_HANDLE_V1::dcp.open = EvpDcpOpen;
1880    ENGINE_HANDLE_V1::dcp.add_stream = EvpDcpAddStream;
1881    ENGINE_HANDLE_V1::dcp.close_stream = EvpDcpCloseStream;
1882    ENGINE_HANDLE_V1::dcp.get_failover_log = EvpDcpGetFailoverLog;
1883    ENGINE_HANDLE_V1::dcp.stream_req = EvpDcpStreamReq;
1884    ENGINE_HANDLE_V1::dcp.stream_end = EvpDcpStreamEnd;
1885    ENGINE_HANDLE_V1::dcp.snapshot_marker = EvpDcpSnapshotMarker;
1886    ENGINE_HANDLE_V1::dcp.mutation = EvpDcpMutation;
1887    ENGINE_HANDLE_V1::dcp.deletion = EvpDcpDeletion;
1888    ENGINE_HANDLE_V1::dcp.deletion_v2 = EvpDcpDeletionV2;
1889    ENGINE_HANDLE_V1::dcp.expiration = EvpDcpExpiration;
1890    ENGINE_HANDLE_V1::dcp.flush = EvpDcpFlush;
1891    ENGINE_HANDLE_V1::dcp.set_vbucket_state = EvpDcpSetVbucketState;
1892    ENGINE_HANDLE_V1::dcp.noop = EvpDcpNoop;
1893    ENGINE_HANDLE_V1::dcp.buffer_acknowledgement = EvpDcpBufferAcknowledgement;
1894    ENGINE_HANDLE_V1::dcp.control = EvpDcpControl;
1895    ENGINE_HANDLE_V1::dcp.response_handler = EvpDcpResponseHandler;
1896    ENGINE_HANDLE_V1::dcp.system_event = EvpDcpSystemEvent;
1897    ENGINE_HANDLE_V1::set_log_level = EvpSetLogLevel;
1898    ENGINE_HANDLE_V1::collections.set_manifest = EvpCollectionsSetManifest;
1899    ENGINE_HANDLE_V1::collections.get_manifest = EvpCollectionsGetManifest;
1900    ENGINE_HANDLE_V1::isXattrEnabled = EvpIsXattrEnabled;
1901    ENGINE_HANDLE_V1::getCompressionMode = EvpGetCompressionMode;
1902    ENGINE_HANDLE_V1::getMaxItemSize = EvpGetMaxItemSize;
1903    ENGINE_HANDLE_V1::getMinCompressionRatio = EvpGetMinCompressionRatio;
1904
1905    serverApi = getServerApiFunc();
1906}
1907
1908ENGINE_ERROR_CODE EventuallyPersistentEngine::reserveCookie(const void *cookie)
1909{
1910    EventuallyPersistentEngine *epe =
1911                                    ObjectRegistry::onSwitchThread(NULL, true);
1912    ENGINE_ERROR_CODE rv = serverApi->cookie->reserve(cookie);
1913    ObjectRegistry::onSwitchThread(epe);
1914    return rv;
1915}
1916
1917ENGINE_ERROR_CODE EventuallyPersistentEngine::releaseCookie(const void *cookie)
1918{
1919    EventuallyPersistentEngine *epe =
1920                                    ObjectRegistry::onSwitchThread(NULL, true);
1921    ENGINE_ERROR_CODE rv = serverApi->cookie->release(cookie);
1922    ObjectRegistry::onSwitchThread(epe);
1923    return rv;
1924}
1925
1926void EventuallyPersistentEngine::storeEngineSpecific(const void* cookie,
1927                                                     void* engine_data) {
1928    EventuallyPersistentEngine* epe =
1929            ObjectRegistry::onSwitchThread(NULL, true);
1930    serverApi->cookie->store_engine_specific(cookie, engine_data);
1931    ObjectRegistry::onSwitchThread(epe);
1932}
1933
1934void* EventuallyPersistentEngine::getEngineSpecific(const void* cookie) {
1935    EventuallyPersistentEngine* epe =
1936            ObjectRegistry::onSwitchThread(NULL, true);
1937    void* engine_data = serverApi->cookie->get_engine_specific(cookie);
1938    ObjectRegistry::onSwitchThread(epe);
1939    return engine_data;
1940}
1941
1942bool EventuallyPersistentEngine::isDatatypeSupported(
1943        const void* cookie, protocol_binary_datatype_t datatype) {
1944    EventuallyPersistentEngine* epe =
1945            ObjectRegistry::onSwitchThread(NULL, true);
1946    bool isSupported =
1947            serverApi->cookie->is_datatype_supported(cookie, datatype);
1948    ObjectRegistry::onSwitchThread(epe);
1949    return isSupported;
1950}
1951
1952bool EventuallyPersistentEngine::isMutationExtrasSupported(const void* cookie) {
1953    EventuallyPersistentEngine* epe =
1954            ObjectRegistry::onSwitchThread(NULL, true);
1955    bool isSupported = serverApi->cookie->is_mutation_extras_supported(cookie);
1956    ObjectRegistry::onSwitchThread(epe);
1957    return isSupported;
1958}
1959
1960bool EventuallyPersistentEngine::isXattrEnabled(const void* cookie) {
1961    return isDatatypeSupported(cookie, PROTOCOL_BINARY_DATATYPE_XATTR);
1962}
1963
1964bool EventuallyPersistentEngine::isCollectionsSupported(const void* cookie) {
1965    EventuallyPersistentEngine* epe =
1966            ObjectRegistry::onSwitchThread(NULL, true);
1967    bool isSupported = serverApi->cookie->is_collections_supported(cookie);
1968    ObjectRegistry::onSwitchThread(epe);
1969    return isSupported;
1970}
1971
1972uint8_t EventuallyPersistentEngine::getOpcodeIfEwouldblockSet(
1973        const void* cookie) {
1974    EventuallyPersistentEngine* epe =
1975            ObjectRegistry::onSwitchThread(NULL, true);
1976    uint8_t opcode = serverApi->cookie->get_opcode_if_ewouldblock_set(cookie);
1977    ObjectRegistry::onSwitchThread(epe);
1978    return opcode;
1979}
1980
1981bool EventuallyPersistentEngine::validateSessionCas(const uint64_t cas) {
1982    EventuallyPersistentEngine* epe =
1983            ObjectRegistry::onSwitchThread(NULL, true);
1984    bool ret = serverApi->cookie->validate_session_cas(cas);
1985    ObjectRegistry::onSwitchThread(epe);
1986    return ret;
1987}
1988
1989void EventuallyPersistentEngine::decrementSessionCtr(void) {
1990    EventuallyPersistentEngine* epe =
1991            ObjectRegistry::onSwitchThread(NULL, true);
1992    serverApi->cookie->decrement_session_ctr();
1993    ObjectRegistry::onSwitchThread(epe);
1994}
1995
1996void EventuallyPersistentEngine::registerEngineCallback(ENGINE_EVENT_TYPE type,
1997                                                        EVENT_CALLBACK cb,
1998                                                        const void *cb_data) {
1999    EventuallyPersistentEngine *epe =
2000                                    ObjectRegistry::onSwitchThread(NULL, true);
2001    SERVER_CALLBACK_API *sapi = getServerApi()->callback;
2002    sapi->register_callback(reinterpret_cast<ENGINE_HANDLE*>(this),
2003                            type, cb, cb_data);
2004    ObjectRegistry::onSwitchThread(epe);
2005}
2006
2007void EventuallyPersistentEngine::setErrorContext(
2008        const void* cookie, cb::const_char_buffer message) {
2009    EventuallyPersistentEngine* epe =
2010            ObjectRegistry::onSwitchThread(NULL, true);
2011    serverApi->cookie->set_error_context(const_cast<void*>(cookie), message);
2012    ObjectRegistry::onSwitchThread(epe);
2013}
2014
2015template <typename T>
2016void EventuallyPersistentEngine::notifyIOComplete(T cookies,
2017                                                  ENGINE_ERROR_CODE status) {
2018    EventuallyPersistentEngine* epe =
2019            ObjectRegistry::onSwitchThread(NULL, true);
2020    std::for_each(
2021            cookies.begin(),
2022            cookies.end(),
2023            std::bind2nd(std::ptr_fun((NOTIFY_IO_COMPLETE_T)serverApi->cookie
2024                                              ->notify_io_complete),
2025                         status));
2026    ObjectRegistry::onSwitchThread(epe);
2027}
2028
2029/**
2030 * A configuration value changed listener that responds to ep-engine
2031 * parameter changes by invoking engine-specific methods on
2032 * configuration change events.
2033 */
2034class EpEngineValueChangeListener : public ValueChangedListener {
2035public:
2036    EpEngineValueChangeListener(EventuallyPersistentEngine &e) : engine(e) {
2037        // EMPTY
2038    }
2039
2040    virtual void sizeValueChanged(const std::string &key, size_t value) {
2041        if (key.compare("getl_max_timeout") == 0) {
2042            engine.setGetlMaxTimeout(value);
2043        } else if (key.compare("getl_default_timeout") == 0) {
2044            engine.setGetlDefaultTimeout(value);
2045        } else if (key.compare("max_item_size") == 0) {
2046            engine.setMaxItemSize(value);
2047        } else if (key.compare("max_item_privileged_bytes") == 0) {
2048            engine.setMaxItemPrivilegedBytes(value);
2049        }
2050    }
2051
2052    virtual void stringValueChanged(const std::string& key, const char* value) {
2053        if (key == "compression_mode") {
2054            std::string value_str{value, strlen(value)};
2055            engine.setCompressionMode(value_str);
2056        }
2057    }
2058
2059    virtual void floatValueChanged(const std::string& key, float value) {
2060        if (key == "min_compression_ratio") {
2061            engine.setMinCompressionRatio(value);
2062        }
2063    }
2064
2065private:
2066    EventuallyPersistentEngine &engine;
2067};
2068
2069ENGINE_ERROR_CODE EventuallyPersistentEngine::initialize(const char* config) {
2070    resetStats();
2071    if (config != nullptr) {
2072        if (!configuration.parseConfiguration(config, serverApi)) {
2073            LOG(EXTENSION_LOG_WARNING, "Failed to parse the configuration config "
2074                "during bucket initialization.  config=%s", config);
2075            return ENGINE_FAILED;
2076        }
2077    }
2078
2079    name = configuration.getCouchBucket();
2080
2081    if (config != nullptr) {
2082        LOG(EXTENSION_LOG_NOTICE,
2083            R"(EPEngine::initialize: using configuration:"%s")",
2084            config);
2085    }
2086
2087    maxFailoverEntries = configuration.getMaxFailoverEntries();
2088
2089    // Start updating the variables from the config!
2090    VBucket::setMutationMemoryThreshold(
2091            configuration.getMutationMemThreshold());
2092
2093    if (configuration.getMaxSize() == 0) {
2094        configuration.setMaxSize(std::numeric_limits<size_t>::max());
2095    }
2096
2097    if (configuration.getMemLowWat() == std::numeric_limits<size_t>::max()) {
2098        stats.mem_low_wat_percent.store(0.75);
2099        configuration.setMemLowWat(percentOf(
2100                configuration.getMaxSize(), stats.mem_low_wat_percent.load()));
2101    }
2102
2103    if (configuration.getMemHighWat() == std::numeric_limits<size_t>::max()) {
2104        stats.mem_high_wat_percent.store(0.85);
2105        configuration.setMemHighWat(percentOf(
2106                configuration.getMaxSize(), stats.mem_high_wat_percent.load()));
2107    }
2108
2109
2110    maxItemSize = configuration.getMaxItemSize();
2111    configuration.addValueChangedListener(
2112            "max_item_size",
2113            std::make_unique<EpEngineValueChangeListener>(*this));
2114
2115    maxItemPrivilegedBytes = configuration.getMaxItemPrivilegedBytes();
2116    configuration.addValueChangedListener(
2117            "max_item_privileged_bytes",
2118            std::make_unique<EpEngineValueChangeListener>(*this));
2119
2120    getlDefaultTimeout = configuration.getGetlDefaultTimeout();
2121    configuration.addValueChangedListener(
2122            "getl_default_timeout",
2123            std::make_unique<EpEngineValueChangeListener>(*this));
2124    getlMaxTimeout = configuration.getGetlMaxTimeout();
2125    configuration.addValueChangedListener(
2126            "getl_max_timeout",
2127            std::make_unique<EpEngineValueChangeListener>(*this));
2128
2129    workload = new WorkLoadPolicy(configuration.getMaxNumWorkers(),
2130                                  configuration.getMaxNumShards());
2131    if ((unsigned int)workload->getNumShards() >
2132                                              configuration.getMaxVbuckets()) {
2133        LOG(EXTENSION_LOG_WARNING, "Invalid configuration: Shards must be "
2134            "equal or less than max number of vbuckets");
2135        return ENGINE_FAILED;
2136    }
2137
2138    dcpConnMap_ = std::make_unique<DcpConnMap>(*this);
2139
2140    /* Get the flow control policy */
2141    std::string flowCtlPolicy = configuration.getDcpFlowControlPolicy();
2142
2143    if (!flowCtlPolicy.compare("static")) {
2144        dcpFlowControlManager_ =
2145                std::make_unique<DcpFlowControlManagerStatic>(*this);
2146    } else if (!flowCtlPolicy.compare("dynamic")) {
2147        dcpFlowControlManager_ =
2148                std::make_unique<DcpFlowControlManagerDynamic>(*this);
2149    } else if (!flowCtlPolicy.compare("aggressive")) {
2150        dcpFlowControlManager_ =
2151                std::make_unique<DcpFlowControlManagerAggressive>(*this);
2152    } else {
2153        /* Flow control is not enabled */
2154        dcpFlowControlManager_ = std::make_unique<DcpFlowControlManager>(*this);
2155    }
2156
2157    checkpointConfig = new CheckpointConfig(*this);
2158    CheckpointConfig::addConfigChangeListener(*this);
2159
2160    kvBucket = makeBucket(configuration);
2161
2162    initializeEngineCallbacks();
2163
2164    // Complete the initialization of the ep-store
2165    if (!kvBucket->initialize()) {
2166        return ENGINE_FAILED;
2167    }
2168
2169    if(configuration.isDataTrafficEnabled()) {
2170        enableTraffic(true);
2171    }
2172
2173    dcpConnMap_->initialize();
2174
2175    // record engine initialization time
2176    startupTime.store(ep_real_time());
2177
2178    LOG(EXTENSION_LOG_NOTICE,
2179        "EP Engine: Initialization of %s bucket complete",
2180        configuration.getBucketType().c_str());
2181
2182    setCompressionMode(configuration.getCompressionMode());
2183
2184    configuration.addValueChangedListener(
2185            "compression_mode",
2186            std::make_unique<EpEngineValueChangeListener>(*this));
2187
2188    setMinCompressionRatio(configuration.getMinCompressionRatio());
2189
2190    configuration.addValueChangedListener(
2191            "min_compression_ratio",
2192            std::make_unique<EpEngineValueChangeListener>(*this));
2193
2194    return ENGINE_SUCCESS;
2195}
2196
2197void EventuallyPersistentEngine::destroy(bool force) {
2198    stats.forceShutdown = force;
2199    stats.isShutdown = true;
2200
2201    // Perform a snapshot of the stats before shutting down so we can persist
2202    // the type of shutdown (stats.forceShutdown), and consequently on the
2203    // next warmup can determine is there was a clean shutdown - see
2204    // Warmup::cleanShutdown
2205    if (kvBucket) {
2206        kvBucket->snapshotStats();
2207    }
2208    if (dcpConnMap_) {
2209        dcpConnMap_->shutdownAllConnections();
2210    }
2211}
2212
2213std::pair<cb::ExpiryLimit, rel_time_t>
2214EventuallyPersistentEngine::getExpiryParameters(rel_time_t exptime) const {
2215    cb::ExpiryLimit expiryLimit;
2216    auto limit = kvBucket->getMaxTtl();
2217    if (limit.count()) {
2218        expiryLimit = limit;
2219        // If max_ttl is more than 30 days we need to convert it to absolute so
2220        // it makes sense as an expiry time.
2221        if (exptime == 0) {
2222            if (limit.count() > (60 * 60 * 24 * 30)) {
2223                exptime = ep_abs_time(limit.count());
2224            } else {
2225                exptime = limit.count();
2226            }
2227        }
2228    }
2229
2230    return {expiryLimit, exptime};
2231}
2232
2233time_t EventuallyPersistentEngine::processExpiryTime(time_t in) const {
2234    auto limit = kvBucket->getMaxTtl();
2235    time_t out = in;
2236    if (limit.count()) {
2237        auto currentTime = ep_real_time();
2238        if (in == 0 || in > (currentTime + limit.count())) {
2239            // must expire in now + MaxTTL seconds
2240            out = currentTime + limit.count();
2241        }
2242    }
2243
2244    return out;
2245}
2246
2247ENGINE_ERROR_CODE EventuallyPersistentEngine::itemAllocate(
2248        item** itm,
2249        const DocKey& key,
2250        const size_t nbytes,
2251        const size_t priv_nbytes,
2252        const int flags,
2253        rel_time_t exptime,
2254        uint8_t datatype,
2255        uint16_t vbucket) {
2256    if (priv_nbytes > maxItemPrivilegedBytes) {
2257        return ENGINE_E2BIG;
2258    }
2259
2260    if ((nbytes - priv_nbytes) > maxItemSize) {
2261        return ENGINE_E2BIG;
2262    }
2263
2264    if (!hasMemoryForItemAllocation(sizeof(Item) + sizeof(Blob) + key.size() +
2265                                    nbytes)) {
2266        return memoryCondition();
2267    }
2268
2269    cb::ExpiryLimit expiryLimit;
2270    std::tie(expiryLimit, exptime) = getExpiryParameters(exptime);
2271    time_t expiretime =
2272            (exptime == 0) ? 0 : ep_abs_time(ep_reltime(exptime, expiryLimit));
2273
2274    *itm = new Item(key,
2275                    flags,
2276                    expiretime,
2277                    nullptr,
2278                    nbytes,
2279                    datatype,
2280                    0 /*cas*/,
2281                    -1 /*seq*/,
2282                    vbucket);
2283    if (*itm == NULL) {
2284        return memoryCondition();
2285    } else {
2286        stats.itemAllocSizeHisto.add(nbytes);
2287        return ENGINE_SUCCESS;
2288    }
2289}
2290
2291ENGINE_ERROR_CODE EventuallyPersistentEngine::itemDelete(
2292        const void* cookie,
2293        const DocKey& key,
2294        uint64_t& cas,
2295        uint16_t vbucket,
2296        ItemMetaData* item_meta,
2297        mutation_descr_t& mut_info) {
2298    ENGINE_ERROR_CODE ret = kvBucket->deleteItem(key,
2299                                                 cas,
2300                                                 vbucket,
2301                                                 cookie,
2302                                                 item_meta,
2303                                                 mut_info);
2304
2305    if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
2306        if (isDegradedMode()) {
2307            return ENGINE_TMPFAIL;
2308        }
2309    } else if (ret == ENGINE_SUCCESS) {
2310        ++stats.numOpsDelete;
2311    }
2312    return ret;
2313}
2314
2315void EventuallyPersistentEngine::itemRelease(item* itm) {
2316    delete reinterpret_cast<Item*>(itm);
2317}
2318
2319ENGINE_ERROR_CODE EventuallyPersistentEngine::get(const void* cookie,
2320                                                  item** itm,
2321                                                  const DocKey& key,
2322                                                  uint16_t vbucket,
2323                                                  get_options_t options) {
2324    ScopeTimer2<MicrosecondStopwatch, TracerStopwatch> timer(
2325            MicrosecondStopwatch(stats.getCmdHisto),
2326            TracerStopwatch(cookie, cb::tracing::TraceCode::GET));
2327
2328    GetValue gv(kvBucket->get(key, vbucket, cookie, options));
2329    ENGINE_ERROR_CODE ret = gv.getStatus();
2330
2331    if (ret == ENGINE_SUCCESS) {
2332        *itm = gv.item.release();
2333        if (options & TRACK_STATISTICS) {
2334            ++stats.numOpsGet;
2335        }
2336    } else if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
2337        if (isDegradedMode()) {
2338            return ENGINE_TMPFAIL;
2339        }
2340    }
2341
2342    return ret;
2343}
2344
2345ENGINE_ERROR_CODE EventuallyPersistentEngine::flush(const void *cookie){
2346    return ENGINE_ENOTSUP;
2347}
2348
2349cb::EngineErrorItemPair EventuallyPersistentEngine::get_and_touch(const void* cookie,
2350                                                           const DocKey& key,
2351                                                           uint16_t vbucket,
2352                                                           uint32_t exptime) {
2353    auto* handle = reinterpret_cast<ENGINE_HANDLE*>(this);
2354
2355    cb::ExpiryLimit expiryLimit;
2356    std::tie(expiryLimit, exptime) = getExpiryParameters(exptime);
2357
2358    time_t expiry_time =
2359            (exptime == 0) ? 0 : ep_abs_time(ep_reltime(exptime, expiryLimit));
2360
2361    GetValue gv(kvBucket->getAndUpdateTtl(key, vbucket, cookie, expiry_time));
2362
2363    auto rv = gv.getStatus();
2364    if (rv == ENGINE_SUCCESS) {
2365        ++stats.numOpsGet;
2366        ++stats.numOpsStore;
2367        return cb::makeEngineErrorItemPair(
2368                cb::engine_errc::success, gv.item.release(), handle);
2369    }
2370
2371    if (isDegradedMode()) {
2372        // Remap all some of the error codes
2373        switch (rv) {
2374        case ENGINE_KEY_EEXISTS:
2375        case ENGINE_KEY_ENOENT:
2376        case ENGINE_NOT_MY_VBUCKET:
2377            rv = ENGINE_TMPFAIL;
2378            break;
2379        default:
2380            break;
2381        }
2382    }
2383
2384    if (rv == ENGINE_KEY_EEXISTS) {
2385        rv = ENGINE_LOCKED;
2386    }
2387
2388    return cb::makeEngineErrorItemPair(cb::engine_errc(rv));
2389}
2390
2391cb::EngineErrorItemPair EventuallyPersistentEngine::get_if(const void* cookie,
2392                                                       const DocKey& key,
2393                                                       uint16_t vbucket,
2394                                                       std::function<bool(const item_info&)>filter) {
2395    auto* handle = reinterpret_cast<ENGINE_HANDLE*>(this);
2396
2397    ScopeTimer2<MicrosecondStopwatch, TracerStopwatch> timer(
2398            MicrosecondStopwatch(stats.getCmdHisto),
2399            TracerStopwatch(cookie, cb::tracing::TraceCode::GETIF));
2400
2401    // Fetch an item from the hashtable (without trying to schedule a bg-fetch
2402    // and pass it through the filter. If the filter accepts the document
2403    // based on the metadata, return the document. If the document's data
2404    // isn't resident we run another iteration in the loop and retries the
2405    // action but this time we _do_ schedule a bg-fetch.
2406    for (int ii = 0; ii < 2; ++ii) {
2407        auto options = static_cast<get_options_t>(HONOR_STATES |
2408                                                  DELETE_TEMP |
2409                                                  HIDE_LOCKED_CAS);
2410
2411        // For the first pass, if we need to do a BGfetch, only fetch metadata
2412        // (no point in fetching the whole document if the filter doesn't want
2413        // it).
2414        if (ii == 0) {
2415            options = static_cast<get_options_t>(int(options) | ALLOW_META_ONLY);
2416        }
2417
2418        // For second pass, or if full eviction, we'll need to issue a BG fetch.
2419        if (ii == 1 || kvBucket->getItemEvictionPolicy() == FULL_EVICTION) {
2420            options = static_cast<get_options_t>(int(options) | QUEUE_BG_FETCH);
2421        }
2422
2423        GetValue gv(kvBucket->get(key, vbucket, cookie, options));
2424        ENGINE_ERROR_CODE status = gv.getStatus();
2425
2426        switch (status) {
2427        case ENGINE_SUCCESS:
2428            break;
2429
2430        case ENGINE_KEY_ENOENT: // FALLTHROUGH
2431        case ENGINE_NOT_MY_VBUCKET: // FALLTHROUGH
2432            if (isDegradedMode()) {
2433                status = ENGINE_TMPFAIL;
2434            }
2435            // FALLTHROUGH
2436        default:
2437            return cb::makeEngineErrorItemPair(cb::engine_errc(status));
2438        }
2439
2440        const VBucketPtr vb = getKVBucket()->getVBucket(vbucket);
2441        uint64_t vb_uuid = 0;
2442        int64_t hlcEpoch = HlcCasSeqnoUninitialised;
2443        if (vb) {
2444            vb_uuid = vb->failovers->getLatestUUID();
2445            hlcEpoch = vb->getHLCEpochSeqno();
2446        }
2447        // Apply filter; the item value isn't guaranteed to be present
2448        // (meta only) so remove it to prevent people accidentally trying to
2449        // test it.
2450        auto info = gv.item->toItemInfo(vb_uuid, hlcEpoch);
2451        info.value[0].iov_base = nullptr;
2452        info.value[0].iov_len = 0;
2453        if (filter(info)) {
2454            if (!gv.isPartial()) {
2455                return cb::makeEngineErrorItemPair(
2456                        cb::engine_errc::success, gv.item.release(), handle);
2457            }
2458            // We want this item, but we need to fetch it off disk
2459        } else {
2460            // the client don't care about this thing..
2461            return cb::makeEngineErrorItemPair(cb::engine_errc::success);
2462        }
2463    }
2464
2465    // It should not be possible to get as the second iteration in the loop
2466    // SHOULD handle backround fetches an the item should NOT be partial!
2467    throw std::logic_error("EventuallyPersistentEngine::get_if: loop terminated");
2468}
2469
2470ENGINE_ERROR_CODE EventuallyPersistentEngine::get_locked(const void* cookie,
2471                                                         item** itm,
2472                                                         const DocKey& key,
2473                                                         uint16_t vbucket,
2474                                                         uint32_t lock_timeout) {
2475    auto default_timeout = static_cast<uint32_t>(getGetlDefaultTimeout());
2476
2477    if (lock_timeout == 0) {
2478        lock_timeout = default_timeout;
2479    } else if (lock_timeout > static_cast<uint32_t>(getGetlMaxTimeout())) {
2480        LOG(EXTENSION_LOG_WARNING,
2481            "EventuallyPersistentEngine::get_locked: "
2482            "Illegal value for lock timeout specified %u. "
2483            "Using default value: %u", lock_timeout, default_timeout);
2484        lock_timeout = default_timeout;
2485    }
2486
2487    auto result = kvBucket->getLocked(key, vbucket, ep_current_time(),
2488                                      lock_timeout, cookie);
2489
2490    if (result.getStatus() == ENGINE_SUCCESS) {
2491        ++stats.numOpsGet;
2492        *itm = result.item.release();
2493    }
2494
2495    return result.getStatus();
2496}
2497
2498ENGINE_ERROR_CODE EventuallyPersistentEngine::unlock(const void* cookie,
2499                                                     const DocKey& key,
2500                                                     uint16_t vbucket,
2501                                                     uint64_t cas) {
2502    return kvBucket->unlockKey(key, vbucket, cas, ep_current_time());
2503}
2504
2505cb::EngineErrorCasPair EventuallyPersistentEngine::store_if(
2506        const void* cookie,
2507        Item& item,
2508        uint64_t cas,
2509        ENGINE_STORE_OPERATION operation,
2510        cb::StoreIfPredicate predicate) {
2511    ScopeTimer2<MicrosecondStopwatch, TracerStopwatch> timer(
2512            MicrosecondStopwatch(stats.storeCmdHisto),
2513            TracerStopwatch(cookie, cb::tracing::TraceCode::STORE));
2514
2515    ENGINE_ERROR_CODE status;
2516    switch (operation) {
2517    case OPERATION_CAS:
2518        if (item.getCas() == 0) {
2519            // Using a cas command with a cas wildcard doesn't make sense
2520            status = ENGINE_NOT_STORED;
2521            break;
2522        }
2523    // FALLTHROUGH
2524    case OPERATION_SET:
2525        if (isDegradedMode()) {
2526            return {cb::engine_errc::temporary_failure, cas};
2527        }
2528        status = kvBucket->set(item, cookie, predicate);
2529        break;
2530
2531    case OPERATION_ADD:
2532        if (isDegradedMode()) {
2533            return {cb::engine_errc::temporary_failure, cas};
2534        }
2535
2536        if (item.getCas() != 0) {
2537            // Adding an item with a cas value doesn't really make sense...
2538            return {cb::engine_errc::key_already_exists, cas};
2539        }
2540
2541        status = kvBucket->add(item, cookie);
2542        break;
2543
2544    case OPERATION_REPLACE:
2545        status = kvBucket->replace(item, cookie, predicate);
2546        break;
2547    default:
2548        status = ENGINE_ENOTSUP;
2549    }
2550
2551    switch (status) {
2552    case ENGINE_SUCCESS:
2553        ++stats.numOpsStore;
2554        // If success - check if we're now in need of some memory freeing
2555        kvBucket->checkAndMaybeFreeMemory();
2556        break;
2557    case ENGINE_ENOMEM:
2558        status = memoryCondition();
2559        break;
2560    case ENGINE_NOT_STORED:
2561    case ENGINE_NOT_MY_VBUCKET:
2562        if (isDegradedMode()) {
2563            return {cb::engine_errc::temporary_failure, cas};
2564        }
2565        break;
2566    default:
2567        break;
2568    }
2569
2570    return {cb::engine_errc(status), item.getCas()};
2571}
2572
2573ENGINE_ERROR_CODE EventuallyPersistentEngine::store(
2574        const void* cookie,
2575        item* itm,
2576        uint64_t& cas,
2577        ENGINE_STORE_OPERATION operation) {
2578    Item& item = static_cast<Item&>(*static_cast<Item*>(itm));
2579    auto rv = store_if(cookie, item, cas, operation, {});
2580    cas = rv.cas;
2581    return ENGINE_ERROR_CODE(rv.status);
2582}
2583
2584void EventuallyPersistentEngine::initializeEngineCallbacks() {
2585    // Register the ON_DISCONNECT callback
2586    registerEngineCallback(ON_DISCONNECT, EvpHandleDisconnect, this);
2587    // Register the ON_DELETE_BUCKET callback
2588    registerEngineCallback(ON_DELETE_BUCKET, EvpHandleDeleteBucket, this);
2589}
2590
2591ENGINE_ERROR_CODE EventuallyPersistentEngine::memoryCondition() {
2592    // Do we think it's possible we could free something?
2593    bool haveEvidenceWeCanFreeMemory =
2594            (stats.getMaxDataSize() > stats.getMemOverhead());
2595    if (haveEvidenceWeCanFreeMemory) {
2596        // Look for more evidence by seeing if we have resident items.
2597        VBucketCountVisitor countVisitor(vbucket_state_active);
2598        kvBucket->visit(countVisitor);
2599
2600        haveEvidenceWeCanFreeMemory = countVisitor.getNonResident() <
2601            countVisitor.getNumItems();
2602    }
2603    if (haveEvidenceWeCanFreeMemory) {
2604        ++stats.tmp_oom_errors;
2605        // Wake up the item pager task as memory usage
2606        // seems to have exceeded high water mark
2607        getKVBucket()->attemptToFreeMemory();
2608        return ENGINE_TMPFAIL;
2609    } else {
2610        if (getKVBucket()->getItemEvictionPolicy() == FULL_EVICTION) {
2611            ++stats.tmp_oom_errors;
2612            getKVBucket()->wakeUpCheckpointRemover();
2613            return ENGINE_TMPFAIL;
2614        }
2615
2616        ++stats.oom_errors;
2617        return ENGINE_ENOMEM;
2618    }
2619}
2620
2621bool EventuallyPersistentEngine::hasMemoryForItemAllocation(
2622        uint32_t totalItemSize) {
2623    return (stats.getEstimatedTotalMemoryUsed() + totalItemSize) <=
2624           stats.getMaxDataSize();
2625}
2626
2627bool EventuallyPersistentEngine::enableTraffic(bool enable) {
2628    bool inverse = !enable;
2629    return trafficEnabled.compare_exchange_strong(inverse, enable);
2630}
2631
2632ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
2633                                                           ADD_STAT add_stat) {
2634
2635    configuration.addStats(add_stat, cookie);
2636
2637    EPStats &epstats = getEpStats();
2638    add_casted_stat("ep_storage_age",
2639                    epstats.dirtyAge, add_stat, cookie);
2640    add_casted_stat("ep_storage_age_highwat",
2641                    epstats.dirtyAgeHighWat, add_stat, cookie);
2642    add_casted_stat("ep_num_workers", ExecutorPool::get()->getNumWorkersStat(),
2643                    add_stat, cookie);
2644
2645    if (getWorkloadPriority() == HIGH_BUCKET_PRIORITY) {
2646        add_casted_stat("ep_bucket_priority", "HIGH", add_stat, cookie);
2647    } else if (getWorkloadPriority() == LOW_BUCKET_PRIORITY) {
2648        add_casted_stat("ep_bucket_priority", "LOW", add_stat, cookie);
2649    }
2650
2651    add_casted_stat("ep_total_enqueued",
2652                    epstats.totalEnqueued, add_stat, cookie);
2653    add_casted_stat("ep_expired_access", epstats.expired_access,
2654                    add_stat, cookie);
2655    add_casted_stat("ep_expired_compactor", epstats.expired_compactor,
2656                    add_stat, cookie);
2657    add_casted_stat("ep_expired_pager", epstats.expired_pager,
2658                    add_stat, cookie);
2659    add_casted_stat("ep_queue_size",
2660                    epstats.diskQueueSize, add_stat, cookie);
2661    add_casted_stat("ep_diskqueue_items",
2662                    epstats.diskQueueSize, add_stat, cookie);
2663    add_casted_stat("ep_vb_backfill_queue_size",
2664                    epstats.vbBackfillQueueSize,
2665                    add_stat,
2666                    cookie);
2667    auto* flusher = kvBucket->getFlusher(EP_PRIMARY_SHARD);
2668    if (flusher) {
2669        add_casted_stat("ep_commit_num", epstats.flusherCommits,
2670                        add_stat, cookie);
2671        add_casted_stat("ep_commit_time",
2672                        epstats.commit_time, add_stat, cookie);
2673        add_casted_stat("ep_commit_time_total",
2674                        epstats.cumulativeCommitTime, add_stat, cookie);
2675        add_casted_stat("ep_item_begin_failed",
2676                        epstats.beginFailed, add_stat, cookie);
2677        add_casted_stat("ep_item_commit_failed",
2678                        epstats.commitFailed, add_stat, cookie);
2679        add_casted_stat("ep_item_flush_expired",
2680                        epstats.flushExpired, add_stat, cookie);
2681        add_casted_stat("ep_item_flush_failed",
2682                        epstats.flushFailed, add_stat, cookie);
2683        add_casted_stat("ep_flusher_state",
2684                        flusher->stateName(), add_stat, cookie);
2685        add_casted_stat("ep_flusher_todo",
2686                        epstats.flusher_todo, add_stat, cookie);
2687        add_casted_stat("ep_total_persisted",
2688                        epstats.totalPersisted, add_stat, cookie);
2689        add_casted_stat("ep_uncommitted_items",
2690                        epstats.flusher_todo, add_stat, cookie);
2691        add_casted_stat("ep_chk_persistence_timeout",
2692                        VBucket::getCheckpointFlushTimeout().count(),
2693                        add_stat,
2694                        cookie);
2695    }
2696    add_casted_stat("ep_vbucket_del",
2697                    epstats.vbucketDeletions, add_stat, cookie);
2698    add_casted_stat("ep_vbucket_del_fail",
2699                    epstats.vbucketDeletionFail, add_stat, cookie);
2700    add_casted_stat("ep_flush_duration_total",
2701                    epstats.cumulativeFlushTime, add_stat, cookie);
2702
2703    kvBucket->getAggregatedVBucketStats(cookie, add_stat);
2704
2705    kvBucket->getFileStats(cookie, add_stat);
2706
2707    add_casted_stat("ep_persist_vbstate_total",
2708                    epstats.totalPersistVBState, add_stat, cookie);
2709
2710    size_t memUsed = stats.getPreciseTotalMemoryUsed();
2711    add_casted_stat("mem_used", memUsed, add_stat, cookie);
2712    add_casted_stat("mem_used_estimate",
2713                    stats.getEstimatedTotalMemoryUsed(),
2714                    add_stat,
2715                    cookie);
2716    add_casted_stat("ep_mem_low_wat_percent", stats.mem_low_wat_percent,
2717                    add_stat, cookie);
2718    add_casted_stat("ep_mem_high_wat_percent", stats.mem_high_wat_percent,
2719                    add_stat, cookie);
2720    add_casted_stat("bytes", memUsed, add_stat, cookie);
2721    add_casted_stat("ep_kv_size", stats.getCurrentSize(), add_stat, cookie);
2722    add_casted_stat("ep_blob_num", stats.getNumBlob(), add_stat, cookie);
2723#if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
2724    add_casted_stat(
2725            "ep_blob_overhead", stats.getBlobOverhead(), add_stat, cookie);
2726#else
2727    add_casted_stat("ep_blob_overhead", "unknown", add_stat, cookie);
2728#endif
2729    add_casted_stat(
2730            "ep_value_size", stats.getTotalValueSize(), add_stat, cookie);
2731    add_casted_stat(
2732            "ep_storedval_size", stats.getStoredValSize(), add_stat, cookie);
2733#if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
2734    add_casted_stat(
2735            "ep_storedval_overhead", stats.getBlobOverhead(), add_stat, cookie);
2736#else
2737    add_casted_stat("ep_storedval_overhead", "unknown", add_stat, cookie);
2738#endif
2739    add_casted_stat(
2740            "ep_storedval_num", stats.getNumStoredVal(), add_stat, cookie);
2741    add_casted_stat("ep_overhead", stats.getMemOverhead(), add_stat, cookie);
2742    add_casted_stat("ep_item_num", stats.getNumItem(), add_stat, cookie);
2743
2744    add_casted_stat("ep_oom_errors", stats.oom_errors, add_stat, cookie);
2745    add_casted_stat("ep_tmp_oom_errors", stats.tmp_oom_errors,
2746                    add_stat, cookie);
2747    add_casted_stat("ep_mem_tracker_enabled", stats.memoryTrackerEnabled,
2748                    add_stat, cookie);
2749    add_casted_stat("ep_bg_fetched", epstats.bg_fetched,
2750                    add_stat, cookie);
2751    add_casted_stat("ep_bg_meta_fetched", epstats.bg_meta_fetched,
2752                    add_stat, cookie);
2753    add_casted_stat("ep_bg_remaining_items", epstats.numRemainingBgItems,
2754                    add_stat, cookie);
2755    add_casted_stat("ep_bg_remaining_jobs", epstats.numRemainingBgJobs,
2756                    add_stat, cookie);
2757    add_casted_stat("ep_max_bg_remaining_jobs", epstats.maxRemainingBgJobs,
2758                    add_stat, cookie);
2759    add_casted_stat("ep_num_pager_runs", epstats.pagerRuns,
2760                    add_stat, cookie);
2761    add_casted_stat("ep_num_expiry_pager_runs", epstats.expiryPagerRuns,
2762                    add_stat, cookie);
2763    add_casted_stat("ep_num_freq_decayer_runs",
2764                    epstats.freqDecayerRuns,
2765                    add_stat,
2766                    cookie);
2767    add_casted_stat("ep_items_rm_from_checkpoints",
2768                    epstats.itemsRemovedFromCheckpoints,
2769                    add_stat, cookie);
2770    add_casted_stat("ep_num_value_ejects", epstats.numValueEjects,
2771                    add_stat, cookie);
2772    add_casted_stat("ep_num_eject_failures", epstats.numFailedEjects,
2773                    add_stat, cookie);
2774    add_casted_stat("ep_num_not_my_vbuckets", epstats.numNotMyVBuckets,
2775                    add_stat, cookie);
2776
2777    add_casted_stat("ep_pending_ops", epstats.pendingOps, add_stat, cookie);
2778    add_casted_stat("ep_pending_ops_total", epstats.pendingOpsTotal,
2779                    add_stat, cookie);
2780    add_casted_stat("ep_pending_ops_max", epstats.pendingOpsMax,
2781                    add_stat, cookie);
2782    add_casted_stat("ep_pending_ops_max_duration",
2783                    epstats.pendingOpsMaxDuration,
2784                    add_stat, cookie);
2785
2786    add_casted_stat("ep_pending_compactions", epstats.pendingCompactions,
2787                    add_stat, cookie);
2788    add_casted_stat("ep_rollback_count", epstats.rollbackCount,
2789                    add_stat, cookie);
2790
2791    size_t vbDeletions = epstats.vbucketDeletions.load();
2792    if (vbDeletions > 0) {
2793        add_casted_stat("ep_vbucket_del_max_walltime",
2794                        epstats.vbucketDelMaxWalltime,
2795                        add_stat, cookie);
2796        add_casted_stat("ep_vbucket_del_avg_walltime",
2797                        epstats.vbucketDelTotWalltime / vbDeletions,
2798                        add_stat, cookie);
2799    }
2800
2801    size_t numBgOps = epstats.bgNumOperations.load();
2802    if (numBgOps > 0) {
2803        add_casted_stat("ep_bg_num_samples", epstats.bgNumOperations,
2804                        add_stat, cookie);
2805        add_casted_stat("ep_bg_min_wait",
2806                        epstats.bgMinWait,
2807                        add_stat, cookie);
2808        add_casted_stat("ep_bg_max_wait",
2809                        epstats.bgMaxWait,
2810                        add_stat, cookie);
2811        add_casted_stat("ep_bg_wait_avg",
2812                        epstats.bgWait / numBgOps,
2813                        add_stat, cookie);
2814        add_casted_stat("ep_bg_min_load",
2815                        epstats.bgMinLoad,
2816                        add_stat, cookie);
2817        add_casted_stat("ep_bg_max_load",
2818                        epstats.bgMaxLoad,
2819                        add_stat, cookie);
2820        add_casted_stat("ep_bg_load_avg",
2821                        epstats.bgLoad / numBgOps,
2822                        add_stat, cookie);
2823        add_casted_stat("ep_bg_wait",
2824                        epstats.bgWait,
2825                        add_stat, cookie);
2826        add_casted_stat("ep_bg_load",
2827                        epstats.bgLoad,
2828                        add_stat, cookie);
2829    }
2830
2831    add_casted_stat("ep_degraded_mode", isDegradedMode(), add_stat, cookie);
2832
2833    add_casted_stat("ep_num_access_scanner_runs", epstats.alogRuns,
2834                    add_stat, cookie);
2835    add_casted_stat("ep_num_access_scanner_skips",
2836                    epstats.accessScannerSkips, add_stat, cookie);
2837    add_casted_stat("ep_access_scanner_last_runtime", epstats.alogRuntime,
2838                    add_stat, cookie);
2839    add_casted_stat("ep_access_scanner_num_items", epstats.alogNumItems,
2840                    add_stat, cookie);
2841
2842    if (kvBucket->isAccessScannerEnabled() && epstats.alogTime.load() != 0)
2843    {
2844        char timestr[20];
2845        struct tm alogTim;
2846        hrtime_t alogTime = epstats.alogTime.load();
2847        if (cb_gmtime_r((time_t *)&alogTime, &alogTim) == -1) {
2848            add_casted_stat("ep_access_scanner_task_time", "UNKNOWN", add_stat,
2849                            cookie);
2850        } else {
2851            strftime(timestr, 20, "%Y-%m-%d %H:%M:%S", &alogTim);
2852            add_casted_stat("ep_access_scanner_task_time", timestr, add_stat,
2853                            cookie);
2854        }
2855    } else {
2856        add_casted_stat("ep_access_scanner_task_time", "NOT_SCHEDULED",
2857                        add_stat, cookie);
2858    }
2859
2860    if (kvBucket->isExpPagerEnabled()) {
2861        char timestr[20];
2862        struct tm expPagerTim;
2863        hrtime_t expPagerTime = epstats.expPagerTime.load();
2864        if (cb_gmtime_r((time_t *)&expPagerTime, &expPagerTim) == -1) {
2865            add_casted_stat("ep_expiry_pager_task_time", "UNKNOWN", add_stat,
2866                            cookie);
2867        } else {
2868            strftime(timestr, 20, "%Y-%m-%d %H:%M:%S", &expPagerTim);
2869            add_casted_stat("ep_expiry_pager_task_time", timestr, add_stat,
2870                            cookie);
2871        }
2872    } else {
2873        add_casted_stat("ep_expiry_pager_task_time", "NOT_SCHEDULED",
2874                        add_stat, cookie);
2875    }
2876
2877    add_casted_stat("ep_startup_time", startupTime.load(), add_stat, cookie);
2878
2879    if (getConfiguration().isWarmup()) {
2880        Warmup *wp = kvBucket->getWarmup();
2881        if (wp == nullptr) {
2882            throw std::logic_error("EPEngine::doEngineStats: warmup is NULL");
2883        }
2884        if (!kvBucket->isWarmingUp()) {
2885            add_casted_stat("ep_warmup_thread", "complete", add_stat, cookie);
2886        } else {
2887            add_casted_stat("ep_warmup_thread", "running", add_stat, cookie);
2888        }
2889        if (wp->getTime() > wp->getTime().zero()) {
2890            add_casted_stat(
2891                    "ep_warmup_time",
2892                    std::chrono::duration_cast<std::chrono::microseconds>(
2893                            wp->getTime())
2894                            .count(),
2895                    add_stat,
2896                    cookie);
2897        }
2898        add_casted_stat("ep_warmup_oom", epstats.warmOOM, add_stat, cookie);
2899        add_casted_stat("ep_warmup_dups", epstats.warmDups, add_stat, cookie);
2900    }
2901
2902    add_casted_stat("ep_num_ops_get_meta", epstats.numOpsGetMeta,
2903                    add_stat, cookie);
2904    add_casted_stat("ep_num_ops_set_meta", epstats.numOpsSetMeta,
2905                    add_stat, cookie);
2906    add_casted_stat("ep_num_ops_del_meta", epstats.numOpsDelMeta,
2907                    add_stat, cookie);
2908    add_casted_stat("ep_num_ops_set_meta_res_fail",
2909                    epstats.numOpsSetMetaResolutionFailed, add_stat, cookie);
2910    add_casted_stat("ep_num_ops_del_meta_res_fail",
2911                    epstats.numOpsDelMetaResolutionFailed, add_stat, cookie);
2912    add_casted_stat("ep_num_ops_set_ret_meta", epstats.numOpsSetRetMeta,
2913                    add_stat, cookie);
2914    add_casted_stat("ep_num_ops_del_ret_meta", epstats.numOpsDelRetMeta,
2915                    add_stat, cookie);
2916    add_casted_stat("ep_num_ops_get_meta_on_set_meta",
2917                    epstats.numOpsGetMetaOnSetWithMeta, add_stat, cookie);
2918    add_casted_stat("ep_workload_pattern",
2919                    workload->stringOfWorkLoadPattern(),
2920                    add_stat, cookie);
2921
2922    add_casted_stat("ep_defragmenter_num_visited", epstats.defragNumVisited,
2923                    add_stat, cookie);
2924    add_casted_stat("ep_defragmenter_num_moved", epstats.defragNumMoved,
2925                    add_stat, cookie);
2926
2927    add_casted_stat("ep_item_compressor_num_visited",
2928                    epstats.compressorNumVisited,
2929                    add_stat,
2930                    cookie);
2931    add_casted_stat("ep_item_compressor_num_compressed",
2932                    epstats.compressorNumCompressed,
2933                    add_stat,
2934                    cookie);
2935
2936    add_casted_stat("ep_cursor_dropping_lower_threshold",
2937                    epstats.cursorDroppingLThreshold, add_stat, cookie);
2938    add_casted_stat("ep_cursor_dropping_upper_threshold",
2939                    epstats.cursorDroppingUThreshold, add_stat, cookie);
2940    add_casted_stat("ep_cursors_dropped",
2941                    epstats.cursorsDropped, add_stat, cookie);
2942    add_casted_stat("ep_cursor_memory_freed",
2943                    epstats.cursorMemoryFreed,
2944                    add_stat,
2945                    cookie);
2946
2947    // Note: These are also reported per-shard in 'kvstore' stats, however
2948    // we want to be able to graph these over time, and hence need to expose
2949    // to ns_sever at the top-level.
2950    size_t value = 0;
2951    if (kvBucket->getKVStoreStat("failure_compaction", value,
2952                                 KVBucketIface::KVSOption::BOTH)) {
2953        // Total data write failures is compaction failures plus commit failures
2954        auto writeFailure = value + epstats.commitFailed;
2955        add_casted_stat("ep_data_write_failed", writeFailure, add_stat, cookie);
2956    }
2957    if (kvBucket->getKVStoreStat("failure_get", value,
2958                                 KVBucketIface::KVSOption::BOTH)) {
2959        add_casted_stat("ep_data_read_failed",  value, add_stat, cookie);
2960    }
2961    if (kvBucket->getKVStoreStat("io_total_read_bytes", value,
2962                                 KVBucketIface::KVSOption::BOTH)) {
2963        add_casted_stat("ep_io_total_read_bytes",  value, add_stat, cookie);
2964    }
2965    if (kvBucket->getKVStoreStat("io_total_write_bytes", value,
2966                                 KVBucketIface::KVSOption::BOTH)) {
2967        add_casted_stat("ep_io_total_write_bytes",  value, add_stat, cookie);
2968    }
2969    if (kvBucket->getKVStoreStat("io_compaction_read_bytes", value,
2970                                 KVBucketIface::KVSOption::BOTH)) {
2971        add_casted_stat("ep_io_compaction_read_bytes",  value, add_stat, cookie);
2972    }
2973    if (kvBucket->getKVStoreStat("io_compaction_write_bytes", value,
2974                                 KVBucketIface::KVSOption::BOTH)) {
2975        add_casted_stat("ep_io_compaction_write_bytes",  value, add_stat, cookie);
2976    }
2977
2978    if (kvBucket->getKVStoreStat("io_bg_fetch_read_count",
2979                                 value,
2980                                 KVBucketIface::KVSOption::BOTH)) {
2981        add_casted_stat("ep_io_bg_fetch_read_count", value, add_stat, cookie);
2982        // Calculate read amplication (RA) in terms of disk reads:
2983        // ratio of number of reads performed, compared to how many docs
2984        // fetched.
2985        //
2986        // Note: An alternative definition would be in terms of *bytes* read -
2987        // count of bytes read from disk compared to sizeof(key+meta+body) for
2988        // for fetched documents. However this is potentially misleading given
2989        // we perform IO buffering and always read in 4K sized chunks, so it
2990        // would give very large values.
2991        auto fetched = epstats.bg_fetched + epstats.bg_meta_fetched;
2992        double readAmp = fetched ? double(value) / double(fetched) : 0.0;
2993        add_casted_stat("ep_bg_fetch_avg_read_amplification",
2994                        readAmp,
2995                        add_stat,
2996                        cookie);
2997    }
2998
2999    // Specific to RocksDB. Cumulative ep-engine stats.
3000    // Note: These are also reported per-shard in 'kvstore' stats.
3001    // Memory Usage
3002    if (kvBucket->getKVStoreStat(
3003                "kMemTableTotal", value, KVBucketIface::KVSOption::RW)) {
3004        add_casted_stat("ep_rocksdb_kMemTableTotal", value, add_stat, cookie);
3005    }
3006    if (kvBucket->getKVStoreStat(
3007                "kMemTableUnFlushed", value, KVBucketIface::KVSOption::RW)) {
3008        add_casted_stat(
3009                "ep_rocksdb_kMemTableUnFlushed", value, add_stat, cookie);
3010    }
3011    if (kvBucket->getKVStoreStat(
3012                "kTableReadersTotal", value, KVBucketIface::KVSOption::RW)) {
3013        add_casted_stat(
3014                "ep_rocksdb_kTableReadersTotal", value, add_stat, cookie);
3015    }
3016    if (kvBucket->getKVStoreStat(
3017                "kCacheTotal", value, KVBucketIface::KVSOption::RW)) {
3018        add_casted_stat("ep_rocksdb_kCacheTotal", value, add_stat, cookie);
3019    }
3020    // MemTable Size per-CF
3021    if (kvBucket->getKVStoreStat("default_kSizeAllMemTables",
3022                                 value,
3023                                 KVBucketIface::KVSOption::RW)) {
3024        add_casted_stat("ep_rocksdb_default_kSizeAllMemTables",
3025                        value,
3026                        add_stat,
3027                        cookie);
3028    }
3029    if (kvBucket->getKVStoreStat("seqno_kSizeAllMemTables",
3030                                 value,
3031                                 KVBucketIface::KVSOption::RW)) {
3032        add_casted_stat(
3033                "ep_rocksdb_seqno_kSizeAllMemTables", value, add_stat, cookie);
3034    }
3035    // BlockCache Hit Ratio
3036    size_t hit = 0;
3037    size_t miss = 0;
3038    if (kvBucket->getKVStoreStat("rocksdb.block.cache.data.hit",
3039                                 hit,
3040                                 KVBucketIface::KVSOption::RW) &&
3041        kvBucket->getKVStoreStat("rocksdb.block.cache.data.miss",
3042                                 miss,
3043                                 KVBucketIface::KVSOption::RW) &&
3044        (hit + miss) != 0) {
3045        const auto ratio =
3046                gsl::narrow_cast<int>(float(hit) / (hit + miss) * 10000);
3047        add_casted_stat("ep_rocksdb_block_cache_data_hit_ratio",
3048                        ratio,
3049                        add_stat,
3050                        cookie);
3051    }
3052    if (kvBucket->getKVStoreStat("rocksdb.block.cache.index.hit",
3053                                 hit,
3054                                 KVBucketIface::KVSOption::RW) &&
3055        kvBucket->getKVStoreStat("rocksdb.block.cache.index.miss",
3056                                 miss,
3057                                 KVBucketIface::KVSOption::RW) &&
3058        (hit + miss) != 0) {
3059        const auto ratio =
3060                gsl::narrow_cast<int>(float(hit) / (hit + miss) * 10000);
3061        add_casted_stat("ep_rocksdb_block_cache_index_hit_ratio",
3062                        ratio,
3063                        add_stat,
3064                        cookie);
3065    }
3066    if (kvBucket->getKVStoreStat("rocksdb.block.cache.filter.hit",
3067                                 hit,
3068                                 KVBucketIface::KVSOption::RW) &&
3069        kvBucket->getKVStoreStat("rocksdb.block.cache.filter.miss",
3070                                 miss,
3071                                 KVBucketIface::KVSOption::RW) &&
3072        (hit + miss) != 0) {
3073        const auto ratio =
3074                gsl::narrow_cast<int>(float(hit) / (hit + miss) * 10000);
3075        add_casted_stat("ep_rocksdb_block_cache_filter_hit_ratio",
3076                        ratio,
3077                        add_stat,
3078                        cookie);
3079    }
3080    // Disk Usage per-CF
3081    if (kvBucket->getKVStoreStat("default_kTotalSstFilesSize",
3082                                 value,
3083                                 KVBucketIface::KVSOption::RW)) {
3084        add_casted_stat("ep_rocksdb_default_kTotalSstFilesSize",
3085                        value,
3086                        add_stat,
3087                        cookie);
3088    }
3089    if (kvBucket->getKVStoreStat("seqno_kTotalSstFilesSize",
3090                                 value,
3091                                 KVBucketIface::KVSOption::RW)) {
3092        add_casted_stat(
3093                "ep_rocksdb_seqno_kTotalSstFilesSize", value, add_stat, cookie);
3094    }
3095    // Scan stats
3096    if (kvBucket->getKVStoreStat(
3097                "scan_totalSeqnoHits", value, KVBucketIface::KVSOption::RW)) {
3098        add_casted_stat(
3099                "ep_rocksdb_scan_totalSeqnoHits", value, add_stat, cookie);
3100    }
3101    if (kvBucket->getKVStoreStat(
3102                "scan_oldSeqnoHits", value, KVBucketIface::KVSOption::RW)) {
3103        add_casted_stat(
3104                "ep_rocksdb_scan_oldSeqnoHits", value, add_stat, cookie);
3105    }
3106
3107    return ENGINE_SUCCESS;
3108}
3109
3110ENGINE_ERROR_CODE EventuallyPersistentEngine::doMemoryStats(const void *cookie,
3111                                                           ADD_STAT add_stat) {
3112    add_casted_stat(
3113            "bytes", stats.getPreciseTotalMemoryUsed(), add_stat, cookie);
3114    add_casted_stat(
3115            "mem_used", stats.getPreciseTotalMemoryUsed(), add_stat, cookie);
3116    add_casted_stat("mem_used_estimate",
3117                    stats.getEstimatedTotalMemoryUsed(),
3118                    add_stat,
3119                    cookie);
3120    add_casted_stat("mem_used_merge_threshold",
3121                    stats.getMemUsedMergeThreshold(),
3122                    add_stat,
3123                    cookie);
3124
3125    add_casted_stat("ep_kv_size", stats.getCurrentSize(), add_stat, cookie);
3126    add_casted_stat(
3127            "ep_value_size", stats.getTotalValueSize(), add_stat, cookie);
3128    add_casted_stat("ep_overhead", stats.getMemOverhead(), add_stat, cookie);
3129    add_casted_stat("ep_max_size", stats.getMaxDataSize(), add_stat, cookie);
3130    add_casted_stat("ep_mem_low_wat", stats.mem_low_wat, add_stat,