xref: /6.0.3/kv_engine/engines/ep/src/ep_engine.cc (revision 90bd9f55)
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "config.h"
19 
20 #include "ep_engine.h"
21 #include "kv_bucket.h"
22 
23 #include "checkpoint.h"
24 #include "collections/manager.h"
25 #include "common.h"
26 #include "connmap.h"
27 #include "dcp/consumer.h"
28 #include "dcp/dcpconnmap.h"
29 #include "dcp/flow-control-manager.h"
30 #include "dcp/producer.h"
31 #include "ep_bucket.h"
32 #include "ep_vb.h"
33 #include "ephemeral_bucket.h"
34 #include "failover-table.h"
35 #include "flusher.h"
36 #include "htresizer.h"
37 #include "logger.h"
38 #include "memory_tracker.h"
39 #include "replicationthrottle.h"
40 #include "stats-info.h"
41 #include "statwriter.h"
42 #include "string_utils.h"
43 #include "vb_count_visitor.h"
44 #include "warmup.h"
45 
46 #include <JSON_checker.h>
47 #include <cJSON_utils.h>
48 #include <memcached/engine.h>
49 #include <memcached/extension.h>
50 #include <memcached/protocol_binary.h>
51 #include <memcached/server_api.h>
52 #include <memcached/util.h>
53 #include <platform/cb_malloc.h>
54 #include <platform/checked_snprintf.h>
55 #include <platform/compress.h>
56 #include <platform/make_unique.h>
57 #include <platform/platform.h>
58 #include <platform/processclock.h>
59 #include <platform/scope_timer.h>
60 #include <tracing/trace_helpers.h>
61 #include <utilities/logtags.h>
62 #include <xattr/utils.h>
63 
64 #include <cstdio>
65 #include <cstring>
66 #include <fcntl.h>
67 #include <fstream>
68 #include <iostream>
69 #include <limits>
70 #include <mutex>
71 #include <stdarg.h>
72 #include <string>
73 #include <vector>
74 
75 using cb::tracing::TraceCode;
76 
percentOf(size_t val,double percent)77 static size_t percentOf(size_t val, double percent) {
78     return static_cast<size_t>(static_cast<double>(val) * percent);
79 }
80 
81 struct EPHandleReleaser {
operator ()EPHandleReleaser82     void operator()(EventuallyPersistentEngine*) {
83         ObjectRegistry::onSwitchThread(nullptr);
84     }
85 };
86 
87 using EPHandle = std::unique_ptr<EventuallyPersistentEngine, EPHandleReleaser>;
88 
89 /**
90  * Helper function to acquire a handle to the engine which allows access to
91  * the engine while the handle is in scope.
92  * @param handle pointer to the engine
93  * @return EPHandle which is a unique_ptr to an EventuallyPersistentEngine
94  * with a custom deleter (EPHandleReleaser) which performs the required
95  * ObjectRegistry release.
96  */
97 
acquireEngine(ENGINE_HANDLE * handle)98 static inline EPHandle acquireEngine(ENGINE_HANDLE* handle) {
99     auto ret = reinterpret_cast<EventuallyPersistentEngine*>(handle);
100     ObjectRegistry::onSwitchThread(ret);
101 
102     return EPHandle(ret);
103 }
104 
105 /**
106  * Call the response callback and return the appropriate value so that
107  * the core knows what to do..
108  */
sendResponse(ADD_RESPONSE response,const void * key,uint16_t keylen,const void * ext,uint8_t extlen,const void * body,uint32_t bodylen,uint8_t datatype,uint16_t status,uint64_t cas,const void * cookie)109 static ENGINE_ERROR_CODE sendResponse(ADD_RESPONSE response, const void *key,
110                                       uint16_t keylen,
111                                       const void *ext, uint8_t extlen,
112                                       const void *body, uint32_t bodylen,
113                                       uint8_t datatype, uint16_t status,
114                                       uint64_t cas, const void *cookie)
115 {
116     ENGINE_ERROR_CODE rv = ENGINE_FAILED;
117     EventuallyPersistentEngine *e = ObjectRegistry::onSwitchThread(NULL, true);
118     if (response(key, keylen, ext, extlen, body, bodylen, datatype,
119                  status, cas, cookie)) {
120         rv = ENGINE_SUCCESS;
121     }
122     ObjectRegistry::onSwitchThread(e);
123     return rv;
124 }
125 
126 template <typename T>
validate(T v,T l,T h)127 static void validate(T v, T l, T h) {
128     if (v < l || v > h) {
129         throw std::runtime_error("Value out of range.");
130     }
131 }
132 
133 
checkNumeric(const char * str)134 static void checkNumeric(const char* str) {
135     int i = 0;
136     if (str[0] == '-') {
137         i++;
138     }
139     for (; str[i]; i++) {
140         using namespace std;
141         if (!isdigit(str[i])) {
142             throw std::runtime_error("Value is not numeric");
143         }
144     }
145 }
146 
EvpInitialize(gsl::not_null<ENGINE_HANDLE * > handle,const char * config_str)147 static ENGINE_ERROR_CODE EvpInitialize(gsl::not_null<ENGINE_HANDLE*> handle,
148                                        const char* config_str) {
149     return acquireEngine(handle)->initialize(config_str);
150 }
151 
EvpDestroy(gsl::not_null<ENGINE_HANDLE * > handle,const bool force)152 static void EvpDestroy(gsl::not_null<ENGINE_HANDLE*> handle, const bool force) {
153     auto eng = acquireEngine(handle);
154     eng->destroy(force);
155     delete eng.get();
156 }
157 
EvpItemAllocate(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,const size_t nbytes,const int flags,const rel_time_t exptime,uint8_t datatype,uint16_t vbucket)158 static cb::EngineErrorItemPair EvpItemAllocate(
159         gsl::not_null<ENGINE_HANDLE*> handle,
160         gsl::not_null<const void*> cookie,
161         const DocKey& key,
162         const size_t nbytes,
163         const int flags,
164         const rel_time_t exptime,
165         uint8_t datatype,
166         uint16_t vbucket) {
167     if (!mcbp::datatype::is_valid(datatype)) {
168         LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
169             " (ItemAllocate)");
170         return cb::makeEngineErrorItemPair(cb::engine_errc::invalid_arguments);
171     }
172 
173     item* itm = nullptr;
174     auto ret = acquireEngine(handle)->itemAllocate(&itm,
175                                                    key,
176                                                    nbytes,
177                                                    0, // No privileged bytes
178                                                    flags,
179                                                    exptime,
180                                                    datatype,
181                                                    vbucket);
182     return cb::makeEngineErrorItemPair(cb::engine_errc(ret), itm, handle);
183 }
184 
185 static bool EvpGetItemInfo(gsl::not_null<ENGINE_HANDLE*> handle,
186                            gsl::not_null<const item*> itm,
187                            gsl::not_null<item_info*> itm_info);
188 static void EvpItemRelease(gsl::not_null<ENGINE_HANDLE*> handle,
189                            gsl::not_null<item*> itm);
190 
EvpItemAllocateEx(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,size_t nbytes,size_t priv_nbytes,int flags,rel_time_t exptime,uint8_t datatype,uint16_t vbucket)191 static std::pair<cb::unique_item_ptr, item_info> EvpItemAllocateEx(
192         gsl::not_null<ENGINE_HANDLE*> handle,
193         gsl::not_null<const void*> cookie,
194         const DocKey& key,
195         size_t nbytes,
196         size_t priv_nbytes,
197         int flags,
198         rel_time_t exptime,
199         uint8_t datatype,
200         uint16_t vbucket) {
201     item* it = nullptr;
202     auto err = acquireEngine(handle)->itemAllocate(
203             &it, key, nbytes, priv_nbytes, flags, exptime, datatype, vbucket);
204 
205     if (err != ENGINE_SUCCESS) {
206         throw cb::engine_error(cb::engine_errc(err),
207                                "EvpItemAllocateEx: failed to allocate memory");
208     }
209 
210     item_info info;
211     if (!EvpGetItemInfo(handle, it, &info)) {
212         EvpItemRelease(handle, it);
213         throw cb::engine_error(cb::engine_errc::failed,
214                                "EvpItemAllocateEx: EvpGetItemInfo failed");
215     }
216 
217     return std::make_pair(cb::unique_item_ptr{it, cb::ItemDeleter{handle}},
218                           info);
219 }
220 
EvpItemDelete(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,uint64_t & cas,uint16_t vbucket,mutation_descr_t & mut_info)221 static ENGINE_ERROR_CODE EvpItemDelete(gsl::not_null<ENGINE_HANDLE*> handle,
222                                        gsl::not_null<const void*> cookie,
223                                        const DocKey& key,
224                                        uint64_t& cas,
225                                        uint16_t vbucket,
226                                        mutation_descr_t& mut_info) {
227     return acquireEngine(handle)->itemDelete(
228             cookie, key, cas, vbucket, nullptr, mut_info);
229 }
230 
EvpItemRelease(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<item * > itm)231 static void EvpItemRelease(gsl::not_null<ENGINE_HANDLE*> handle,
232                            gsl::not_null<item*> itm) {
233     acquireEngine(handle)->itemRelease(itm);
234 }
235 
EvpGet(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,uint16_t vbucket,DocStateFilter documentStateFilter)236 static cb::EngineErrorItemPair EvpGet(gsl::not_null<ENGINE_HANDLE*> handle,
237                                       gsl::not_null<const void*> cookie,
238                                       const DocKey& key,
239                                       uint16_t vbucket,
240                                       DocStateFilter documentStateFilter) {
241     get_options_t options = static_cast<get_options_t>(QUEUE_BG_FETCH |
242                                                        HONOR_STATES |
243                                                        TRACK_REFERENCE |
244                                                        DELETE_TEMP |
245                                                        HIDE_LOCKED_CAS |
246                                                        TRACK_STATISTICS);
247 
248     switch (documentStateFilter) {
249     case DocStateFilter::Alive:
250         break;
251     case DocStateFilter::Deleted:
252         // MB-23640 was caused by this bug as the frontend asked for
253         // Alive and Deleted documents. The internals don't have a
254         // way of requesting just deleted documents, and luckily for
255         // us no part of our code is using this yet. Return an error
256         // if anyone start using it
257         return std::make_pair(
258                 cb::engine_errc::not_supported,
259                 cb::unique_item_ptr{nullptr, cb::ItemDeleter{handle}});
260     case DocStateFilter::AliveOrDeleted:
261         options = static_cast<get_options_t>(options | GET_DELETED_VALUE);
262         break;
263     }
264 
265     item* itm = nullptr;
266     ENGINE_ERROR_CODE ret =
267             acquireEngine(handle)->get(cookie, &itm, key, vbucket, options);
268     return cb::makeEngineErrorItemPair(cb::engine_errc(ret), itm, handle);
269 }
270 
EvpGetIf(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,uint16_t vbucket,std::function<bool (const item_info &)> filter)271 static cb::EngineErrorItemPair EvpGetIf(
272         gsl::not_null<ENGINE_HANDLE*> handle,
273         gsl::not_null<const void*> cookie,
274         const DocKey& key,
275         uint16_t vbucket,
276         std::function<bool(const item_info&)> filter) {
277     return acquireEngine(handle)->get_if(cookie, key, vbucket, filter);
278 }
279 
EvpGetAndTouch(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,uint16_t vbucket,uint32_t expiry_time)280 static cb::EngineErrorItemPair EvpGetAndTouch(
281         gsl::not_null<ENGINE_HANDLE*> handle,
282         gsl::not_null<const void*> cookie,
283         const DocKey& key,
284         uint16_t vbucket,
285         uint32_t expiry_time) {
286     return acquireEngine(handle)->get_and_touch(cookie, key, vbucket,
287                                                 expiry_time);
288 }
289 
EvpGetLocked(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,uint16_t vbucket,uint32_t lock_timeout)290 static cb::EngineErrorItemPair EvpGetLocked(
291         gsl::not_null<ENGINE_HANDLE*> handle,
292         gsl::not_null<const void*> cookie,
293         const DocKey& key,
294         uint16_t vbucket,
295         uint32_t lock_timeout) {
296     item* itm = nullptr;
297     auto ret = acquireEngine(handle)->get_locked(
298             cookie, &itm, key, vbucket, lock_timeout);
299     return cb::makeEngineErrorItemPair(cb::engine_errc(ret), itm, handle);
300 }
301 
EvpGetMaxItemSize(gsl::not_null<ENGINE_HANDLE * > handle)302 static size_t EvpGetMaxItemSize(
303         gsl::not_null<ENGINE_HANDLE*> handle) {
304     return acquireEngine(handle)->getMaxItemSize();
305 }
306 
EvpUnlock(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,uint16_t vbucket,uint64_t cas)307 static ENGINE_ERROR_CODE EvpUnlock(gsl::not_null<ENGINE_HANDLE*> handle,
308                                    gsl::not_null<const void*> cookie,
309                                    const DocKey& key,
310                                    uint16_t vbucket,
311                                    uint64_t cas) {
312     return acquireEngine(handle)->unlock(cookie, key, vbucket, cas);
313 }
314 
EvpGetStats(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,cb::const_char_buffer key,ADD_STAT add_stat)315 static ENGINE_ERROR_CODE EvpGetStats(gsl::not_null<ENGINE_HANDLE*> handle,
316                                      gsl::not_null<const void*> cookie,
317                                      cb::const_char_buffer key,
318                                      ADD_STAT add_stat) {
319     return acquireEngine(handle)->getStats(
320             cookie, key.data(), gsl::narrow_cast<int>(key.size()), add_stat);
321 }
322 
EvpStore(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,gsl::not_null<item * > itm,uint64_t & cas,ENGINE_STORE_OPERATION operation,DocumentState document_state)323 static ENGINE_ERROR_CODE EvpStore(gsl::not_null<ENGINE_HANDLE*> handle,
324                                   gsl::not_null<const void*> cookie,
325                                   gsl::not_null<item*> itm,
326                                   uint64_t& cas,
327                                   ENGINE_STORE_OPERATION operation,
328                                   DocumentState document_state) {
329     auto engine = acquireEngine(handle);
330 
331     if (document_state == DocumentState::Deleted) {
332         Item* item = static_cast<Item*>(itm.get());
333         item->setDeleted();
334     }
335 
336     return engine->store(cookie, itm, cas, operation);
337 }
338 
EvpStoreIf(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,gsl::not_null<item * > itm,uint64_t cas,ENGINE_STORE_OPERATION operation,cb::StoreIfPredicate predicate,DocumentState document_state)339 static cb::EngineErrorCasPair EvpStoreIf(gsl::not_null<ENGINE_HANDLE*> handle,
340                                          gsl::not_null<const void*> cookie,
341                                          gsl::not_null<item*> itm,
342                                          uint64_t cas,
343                                          ENGINE_STORE_OPERATION operation,
344                                          cb::StoreIfPredicate predicate,
345                                          DocumentState document_state) {
346     auto engine = acquireEngine(handle);
347 
348     Item& item = static_cast<Item&>(*static_cast<Item*>(itm.get()));
349 
350     if (document_state == DocumentState::Deleted) {
351         item.setDeleted();
352     }
353     return engine->store_if(cookie, item, cas, operation, predicate);
354 }
355 
EvpFlush(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie)356 static ENGINE_ERROR_CODE EvpFlush(gsl::not_null<ENGINE_HANDLE*> handle,
357                                   gsl::not_null<const void*> cookie) {
358     return acquireEngine(handle)->flush(cookie);
359 }
360 
EvpResetStats(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie)361 static void EvpResetStats(gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie) {
362     acquireEngine(handle)->resetStats();
363 }
364 
setReplicationParam(const char * keyz,const char * valz,std::string & msg)365 protocol_binary_response_status EventuallyPersistentEngine::setReplicationParam(
366         const char* keyz, const char* valz, std::string& msg) {
367     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
368 
369     try {
370         if (strcmp(keyz, "replication_throttle_threshold") == 0) {
371             getConfiguration().setReplicationThrottleThreshold(
372                     std::stoull(valz));
373         } else if (strcmp(keyz, "replication_throttle_queue_cap") == 0) {
374             getConfiguration().setReplicationThrottleQueueCap(std::stoll(valz));
375         } else if (strcmp(keyz, "replication_throttle_cap_pcnt") == 0) {
376             getConfiguration().setReplicationThrottleCapPcnt(std::stoull(valz));
377         } else {
378             msg = "Unknown config param";
379             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
380         }
381         // Handles exceptions thrown by the standard
382         // library stoi/stoul style functions when not numeric
383     } catch (std::invalid_argument&) {
384         msg = "Argument was not numeric";
385         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
386 
387         // Handles exceptions thrown by the standard library stoi/stoul
388         // style functions when the conversion does not fit in the datatype
389     } catch (std::out_of_range&) {
390         msg = "Argument was out of range";
391         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
392 
393         // Handles any miscellaenous exceptions in addition to the range_error
394         // exceptions thrown by the configuration::set<param>() methods
395     } catch (std::exception& error) {
396         msg = error.what();
397         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
398     }
399 
400     return rv;
401 }
402 
setCheckpointParam(const char * keyz,const char * valz,std::string & msg)403 protocol_binary_response_status EventuallyPersistentEngine::setCheckpointParam(
404         const char* keyz, const char* valz, std::string& msg) {
405     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
406 
407     try {
408         if (strcmp(keyz, "chk_max_items") == 0) {
409             size_t v = std::stoull(valz);
410             validate(v, size_t(MIN_CHECKPOINT_ITEMS),
411                      size_t(MAX_CHECKPOINT_ITEMS));
412             getConfiguration().setChkMaxItems(v);
413         } else if (strcmp(keyz, "chk_period") == 0) {
414             size_t v = std::stoull(valz);
415             validate(v, size_t(MIN_CHECKPOINT_PERIOD),
416                      size_t(MAX_CHECKPOINT_PERIOD));
417             getConfiguration().setChkPeriod(v);
418         } else if (strcmp(keyz, "max_checkpoints") == 0) {
419             size_t v = std::stoull(valz);
420             validate(v, size_t(DEFAULT_MAX_CHECKPOINTS),
421                      size_t(MAX_CHECKPOINTS_UPPER_BOUND));
422             getConfiguration().setMaxCheckpoints(v);
423         } else if (strcmp(keyz, "item_num_based_new_chk") == 0) {
424             getConfiguration().setItemNumBasedNewChk(cb_stob(valz));
425         } else if (strcmp(keyz, "keep_closed_chks") == 0) {
426             getConfiguration().setKeepClosedChks(cb_stob(valz));
427         } else if (strcmp(keyz, "enable_chk_merge") == 0) {
428             getConfiguration().setEnableChkMerge(cb_stob(valz));
429         } else if (strcmp(keyz, "cursor_dropping_checkpoint_mem_upper_mark") ==
430                    0) {
431             size_t v = std::stoull(valz);
432             validate(v,
433                      getConfiguration().getCursorDroppingCheckpointMemLowerMark(),
434                      size_t(100));
435             getConfiguration().setCursorDroppingCheckpointMemUpperMark(v);
436         } else if (strcmp(keyz, "cursor_dropping_checkpoint_mem_lower_mark") ==
437                    0) {
438             size_t v = std::stoull(valz);
439             validate(
440                     v, size_t(0), getConfiguration().getCursorDroppingCheckpointMemUpperMark());
441             getConfiguration().setCursorDroppingCheckpointMemLowerMark(v);
442         } else {
443             msg = "Unknown config param";
444             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
445         }
446 
447         // Handles exceptions thrown by the cb_stob function
448     } catch (invalid_argument_bool& error) {
449         msg = error.what();
450         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
451 
452         // Handles exceptions thrown by the standard
453         // library stoi/stoul style functions when not numeric
454     } catch (std::invalid_argument&) {
455         msg = "Argument was not numeric";
456         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
457 
458         // Handles exceptions thrown by the standard library stoi/stoul
459         // style functions when the conversion does not fit in the datatype
460     } catch (std::out_of_range&) {
461         msg = "Argument was out of range";
462         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
463 
464         // Handles any miscellaenous exceptions in addition to the range_error
465         // exceptions thrown by the configuration::set<param>() methods
466     } catch (std::exception& error) {
467         msg = error.what();
468         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
469     }
470 
471     return rv;
472 }
473 
setFlushParam(const char * keyz,const char * valz,std::string & msg)474 protocol_binary_response_status EventuallyPersistentEngine::setFlushParam(
475         const char* keyz, const char* valz, std::string& msg) {
476     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
477 
478     // Handle the actual mutation.
479     try {
480         if (strcmp(keyz, "bg_fetch_delay") == 0) {
481             getConfiguration().setBgFetchDelay(std::stoull(valz));
482         } else if (strcmp(keyz, "max_size") == 0) {
483             size_t vsize = std::stoull(valz);
484 
485             getConfiguration().setMaxSize(vsize);
486             EPStats& st = getEpStats();
487             getConfiguration().setMemLowWat(
488                     percentOf(vsize, st.mem_low_wat_percent));
489             getConfiguration().setMemHighWat(
490                     percentOf(vsize, st.mem_high_wat_percent));
491         } else if (strcmp(keyz, "mem_low_wat") == 0) {
492             getConfiguration().setMemLowWat(std::stoull(valz));
493         } else if (strcmp(keyz, "mem_high_wat") == 0) {
494             getConfiguration().setMemHighWat(std::stoull(valz));
495         } else if (strcmp(keyz, "backfill_mem_threshold") == 0) {
496             getConfiguration().setBackfillMemThreshold(std::stoull(valz));
497         } else if (strcmp(keyz, "compaction_exp_mem_threshold") == 0) {
498             getConfiguration().setCompactionExpMemThreshold(std::stoull(valz));
499         } else if (strcmp(keyz, "mutation_mem_threshold") == 0) {
500             getConfiguration().setMutationMemThreshold(std::stoull(valz));
501         } else if (strcmp(keyz, "timing_log") == 0) {
502             EPStats& stats = getEpStats();
503             std::ostream* old = stats.timingLog;
504             stats.timingLog = NULL;
505             delete old;
506             if (strcmp(valz, "off") == 0) {
507                 LOG(EXTENSION_LOG_INFO, "Disabled timing log.");
508             } else {
509                 std::ofstream* tmp(new std::ofstream(valz));
510                 if (tmp->good()) {
511                     LOG(EXTENSION_LOG_INFO,
512                         "Logging detailed timings to ``%s''.", valz);
513                     stats.timingLog = tmp;
514                 } else {
515                     LOG(EXTENSION_LOG_WARNING,
516                         "Error setting detailed timing log to ``%s'':  %s",
517                         valz, strerror(errno));
518                     delete tmp;
519                 }
520             }
521         } else if (strcmp(keyz, "exp_pager_enabled") == 0) {
522             getConfiguration().setExpPagerEnabled(cb_stob(valz));
523         } else if (strcmp(keyz, "exp_pager_stime") == 0) {
524             getConfiguration().setExpPagerStime(std::stoull(valz));
525         } else if (strcmp(keyz, "exp_pager_initial_run_time") == 0) {
526             getConfiguration().setExpPagerInitialRunTime(std::stoll(valz));
527         } else if (strcmp(keyz, "access_scanner_enabled") == 0) {
528             getConfiguration().requirementsMetOrThrow("access_scanner_enabled");
529             getConfiguration().setAccessScannerEnabled(cb_stob(valz));
530         } else if (strcmp(keyz, "alog_sleep_time") == 0) {
531             getConfiguration().requirementsMetOrThrow("alog_sleep_time");
532             getConfiguration().setAlogSleepTime(std::stoull(valz));
533         } else if (strcmp(keyz, "alog_task_time") == 0) {
534             getConfiguration().requirementsMetOrThrow("alog_task_time");
535             getConfiguration().setAlogTaskTime(std::stoull(valz));
536             /* Start of ItemPager parameters */
537         } else if (strcmp(keyz, "pager_active_vb_pcnt") == 0) {
538             getConfiguration().setPagerActiveVbPcnt(std::stoull(valz));
539         } else if (strcmp(keyz, "pager_sleep_time_ms") == 0) {
540             getConfiguration().setPagerSleepTimeMs(std::stoull(valz));
541         } else if (strcmp(keyz, "ht_eviction_policy") == 0) {
542             getConfiguration().setHtEvictionPolicy(valz);
543         } else if (strcmp(keyz, "item_eviction_age_percentage") == 0) {
544             getConfiguration().setItemEvictionAgePercentage(std::stoull(valz));
545         } else if (strcmp(keyz, "item_eviction_freq_counter_age_threshold") ==
546                    0) {
547             getConfiguration().setItemEvictionFreqCounterAgeThreshold(
548                     std::stoull(valz));
549         } else if (strcmp(keyz, "item_freq_decayer_chunk_duration") == 0) {
550             getConfiguration().setItemFreqDecayerChunkDuration(
551                     std::stoull(valz));
552         } else if (strcmp(keyz, "item_freq_decayer_percent") == 0) {
553             getConfiguration().setItemFreqDecayerPercent(std::stoull(valz));
554             /* End of ItemPager parameters */
555         } else if (strcmp(keyz, "warmup_min_memory_threshold") == 0) {
556             getConfiguration().setWarmupMinMemoryThreshold(std::stoull(valz));
557         } else if (strcmp(keyz, "warmup_min_items_threshold") == 0) {
558             getConfiguration().setWarmupMinItemsThreshold(std::stoull(valz));
559         } else if (strcmp(keyz, "num_reader_threads") == 0) {
560             size_t value = std::stoull(valz);
561             getConfiguration().setNumReaderThreads(value);
562             ExecutorPool::get()->setNumReaders(value);
563         } else if (strcmp(keyz, "num_writer_threads") == 0) {
564             size_t value = std::stoull(valz);
565             getConfiguration().setNumWriterThreads(value);
566             ExecutorPool::get()->setNumWriters(value);
567         } else if (strcmp(keyz, "num_auxio_threads") == 0) {
568             size_t value = std::stoull(valz);
569             getConfiguration().setNumAuxioThreads(value);
570             ExecutorPool::get()->setNumAuxIO(value);
571         } else if (strcmp(keyz, "num_nonio_threads") == 0) {
572             size_t value = std::stoull(valz);
573             getConfiguration().setNumNonioThreads(value);
574             ExecutorPool::get()->setNumNonIO(value);
575         } else if (strcmp(keyz, "bfilter_enabled") == 0) {
576             getConfiguration().setBfilterEnabled(cb_stob(valz));
577         } else if (strcmp(keyz, "bfilter_residency_threshold") == 0) {
578             getConfiguration().setBfilterResidencyThreshold(std::stof(valz));
579         } else if (strcmp(keyz, "defragmenter_enabled") == 0) {
580             getConfiguration().setDefragmenterEnabled(cb_stob(valz));
581         } else if (strcmp(keyz, "defragmenter_interval") == 0) {
582             auto v = std::stod(valz);
583             getConfiguration().setDefragmenterInterval(v);
584         } else if (strcmp(keyz, "item_compressor_interval") == 0) {
585             size_t v = std::stoull(valz);
586             // Adding separate validation as external limit is minimum 1
587             // to prevent setting item compressor to constantly run
588             validate(v, size_t(1), std::numeric_limits<size_t>::max());
589             getConfiguration().setItemCompressorInterval(v);
590         } else if (strcmp(keyz, "item_compressor_chunk_duration") == 0) {
591             getConfiguration().setItemCompressorChunkDuration(
592                     std::stoull(valz));
593         } else if (strcmp(keyz, "defragmenter_age_threshold") == 0) {
594             getConfiguration().setDefragmenterAgeThreshold(std::stoull(valz));
595         } else if (strcmp(keyz, "defragmenter_stored_value_age_threshold") ==
596                    0) {
597             getConfiguration().setDefragmenterStoredValueAgeThreshold(
598                     std::stoull(valz));
599         } else if (strcmp(keyz, "defragmenter_chunk_duration") == 0) {
600             getConfiguration().setDefragmenterChunkDuration(std::stoull(valz));
601         } else if (strcmp(keyz, "defragmenter_run") == 0) {
602             runDefragmenterTask();
603         } else if (strcmp(keyz, "compaction_write_queue_cap") == 0) {
604             getConfiguration().setCompactionWriteQueueCap(std::stoull(valz));
605         } else if (strcmp(keyz, "dcp_min_compression_ratio") == 0) {
606             getConfiguration().setDcpMinCompressionRatio(std::stof(valz));
607         } else if (strcmp(keyz, "dcp_noop_mandatory_for_v5_features") == 0) {
608             getConfiguration().setDcpNoopMandatoryForV5Features(cb_stob(valz));
609         } else if (strcmp(keyz, "access_scanner_run") == 0) {
610             if (!(runAccessScannerTask())) {
611                 rv = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
612             }
613         } else if (strcmp(keyz, "vb_state_persist_run") == 0) {
614             runVbStatePersistTask(std::stoi(valz));
615         } else if (strcmp(keyz, "ephemeral_full_policy") == 0) {
616             getConfiguration().requirementsMetOrThrow("ephemeral_full_policy");
617             getConfiguration().setEphemeralFullPolicy(valz);
618         } else if (strcmp(keyz, "ephemeral_metadata_purge_age") == 0) {
619             getConfiguration().requirementsMetOrThrow(
620                     "ephemeral_metadata_purge_age");
621             getConfiguration().setEphemeralMetadataPurgeAge(std::stoull(valz));
622         } else if (strcmp(keyz, "ephemeral_metadata_purge_interval") == 0) {
623             getConfiguration().requirementsMetOrThrow("ephemeral_metadata_purge_interval");
624             getConfiguration().setEphemeralMetadataPurgeInterval(
625                     std::stoull(valz));
626         } else if (strcmp(keyz, "fsync_after_every_n_bytes_written") == 0) {
627             getConfiguration().setFsyncAfterEveryNBytesWritten(
628                     std::stoull(valz));
629         } else if (strcmp(keyz, "xattr_enabled") == 0) {
630             getConfiguration().setXattrEnabled(cb_stob(valz));
631         } else if (strcmp(keyz, "compression_mode") == 0) {
632             getConfiguration().setCompressionMode(valz);
633         } else if (strcmp(keyz, "min_compression_ratio") == 0) {
634             float min_comp_ratio;
635             if (safe_strtof(valz, min_comp_ratio)) {
636                 getConfiguration().setMinCompressionRatio(min_comp_ratio);
637             } else {
638                 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
639             }
640         } else if (strcmp(keyz, "max_ttl") == 0) {
641             getConfiguration().setMaxTtl(std::stoull(valz));
642         } else if (strcmp(keyz, "mem_used_merge_threshold_percent") == 0) {
643             getConfiguration().setMemUsedMergeThresholdPercent(std::stof(valz));
644         } else if (strcmp(keyz, "couchstore_tracing") == 0) {
645             getConfiguration().setCouchstoreTracing(cb_stob(valz));
646         } else if (strcmp(keyz, "couchstore_write_validation") == 0) {
647             getConfiguration().setCouchstoreWriteValidation(cb_stob(valz));
648         } else if (strcmp(keyz, "couchstore_mprotect") == 0) {
649             getConfiguration().setCouchstoreMprotect(cb_stob(valz));
650         } else {
651             msg = "Unknown config param";
652             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
653         }
654         // Handles exceptions thrown by the cb_stob function
655     } catch (invalid_argument_bool& error) {
656         msg = error.what();
657         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
658 
659         // Handles exceptions thrown by the standard
660         // library stoi/stoul style functions when not numeric
661     } catch (std::invalid_argument&) {
662         msg = "Argument was not numeric";
663         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
664 
665         // Handles exceptions thrown by the standard library stoi/stoul
666         // style functions when the conversion does not fit in the datatype
667     } catch (std::out_of_range&) {
668         msg = "Argument was out of range";
669         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
670 
671         // Handles any miscellaneous exceptions in addition to the range_error
672         // exceptions thrown by the configuration::set<param>() methods
673     } catch (std::exception& error) {
674         msg = error.what();
675         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
676     }
677 
678     return rv;
679 }
680 
setDcpParam(const char * keyz,const char * valz,std::string & msg)681 protocol_binary_response_status EventuallyPersistentEngine::setDcpParam(
682         const char* keyz, const char* valz, std::string& msg) {
683     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
684     try {
685 
686         if (strcmp(keyz,
687                    "dcp_consumer_process_buffered_messages_yield_limit") == 0) {
688             size_t v = atoi(valz);
689             checkNumeric(valz);
690             validate(v, size_t(1), std::numeric_limits<size_t>::max());
691             getConfiguration().setDcpConsumerProcessBufferedMessagesYieldLimit(
692                     v);
693         } else if (
694             strcmp(keyz, "dcp_consumer_process_buffered_messages_batch_size") ==
695             0) {
696             size_t v = atoi(valz);
697             checkNumeric(valz);
698             validate(v, size_t(1), std::numeric_limits<size_t>::max());
699             getConfiguration().setDcpConsumerProcessBufferedMessagesBatchSize(
700                     v);
701         } else {
702             msg = "Unknown config param";
703             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
704         }
705     } catch (std::runtime_error&) {
706         msg = "Value out of range.";
707         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
708     }
709 
710     return rv;
711 }
712 
setVbucketParam(uint16_t vbucket,const char * keyz,const char * valz,std::string & msg)713 protocol_binary_response_status EventuallyPersistentEngine::setVbucketParam(
714         uint16_t vbucket,
715         const char* keyz,
716         const char* valz,
717         std::string& msg) {
718     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
719     try {
720         if (strcmp(keyz, "hlc_drift_ahead_threshold_us") == 0) {
721             uint64_t v = std::strtoull(valz, nullptr, 10);
722             checkNumeric(valz);
723             getConfiguration().setHlcDriftAheadThresholdUs(v);
724         } else if (strcmp(keyz, "hlc_drift_behind_threshold_us") == 0) {
725             uint64_t v = std::strtoull(valz, nullptr, 10);
726             checkNumeric(valz);
727             getConfiguration().setHlcDriftBehindThresholdUs(v);
728         } else if (strcmp(keyz, "max_cas") == 0) {
729             uint64_t v = std::strtoull(valz, nullptr, 10);
730             checkNumeric(valz);
731             LOG(EXTENSION_LOG_WARNING,
732                 "setVbucketParam: max_cas:%" PRIu64
733                 " "
734                 "vb:%" PRIu16,
735                 v,
736                 vbucket);
737             if (getKVBucket()->forceMaxCas(vbucket, v) != ENGINE_SUCCESS) {
738                 rv = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
739                 msg = "Not my vbucket";
740             }
741         } else {
742             msg = "Unknown config param";
743             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
744         }
745     } catch (std::runtime_error&) {
746         msg = "Value out of range.";
747         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
748     }
749     return rv;
750 }
751 
evictKey(EventuallyPersistentEngine * e,protocol_binary_request_header * request,const char ** msg,size_t * msg_size,DocNamespace docNamespace)752 static protocol_binary_response_status evictKey(
753     EventuallyPersistentEngine* e,
754     protocol_binary_request_header
755     * request,
756     const char** msg,
757     size_t* msg_size,
758     DocNamespace docNamespace) {
759     protocol_binary_request_no_extras* req =
760         (protocol_binary_request_no_extras*)request;
761 
762     const uint8_t* keyPtr = reinterpret_cast<const uint8_t*>(request) +
763                             sizeof(*request);
764     size_t keylen = ntohs(req->message.header.request.keylen);
765     uint16_t vbucket = ntohs(request->request.vbucket);
766 
767     LOG(EXTENSION_LOG_DEBUG,
768         "Manually evicting object with key %s",
769         cb::logtags::tagUserData(std::string{(const char*)keyPtr, keylen})
770                 .c_str());
771     msg_size = 0;
772     auto rv = e->evictKey(DocKey(keyPtr, keylen, docNamespace), vbucket, msg);
773     if (rv == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET ||
774         rv == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) {
775         if (e->isDegradedMode()) {
776             return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
777         }
778     }
779     return rv;
780 }
781 
setParam(protocol_binary_request_set_param * req,std::string & msg)782 protocol_binary_response_status EventuallyPersistentEngine::setParam(
783         protocol_binary_request_set_param* req, std::string& msg) {
784     size_t keylen = ntohs(req->message.header.request.keylen);
785     uint8_t extlen = req->message.header.request.extlen;
786     size_t vallen = ntohl(req->message.header.request.bodylen);
787     uint16_t vbucket = ntohs(req->message.header.request.vbucket);
788     protocol_binary_engine_param_t paramtype =
789         static_cast<protocol_binary_engine_param_t>(ntohl(
790             req->message.body.param_type));
791 
792     if (keylen == 0 || (vallen - keylen - extlen) == 0) {
793         return PROTOCOL_BINARY_RESPONSE_EINVAL;
794     }
795 
796     const char* keyp = reinterpret_cast<const char*>(req->bytes)
797                        + sizeof(req->bytes);
798     const char* valuep = keyp + keylen;
799     vallen -= (keylen + extlen);
800 
801     char keyz[128];
802     char valz[512];
803 
804     // Read the key.
805     if (keylen >= sizeof(keyz)) {
806         msg = "Key is too large.";
807         return PROTOCOL_BINARY_RESPONSE_EINVAL;
808     }
809     memcpy(keyz, keyp, keylen);
810     keyz[keylen] = 0x00;
811 
812     // Read the value.
813     if (vallen >= sizeof(valz)) {
814         msg = "Value is too large.";
815         return PROTOCOL_BINARY_RESPONSE_EINVAL;
816     }
817     memcpy(valz, valuep, vallen);
818     valz[vallen] = 0x00;
819 
820     protocol_binary_response_status rv;
821 
822     switch (paramtype) {
823     case protocol_binary_engine_param_flush:
824         rv = setFlushParam(keyz, valz, msg);
825         break;
826     case protocol_binary_engine_param_replication:
827         rv = setReplicationParam(keyz, valz, msg);
828         break;
829     case protocol_binary_engine_param_checkpoint:
830         rv = setCheckpointParam(keyz, valz, msg);
831         break;
832     case protocol_binary_engine_param_dcp:
833         rv = setDcpParam(keyz, valz, msg);
834         break;
835     case protocol_binary_engine_param_vbucket:
836         rv = setVbucketParam(vbucket, keyz, valz, msg);
837         break;
838     default:
839         rv = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
840     }
841 
842     return rv;
843 }
844 
getVBucket(EventuallyPersistentEngine * e,const void * cookie,protocol_binary_request_header * request,ADD_RESPONSE response)845 static ENGINE_ERROR_CODE getVBucket(EventuallyPersistentEngine* e,
846                                     const void* cookie,
847                                     protocol_binary_request_header* request,
848                                     ADD_RESPONSE response) {
849     protocol_binary_request_get_vbucket* req =
850         reinterpret_cast<protocol_binary_request_get_vbucket*>(request);
851     if (req == nullptr) {
852         throw std::invalid_argument("getVBucket: Unable to convert req"
853                                         " to protocol_binary_request_get_vbucket");
854     }
855 
856     uint16_t vbucket = ntohs(req->message.header.request.vbucket);
857     VBucketPtr vb = e->getVBucket(vbucket);
858     if (!vb) {
859         return ENGINE_NOT_MY_VBUCKET;
860     } else {
861         vbucket_state_t state = (vbucket_state_t)ntohl(vb->getState());
862         return sendResponse(response, NULL, 0, NULL, 0, &state,
863                             sizeof(state),
864                             PROTOCOL_BINARY_RAW_BYTES,
865                             PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
866     }
867 }
868 
setVBucket(EventuallyPersistentEngine * e,const void * cookie,protocol_binary_request_header * request,ADD_RESPONSE response)869 static ENGINE_ERROR_CODE setVBucket(EventuallyPersistentEngine* e,
870                                     const void* cookie,
871                                     protocol_binary_request_header* request,
872                                     ADD_RESPONSE response) {
873 
874     protocol_binary_request_set_vbucket* req =
875         reinterpret_cast<protocol_binary_request_set_vbucket*>(request);
876 
877     uint64_t cas = ntohll(req->message.header.request.cas);
878 
879     size_t bodylen = ntohl(req->message.header.request.bodylen)
880                      - ntohs(req->message.header.request.keylen);
881     if (bodylen != sizeof(vbucket_state_t)) {
882         e->setErrorContext(cookie, "Body too short");
883         return sendResponse(response, NULL, 0, NULL, 0, NULL,
884                             0, PROTOCOL_BINARY_RAW_BYTES,
885                             PROTOCOL_BINARY_RESPONSE_EINVAL,
886                             cas, cookie);
887     }
888 
889     vbucket_state_t state;
890     memcpy(&state, &req->message.body.state, sizeof(state));
891     state = static_cast<vbucket_state_t>(ntohl(state));
892 
893     if (!is_valid_vbucket_state_t(state)) {
894         e->setErrorContext(cookie, "Invalid vbucket state");
895         return sendResponse(response, NULL, 0, NULL, 0, NULL,
896                             0, PROTOCOL_BINARY_RAW_BYTES,
897                             PROTOCOL_BINARY_RESPONSE_EINVAL,
898                             cas, cookie);
899     }
900 
901     uint16_t vb = ntohs(req->message.header.request.vbucket);
902     return e->setVBucketState(cookie, response, vb, state, false, cas);
903 }
904 
delVBucket(EventuallyPersistentEngine * e,const void * cookie,protocol_binary_request_header * req,ADD_RESPONSE response)905 static ENGINE_ERROR_CODE delVBucket(EventuallyPersistentEngine* e,
906                                     const void* cookie,
907                                     protocol_binary_request_header* req,
908                                     ADD_RESPONSE response) {
909 
910     uint64_t cas = ntohll(req->request.cas);
911 
912     protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
913     uint16_t vbucket = ntohs(req->request.vbucket);
914 
915     if (ntohs(req->request.keylen) > 0 || req->request.extlen > 0) {
916         e->setErrorContext(cookie, "Key and extras required");
917         return sendResponse(response, NULL, 0, NULL, 0, NULL,
918                             0,
919                             PROTOCOL_BINARY_RAW_BYTES,
920                             PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
921     }
922 
923     bool sync = false;
924     uint32_t bodylen = ntohl(req->request.bodylen);
925     if (bodylen > 0) {
926         const char* ptr = reinterpret_cast<const char*>(req->bytes) +
927                           sizeof(req->bytes);
928         if (bodylen == 7 && strncmp(ptr, "async=0", bodylen) == 0) {
929             sync = true;
930         }
931     }
932 
933     ENGINE_ERROR_CODE err;
934     void* es = e->getEngineSpecific(cookie);
935     if (sync) {
936         if (es == NULL) {
937             err = e->deleteVBucket(vbucket, cookie);
938             e->storeEngineSpecific(cookie, e);
939         } else {
940             e->storeEngineSpecific(cookie, NULL);
941             LOG(EXTENSION_LOG_INFO,
942                 "Completed sync deletion of vbucket %u",
943                 (unsigned)vbucket);
944             err = ENGINE_SUCCESS;
945         }
946     } else {
947         err = e->deleteVBucket(vbucket);
948     }
949     switch (err) {
950     case ENGINE_SUCCESS:
951         LOG(EXTENSION_LOG_NOTICE,
952             "Deletion of vbucket %d was completed.", vbucket);
953         break;
954     case ENGINE_NOT_MY_VBUCKET:
955         LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
956             "because the vbucket doesn't exist!!!", vbucket);
957         res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
958         break;
959     case ENGINE_EINVAL:
960         LOG(EXTENSION_LOG_WARNING,
961             "Deletion of vbucket %d failed "
962             "because the vbucket is not in a dead state",
963             vbucket);
964         e->setErrorContext(
965                 cookie,
966                 "Failed to delete vbucket.  Must be in the dead state.");
967         res = PROTOCOL_BINARY_RESPONSE_EINVAL;
968         break;
969     case ENGINE_EWOULDBLOCK:
970         LOG(EXTENSION_LOG_NOTICE, "Request for vbucket %d deletion is in"
971                 " EWOULDBLOCK until the database file is removed from disk",
972             vbucket);
973         e->storeEngineSpecific(cookie, req);
974         return ENGINE_EWOULDBLOCK;
975     default:
976         LOG(EXTENSION_LOG_WARNING,
977             "Deletion of vbucket %d failed "
978             "because of unknown reasons",
979             vbucket);
980         e->setErrorContext(cookie, "Failed to delete vbucket.  Unknown reason.");
981         res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
982     }
983 
984     if (err == ENGINE_NOT_MY_VBUCKET) {
985         return err;
986     }
987 
988     return sendResponse(response,
989                         NULL,
990                         0,
991                         NULL,
992                         0,
993                         NULL,
994                         0,
995                         PROTOCOL_BINARY_RAW_BYTES,
996                         res,
997                         cas,
998                         cookie);
999 }
1000 
getReplicaCmd(EventuallyPersistentEngine * e,protocol_binary_request_header * request,const void * cookie,Item ** it,const char ** msg,protocol_binary_response_status * res,DocNamespace docNamespace)1001 static ENGINE_ERROR_CODE getReplicaCmd(EventuallyPersistentEngine* e,
1002                                        protocol_binary_request_header* request,
1003                                        const void* cookie,
1004                                        Item** it,
1005                                        const char** msg,
1006                                        protocol_binary_response_status* res,
1007                                        DocNamespace docNamespace) {
1008     KVBucketIface* kvb = e->getKVBucket();
1009     protocol_binary_request_no_extras* req =
1010         (protocol_binary_request_no_extras*)request;
1011     int keylen = ntohs(req->message.header.request.keylen);
1012     uint16_t vbucket = ntohs(req->message.header.request.vbucket);
1013     ENGINE_ERROR_CODE error_code;
1014     DocKey key(reinterpret_cast<const uint8_t*>(request) + sizeof(*request),
1015                keylen, docNamespace);
1016 
1017     GetValue rv(kvb->getReplica(key, vbucket, cookie));
1018 
1019     if ((error_code = rv.getStatus()) != ENGINE_SUCCESS) {
1020         if (error_code == ENGINE_NOT_MY_VBUCKET) {
1021             *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1022             return error_code;
1023         } else if (error_code == ENGINE_TMPFAIL) {
1024             *msg = "NOT_FOUND";
1025             *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1026         } else {
1027             return error_code;
1028         }
1029     } else {
1030         *it = rv.item.release();
1031         *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
1032     }
1033     ++(e->getEpStats().numOpsGet);
1034     return ENGINE_SUCCESS;
1035 }
1036 
compactDB(EventuallyPersistentEngine * e,const void * cookie,protocol_binary_request_compact_db * req,ADD_RESPONSE response)1037 static ENGINE_ERROR_CODE compactDB(EventuallyPersistentEngine* e,
1038                                    const void* cookie,
1039                                    protocol_binary_request_compact_db* req,
1040                                    ADD_RESPONSE response) {
1041 
1042     protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
1043     compaction_ctx compactreq;
1044     uint64_t cas = ntohll(req->message.header.request.cas);
1045 
1046     if (ntohs(req->message.header.request.keylen) > 0 ||
1047         req->message.header.request.extlen != 24) {
1048         LOG(EXTENSION_LOG_WARNING,
1049             "Compaction received bad ext/key len %d/%d.",
1050             req->message.header.request.extlen,
1051             ntohs(req->message.header.request.keylen));
1052         e->setErrorContext(cookie, "Key and correct extras required");
1053         return sendResponse(response, NULL, 0, NULL, 0, NULL,
1054                             0,
1055                             PROTOCOL_BINARY_RAW_BYTES,
1056                             PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
1057     }
1058     EPStats& stats = e->getEpStats();
1059     compactreq.purge_before_ts = ntohll(req->message.body.purge_before_ts);
1060     compactreq.purge_before_seq =
1061         ntohll(req->message.body.purge_before_seq);
1062     compactreq.drop_deletes = req->message.body.drop_deletes;
1063     compactreq.db_file_id = e->getKVBucket()->getDBFileId(*req);
1064     uint16_t vbid = ntohs(req->message.header.request.vbucket);
1065 
1066     ENGINE_ERROR_CODE err;
1067     void* es = e->getEngineSpecific(cookie);
1068     if (es == NULL) {
1069         ++stats.pendingCompactions;
1070         e->storeEngineSpecific(cookie, e);
1071         err = e->compactDB(vbid, compactreq, cookie);
1072     } else {
1073         e->storeEngineSpecific(cookie, NULL);
1074         err = ENGINE_SUCCESS;
1075     }
1076 
1077     switch (err) {
1078     case ENGINE_SUCCESS:
1079         break;
1080     case ENGINE_NOT_MY_VBUCKET:
1081         --stats.pendingCompactions;
1082         LOG(EXTENSION_LOG_WARNING, "Compaction of db file id: %d failed "
1083             "because the db file doesn't exist!!!", compactreq.db_file_id);
1084         res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1085         break;
1086     case ENGINE_EINVAL:
1087         --stats.pendingCompactions;
1088         LOG(EXTENSION_LOG_WARNING, "Compaction of db file id: %d failed "
1089             "because of an invalid argument", compactreq.db_file_id);
1090         res = PROTOCOL_BINARY_RESPONSE_EINVAL;
1091         break;
1092     case ENGINE_EWOULDBLOCK:
1093         LOG(EXTENSION_LOG_NOTICE,
1094             "Compaction of db file id:%d, purge_before_ts:%" PRIu64
1095             ", purge_before_seq:%" PRIu64
1096             ", drop_deletes:%d scheduled "
1097             "(awaiting completion).",
1098             compactreq.db_file_id,
1099             compactreq.purge_before_ts,
1100             compactreq.purge_before_seq,
1101             compactreq.drop_deletes);
1102         e->storeEngineSpecific(cookie, req);
1103         return ENGINE_EWOULDBLOCK;
1104     case ENGINE_TMPFAIL:
1105         LOG(EXTENSION_LOG_WARNING, "Request to compact db file id: %d hit"
1106                 " a temporary failure and may need to be retried",
1107             compactreq.db_file_id);
1108         e->setErrorContext(cookie, "Temporary failure in compacting db file.");
1109         res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
1110         break;
1111     default:
1112         --stats.pendingCompactions;
1113         LOG(EXTENSION_LOG_WARNING,
1114             "Compaction of db file id: %d failed "
1115             "because of unknown reasons",
1116             compactreq.db_file_id);
1117         e->setErrorContext(cookie, "Failed to compact db file.  Unknown reason.");
1118         res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
1119         break;
1120     }
1121 
1122     if (err == ENGINE_NOT_MY_VBUCKET) {
1123         return err;
1124     }
1125 
1126     return sendResponse(response,
1127                         NULL,
1128                         0,
1129                         NULL,
1130                         0,
1131                         NULL,
1132                         0,
1133                         PROTOCOL_BINARY_RAW_BYTES,
1134                         res,
1135                         cas,
1136                         cookie);
1137 }
1138 
processUnknownCommand(EventuallyPersistentEngine * h,const void * cookie,protocol_binary_request_header * request,ADD_RESPONSE response,DocNamespace docNamespace)1139 static ENGINE_ERROR_CODE processUnknownCommand(
1140     EventuallyPersistentEngine* h,
1141     const void* cookie,
1142     protocol_binary_request_header* request,
1143     ADD_RESPONSE response,
1144     DocNamespace docNamespace) {
1145     protocol_binary_response_status res =
1146         PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
1147     std::string dynamic_msg;
1148     const char* msg = NULL;
1149     size_t msg_size = 0;
1150     Item* itm = NULL;
1151 
1152     EPStats& stats = h->getEpStats();
1153     ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
1154 
1155     /**
1156      * Session validation
1157      * (For ns_server commands only)
1158      */
1159     switch (request->request.opcode) {
1160     case PROTOCOL_BINARY_CMD_SET_PARAM:
1161     case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1162     case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1163     case PROTOCOL_BINARY_CMD_COMPACT_DB: {
1164         if (h->getEngineSpecific(cookie) == NULL) {
1165             uint64_t cas = ntohll(request->request.cas);
1166             if (!h->validateSessionCas(cas)) {
1167                 h->setErrorContext(cookie, "Invalid session token");
1168                 return sendResponse(response, NULL, 0, NULL, 0,
1169                                     NULL, 0,
1170                                     PROTOCOL_BINARY_RAW_BYTES,
1171                                     PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS,
1172                                     cas, cookie);
1173             }
1174         }
1175         break;
1176     }
1177     default:
1178         break;
1179     }
1180 
1181     switch (request->request.opcode) {
1182     case PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS:
1183         return h->getAllVBucketSequenceNumbers(cookie, request, response);
1184 
1185     case PROTOCOL_BINARY_CMD_GET_VBUCKET: {
1186         BlockTimer timer(&stats.getVbucketCmdHisto);
1187         rv = getVBucket(h, cookie, request, response);
1188         return rv;
1189     }
1190     case PROTOCOL_BINARY_CMD_DEL_VBUCKET: {
1191         BlockTimer timer(&stats.delVbucketCmdHisto);
1192         rv = delVBucket(h, cookie, request, response);
1193         if (rv != ENGINE_EWOULDBLOCK) {
1194             h->decrementSessionCtr();
1195             h->storeEngineSpecific(cookie, NULL);
1196         }
1197         return rv;
1198     }
1199     case PROTOCOL_BINARY_CMD_SET_VBUCKET: {
1200         BlockTimer timer(&stats.setVbucketCmdHisto);
1201         rv = setVBucket(h, cookie, request, response);
1202         h->decrementSessionCtr();
1203         return rv;
1204     }
1205     case PROTOCOL_BINARY_CMD_STOP_PERSISTENCE:
1206         res = h->stopFlusher(&msg, &msg_size);
1207         break;
1208     case PROTOCOL_BINARY_CMD_START_PERSISTENCE:
1209         res = h->startFlusher(&msg, &msg_size);
1210         break;
1211     case PROTOCOL_BINARY_CMD_SET_PARAM:
1212         res = h->setParam(
1213                 reinterpret_cast<protocol_binary_request_set_param*>(request),
1214                 dynamic_msg);
1215         msg = dynamic_msg.c_str();
1216         msg_size = dynamic_msg.length();
1217         h->decrementSessionCtr();
1218         break;
1219     case PROTOCOL_BINARY_CMD_EVICT_KEY:
1220         res = evictKey(h, request, &msg, &msg_size, docNamespace);
1221         break;
1222     case PROTOCOL_BINARY_CMD_OBSERVE:
1223         return h->observe(cookie, request, response, docNamespace);
1224     case PROTOCOL_BINARY_CMD_OBSERVE_SEQNO:
1225         return h->observe_seqno(cookie, request, response);
1226     case PROTOCOL_BINARY_CMD_LAST_CLOSED_CHECKPOINT:
1227     case PROTOCOL_BINARY_CMD_CREATE_CHECKPOINT:
1228     case PROTOCOL_BINARY_CMD_CHECKPOINT_PERSISTENCE: {
1229         rv = h->handleCheckpointCmds(cookie, request, response);
1230         return rv;
1231     }
1232     case PROTOCOL_BINARY_CMD_SEQNO_PERSISTENCE: {
1233         rv = h->handleSeqnoCmds(cookie, request, response);
1234         return rv;
1235     }
1236     case PROTOCOL_BINARY_CMD_SET_WITH_META:
1237     case PROTOCOL_BINARY_CMD_SETQ_WITH_META:
1238     case PROTOCOL_BINARY_CMD_ADD_WITH_META:
1239     case PROTOCOL_BINARY_CMD_ADDQ_WITH_META: {
1240         rv = h->setWithMeta(cookie,
1241                             reinterpret_cast<protocol_binary_request_set_with_meta*>
1242                             (request), response,
1243                             docNamespace);
1244         return rv;
1245     }
1246     case PROTOCOL_BINARY_CMD_DEL_WITH_META:
1247     case PROTOCOL_BINARY_CMD_DELQ_WITH_META: {
1248         rv = h->deleteWithMeta(cookie,
1249                                reinterpret_cast<protocol_binary_request_delete_with_meta*>
1250                                (request), response,
1251                                docNamespace);
1252         return rv;
1253     }
1254     case PROTOCOL_BINARY_CMD_RETURN_META: {
1255         return h->returnMeta(cookie,
1256                              reinterpret_cast<protocol_binary_request_return_meta*>
1257                              (request), response,
1258                              docNamespace);
1259     }
1260     case PROTOCOL_BINARY_CMD_GET_REPLICA:
1261         rv = getReplicaCmd(h, request, cookie, &itm, &msg, &res, docNamespace);
1262         if (rv != ENGINE_SUCCESS && rv != ENGINE_NOT_MY_VBUCKET) {
1263             return rv;
1264         }
1265         break;
1266     case PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC:
1267     case PROTOCOL_BINARY_CMD_DISABLE_TRAFFIC: {
1268         rv = h->handleTrafficControlCmd(cookie, request, response);
1269         return rv;
1270     }
1271     case PROTOCOL_BINARY_CMD_COMPACT_DB: {
1272         rv = compactDB(h, cookie,
1273                        (protocol_binary_request_compact_db*)(request),
1274                        response);
1275         if (rv != ENGINE_EWOULDBLOCK) {
1276             h->decrementSessionCtr();
1277             h->storeEngineSpecific(cookie, NULL);
1278         }
1279         return rv;
1280     }
1281     case PROTOCOL_BINARY_CMD_GET_RANDOM_KEY: {
1282         if (request->request.extlen != 0 ||
1283             request->request.keylen != 0 ||
1284             request->request.bodylen != 0) {
1285             return ENGINE_EINVAL;
1286         }
1287         return h->getRandomKey(cookie, response);
1288     }
1289     case PROTOCOL_BINARY_CMD_GET_KEYS: {
1290         return h->getAllKeys(cookie,
1291                              reinterpret_cast<protocol_binary_request_get_keys*>
1292                              (request), response,
1293                              docNamespace);
1294     }
1295         // MB-21143: Remove adjusted time/drift API, but return NOT_SUPPORTED
1296     case PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME:
1297     case PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE: {
1298         return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
1299                             PROTOCOL_BINARY_RAW_BYTES,
1300                             PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0,
1301                             cookie);
1302     }
1303     }
1304 
1305     if (itm) {
1306         uint32_t flags = itm->getFlags();
1307         rv = sendResponse(response,
1308                           static_cast<const void*>(itm->getKey().data()),
1309                           itm->getKey().size(),
1310                           (const void*)&flags, sizeof(uint32_t),
1311                           static_cast<const void*>(itm->getData()),
1312                           itm->getNBytes(), itm->getDataType(),
1313                           static_cast<uint16_t>(res), itm->getCas(),
1314                           cookie);
1315         delete itm;
1316     } else if (rv == ENGINE_NOT_MY_VBUCKET) {
1317         return rv;
1318     } else {
1319         msg_size = (msg_size > 0 || msg == NULL) ? msg_size : strlen(msg);
1320         rv = sendResponse(response, NULL, 0, NULL, 0,
1321                           msg, static_cast<uint16_t>(msg_size),
1322                           PROTOCOL_BINARY_RAW_BYTES,
1323                           static_cast<uint16_t>(res), 0, cookie);
1324 
1325     }
1326     return rv;
1327 }
1328 
EvpUnknownCommand(gsl::not_null<ENGINE_HANDLE * > handle,const void * cookie,gsl::not_null<protocol_binary_request_header * > request,ADD_RESPONSE response,DocNamespace doc_namespace)1329 static ENGINE_ERROR_CODE EvpUnknownCommand(
1330         gsl::not_null<ENGINE_HANDLE*> handle,
1331         const void* cookie,
1332         gsl::not_null<protocol_binary_request_header*> request,
1333         ADD_RESPONSE response,
1334         DocNamespace doc_namespace) {
1335     auto engine = acquireEngine(handle);
1336     auto ret = processUnknownCommand(
1337             engine.get(), cookie, request, response, doc_namespace);
1338     return ret;
1339 }
1340 
EvpItemSetCas(gsl::not_null<ENGINE_HANDLE * >,gsl::not_null<item * > itm,uint64_t cas)1341 static void EvpItemSetCas(gsl::not_null<ENGINE_HANDLE*>,
1342                           gsl::not_null<item*> itm,
1343                           uint64_t cas) {
1344     static_cast<Item*>(itm.get())->setCas(cas);
1345 }
1346 
EvpItemSetDatatype(gsl::not_null<ENGINE_HANDLE * >,gsl::not_null<item * > itm,protocol_binary_datatype_t datatype)1347 static void EvpItemSetDatatype(gsl::not_null<ENGINE_HANDLE*>,
1348                                gsl::not_null<item*> itm,
1349                                protocol_binary_datatype_t datatype) {
1350     static_cast<Item*>(itm.get())->setDataType(datatype);
1351 }
1352 
EvpDcpStep(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,gsl::not_null<dcp_message_producers * > producers)1353 static ENGINE_ERROR_CODE EvpDcpStep(
1354         gsl::not_null<ENGINE_HANDLE*> handle,
1355         gsl::not_null<const void*> cookie,
1356         gsl::not_null<dcp_message_producers*> producers) {
1357     auto engine = acquireEngine(handle);
1358     ConnHandler* conn = engine->getConnHandler(cookie);
1359     if (conn) {
1360         return conn->step(producers);
1361     }
1362     return ENGINE_DISCONNECT;
1363 }
1364 
EvpDcpOpen(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,uint32_t opaque,uint32_t seqno,uint32_t flags,cb::const_char_buffer name,cb::const_byte_buffer jsonExtra)1365 static ENGINE_ERROR_CODE EvpDcpOpen(gsl::not_null<ENGINE_HANDLE*> handle,
1366                                     gsl::not_null<const void*> cookie,
1367                                     uint32_t opaque,
1368                                     uint32_t seqno,
1369                                     uint32_t flags,
1370                                     cb::const_char_buffer name,
1371                                     cb::const_byte_buffer jsonExtra) {
1372     return acquireEngine(handle)->dcpOpen(
1373             cookie, opaque, seqno, flags, name, jsonExtra);
1374 }
1375 
EvpDcpAddStream(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,uint32_t opaque,uint16_t vbucket,uint32_t flags)1376 static ENGINE_ERROR_CODE EvpDcpAddStream(gsl::not_null<ENGINE_HANDLE*> handle,
1377                                          gsl::not_null<const void*> cookie,
1378                                          uint32_t opaque,
1379                                          uint16_t vbucket,
1380                                          uint32_t flags) {
1381     return acquireEngine(handle)->dcpAddStream(cookie, opaque, vbucket, flags);
1382 }
1383 
EvpDcpCloseStream(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,uint32_t opaque,uint16_t vbucket)1384 static ENGINE_ERROR_CODE EvpDcpCloseStream(gsl::not_null<ENGINE_HANDLE*> handle,
1385                                            gsl::not_null<const void*> cookie,
1386                                            uint32_t opaque,
1387                                            uint16_t vbucket) {
1388     auto engine = acquireEngine(handle);
1389     ConnHandler* conn = engine->getConnHandler(cookie);
1390     if (conn) {
1391         return conn->closeStream(opaque, vbucket);
1392     }
1393     return ENGINE_DISCONNECT;
1394 }
1395 
EvpDcpStreamReq(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,uint32_t flags,uint32_t opaque,uint16_t vbucket,uint64_t startSeqno,uint64_t endSeqno,uint64_t vbucketUuid,uint64_t snapStartSeqno,uint64_t snapEndSeqno,uint64_t * rollbackSeqno,dcp_add_failover_log callback)1396 static ENGINE_ERROR_CODE EvpDcpStreamReq(gsl::not_null<ENGINE_HANDLE*> handle,
1397                                          gsl::not_null<const void*> cookie,
1398                                          uint32_t flags,
1399                                          uint32_t opaque,
1400                                          uint16_t vbucket,
1401                                          uint64_t startSeqno,
1402                                          uint64_t endSeqno,
1403                                          uint64_t vbucketUuid,
1404                                          uint64_t snapStartSeqno,
1405                                          uint64_t snapEndSeqno,
1406                                          uint64_t* rollbackSeqno,
1407                                          dcp_add_failover_log callback) {
1408     auto engine = acquireEngine(handle);
1409     ConnHandler* conn = engine->getConnHandler(cookie);
1410     if (conn) {
1411         return conn->streamRequest(flags,
1412                                    opaque,
1413                                    vbucket,
1414                                    startSeqno,
1415                                    endSeqno,
1416                                    vbucketUuid,
1417                                    snapStartSeqno,
1418                                    snapEndSeqno,
1419                                    rollbackSeqno,
1420                                    callback);
1421     }
1422     return ENGINE_DISCONNECT;
1423 }
1424 
EvpDcpGetFailoverLog(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,uint32_t opaque,uint16_t vbucket,dcp_add_failover_log callback)1425 static ENGINE_ERROR_CODE EvpDcpGetFailoverLog(
1426         gsl::not_null<ENGINE_HANDLE*> handle,
1427         gsl::not_null<const void*> cookie,
1428         uint32_t opaque,
1429         uint16_t vbucket,
1430         dcp_add_failover_log callback) {
1431     // This function covers two commands:
1432     // 1) DCP_GET_FAILOVER_LOG
1433     //     It is valid only on a DCP Producer connection. Updates the
1434     //     'lastReceiveTime' for the Producer.
1435     // 2) GET_FAILOVER_LOG
1436     //     It does not require a DCP connection (the client has opened
1437     //     a regular MCBP connection).
1438     auto engine = acquireEngine(handle);
1439     ConnHandler* conn = engine->getConnHandler(cookie);
1440     // Note: (conn != nullptr) only if conn is a DCP connection
1441     if (conn) {
1442         auto* producer = dynamic_cast<DcpProducer*>(conn);
1443         // GetFailoverLog not supported for DcpConsumer
1444         if (!producer) {
1445             LOG(EXTENSION_LOG_WARNING,
1446                 "Disconnecting - This connection doesn't support the dcp get "
1447                 "failover log API");
1448             return ENGINE_DISCONNECT;
1449         }
1450         producer->setLastReceiveTime(ep_current_time());
1451         if (producer->doDisconnect()) {
1452             return ENGINE_DISCONNECT;
1453         }
1454     }
1455     VBucketPtr vb = engine->getVBucket(vbucket);
1456     if (!vb) {
1457         LOG(EXTENSION_LOG_WARNING,
1458             "%s (vb %d) Get Failover Log failed because this vbucket doesn't "
1459             "exist",
1460             conn ? conn->logHeader() : "MCBP-Connection",
1461             vbucket);
1462         return ENGINE_NOT_MY_VBUCKET;
1463     }
1464     auto failoverEntries = vb->failovers->getFailoverLog();
1465     auto* epEngine = ObjectRegistry::onSwitchThread(NULL, true);
1466     auto ret = callback(failoverEntries.data(), failoverEntries.size(), cookie);
1467     ObjectRegistry::onSwitchThread(epEngine);
1468     return ret;
1469 }
1470 
EvpDcpStreamEnd(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,uint32_t opaque,uint16_t vbucket,uint32_t flags)1471 static ENGINE_ERROR_CODE EvpDcpStreamEnd(gsl::not_null<ENGINE_HANDLE*> handle,
1472                                          gsl::not_null<const void*> cookie,
1473                                          uint32_t opaque,
1474                                          uint16_t vbucket,
1475                                          uint32_t flags) {
1476     auto engine = acquireEngine(handle);
1477     ConnHandler* conn = engine->getConnHandler(cookie);
1478     if (conn) {
1479         return conn->streamEnd(opaque, vbucket, flags);
1480     }
1481     return ENGINE_DISCONNECT;
1482 }
1483 
EvpDcpSnapshotMarker(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,uint32_t opaque,uint16_t vbucket,uint64_t start_seqno,uint64_t end_seqno,uint32_t flags)1484 static ENGINE_ERROR_CODE EvpDcpSnapshotMarker(
1485         gsl::not_null<ENGINE_HANDLE*> handle,
1486         gsl::not_null<const void*> cookie,
1487         uint32_t opaque,
1488         uint16_t vbucket,
1489         uint64_t start_seqno,
1490         uint64_t end_seqno,
1491         uint32_t flags) {
1492     auto engine = acquireEngine(handle);
1493     ConnHandler* conn = engine->getConnHandler(cookie);
1494     if (conn) {
1495         return conn->snapshotMarker(
1496                 opaque, vbucket, start_seqno, end_seqno, flags);
1497     }
1498     return ENGINE_DISCONNECT;
1499 }
1500 
EvpDcpMutation(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,uint32_t opaque,const DocKey & key,cb::const_byte_buffer value,size_t priv_bytes,uint8_t datatype,uint64_t cas,uint16_t vbucket,uint32_t flags,uint64_t by_seqno,uint64_t rev_seqno,uint32_t expiration,uint32_t lock_time,cb::const_byte_buffer meta,uint8_t nru)1501 static ENGINE_ERROR_CODE EvpDcpMutation(gsl::not_null<ENGINE_HANDLE*> handle,
1502                                         gsl::not_null<const void*> cookie,
1503                                         uint32_t opaque,
1504                                         const DocKey& key,
1505                                         cb::const_byte_buffer value,
1506                                         size_t priv_bytes,
1507                                         uint8_t datatype,
1508                                         uint64_t cas,
1509                                         uint16_t vbucket,
1510                                         uint32_t flags,
1511                                         uint64_t by_seqno,
1512                                         uint64_t rev_seqno,
1513                                         uint32_t expiration,
1514                                         uint32_t lock_time,
1515                                         cb::const_byte_buffer meta,
1516                                         uint8_t nru) {
1517     if (!mcbp::datatype::is_valid(datatype)) {
1518         LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1519             " (DCPMutation)");
1520         return ENGINE_EINVAL;
1521     }
1522     auto engine = acquireEngine(handle);
1523     ConnHandler* conn = engine->getConnHandler(cookie);
1524     if (conn) {
1525         return conn->mutation(opaque, key, value, priv_bytes, datatype, cas,
1526                               vbucket, flags, by_seqno, rev_seqno, expiration,
1527                               lock_time, meta, nru);
1528     }
1529     return ENGINE_DISCONNECT;
1530 }
1531 
EvpDcpDeletion(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,uint32_t opaque,const DocKey & key,cb::const_byte_buffer value,size_t priv_bytes,uint8_t datatype,uint64_t cas,uint16_t vbucket,uint64_t by_seqno,uint64_t rev_seqno,cb::const_byte_buffer meta)1532 static ENGINE_ERROR_CODE EvpDcpDeletion(gsl::not_null<ENGINE_HANDLE*> handle,
1533                                         gsl::not_null<const void*> cookie,
1534                                         uint32_t opaque,
1535                                         const DocKey& key,
1536                                         cb::const_byte_buffer value,
1537                                         size_t priv_bytes,
1538                                         uint8_t datatype,
1539                                         uint64_t cas,
1540                                         uint16_t vbucket,
1541                                         uint64_t by_seqno,
1542                                         uint64_t rev_seqno,
1543                                         cb::const_byte_buffer meta) {
1544     auto engine = acquireEngine(handle);
1545     ConnHandler* conn = engine->getConnHandler(cookie);
1546     if (conn) {
1547         return conn->deletion(opaque, key, value, priv_bytes, datatype, cas,
1548                               vbucket, by_seqno, rev_seqno, meta);
1549     }
1550     return ENGINE_DISCONNECT;
1551 }
1552 
EvpDcpDeletionV2(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,uint32_t opaque,const DocKey & key,cb::const_byte_buffer value,size_t priv_bytes,uint8_t datatype,uint64_t cas,uint16_t vbucket,uint64_t by_seqno,uint64_t rev_seqno,uint32_t delete_time)1553 static ENGINE_ERROR_CODE EvpDcpDeletionV2(gsl::not_null<ENGINE_HANDLE*> handle,
1554                                           gsl::not_null<const void*> cookie,
1555                                           uint32_t opaque,
1556                                           const DocKey& key,
1557                                           cb::const_byte_buffer value,
1558                                           size_t priv_bytes,
1559                                           uint8_t datatype,
1560                                           uint64_t cas,
1561                                           uint16_t vbucket,
1562                                           uint64_t by_seqno,
1563                                           uint64_t rev_seqno,
1564                                           uint32_t delete_time) {
1565     auto engine = acquireEngine(handle);
1566     ConnHandler* conn = engine->getConnHandler(cookie);
1567     if (conn) {
1568         return conn->deletionV2(opaque,
1569                                 key,
1570                                 value,
1571                                 priv_bytes,
1572                                 datatype,
1573                                 cas,
1574                                 vbucket,
1575                                 by_seqno,
1576                                 rev_seqno,
1577                                 delete_time);
1578     }
1579     return ENGINE_DISCONNECT;
1580 }
1581 
EvpDcpExpiration(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,uint32_t opaque,const DocKey & key,cb::const_byte_buffer value,size_t priv_bytes,uint8_t datatype,uint64_t cas,uint16_t vbucket,uint64_t by_seqno,uint64_t rev_seqno,cb::const_byte_buffer meta)1582 static ENGINE_ERROR_CODE EvpDcpExpiration(gsl::not_null<ENGINE_HANDLE*> handle,
1583                                           gsl::not_null<const void*> cookie,
1584                                           uint32_t opaque,
1585                                           const DocKey& key,
1586                                           cb::const_byte_buffer value,
1587                                           size_t priv_bytes,
1588                                           uint8_t datatype,
1589                                           uint64_t cas,
1590                                           uint16_t vbucket,
1591                                           uint64_t by_seqno,
1592                                           uint64_t rev_seqno,
1593                                           cb::const_byte_buffer meta) {
1594     auto engine = acquireEngine(handle);
1595     ConnHandler* conn = engine->getConnHandler(cookie);
1596     if (conn) {
1597         return conn->expiration(opaque, key, value, priv_bytes, datatype, cas,
1598                                 vbucket, by_seqno, rev_seqno, meta);
1599     }
1600     return ENGINE_DISCONNECT;
1601 }
1602 
EvpDcpFlush(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,uint32_t opaque,uint16_t vbucket)1603 static ENGINE_ERROR_CODE EvpDcpFlush(gsl::not_null<ENGINE_HANDLE*> handle,
1604                                      gsl::not_null<const void*> cookie,
1605                                      uint32_t opaque,
1606                                      uint16_t vbucket) {
1607     auto engine = acquireEngine(handle);
1608     ConnHandler* conn = engine->getConnHandler(cookie);
1609     if (conn) {
1610         return conn->flushall(opaque, vbucket);
1611     }
1612     return ENGINE_DISCONNECT;
1613 }
1614 
EvpDcpSetVbucketState(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,uint32_t opaque,uint16_t vbucket,vbucket_state_t state)1615 static ENGINE_ERROR_CODE EvpDcpSetVbucketState(
1616         gsl::not_null<ENGINE_HANDLE*> handle,
1617         gsl::not_null<const void*> cookie,
1618         uint32_t opaque,
1619         uint16_t vbucket,
1620         vbucket_state_t state) {
1621     auto engine = acquireEngine(handle);
1622     ConnHandler* conn = engine->getConnHandler(cookie);
1623     if (conn) {
1624         return conn->setVBucketState(opaque, vbucket, state);
1625     }
1626     return ENGINE_DISCONNECT;
1627 }
1628 
EvpDcpNoop(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,uint32_t opaque)1629 static ENGINE_ERROR_CODE EvpDcpNoop(gsl::not_null<ENGINE_HANDLE*> handle,
1630                                     gsl::not_null<const void*> cookie,
1631                                     uint32_t opaque) {
1632     auto engine = acquireEngine(handle);
1633     ConnHandler* conn = engine->getConnHandler(cookie);
1634     if (conn) {
1635         return conn->noop(opaque);
1636     }
1637     return ENGINE_DISCONNECT;
1638 }
1639 
EvpDcpBufferAcknowledgement(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,uint32_t opaque,uint16_t vbucket,uint32_t buffer_bytes)1640 static ENGINE_ERROR_CODE EvpDcpBufferAcknowledgement(
1641         gsl::not_null<ENGINE_HANDLE*> handle,
1642         gsl::not_null<const void*> cookie,
1643         uint32_t opaque,
1644         uint16_t vbucket,
1645         uint32_t buffer_bytes) {
1646     auto engine = acquireEngine(handle);
1647     ConnHandler* conn = engine->getConnHandler(cookie);
1648     if (conn) {
1649         return conn->bufferAcknowledgement(opaque, vbucket, buffer_bytes);
1650     }
1651     return ENGINE_DISCONNECT;
1652 }
1653 
EvpDcpControl(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,uint32_t opaque,const void * key,uint16_t nkey,const void * value,uint32_t nvalue)1654 static ENGINE_ERROR_CODE EvpDcpControl(gsl::not_null<ENGINE_HANDLE*> handle,
1655                                        gsl::not_null<const void*> cookie,
1656                                        uint32_t opaque,
1657                                        const void* key,
1658                                        uint16_t nkey,
1659                                        const void* value,
1660                                        uint32_t nvalue) {
1661     auto engine = acquireEngine(handle);
1662     ConnHandler* conn = engine->getConnHandler(cookie);
1663     if (conn) {
1664         return conn->control(opaque, key, nkey, value, nvalue);
1665     }
1666     return ENGINE_DISCONNECT;
1667 }
1668 
EvpDcpResponseHandler(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const protocol_binary_response_header * response)1669 static ENGINE_ERROR_CODE EvpDcpResponseHandler(
1670         gsl::not_null<ENGINE_HANDLE*> handle,
1671         gsl::not_null<const void*> cookie,
1672         const protocol_binary_response_header* response) {
1673     auto engine = acquireEngine(handle);
1674     ConnHandler* conn = engine->getConnHandler(cookie);
1675     if (conn) {
1676         if (conn->handleResponse(response)) {
1677             return ENGINE_SUCCESS;
1678         }
1679     }
1680     return ENGINE_DISCONNECT;
1681 }
1682 
EvpDcpSystemEvent(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,uint32_t opaque,uint16_t vbucket,mcbp::systemevent::id event,uint64_t bySeqno,cb::const_byte_buffer key,cb::const_byte_buffer eventData)1683 static ENGINE_ERROR_CODE EvpDcpSystemEvent(gsl::not_null<ENGINE_HANDLE*> handle,
1684                                            gsl::not_null<const void*> cookie,
1685                                            uint32_t opaque,
1686                                            uint16_t vbucket,
1687                                            mcbp::systemevent::id event,
1688                                            uint64_t bySeqno,
1689                                            cb::const_byte_buffer key,
1690                                            cb::const_byte_buffer eventData) {
1691     auto engine = acquireEngine(handle);
1692     ConnHandler* conn = engine->getConnHandler(cookie);
1693     if (conn) {
1694         return conn->systemEvent(
1695                 opaque, vbucket, event, bySeqno, key, eventData);
1696     }
1697     return ENGINE_DISCONNECT;
1698 }
1699 
EvpHandleDisconnect(const void * cookie,ENGINE_EVENT_TYPE type,const void * event_data,const void * cb_data)1700 static void EvpHandleDisconnect(const void* cookie,
1701                                 ENGINE_EVENT_TYPE type,
1702                                 const void* event_data,
1703                                 const void* cb_data) {
1704     if (type != ON_DISCONNECT) {
1705         throw std::invalid_argument("EvpHandleDisconnect: type "
1706                                         "(which is" + std::to_string(type) +
1707                                     ") is not ON_DISCONNECT");
1708     }
1709     if (event_data != nullptr) {
1710         throw std::invalid_argument("EvpHandleDisconnect: event_data "
1711                                         "is not NULL");
1712     }
1713     void* c = const_cast<void*>(cb_data);
1714     acquireEngine(static_cast<ENGINE_HANDLE*>(c))->handleDisconnect(cookie);
1715 }
1716 
EvpHandleDeleteBucket(const void * cookie,ENGINE_EVENT_TYPE type,const void * event_data,const void * cb_data)1717 static void EvpHandleDeleteBucket(const void* cookie,
1718                                   ENGINE_EVENT_TYPE type,
1719                                   const void* event_data,
1720                                   const void* cb_data) {
1721     if (type != ON_DELETE_BUCKET) {
1722         throw std::invalid_argument("EvpHandleDeleteBucket: type "
1723                                         "(which is" + std::to_string(type) +
1724                                     ") is not ON_DELETE_BUCKET");
1725     }
1726     if (event_data != nullptr) {
1727         throw std::invalid_argument("EvpHandleDeleteBucket: event_data "
1728                                         "is not NULL");
1729     }
1730     void* c = const_cast<void*>(cb_data);
1731     acquireEngine(static_cast<ENGINE_HANDLE*>(c))->handleDeleteBucket(cookie);
1732 }
1733 
EvpSetLogLevel(gsl::not_null<ENGINE_HANDLE * > handle,EXTENSION_LOG_LEVEL level)1734 void EvpSetLogLevel(gsl::not_null<ENGINE_HANDLE*> handle,
1735                     EXTENSION_LOG_LEVEL level) {
1736     Logger::setGlobalLogLevel(level);
1737 }
1738 
1739 /**
1740  * The only public interface to the eventually persistent engine.
1741  * Allocate a new instance and initialize it
1742  * @param interface the highest interface the server supports (we only
1743  *                  support interface 1)
1744  * @param get_server_api callback function to get the server exported API
1745  *                  functions
1746  * @param handle Where to return the new instance
1747  * @return ENGINE_SUCCESS on success
1748  */
create_instance(uint64_t interface,GET_SERVER_API get_server_api,ENGINE_HANDLE ** handle)1749 ENGINE_ERROR_CODE create_instance(uint64_t interface,
1750                                   GET_SERVER_API get_server_api,
1751                                   ENGINE_HANDLE** handle) {
1752     SERVER_HANDLE_V1* api = get_server_api();
1753     if (interface != 1 || api == NULL) {
1754         return ENGINE_ENOTSUP;
1755     }
1756 
1757     Logger::setLoggerAPI(api->log);
1758 
1759     MemoryTracker::getInstance(*api->alloc_hooks);
1760     ObjectRegistry::initialize(api->alloc_hooks->get_allocation_size);
1761 
1762     std::atomic<size_t>* inital_tracking = new std::atomic<size_t>();
1763 
1764     ObjectRegistry::setStats(inital_tracking);
1765     EventuallyPersistentEngine* engine;
1766     engine = new EventuallyPersistentEngine(get_server_api);
1767     ObjectRegistry::setStats(NULL);
1768 
1769     if (engine == NULL) {
1770         return ENGINE_ENOMEM;
1771     }
1772 
1773     if (MemoryTracker::trackingMemoryAllocations()) {
1774         engine->getEpStats().estimatedTotalMemory.get()->store(
1775                 inital_tracking->load());
1776         engine->getEpStats().memoryTrackerEnabled.store(true);
1777     }
1778     delete inital_tracking;
1779 
1780     initialize_time_functions(api->core);
1781 
1782     *handle = reinterpret_cast<ENGINE_HANDLE*> (engine);
1783 
1784     return ENGINE_SUCCESS;
1785 }
1786 
1787 /*
1788     This method is called prior to unloading of the shared-object.
1789     Global clean-up should be performed from this method.
1790 */
destroy_engine()1791 void destroy_engine() {
1792     ExecutorPool::shutdown();
1793     // A single MemoryTracker exists for *all* buckets
1794     // and must be destroyed before unloading the shared object.
1795     MemoryTracker::destroyInstance();
1796     ObjectRegistry::reset();
1797 }
1798 
EvpGetItemInfo(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const item * > itm,gsl::not_null<item_info * > itm_info)1799 static bool EvpGetItemInfo(gsl::not_null<ENGINE_HANDLE*> handle,
1800                            gsl::not_null<const item*> itm,
1801                            gsl::not_null<item_info*> itm_info) {
1802     const Item* it = reinterpret_cast<const Item*>(itm.get());
1803     auto engine = acquireEngine(handle);
1804     *itm_info = engine->getItemInfo(*it);
1805     return true;
1806 }
1807 
EvpGetMeta(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,uint16_t vbucket)1808 static cb::EngineErrorMetadataPair EvpGetMeta(
1809         gsl::not_null<ENGINE_HANDLE*> handle,
1810         gsl::not_null<const void*> cookie,
1811         const DocKey& key,
1812         uint16_t vbucket) {
1813     return acquireEngine(handle)->getMeta(cookie, key, vbucket);
1814 }
1815 
EvpSetItemInfo(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<item * > itm,gsl::not_null<const item_info * > itm_info)1816 static bool EvpSetItemInfo(gsl::not_null<ENGINE_HANDLE*> handle,
1817                            gsl::not_null<item*> itm,
1818                            gsl::not_null<const item_info*> itm_info) {
1819     Item* it = reinterpret_cast<Item*>(itm.get());
1820     it->setDataType(itm_info->datatype);
1821     return true;
1822 }
1823 
EvpCollectionsSetManifest(gsl::not_null<ENGINE_HANDLE * > handle,cb::const_char_buffer json)1824 static cb::engine_error EvpCollectionsSetManifest(
1825         gsl::not_null<ENGINE_HANDLE*> handle, cb::const_char_buffer json) {
1826     auto engine = acquireEngine(handle);
1827     return engine->getKVBucket()->setCollections(json);
1828 }
1829 
EvpCollectionsGetManifest(gsl::not_null<ENGINE_HANDLE * > handle)1830 static cb::EngineErrorStringPair EvpCollectionsGetManifest(
1831         gsl::not_null<ENGINE_HANDLE*> handle) {
1832     auto engine = acquireEngine(handle);
1833     return engine->getKVBucket()->getCollections();
1834 }
1835 
EvpIsXattrEnabled(gsl::not_null<ENGINE_HANDLE * > handle)1836 static bool EvpIsXattrEnabled(gsl::not_null<ENGINE_HANDLE*> handle) {
1837     auto engine = acquireEngine(handle);
1838     return engine->getKVBucket()->isXattrEnabled();
1839 }
1840 
EvpGetCompressionMode(gsl::not_null<ENGINE_HANDLE * > handle)1841 static BucketCompressionMode EvpGetCompressionMode(gsl::not_null<ENGINE_HANDLE*> handle) {
1842     auto engine = acquireEngine(handle);
1843     return engine->getCompressionMode();
1844 }
1845 
EvpGetMinCompressionRatio(gsl::not_null<ENGINE_HANDLE * > handle)1846 static float EvpGetMinCompressionRatio(gsl::not_null<ENGINE_HANDLE*> handle) {
1847     auto engine = acquireEngine(handle);
1848     return engine->getMinCompressionRatio();
1849 }
1850 
LOG(EXTENSION_LOG_LEVEL severity,const char * fmt,...)1851 void LOG(EXTENSION_LOG_LEVEL severity, const char *fmt, ...) {
1852     va_list va;
1853     va_start(va, fmt);
1854     global_logger.vlog(severity, fmt, va);
1855     va_end(va);
1856 }
1857 
EventuallyPersistentEngine(GET_SERVER_API get_server_api)1858 EventuallyPersistentEngine::EventuallyPersistentEngine(
1859         GET_SERVER_API get_server_api)
1860     : kvBucket(nullptr),
1861       workload(NULL),
1862       workloadPriority(NO_BUCKET_PRIORITY),
1863       getServerApiFunc(get_server_api),
1864       checkpointConfig(NULL),
1865       trafficEnabled(false),
1866       startupTime(0),
1867       taskable(this),
1868       compressionMode(BucketCompressionMode::Off),
1869       minCompressionRatio(default_min_compression_ratio) {
1870     interface.interface = 1;
1871     ENGINE_HANDLE_V1::initialize = EvpInitialize;
1872     ENGINE_HANDLE_V1::destroy = EvpDestroy;
1873     ENGINE_HANDLE_V1::allocate = EvpItemAllocate;
1874     ENGINE_HANDLE_V1::allocate_ex = EvpItemAllocateEx;
1875     ENGINE_HANDLE_V1::remove = EvpItemDelete;
1876     ENGINE_HANDLE_V1::release = EvpItemRelease;
1877     ENGINE_HANDLE_V1::get = EvpGet;
1878     ENGINE_HANDLE_V1::get_if = EvpGetIf;
1879     ENGINE_HANDLE_V1::get_and_touch = EvpGetAndTouch;
1880     ENGINE_HANDLE_V1::get_locked = EvpGetLocked;
1881     ENGINE_HANDLE_V1::get_meta = EvpGetMeta;
1882     ENGINE_HANDLE_V1::unlock = EvpUnlock;
1883     ENGINE_HANDLE_V1::get_stats = EvpGetStats;
1884     ENGINE_HANDLE_V1::reset_stats = EvpResetStats;
1885     ENGINE_HANDLE_V1::store = EvpStore;
1886     ENGINE_HANDLE_V1::store_if = EvpStoreIf;
1887     ENGINE_HANDLE_V1::flush = EvpFlush;
1888     ENGINE_HANDLE_V1::unknown_command = EvpUnknownCommand;
1889     ENGINE_HANDLE_V1::item_set_cas = EvpItemSetCas;
1890     ENGINE_HANDLE_V1::item_set_datatype = EvpItemSetDatatype;
1891     ENGINE_HANDLE_V1::get_item_info = EvpGetItemInfo;
1892     ENGINE_HANDLE_V1::set_item_info = EvpSetItemInfo;
1893 
1894     ENGINE_HANDLE_V1::dcp.step = EvpDcpStep;
1895     ENGINE_HANDLE_V1::dcp.open = EvpDcpOpen;
1896     ENGINE_HANDLE_V1::dcp.add_stream = EvpDcpAddStream;
1897     ENGINE_HANDLE_V1::dcp.close_stream = EvpDcpCloseStream;
1898     ENGINE_HANDLE_V1::dcp.get_failover_log = EvpDcpGetFailoverLog;
1899     ENGINE_HANDLE_V1::dcp.stream_req = EvpDcpStreamReq;
1900     ENGINE_HANDLE_V1::dcp.stream_end = EvpDcpStreamEnd;
1901     ENGINE_HANDLE_V1::dcp.snapshot_marker = EvpDcpSnapshotMarker;
1902     ENGINE_HANDLE_V1::dcp.mutation = EvpDcpMutation;
1903     ENGINE_HANDLE_V1::dcp.deletion = EvpDcpDeletion;
1904     ENGINE_HANDLE_V1::dcp.deletion_v2 = EvpDcpDeletionV2;
1905     ENGINE_HANDLE_V1::dcp.expiration = EvpDcpExpiration;
1906     ENGINE_HANDLE_V1::dcp.flush = EvpDcpFlush;
1907     ENGINE_HANDLE_V1::dcp.set_vbucket_state = EvpDcpSetVbucketState;
1908     ENGINE_HANDLE_V1::dcp.noop = EvpDcpNoop;
1909     ENGINE_HANDLE_V1::dcp.buffer_acknowledgement = EvpDcpBufferAcknowledgement;
1910     ENGINE_HANDLE_V1::dcp.control = EvpDcpControl;
1911     ENGINE_HANDLE_V1::dcp.response_handler = EvpDcpResponseHandler;
1912     ENGINE_HANDLE_V1::dcp.system_event = EvpDcpSystemEvent;
1913     ENGINE_HANDLE_V1::set_log_level = EvpSetLogLevel;
1914     ENGINE_HANDLE_V1::collections.set_manifest = EvpCollectionsSetManifest;
1915     ENGINE_HANDLE_V1::collections.get_manifest = EvpCollectionsGetManifest;
1916     ENGINE_HANDLE_V1::isXattrEnabled = EvpIsXattrEnabled;
1917     ENGINE_HANDLE_V1::getCompressionMode = EvpGetCompressionMode;
1918     ENGINE_HANDLE_V1::getMaxItemSize = EvpGetMaxItemSize;
1919     ENGINE_HANDLE_V1::getMinCompressionRatio = EvpGetMinCompressionRatio;
1920 
1921     serverApi = getServerApiFunc();
1922 }
1923 
reserveCookie(const void * cookie)1924 ENGINE_ERROR_CODE EventuallyPersistentEngine::reserveCookie(const void *cookie)
1925 {
1926     EventuallyPersistentEngine *epe =
1927                                     ObjectRegistry::onSwitchThread(NULL, true);
1928     ENGINE_ERROR_CODE rv = serverApi->cookie->reserve(cookie);
1929     ObjectRegistry::onSwitchThread(epe);
1930     return rv;
1931 }
1932 
releaseCookie(const void * cookie)1933 ENGINE_ERROR_CODE EventuallyPersistentEngine::releaseCookie(const void *cookie)
1934 {
1935     EventuallyPersistentEngine *epe =
1936                                     ObjectRegistry::onSwitchThread(NULL, true);
1937     ENGINE_ERROR_CODE rv = serverApi->cookie->release(cookie);
1938     ObjectRegistry::onSwitchThread(epe);
1939     return rv;
1940 }
1941 
storeEngineSpecific(const void * cookie,void * engine_data)1942 void EventuallyPersistentEngine::storeEngineSpecific(const void* cookie,
1943                                                      void* engine_data) {
1944     EventuallyPersistentEngine* epe =
1945             ObjectRegistry::onSwitchThread(NULL, true);
1946     serverApi->cookie->store_engine_specific(cookie, engine_data);
1947     ObjectRegistry::onSwitchThread(epe);
1948 }
1949 
getEngineSpecific(const void * cookie)1950 void* EventuallyPersistentEngine::getEngineSpecific(const void* cookie) {
1951     EventuallyPersistentEngine* epe =
1952             ObjectRegistry::onSwitchThread(NULL, true);
1953     void* engine_data = serverApi->cookie->get_engine_specific(cookie);
1954     ObjectRegistry::onSwitchThread(epe);
1955     return engine_data;
1956 }
1957 
isDatatypeSupported(const void * cookie,protocol_binary_datatype_t datatype)1958 bool EventuallyPersistentEngine::isDatatypeSupported(
1959         const void* cookie, protocol_binary_datatype_t datatype) {
1960     EventuallyPersistentEngine* epe =
1961             ObjectRegistry::onSwitchThread(NULL, true);
1962     bool isSupported =
1963             serverApi->cookie->is_datatype_supported(cookie, datatype);
1964     ObjectRegistry::onSwitchThread(epe);
1965     return isSupported;
1966 }
1967 
isMutationExtrasSupported(const void * cookie)1968 bool EventuallyPersistentEngine::isMutationExtrasSupported(const void* cookie) {
1969     EventuallyPersistentEngine* epe =
1970             ObjectRegistry::onSwitchThread(NULL, true);
1971     bool isSupported = serverApi->cookie->is_mutation_extras_supported(cookie);
1972     ObjectRegistry::onSwitchThread(epe);
1973     return isSupported;
1974 }
1975 
isXattrEnabled(const void * cookie)1976 bool EventuallyPersistentEngine::isXattrEnabled(const void* cookie) {
1977     return isDatatypeSupported(cookie, PROTOCOL_BINARY_DATATYPE_XATTR);
1978 }
1979 
isCollectionsSupported(const void * cookie)1980 bool EventuallyPersistentEngine::isCollectionsSupported(const void* cookie) {
1981     EventuallyPersistentEngine* epe =
1982             ObjectRegistry::onSwitchThread(NULL, true);
1983     bool isSupported = serverApi->cookie->is_collections_supported(cookie);
1984     ObjectRegistry::onSwitchThread(epe);
1985     return isSupported;
1986 }
1987 
getOpcodeIfEwouldblockSet(const void * cookie)1988 uint8_t EventuallyPersistentEngine::getOpcodeIfEwouldblockSet(
1989         const void* cookie) {
1990     EventuallyPersistentEngine* epe =
1991             ObjectRegistry::onSwitchThread(NULL, true);
1992     uint8_t opcode = serverApi->cookie->get_opcode_if_ewouldblock_set(cookie);
1993     ObjectRegistry::onSwitchThread(epe);
1994     return opcode;
1995 }
1996 
validateSessionCas(const uint64_t cas)1997 bool EventuallyPersistentEngine::validateSessionCas(const uint64_t cas) {
1998     EventuallyPersistentEngine* epe =
1999             ObjectRegistry::onSwitchThread(NULL, true);
2000     bool ret = serverApi->cookie->validate_session_cas(cas);
2001     ObjectRegistry::onSwitchThread(epe);
2002     return ret;
2003 }
2004 
decrementSessionCtr(void)2005 void EventuallyPersistentEngine::decrementSessionCtr(void) {
2006     EventuallyPersistentEngine* epe =
2007             ObjectRegistry::onSwitchThread(NULL, true);
2008     serverApi->cookie->decrement_session_ctr();
2009     ObjectRegistry::onSwitchThread(epe);
2010 }
2011 
registerEngineCallback(ENGINE_EVENT_TYPE type,EVENT_CALLBACK cb,const void * cb_data)2012 void EventuallyPersistentEngine::registerEngineCallback(ENGINE_EVENT_TYPE type,
2013                                                         EVENT_CALLBACK cb,
2014                                                         const void *cb_data) {
2015     EventuallyPersistentEngine *epe =
2016                                     ObjectRegistry::onSwitchThread(NULL, true);
2017     SERVER_CALLBACK_API *sapi = getServerApi()->callback;
2018     sapi->register_callback(reinterpret_cast<ENGINE_HANDLE*>(this),
2019                             type, cb, cb_data);
2020     ObjectRegistry::onSwitchThread(epe);
2021 }
2022 
setErrorContext(const void * cookie,cb::const_char_buffer message)2023 void EventuallyPersistentEngine::setErrorContext(
2024         const void* cookie, cb::const_char_buffer message) {
2025     EventuallyPersistentEngine* epe =
2026             ObjectRegistry::onSwitchThread(NULL, true);
2027     serverApi->cookie->set_error_context(const_cast<void*>(cookie), message);
2028     ObjectRegistry::onSwitchThread(epe);
2029 }
2030 
2031 template <typename T>
notifyIOComplete(T cookies,ENGINE_ERROR_CODE status)2032 void EventuallyPersistentEngine::notifyIOComplete(T cookies,
2033                                                   ENGINE_ERROR_CODE status) {
2034     EventuallyPersistentEngine* epe =
2035             ObjectRegistry::onSwitchThread(NULL, true);
2036     std::for_each(
2037             cookies.begin(),
2038             cookies.end(),
2039             std::bind2nd(std::ptr_fun((NOTIFY_IO_COMPLETE_T)serverApi->cookie
2040                                               ->notify_io_complete),
2041                          status));
2042     ObjectRegistry::onSwitchThread(epe);
2043 }
2044 
2045 /**
2046  * A configuration value changed listener that responds to ep-engine
2047  * parameter changes by invoking engine-specific methods on
2048  * configuration change events.
2049  */
2050 class EpEngineValueChangeListener : public ValueChangedListener {
2051 public:
EpEngineValueChangeListener(EventuallyPersistentEngine & e)2052     EpEngineValueChangeListener(EventuallyPersistentEngine &e) : engine(e) {
2053         // EMPTY
2054     }
2055 
sizeValueChanged(const std::string & key,size_t value)2056     virtual void sizeValueChanged(const std::string &key, size_t value) {
2057         if (key.compare("getl_max_timeout") == 0) {
2058             engine.setGetlMaxTimeout(value);
2059         } else if (key.compare("getl_default_timeout") == 0) {
2060             engine.setGetlDefaultTimeout(value);
2061         } else if (key.compare("max_item_size") == 0) {
2062             engine.setMaxItemSize(value);
2063         } else if (key.compare("max_item_privileged_bytes") == 0) {
2064             engine.setMaxItemPrivilegedBytes(value);
2065         }
2066     }
2067 
stringValueChanged(const std::string & key,const char * value)2068     virtual void stringValueChanged(const std::string& key, const char* value) {
2069         if (key == "compression_mode") {
2070             std::string value_str{value, strlen(value)};
2071             engine.setCompressionMode(value_str);
2072         }
2073     }
2074 
floatValueChanged(const std::string & key,float value)2075     virtual void floatValueChanged(const std::string& key, float value) {
2076         if (key == "min_compression_ratio") {
2077             engine.setMinCompressionRatio(value);
2078         }
2079     }
2080 
2081 private:
2082     EventuallyPersistentEngine &engine;
2083 };
2084 
initialize(const char * config)2085 ENGINE_ERROR_CODE EventuallyPersistentEngine::initialize(const char* config) {
2086     resetStats();
2087     if (config != nullptr) {
2088         if (!configuration.parseConfiguration(config, serverApi)) {
2089             LOG(EXTENSION_LOG_WARNING, "Failed to parse the configuration config "
2090                 "during bucket initialization.  config=%s", config);
2091             return ENGINE_FAILED;
2092         }
2093     }
2094 
2095     name = configuration.getCouchBucket();
2096 
2097     if (config != nullptr) {
2098         LOG(EXTENSION_LOG_NOTICE,
2099             R"(EPEngine::initialize: using configuration:"%s")",
2100             config);
2101     }
2102 
2103     maxFailoverEntries = configuration.getMaxFailoverEntries();
2104 
2105     // Start updating the variables from the config!
2106     VBucket::setMutationMemoryThreshold(
2107             configuration.getMutationMemThreshold());
2108 
2109     if (configuration.getMaxSize() == 0) {
2110         configuration.setMaxSize(std::numeric_limits<size_t>::max());
2111     }
2112 
2113     if (configuration.getMemLowWat() == std::numeric_limits<size_t>::max()) {
2114         stats.mem_low_wat_percent.store(0.75);
2115         configuration.setMemLowWat(percentOf(
2116                 configuration.getMaxSize(), stats.mem_low_wat_percent.load()));
2117     }
2118 
2119     if (configuration.getMemHighWat() == std::numeric_limits<size_t>::max()) {
2120         stats.mem_high_wat_percent.store(0.85);
2121         configuration.setMemHighWat(percentOf(
2122                 configuration.getMaxSize(), stats.mem_high_wat_percent.load()));
2123     }
2124 
2125 
2126     maxItemSize = configuration.getMaxItemSize();
2127     configuration.addValueChangedListener(
2128             "max_item_size",
2129             std::make_unique<EpEngineValueChangeListener>(*this));
2130 
2131     maxItemPrivilegedBytes = configuration.getMaxItemPrivilegedBytes();
2132     configuration.addValueChangedListener(
2133             "max_item_privileged_bytes",
2134             std::make_unique<EpEngineValueChangeListener>(*this));
2135 
2136     getlDefaultTimeout = configuration.getGetlDefaultTimeout();
2137     configuration.addValueChangedListener(
2138             "getl_default_timeout",
2139             std::make_unique<EpEngineValueChangeListener>(*this));
2140     getlMaxTimeout = configuration.getGetlMaxTimeout();
2141     configuration.addValueChangedListener(
2142             "getl_max_timeout",
2143             std::make_unique<EpEngineValueChangeListener>(*this));
2144 
2145     workload = new WorkLoadPolicy(configuration.getMaxNumWorkers(),
2146                                   configuration.getMaxNumShards());
2147     if ((unsigned int)workload->getNumShards() >
2148                                               configuration.getMaxVbuckets()) {
2149         LOG(EXTENSION_LOG_WARNING, "Invalid configuration: Shards must be "
2150             "equal or less than max number of vbuckets");
2151         return ENGINE_FAILED;
2152     }
2153 
2154     dcpConnMap_ = std::make_unique<DcpConnMap>(*this);
2155 
2156     /* Get the flow control policy */
2157     std::string flowCtlPolicy = configuration.getDcpFlowControlPolicy();
2158 
2159     if (!flowCtlPolicy.compare("static")) {
2160         dcpFlowControlManager_ =
2161                 std::make_unique<DcpFlowControlManagerStatic>(*this);
2162     } else if (!flowCtlPolicy.compare("dynamic")) {
2163         dcpFlowControlManager_ =
2164                 std::make_unique<DcpFlowControlManagerDynamic>(*this);
2165     } else if (!flowCtlPolicy.compare("aggressive")) {
2166         dcpFlowControlManager_ =
2167                 std::make_unique<DcpFlowControlManagerAggressive>(*this);
2168     } else {
2169         /* Flow control is not enabled */
2170         dcpFlowControlManager_ = std::make_unique<DcpFlowControlManager>(*this);
2171     }
2172 
2173     checkpointConfig = new CheckpointConfig(*this);
2174     CheckpointConfig::addConfigChangeListener(*this);
2175 
2176     kvBucket = makeBucket(configuration);
2177 
2178     initializeEngineCallbacks();
2179 
2180     // Complete the initialization of the ep-store
2181     if (!kvBucket->initialize()) {
2182         return ENGINE_FAILED;
2183     }
2184 
2185     if(configuration.isDataTrafficEnabled()) {
2186         enableTraffic(true);
2187     }
2188 
2189     dcpConnMap_->initialize();
2190 
2191     // record engine initialization time
2192     startupTime.store(ep_real_time());
2193 
2194     LOG(EXTENSION_LOG_NOTICE,
2195         "EP Engine: Initialization of %s bucket complete",
2196         configuration.getBucketType().c_str());
2197 
2198     setCompressionMode(configuration.getCompressionMode());
2199 
2200     configuration.addValueChangedListener(
2201             "compression_mode",
2202             std::make_unique<EpEngineValueChangeListener>(*this));
2203 
2204     setMinCompressionRatio(configuration.getMinCompressionRatio());
2205 
2206     configuration.addValueChangedListener(
2207             "min_compression_ratio",
2208             std::make_unique<EpEngineValueChangeListener>(*this));
2209 
2210     return ENGINE_SUCCESS;
2211 }
2212 
destroy(bool force)2213 void EventuallyPersistentEngine::destroy(bool force) {
2214     stats.forceShutdown = force;
2215     stats.isShutdown = true;
2216 
2217     // Perform a snapshot of the stats before shutting down so we can persist
2218     // the type of shutdown (stats.forceShutdown), and consequently on the
2219     // next warmup can determine is there was a clean shutdown - see
2220     // Warmup::cleanShutdown
2221     if (kvBucket) {
2222         kvBucket->snapshotStats();
2223     }
2224     if (dcpConnMap_) {
2225         dcpConnMap_->shutdownAllConnections();
2226     }
2227 }
2228 
2229 std::pair<cb::ExpiryLimit, rel_time_t>
getExpiryParameters(rel_time_t exptime) const2230 EventuallyPersistentEngine::getExpiryParameters(rel_time_t exptime) const {
2231     cb::ExpiryLimit expiryLimit;
2232     auto limit = kvBucket->getMaxTtl();
2233     if (limit.count()) {
2234         expiryLimit = limit;
2235         // If max_ttl is more than 30 days we need to convert it to absolute so
2236         // it makes sense as an expiry time.
2237         if (exptime == 0) {
2238             if (limit.count() > (60 * 60 * 24 * 30)) {
2239                 exptime = ep_abs_time(limit.count());
2240             } else {
2241                 exptime = limit.count();
2242             }
2243         }
2244     }
2245 
2246     return {expiryLimit, exptime};
2247 }
2248 
processExpiryTime(time_t in) const2249 time_t EventuallyPersistentEngine::processExpiryTime(time_t in) const {
2250     auto limit = kvBucket->getMaxTtl();
2251     time_t out = in;
2252     if (limit.count()) {
2253         auto currentTime = ep_real_time();
2254         if (in == 0 || in > (currentTime + limit.count())) {
2255             // must expire in now + MaxTTL seconds
2256             out = currentTime + limit.count();
2257         }
2258     }
2259 
2260     return out;
2261 }
2262 
itemAllocate(item ** itm,const DocKey & key,const size_t nbytes,const size_t priv_nbytes,const int flags,rel_time_t exptime,uint8_t datatype,uint16_t vbucket)2263 ENGINE_ERROR_CODE EventuallyPersistentEngine::itemAllocate(
2264         item** itm,
2265         const DocKey& key,
2266         const size_t nbytes,
2267         const size_t priv_nbytes,
2268         const int flags,
2269         rel_time_t exptime,
2270         uint8_t datatype,
2271         uint16_t vbucket) {
2272     if (priv_nbytes > maxItemPrivilegedBytes) {
2273         return ENGINE_E2BIG;
2274     }
2275 
2276     if ((nbytes - priv_nbytes) > maxItemSize) {
2277         return ENGINE_E2BIG;
2278     }
2279 
2280     if (!hasMemoryForItemAllocation(sizeof(Item) + sizeof(Blob) + key.size() +
2281                                     nbytes)) {
2282         return memoryCondition();
2283     }
2284 
2285     cb::ExpiryLimit expiryLimit;
2286     std::tie(expiryLimit, exptime) = getExpiryParameters(exptime);
2287     time_t expiretime =
2288             (exptime == 0) ? 0 : ep_abs_time(ep_reltime(exptime, expiryLimit));
2289 
2290     *itm = new Item(key,
2291                     flags,
2292                     expiretime,
2293                     nullptr,
2294                     nbytes,
2295                     datatype,
2296                     0 /*cas*/,
2297                     -1 /*seq*/,
2298                     vbucket);
2299     if (*itm == NULL) {
2300         return memoryCondition();
2301     } else {
2302         stats.itemAllocSizeHisto.add(nbytes);
2303         return ENGINE_SUCCESS;
2304     }
2305 }
2306 
itemDelete(const void * cookie,const DocKey & key,uint64_t & cas,uint16_t vbucket,ItemMetaData * item_meta,mutation_descr_t & mut_info)2307 ENGINE_ERROR_CODE EventuallyPersistentEngine::itemDelete(
2308         const void* cookie,
2309         const DocKey& key,
2310         uint64_t& cas,
2311         uint16_t vbucket,
2312         ItemMetaData* item_meta,
2313         mutation_descr_t& mut_info) {
2314     ENGINE_ERROR_CODE ret = kvBucket->deleteItem(key,
2315                                                  cas,
2316                                                  vbucket,
2317                                                  cookie,
2318                                                  item_meta,
2319                                                  mut_info);
2320 
2321     if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
2322         if (isDegradedMode()) {
2323             return ENGINE_TMPFAIL;
2324         }
2325     } else if (ret == ENGINE_SUCCESS) {
2326         ++stats.numOpsDelete;
2327     }
2328     return ret;
2329 }
2330 
itemRelease(item * itm)2331 void EventuallyPersistentEngine::itemRelease(item* itm) {
2332     delete reinterpret_cast<Item*>(itm);
2333 }
2334 
get(const void * cookie,item ** itm,const DocKey & key,uint16_t vbucket,get_options_t options)2335 ENGINE_ERROR_CODE EventuallyPersistentEngine::get(const void* cookie,
2336                                                   item** itm,
2337                                                   const DocKey& key,
2338                                                   uint16_t vbucket,
2339                                                   get_options_t options) {
2340     ScopeTimer2<MicrosecondStopwatch, TracerStopwatch> timer(
2341             MicrosecondStopwatch(stats.getCmdHisto),
2342             TracerStopwatch(cookie, cb::tracing::TraceCode::GET));
2343 
2344     GetValue gv(kvBucket->get(key, vbucket, cookie, options));
2345     ENGINE_ERROR_CODE ret = gv.getStatus();
2346 
2347     if (ret == ENGINE_SUCCESS) {
2348         *itm = gv.item.release();
2349         if (options & TRACK_STATISTICS) {
2350             ++stats.numOpsGet;
2351         }
2352     } else if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
2353         if (isDegradedMode()) {
2354             return ENGINE_TMPFAIL;
2355         }
2356     }
2357 
2358     return ret;
2359 }
2360 
flush(const void * cookie)2361 ENGINE_ERROR_CODE EventuallyPersistentEngine::flush(const void *cookie){
2362     return ENGINE_ENOTSUP;
2363 }
2364 
get_and_touch(const void * cookie,const DocKey & key,uint16_t vbucket,uint32_t exptime)2365 cb::EngineErrorItemPair EventuallyPersistentEngine::get_and_touch(const void* cookie,
2366                                                            const DocKey& key,
2367                                                            uint16_t vbucket,
2368                                                            uint32_t exptime) {
2369     auto* handle = reinterpret_cast<ENGINE_HANDLE*>(this);
2370 
2371     cb::ExpiryLimit expiryLimit;
2372     std::tie(expiryLimit, exptime) = getExpiryParameters(exptime);
2373 
2374     time_t expiry_time =
2375             (exptime == 0) ? 0 : ep_abs_time(ep_reltime(exptime, expiryLimit));
2376 
2377     GetValue gv(kvBucket->getAndUpdateTtl(key, vbucket, cookie, expiry_time));
2378 
2379     auto rv = gv.getStatus();
2380     if (rv == ENGINE_SUCCESS) {
2381         ++stats.numOpsGet;
2382         ++stats.numOpsStore;
2383         return cb::makeEngineErrorItemPair(
2384                 cb::engine_errc::success, gv.item.release(), handle);
2385     }
2386 
2387     if (isDegradedMode()) {
2388         // Remap all some of the error codes
2389         switch (rv) {
2390         case ENGINE_KEY_EEXISTS:
2391         case ENGINE_KEY_ENOENT:
2392         case ENGINE_NOT_MY_VBUCKET:
2393             rv = ENGINE_TMPFAIL;
2394             break;
2395         default:
2396             break;
2397         }
2398     }
2399 
2400     if (rv == ENGINE_KEY_EEXISTS) {
2401         rv = ENGINE_LOCKED;
2402     }
2403 
2404     return cb::makeEngineErrorItemPair(cb::engine_errc(rv));
2405 }
2406 
get_if(const void * cookie,const DocKey & key,uint16_t vbucket,std::function<bool (const item_info &)> filter)2407 cb::EngineErrorItemPair EventuallyPersistentEngine::get_if(const void* cookie,
2408                                                        const DocKey& key,
2409                                                        uint16_t vbucket,
2410                                                        std::function<bool(const item_info&)>filter) {
2411     auto* handle = reinterpret_cast<ENGINE_HANDLE*>(this);
2412 
2413     ScopeTimer2<MicrosecondStopwatch, TracerStopwatch> timer(
2414             MicrosecondStopwatch(stats.getCmdHisto),
2415             TracerStopwatch(cookie, cb::tracing::TraceCode::GETIF));
2416 
2417     // Fetch an item from the hashtable (without trying to schedule a bg-fetch
2418     // and pass it through the filter. If the filter accepts the document
2419     // based on the metadata, return the document. If the document's data
2420     // isn't resident we run another iteration in the loop and retries the
2421     // action but this time we _do_ schedule a bg-fetch.
2422     for (int ii = 0; ii < 2; ++ii) {
2423         auto options = static_cast<get_options_t>(HONOR_STATES |
2424                                                   DELETE_TEMP |
2425                                                   HIDE_LOCKED_CAS);
2426 
2427         // For the first pass, if we need to do a BGfetch, only fetch metadata
2428         // (no point in fetching the whole document if the filter doesn't want
2429         // it).
2430         if (ii == 0) {
2431             options = static_cast<get_options_t>(int(options) | ALLOW_META_ONLY);
2432         }
2433 
2434         // For second pass, or if full eviction, we'll need to issue a BG fetch.
2435         if (ii == 1 || kvBucket->getItemEvictionPolicy() == FULL_EVICTION) {
2436             options = static_cast<get_options_t>(int(options) | QUEUE_BG_FETCH);
2437         }
2438 
2439         GetValue gv(kvBucket->get(key, vbucket, cookie, options));
2440         ENGINE_ERROR_CODE status = gv.getStatus();
2441 
2442         switch (status) {
2443         case ENGINE_SUCCESS:
2444             break;
2445 
2446         case ENGINE_KEY_ENOENT: // FALLTHROUGH
2447         case ENGINE_NOT_MY_VBUCKET: // FALLTHROUGH
2448             if (isDegradedMode()) {
2449                 status = ENGINE_TMPFAIL;
2450             }
2451             // FALLTHROUGH
2452         default:
2453             return cb::makeEngineErrorItemPair(cb::engine_errc(status));
2454         }
2455 
2456         const VBucketPtr vb = getKVBucket()->getVBucket(vbucket);
2457         uint64_t vb_uuid = 0;
2458         int64_t hlcEpoch = HlcCasSeqnoUninitialised;
2459         if (vb) {
2460             vb_uuid = vb->failovers->getLatestUUID();
2461             hlcEpoch = vb->getHLCEpochSeqno();
2462         }
2463         // Apply filter; the item value isn't guaranteed to be present
2464         // (meta only) so remove it to prevent people accidentally trying to
2465         // test it.
2466         auto info = gv.item->toItemInfo(vb_uuid, hlcEpoch);
2467         info.value[0].iov_base = nullptr;
2468         info.value[0].iov_len = 0;
2469         if (filter(info)) {
2470             if (!gv.isPartial()) {
2471                 return cb::makeEngineErrorItemPair(
2472                         cb::engine_errc::success, gv.item.release(), handle);
2473             }
2474             // We want this item, but we need to fetch it off disk
2475         } else {
2476             // the client don't care about this thing..
2477             return cb::makeEngineErrorItemPair(cb::engine_errc::success);
2478         }
2479     }
2480 
2481     // It should not be possible to get as the second iteration in the loop
2482     // SHOULD handle backround fetches an the item should NOT be partial!
2483     throw std::logic_error("EventuallyPersistentEngine::get_if: loop terminated");
2484 }
2485 
get_locked(const void * cookie,item ** itm,const DocKey & key,uint16_t vbucket,uint32_t lock_timeout)2486 ENGINE_ERROR_CODE EventuallyPersistentEngine::get_locked(const void* cookie,
2487                                                          item** itm,
2488                                                          const DocKey& key,
2489                                                          uint16_t vbucket,
2490                                                          uint32_t lock_timeout) {
2491     auto default_timeout = static_cast<uint32_t>(getGetlDefaultTimeout());
2492 
2493     if (lock_timeout == 0) {
2494         lock_timeout = default_timeout;
2495     } else if (lock_timeout > static_cast<uint32_t>(getGetlMaxTimeout())) {
2496         LOG(EXTENSION_LOG_WARNING,
2497             "EventuallyPersistentEngine::get_locked: "
2498             "Illegal value for lock timeout specified %u. "
2499             "Using default value: %u", lock_timeout, default_timeout);
2500         lock_timeout = default_timeout;
2501     }
2502 
2503     auto result = kvBucket->getLocked(key, vbucket, ep_current_time(),
2504                                       lock_timeout, cookie);
2505 
2506     if (result.getStatus() == ENGINE_SUCCESS) {
2507         ++stats.numOpsGet;
2508         *itm = result.item.release();
2509     }
2510 
2511     return result.getStatus();
2512 }
2513 
unlock(const void * cookie,const DocKey & key,uint16_t vbucket,uint64_t cas)2514 ENGINE_ERROR_CODE EventuallyPersistentEngine::unlock(const void* cookie,
2515                                                      const DocKey& key,
2516                                                      uint16_t vbucket,
2517                                                      uint64_t cas) {
2518     return kvBucket->unlockKey(key, vbucket, cas, ep_current_time());
2519 }
2520 
store_if(const void * cookie,Item & item,uint64_t cas,ENGINE_STORE_OPERATION operation,cb::StoreIfPredicate predicate)2521 cb::EngineErrorCasPair EventuallyPersistentEngine::store_if(
2522         const void* cookie,
2523         Item& item,
2524         uint64_t cas,
2525         ENGINE_STORE_OPERATION operation,
2526         cb::StoreIfPredicate predicate) {
2527     ScopeTimer2<MicrosecondStopwatch, TracerStopwatch> timer(
2528             MicrosecondStopwatch(stats.storeCmdHisto),
2529             TracerStopwatch(cookie, cb::tracing::TraceCode::STORE));
2530 
2531     ENGINE_ERROR_CODE status;
2532     switch (operation) {
2533     case OPERATION_CAS:
2534         if (item.getCas() == 0) {
2535             // Using a cas command with a cas wildcard doesn't make sense
2536             status = ENGINE_NOT_STORED;
2537             break;
2538         }
2539     // FALLTHROUGH
2540     case OPERATION_SET:
2541         if (isDegradedMode()) {
2542             return {cb::engine_errc::temporary_failure, cas};
2543         }
2544         status = kvBucket->set(item, cookie, predicate);
2545         break;
2546 
2547     case OPERATION_ADD:
2548         if (isDegradedMode()) {
2549             return {cb::engine_errc::temporary_failure, cas};
2550         }
2551 
2552         if (item.getCas() != 0) {
2553             // Adding an item with a cas value doesn't really make sense...
2554             return {cb::engine_errc::key_already_exists, cas};
2555         }
2556 
2557         status = kvBucket->add(item, cookie);
2558         break;
2559 
2560     case OPERATION_REPLACE:
2561         status = kvBucket->replace(item, cookie, predicate);
2562         break;
2563     default:
2564         status = ENGINE_ENOTSUP;
2565     }
2566 
2567     switch (status) {
2568     case ENGINE_SUCCESS:
2569         ++stats.numOpsStore;
2570         // If success - check if we're now in need of some memory freeing
2571         kvBucket->checkAndMaybeFreeMemory();
2572         break;
2573     case ENGINE_ENOMEM:
2574         status = memoryCondition();
2575         break;
2576     case ENGINE_NOT_STORED:
2577     case ENGINE_NOT_MY_VBUCKET:
2578         if (isDegradedMode()) {
2579             return {cb::engine_errc::temporary_failure, cas};
2580         }
2581         break;
2582     default:
2583         break;
2584     }
2585 
2586     return {cb::engine_errc(status), item.getCas()};
2587 }
2588 
store(const void * cookie,item * itm,uint64_t & cas,ENGINE_STORE_OPERATION operation)2589 ENGINE_ERROR_CODE EventuallyPersistentEngine::store(
2590         const void* cookie,
2591         item* itm,
2592         uint64_t& cas,
2593         ENGINE_STORE_OPERATION operation) {
2594     Item& item = static_cast<Item&>(*static_cast<Item*>(itm));
2595     auto rv = store_if(cookie, item, cas, operation, {});
2596     cas = rv.cas;
2597     return ENGINE_ERROR_CODE(rv.status);
2598 }
2599 
initializeEngineCallbacks()2600 void EventuallyPersistentEngine::initializeEngineCallbacks() {
2601     // Register the ON_DISCONNECT callback
2602     registerEngineCallback(ON_DISCONNECT, EvpHandleDisconnect, this);
2603     // Register the ON_DELETE_BUCKET callback
2604     registerEngineCallback(ON_DELETE_BUCKET, EvpHandleDeleteBucket, this);
2605 }
2606 
memoryCondition()2607 ENGINE_ERROR_CODE EventuallyPersistentEngine::memoryCondition() {
2608     // Do we think it's possible we could free something?
2609     bool haveEvidenceWeCanFreeMemory =
2610             (stats.getMaxDataSize() > stats.getMemOverhead());
2611     if (haveEvidenceWeCanFreeMemory) {
2612         // Look for more evidence by seeing if we have resident items.
2613         VBucketCountVisitor countVisitor(vbucket_state_active);
2614         kvBucket->visit(countVisitor);
2615 
2616         haveEvidenceWeCanFreeMemory = countVisitor.getNonResident() <
2617             countVisitor.getNumItems();
2618     }
2619     if (haveEvidenceWeCanFreeMemory) {
2620         ++stats.tmp_oom_errors;
2621         // Wake up the item pager task as memory usage
2622         // seems to have exceeded high water mark
2623         getKVBucket()->attemptToFreeMemory();
2624         return ENGINE_TMPFAIL;
2625     } else {
2626         if (getKVBucket()->getItemEvictionPolicy() == FULL_EVICTION) {
2627             ++stats.tmp_oom_errors;
2628             getKVBucket()->wakeUpCheckpointRemover();
2629             return ENGINE_TMPFAIL;
2630         }
2631 
2632         ++stats.oom_errors;
2633         return ENGINE_ENOMEM;
2634     }
2635 }
2636 
hasMemoryForItemAllocation(uint32_t totalItemSize)2637 bool EventuallyPersistentEngine::hasMemoryForItemAllocation(
2638         uint32_t totalItemSize) {
2639     return (stats.getEstimatedTotalMemoryUsed() + totalItemSize) <=
2640            stats.getMaxDataSize();
2641 }
2642 
enableTraffic(bool enable)2643 bool EventuallyPersistentEngine::enableTraffic(bool enable) {
2644     bool inverse = !enable;
2645     return trafficEnabled.compare_exchange_strong(inverse, enable);
2646 }
2647 
doEngineStats(const void * cookie,ADD_STAT add_stat)2648 ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
2649                                                            ADD_STAT add_stat) {
2650 
2651     configuration.addStats(add_stat, cookie);
2652 
2653     EPStats &epstats = getEpStats();
2654     add_casted_stat("ep_storage_age",
2655                     epstats.dirtyAge, add_stat, cookie);
2656     add_casted_stat("ep_storage_age_highwat",
2657                     epstats.