1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 /*
19  *                "ewouldblock_engine"
20  *
21  * The "ewouldblock_engine" allows one to test how memcached responds when the
22  * engine returns EWOULDBLOCK instead of the correct response.
23  *
24  * Motivation:
25  *
26  * The EWOULDBLOCK response code can be returned from a number of engine
27  * functions, and is used to indicate that the request could not be immediately
28  * fulfilled, and it "would block" if it tried to. The correct way for
29  * memcached to handle this (in general) is to suspend that request until it
30  * is later notified by the engine (via notify_io_complete()).
31  *
32  * However, engines typically return the correct response to requests
33  * immediately, only rarely (and from memcached's POV non-deterministically)
34  * returning EWOULDBLOCK. This makes testing of the code-paths handling
35  * EWOULDBLOCK tricky.
36  *
37  *
38  * Operation:
39  * This engine, when loaded by memcached proxies requests to a "real" engine.
40  * Depending on how it is configured, it can simply pass the request on to the
41  * real engine, or artificially return EWOULDBLOCK back to memcached.
42  *
43  * See the 'Modes' enum below for the possible modes for a connection. The mode
44  * can be selected by sending a `request_ewouldblock_ctl` command
45  *  (opcode PROTOCOL_BINARY_CMD_EWOULDBLOCK_CTL).
46  *
47  * DCP:
48  *    There is a special DCP stream named "ewb_internal" which is an
49  *    endless stream of items. You may also add a number at the end
50  *    e.g. "ewb_internal:10" and it'll create a stream with 10 entries.
51  *    It will always send the same K-V pair.
52  *    Note that we don't register for disconnect events so you might
53  *    experience weirdness if you first try to use the internal dcp
54  *    stream, and then later on want to use the one provided by the
55  *    engine. The workaround for that is to delete the bucket
56  *    in between ;-) (put them in separate test suites and it'll all
57  *    be handled for you.
58  *
59  *    Any other stream name results in proxying the dcp request to
60  *    the underlying engine's DCP implementation.
61  *
62  */
63 
64 #include "ewouldblock_engine.h"
65 
66 #include <atomic>
67 #include <condition_variable>
68 #include <cstring>
69 #include <gsl/gsl>
70 #include <iostream>
71 #include <map>
72 #include <memory>
73 #include <mutex>
74 #include <queue>
75 #include <random>
76 #include <sstream>
77 #include <string>
78 #include <utility>
79 
80 #include <logger/logger.h>
81 #include <memcached/engine.h>
82 #include <memcached/extension.h>
83 #include <platform/cb_malloc.h>
84 #include <platform/dirutils.h>
85 #include <platform/thread.h>
86 #include <xattr/blob.h>
87 
88 #include "utilities/engine_loader.h"
89 
90 /* Public API declaration ****************************************************/
91 
92 extern "C" {
93     MEMCACHED_PUBLIC_API
94     ENGINE_ERROR_CODE create_instance(uint64_t interface, GET_SERVER_API gsa,
95                                       ENGINE_HANDLE **handle);
96 
97     MEMCACHED_PUBLIC_API
98     void destroy_engine(void);
99 }
100 
101 
102 class EWB_Engine;
103 
104 // Mapping from wrapped handle to EWB handles.
105 static std::map<ENGINE_HANDLE*, EWB_Engine*> engine_map;
106 
107 class NotificationThread : public Couchbase::Thread {
108 public:
NotificationThread(EWB_Engine& engine_)109     NotificationThread(EWB_Engine& engine_)
110         : Thread("ewb:pendingQ"),
111           engine(engine_) {}
112 
113 protected:
114     void run() override;
115 
116 protected:
117     EWB_Engine& engine;
118 };
119 
120 /**
121  * The BlockMonitorThread represents the thread that is
122  * monitoring the "lock" file. Once the file is no longer
123  * there it will resume the client specified with the given
124  * id.
125  */
126 class BlockMonitorThread : public Couchbase::Thread {
127 public:
BlockMonitorThread(EWB_Engine& engine_, uint32_t id_, const std::string file_)128     BlockMonitorThread(EWB_Engine& engine_,
129                        uint32_t id_,
130                        const std::string file_)
131         : Thread("ewb:BlockMon"),
132           engine(engine_),
133           id(id_),
134           file(file_) {}
135 
136     /**
137      * Wait for the underlying thread to reach the zombie state
138      * (== terminated, but not reaped)
139      */
~BlockMonitorThread()140     ~BlockMonitorThread() {
141         waitForState(Couchbase::ThreadState::Zombie);
142     }
143 
144 protected:
145     void run() override;
146 
147 private:
148     EWB_Engine& engine;
149     const uint32_t id;
150     const std::string file;
151 };
152 
153 static void register_callback(ENGINE_HANDLE *, ENGINE_EVENT_TYPE,
154                               EVENT_CALLBACK, const void *);
155 
156 static SERVER_HANDLE_V1 wrapped_api;
157 static SERVER_HANDLE_V1 *real_api;
init_wrapped_api(GET_SERVER_API fn)158 static void init_wrapped_api(GET_SERVER_API fn) {
159     static bool init = false;
160     if (init) {
161         return;
162     }
163 
164     init = true;
165     real_api = fn();
166     wrapped_api = *real_api;
167 
168     // Overrides
169     static SERVER_CALLBACK_API callback = *wrapped_api.callback;
170     callback.register_callback = register_callback;
171     wrapped_api.callback = &callback;
172 }
173 
get_wrapped_gsa()174 static SERVER_HANDLE_V1 *get_wrapped_gsa() {
175     return &wrapped_api;
176 }
177 
178 /** ewouldblock_engine class */
179 class EWB_Engine : public ENGINE_HANDLE_V1 {
180 
181 private:
182     enum class Cmd {
183         NONE,
184         GET_INFO,
185         ALLOCATE,
186         REMOVE,
187         GET,
188         STORE,
189         CAS,
190         ARITHMETIC,
191         LOCK,
192         UNLOCK,
193         FLUSH,
194         GET_STATS,
195         GET_META,
196         UNKNOWN_COMMAND
197     };
198 
199     const char* to_string(Cmd cmd);
200 
201     uint64_t (*get_connection_id)(gsl::not_null<const void*> cookie);
202 
203 public:
204     EWB_Engine(GET_SERVER_API gsa_);
205 
206     ~EWB_Engine();
207 
208     // Convert from a handle back to the read object.
to_engine(ENGINE_HANDLE* handle)209     static EWB_Engine* to_engine(ENGINE_HANDLE* handle) {
210         return reinterpret_cast<EWB_Engine*> (handle);
211     }
212 
213     /* Returns true if the next command should have a fake error code injected.
214      * @param func Address of the command function (get, store, etc).
215      * @param cookie The cookie for the user's request.
216      * @param[out] Error code to return.
217      */
should_inject_error(Cmd cmd, const void* cookie, ENGINE_ERROR_CODE& err)218     bool should_inject_error(Cmd cmd, const void* cookie,
219                              ENGINE_ERROR_CODE& err) {
220 
221         if (is_connection_suspended(cookie)) {
222             err = ENGINE_EWOULDBLOCK;
223             return true;
224         }
225 
226         uint64_t id = get_connection_id(cookie);
227 
228         std::lock_guard<std::mutex> guard(cookie_map_mutex);
229 
230         auto iter = connection_map.find(id);
231         if (iter == connection_map.end()) {
232             return false;
233         }
234 
235         if (iter->second.first != cookie) {
236             // The cookie is different so it represents a different command
237             connection_map.erase(iter);
238             return false;
239         }
240 
241         const bool inject = iter->second.second->should_inject_error(cmd, err);
242         const bool add_to_pending_io_ops = iter->second.second->add_to_pending_io_ops();
243 
244         if (inject) {
245             LOG_DEBUG("EWB_Engine: injecting error:{} for cmd:{}",
246                       err,
247                       to_string(cmd));
248 
249             if (err == ENGINE_EWOULDBLOCK && add_to_pending_io_ops) {
250                 // The server expects that if EWOULDBLOCK is returned then the
251                 // server should be notified in the future when the operation is
252                 // ready - so add this op to the pending IO queue.
253                 schedule_notification(iter->second.first);
254             }
255         }
256 
257         return inject;
258     }
259 
260     /* Implementation of all the engine functions. ***************************/
261 
initialize(gsl::not_null<ENGINE_HANDLE*> handle, const char* config_str)262     static ENGINE_ERROR_CODE initialize(gsl::not_null<ENGINE_HANDLE*> handle,
263                                         const char* config_str) {
264         EWB_Engine* ewb = to_engine(handle);
265 
266         // Extract the name of the real engine we will be proxying; then
267         // create and initialize it.
268         std::string config(config_str);
269         auto seperator = config.find(";");
270         std::string real_engine_name(config.substr(0, seperator));
271         std::string real_engine_config;
272         if (seperator != std::string::npos) {
273             real_engine_config = config.substr(seperator + 1);
274         }
275 
276         if ((ewb->real_engine_ref = load_engine(
277                      real_engine_name.c_str(), NULL, NULL)) == NULL) {
278             LOG_CRITICAL(
279                     "ERROR: EWB_Engine::initialize(): Failed to load real "
280                     "engine '{}'",
281                     real_engine_name);
282             abort();
283         }
284 
285         if (!create_engine_instance(
286                     ewb->real_engine_ref, get_wrapped_gsa, &ewb->real_handle)) {
287             LOG_CRITICAL(
288                     "ERROR: EWB_Engine::initialize(): Failed create "
289                     "engine instance '{}'",
290                     real_engine_name);
291             abort();
292         }
293 
294         if (ewb->real_handle->interface != 1) {
295             LOG_CRITICAL(
296                     "ERROR: EWB_Engine::initialize(): Only support engine "
297                     "with interface v1 - got v{}.",
298                     ewb->real_engine->interface.interface);
299             abort();
300         }
301         ewb->real_engine =
302                 reinterpret_cast<ENGINE_HANDLE_V1*>(ewb->real_handle);
303 
304 
305         engine_map[ewb->real_handle] = ewb;
306         ENGINE_ERROR_CODE res = ewb->real_engine->initialize(
307                 ewb->real_handle, real_engine_config.c_str());
308 
309         if (res == ENGINE_SUCCESS) {
310             // For engine interface functions which cannot return EWOULDBLOCK,
311             // and we otherwise don't want to interpose, we can simply use the
312             // real_engine's functions directly.
313             ewb->ENGINE_HANDLE_V1::item_set_cas = ewb->real_engine->item_set_cas;
314             ewb->ENGINE_HANDLE_V1::set_item_info = ewb->real_engine->set_item_info;
315             ewb->ENGINE_HANDLE_V1::item_set_datatype =
316                     ewb->real_engine->item_set_datatype;
317         }
318 
319         // Register a callback on DISCONNECT events, so we can delete
320         // any stale elements from connection_map when a connection
321         // DC's.
322         real_api->callback->register_callback(
323                 reinterpret_cast<ENGINE_HANDLE*>(ewb),
324                 ON_DISCONNECT,
325                 handle_disconnect,
326                 reinterpret_cast<ENGINE_HANDLE*>(ewb));
327 
328         return res;
329     }
330 
destroy(gsl::not_null<ENGINE_HANDLE*> handle, const bool force)331     static void destroy(gsl::not_null<ENGINE_HANDLE*> handle,
332                         const bool force) {
333         EWB_Engine* ewb = to_engine(handle);
334         ewb->real_engine->destroy(ewb->real_handle, force);
335         delete ewb;
336     }
337 
allocate( gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, const DocKey& key, const size_t nbytes, const int flags, const rel_time_t exptime, uint8_t datatype, uint16_t vbucket)338     static cb::EngineErrorItemPair allocate(
339             gsl::not_null<ENGINE_HANDLE*> handle,
340             gsl::not_null<const void*> cookie,
341             const DocKey& key,
342             const size_t nbytes,
343             const int flags,
344             const rel_time_t exptime,
345             uint8_t datatype,
346             uint16_t vbucket) {
347         EWB_Engine* ewb = to_engine(handle);
348         ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
349         if (ewb->should_inject_error(Cmd::ALLOCATE, cookie, err)) {
350             return cb::makeEngineErrorItemPair(cb::engine_errc(err));
351         } else {
352             return ewb->real_engine->allocate(ewb->real_handle,
353                                               cookie,
354                                               key,
355                                               nbytes,
356                                               flags,
357                                               exptime,
358                                               datatype,
359                                               vbucket);
360         }
361     }
362 
allocate_ex( gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, const DocKey& key, size_t nbytes, size_t priv_nbytes, int flags, rel_time_t exptime, uint8_t datatype, uint16_t vbucket)363     static std::pair<cb::unique_item_ptr, item_info> allocate_ex(
364             gsl::not_null<ENGINE_HANDLE*> handle,
365             gsl::not_null<const void*> cookie,
366             const DocKey& key,
367             size_t nbytes,
368             size_t priv_nbytes,
369             int flags,
370             rel_time_t exptime,
371             uint8_t datatype,
372             uint16_t vbucket) {
373         EWB_Engine* ewb = to_engine(handle);
374         ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
375         if (ewb->should_inject_error(Cmd::ALLOCATE, cookie, err)) {
376             throw cb::engine_error(cb::engine_errc(err), "ewb: injecting error");
377         } else {
378             return ewb->real_engine->allocate_ex(ewb->real_handle, cookie,
379                                                  key, nbytes, priv_nbytes,
380                                                  flags, exptime, datatype,
381                                                  vbucket);
382         }
383     }
384 
remove(gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, const DocKey& key, uint64_t& cas, uint16_t vbucket, mutation_descr_t& mut_info)385     static ENGINE_ERROR_CODE remove(gsl::not_null<ENGINE_HANDLE*> handle,
386                                     gsl::not_null<const void*> cookie,
387                                     const DocKey& key,
388                                     uint64_t& cas,
389                                     uint16_t vbucket,
390                                     mutation_descr_t& mut_info) {
391         EWB_Engine* ewb = to_engine(handle);
392         ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
393         if (ewb->should_inject_error(Cmd::REMOVE, cookie, err)) {
394             return err;
395         } else {
396             return ewb->real_engine->remove(ewb->real_handle, cookie, key, cas,
397                                             vbucket, mut_info);
398         }
399     }
400 
release(gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<item*> item)401     static void release(gsl::not_null<ENGINE_HANDLE*> handle,
402                         gsl::not_null<item*> item) {
403         EWB_Engine* ewb = to_engine(handle);
404         LOG_DEBUG("EWB_Engine: release");
405 
406         if (item == &ewb->dcp_mutation_item) {
407             // Ignore the DCP mutation, we own it (and don't track
408             // refcounts on it).
409         } else {
410             return ewb->real_engine->release(ewb->real_handle, item);
411         }
412     }
413 
get(gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, const DocKey& key, uint16_t vbucket, DocStateFilter documentStateFilter)414     static cb::EngineErrorItemPair get(gsl::not_null<ENGINE_HANDLE*> handle,
415                                        gsl::not_null<const void*> cookie,
416                                        const DocKey& key,
417                                        uint16_t vbucket,
418                                        DocStateFilter documentStateFilter) {
419         EWB_Engine* ewb = to_engine(handle);
420         ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
421         if (ewb->should_inject_error(Cmd::GET, cookie, err)) {
422             return std::make_pair(
423                     cb::engine_errc(err),
424                     cb::unique_item_ptr{nullptr, cb::ItemDeleter{handle}});
425         } else {
426             return ewb->real_engine->get(ewb->real_handle,
427                                          cookie,
428                                          key,
429                                          vbucket,
430                                          documentStateFilter);
431         }
432     }
433 
get_if( gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, const DocKey& key, uint16_t vbucket, std::function<bool(const item_info&)> filter)434     static cb::EngineErrorItemPair get_if(
435             gsl::not_null<ENGINE_HANDLE*> handle,
436             gsl::not_null<const void*> cookie,
437             const DocKey& key,
438             uint16_t vbucket,
439             std::function<bool(const item_info&)> filter) {
440         EWB_Engine* ewb = to_engine(handle);
441         ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
442         if (ewb->should_inject_error(Cmd::GET, cookie, err)) {
443             return cb::makeEngineErrorItemPair(cb::engine_errc::would_block);
444         } else {
445             return ewb->real_engine->get_if(
446                     ewb->real_handle, cookie, key, vbucket, filter);
447         }
448     }
449 
get_and_touch( gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, const DocKey& key, uint16_t vbucket, uint32_t exptime)450     static cb::EngineErrorItemPair get_and_touch(
451             gsl::not_null<ENGINE_HANDLE*> handle,
452             gsl::not_null<const void*> cookie,
453             const DocKey& key,
454             uint16_t vbucket,
455             uint32_t exptime) {
456         EWB_Engine* ewb = to_engine(handle);
457         ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
458         if (ewb->should_inject_error(Cmd::GET, cookie, err)) {
459             return cb::makeEngineErrorItemPair(cb::engine_errc::would_block);
460         } else {
461             return ewb->real_engine->get_and_touch(
462                     ewb->real_handle, cookie, key, vbucket, exptime);
463         }
464     }
465 
get_locked( gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, const DocKey& key, uint16_t vbucket, uint32_t lock_timeout)466     static cb::EngineErrorItemPair get_locked(
467             gsl::not_null<ENGINE_HANDLE*> handle,
468             gsl::not_null<const void*> cookie,
469             const DocKey& key,
470             uint16_t vbucket,
471             uint32_t lock_timeout) {
472         EWB_Engine* ewb = to_engine(handle);
473         ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
474         if (ewb->should_inject_error(Cmd::LOCK, cookie, err)) {
475             return cb::makeEngineErrorItemPair(cb::engine_errc(err));
476         } else {
477             return ewb->real_engine->get_locked(
478                     ewb->real_handle, cookie, key, vbucket, lock_timeout);
479         }
480     }
481 
get_meta( gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, const DocKey& key, uint16_t vbucket)482     static cb::EngineErrorMetadataPair get_meta(
483             gsl::not_null<ENGINE_HANDLE*> handle,
484             gsl::not_null<const void*> cookie,
485             const DocKey& key,
486             uint16_t vbucket) {
487         EWB_Engine* ewb = to_engine(handle);
488         ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
489         if (ewb->should_inject_error(Cmd::GET_META, cookie, err)) {
490             return std::make_pair(cb::engine_errc(err), item_info());
491         } else {
492             return ewb->real_engine->get_meta(ewb->real_handle,
493                                               cookie,
494                                               key,
495                                               vbucket);
496         }
497     }
498 
unlock(gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, const DocKey& key, uint16_t vbucket, uint64_t cas)499     static ENGINE_ERROR_CODE unlock(gsl::not_null<ENGINE_HANDLE*> handle,
500                                     gsl::not_null<const void*> cookie,
501                                     const DocKey& key,
502                                     uint16_t vbucket,
503                                     uint64_t cas) {
504         EWB_Engine* ewb = to_engine(handle);
505         ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
506         if (ewb->should_inject_error(Cmd::UNLOCK, cookie, err)) {
507             return err;
508         } else {
509             return ewb->real_engine->unlock(ewb->real_handle, cookie, key,
510                                             vbucket, cas);
511         }
512     }
513 
store(gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, gsl::not_null<item*> item, uint64_t& cas, ENGINE_STORE_OPERATION operation, DocumentState document_state)514     static ENGINE_ERROR_CODE store(gsl::not_null<ENGINE_HANDLE*> handle,
515                                    gsl::not_null<const void*> cookie,
516                                    gsl::not_null<item*> item,
517                                    uint64_t& cas,
518                                    ENGINE_STORE_OPERATION operation,
519                                    DocumentState document_state) {
520         EWB_Engine* ewb = to_engine(handle);
521         ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
522         Cmd opcode = (operation == OPERATION_CAS) ? Cmd::CAS : Cmd::STORE;
523         if (ewb->should_inject_error(opcode, cookie, err)) {
524             return err;
525         } else {
526             return ewb->real_engine->store(ewb->real_handle, cookie, item, cas,
527                                            operation, document_state);
528         }
529     }
530 
store_if(gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, gsl::not_null<item*> item, uint64_t cas, ENGINE_STORE_OPERATION operation, cb::StoreIfPredicate predicate, DocumentState document_state)531     static cb::EngineErrorCasPair store_if(gsl::not_null<ENGINE_HANDLE*> handle,
532                                            gsl::not_null<const void*> cookie,
533                                            gsl::not_null<item*> item,
534                                            uint64_t cas,
535                                            ENGINE_STORE_OPERATION operation,
536                                            cb::StoreIfPredicate predicate,
537                                            DocumentState document_state) {
538         EWB_Engine* ewb = to_engine(handle);
539         ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
540         Cmd opcode = (operation == OPERATION_CAS) ? Cmd::CAS : Cmd::STORE;
541         if (ewb->should_inject_error(opcode, cookie, err)) {
542             return {cb::engine_errc(err), 0};
543         } else {
544             return ewb->real_engine->store_if(ewb->real_handle,
545                                               cookie,
546                                               item,
547                                               cas,
548                                               operation,
549                                               predicate,
550                                               document_state);
551         }
552     }
553 
flush(gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie)554     static ENGINE_ERROR_CODE flush(gsl::not_null<ENGINE_HANDLE*> handle,
555                                    gsl::not_null<const void*> cookie) {
556         // Flush is a little different - it often returns EWOULDBLOCK, and
557         // notify_io_complete() just tells the server it can issue it's *next*
558         // command (i.e. no need to re-flush). Therefore just pass Flush
559         // straight through for now.
560         EWB_Engine* ewb = to_engine(handle);
561         return ewb->real_engine->flush(ewb->real_handle, cookie);
562     }
563 
get_stats(gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, cb::const_char_buffer key, ADD_STAT add_stat)564     static ENGINE_ERROR_CODE get_stats(gsl::not_null<ENGINE_HANDLE*> handle,
565                                        gsl::not_null<const void*> cookie,
566                                        cb::const_char_buffer key,
567                                        ADD_STAT add_stat) {
568         EWB_Engine* ewb = to_engine(handle);
569         ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
570         if (ewb->should_inject_error(Cmd::GET_STATS, cookie, err)) {
571             return err;
572         } else {
573             return ewb->real_engine->get_stats(
574                     ewb->real_handle, cookie, key, add_stat);
575         }
576     }
577 
reset_stats(gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie)578     static void reset_stats(gsl::not_null<ENGINE_HANDLE*> handle,
579                             gsl::not_null<const void*> cookie) {
580         EWB_Engine* ewb = to_engine(handle);
581         return ewb->real_engine->reset_stats(ewb->real_handle, cookie);
582     }
583 
584     /* Handle 'unknown_command'. In additional to wrapping calls to the
585      * underlying real engine, this is also used to configure
586      * ewouldblock_engine itself using he CMD_EWOULDBLOCK_CTL opcode.
587      */
unknown_command( gsl::not_null<ENGINE_HANDLE*> handle, const void* cookie, gsl::not_null<protocol_binary_request_header*> request, ADD_RESPONSE response, DocNamespace doc_namespace)588     static ENGINE_ERROR_CODE unknown_command(
589             gsl::not_null<ENGINE_HANDLE*> handle,
590             const void* cookie,
591             gsl::not_null<protocol_binary_request_header*> request,
592             ADD_RESPONSE response,
593             DocNamespace doc_namespace) {
594         EWB_Engine* ewb = to_engine(handle);
595 
596         if (request->request.opcode == PROTOCOL_BINARY_CMD_EWOULDBLOCK_CTL) {
597             auto* req =
598                     reinterpret_cast<request_ewouldblock_ctl*>(request.get());
599             const EWBEngineMode mode = static_cast<EWBEngineMode>(ntohl(req->message.body.mode));
600             const uint32_t value = ntohl(req->message.body.value);
601             const ENGINE_ERROR_CODE injected_error =
602                     static_cast<ENGINE_ERROR_CODE>(ntohl(req->message.body.inject_error));
603             const std::string key((char*)req->bytes + sizeof(req->bytes),
604                                   ntohs(req->message.header.request.keylen));
605 
606             std::shared_ptr<FaultInjectMode> new_mode = nullptr;
607 
608             // Validate mode, and construct new fault injector.
609             switch (mode) {
610                 case EWBEngineMode::Next_N:
611                     new_mode = std::make_shared<ErrOnNextN>(injected_error, value);
612                     break;
613 
614                 case EWBEngineMode::Random:
615                     new_mode = std::make_shared<ErrRandom>(injected_error, value);
616                     break;
617 
618                 case EWBEngineMode::First:
619                     new_mode = std::make_shared<ErrOnFirst>(injected_error);
620                     break;
621 
622                 case EWBEngineMode::Sequence:
623                     new_mode = std::make_shared<ErrSequence>(injected_error, value);
624                     break;
625 
626                 case EWBEngineMode::No_Notify:
627                     new_mode = std::make_shared<ErrOnNoNotify>(injected_error);
628                     break;
629 
630                 case EWBEngineMode::CasMismatch:
631                     new_mode = std::make_shared<CASMismatch>(value);
632                     break;
633 
634                 case EWBEngineMode::IncrementClusterMapRevno:
635                     ewb->clustermap_revno++;
636                     response(nullptr, 0, nullptr, 0, nullptr, 0,
637                              PROTOCOL_BINARY_RAW_BYTES,
638                              PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
639                     return ENGINE_SUCCESS;
640 
641                 case EWBEngineMode::BlockMonitorFile:
642                     return ewb->handleBlockMonitorFile(cookie, value, key,
643                                                        response);
644 
645                 case EWBEngineMode::Suspend:
646                     return ewb->handleSuspend(cookie, value, response);
647 
648                 case EWBEngineMode::Resume:
649                     return ewb->handleResume(cookie, value, response);
650 
651                 case EWBEngineMode::SetItemCas:
652                     return ewb->setItemCas(cookie, key, value, response);
653             }
654 
655             if (new_mode == nullptr) {
656                 LOG_WARNING(
657                         "EWB_Engine::unknown_command(): "
658                         "Got unexpected mode={} for EWOULDBLOCK_CTL, ",
659                         (unsigned int)mode);
660                 response(nullptr, 0, nullptr, 0, nullptr, 0,
661                          PROTOCOL_BINARY_RAW_BYTES,
662                          PROTOCOL_BINARY_RESPONSE_EINVAL, /*cas*/0, cookie);
663                 return ENGINE_FAILED;
664             } else {
665                 try {
666                     LOG_DEBUG(
667                             "EWB_Engine::unknown_command(): Setting EWB mode "
668                             "to "
669                             "{} for cookie {}",
670                             new_mode->to_string(),
671                             cookie);
672 
673                     uint64_t id = ewb->get_connection_id(cookie);
674 
675                     {
676                         std::lock_guard<std::mutex> guard(ewb->cookie_map_mutex);
677                         ewb->connection_map.erase(id);
678                         ewb->connection_map.emplace(id, std::make_pair(cookie, new_mode));
679                     }
680 
681                     response(nullptr, 0, nullptr, 0, nullptr, 0,
682                              PROTOCOL_BINARY_RAW_BYTES,
683                              PROTOCOL_BINARY_RESPONSE_SUCCESS, /*cas*/0, cookie);
684                     return ENGINE_SUCCESS;
685                 } catch (std::bad_alloc&) {
686                     return ENGINE_ENOMEM;
687                 }
688             }
689         } else {
690             ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
691             if (ewb->should_inject_error(Cmd::UNKNOWN_COMMAND, cookie, err)) {
692                 return err;
693             } else {
694                 return ewb->real_engine->unknown_command(ewb->real_handle, cookie,
695                                                          request, response,
696                                                          doc_namespace);
697             }
698         }
699     }
700 
item_set_cas(gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<item*> item, uint64_t cas)701     static void item_set_cas(gsl::not_null<ENGINE_HANDLE*> handle,
702                              gsl::not_null<item*> item,
703                              uint64_t cas) {
704         // Should never be called as ENGINE_HANDLE_V1::item_set_cas is updated
705         // to point to the real_engine once it is initialized. This function
706         //only exists so there is a non-NULL value for
707         // ENGINE_HANDLE_V1::item_set_cas initially to keep load_engine()
708         // happy.
709         abort();
710     }
711 
item_set_datatype(gsl::not_null<ENGINE_HANDLE*>, gsl::not_null<item*> itm, protocol_binary_datatype_t datatype)712     static void item_set_datatype(gsl::not_null<ENGINE_HANDLE*>,
713                                   gsl::not_null<item*> itm,
714                                   protocol_binary_datatype_t datatype) {
715         // Should never be called - set item_set_datatype().
716         abort();
717     }
718 
get_item_info(gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const item*> item, gsl::not_null<item_info*> item_info)719     static bool get_item_info(gsl::not_null<ENGINE_HANDLE*> handle,
720                               gsl::not_null<const item*> item,
721                               gsl::not_null<item_info*> item_info) {
722         EWB_Engine* ewb = to_engine(handle);
723         LOG_DEBUG("EWB_Engine: get_item_info");
724 
725         // This function cannot return EWOULDBLOCK - just chain to the real
726         // engine's function, unless it is a request for our special DCP item.
727         if (item == &ewb->dcp_mutation_item) {
728             item_info->cas = 0;
729             item_info->vbucket_uuid = 0;
730             item_info->seqno = 0;
731             item_info->exptime = 0;
732             item_info->nbytes =
733                     gsl::narrow<uint32_t>(ewb->dcp_mutation_item.value.size());
734             item_info->flags = 0;
735             item_info->datatype = PROTOCOL_BINARY_DATATYPE_XATTR;
736             item_info->nkey =
737                     gsl::narrow<uint16_t>(ewb->dcp_mutation_item.key.size());
738             item_info->key = ewb->dcp_mutation_item.key.c_str();
739             item_info->value[0].iov_base = &ewb->dcp_mutation_item.value[0];
740             item_info->value[0].iov_len = item_info->nbytes;
741             return true;
742         } else {
743             return ewb->real_engine->get_item_info(
744                     ewb->real_handle, item, item_info);
745         }
746     }
747 
set_item_info(gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<item*> item, gsl::not_null<const item_info*> item_info)748     static bool set_item_info(gsl::not_null<ENGINE_HANDLE*> handle,
749                               gsl::not_null<item*> item,
750                               gsl::not_null<const item_info*> item_info) {
751         // Should never be called - set item_set_cas().
752         abort();
753     }
754 
handle_disconnect(const void* cookie, ENGINE_EVENT_TYPE type, const void* event_data, const void* cb_data)755     static void handle_disconnect(const void* cookie,
756                                   ENGINE_EVENT_TYPE type,
757                                   const void* event_data,
758                                   const void* cb_data) {
759         cb_assert(event_data == NULL);
760         EWB_Engine* ewb =
761                 reinterpret_cast<EWB_Engine*>(const_cast<void*>(cb_data));
762         LOG_DEBUG("EWB_Engine::handle_disconnect");
763 
764         uint64_t id = ewb->get_connection_id(cookie);
765         {
766             std::lock_guard<std::mutex> guard(ewb->cookie_map_mutex);
767             ewb->connection_map.erase(id);
768         }
769     }
770 
771     GET_SERVER_API gsa;
772 
773     // Actual engine we are proxying requests to.
774     ENGINE_HANDLE* real_handle;
775     ENGINE_HANDLE_V1* real_engine;
776     engine_reference* real_engine_ref;
777 
778     std::atomic_int clustermap_revno;
779 
780     /**
781      * The method responsible for pushing all of the notify_io_complete
782      * to the frontend. It is run by notify_io_thread and not intended to
783      * be called by anyone else!.
784      */
785     void process_notifications();
786     std::unique_ptr<Couchbase::Thread> notify_io_thread;
787 
788 protected:
789     /**
790      * Handle the control message for block monitor file
791      *
792      * @param cookie The cookie executing the operation
793      * @param id The identifier used to represent the cookie
794      * @param file The file to monitor
795      * @param response callback used to send a response to the client
796      * @return The standard engine error codes
797      */
798     ENGINE_ERROR_CODE handleBlockMonitorFile(const void* cookie,
799                                              uint32_t id,
800                                              const std::string& file,
801                                              ADD_RESPONSE response);
802 
803     /**
804      * Handle the control message for suspend
805      *
806      * @param cookie The cookie executing the operation
807      * @param id The identifier used to represent the cookie to resume
808      *           (the use of a different id is to allow resume to
809      *           be sent on a different connection)
810      * @param response callback used to send a response to the client
811      * @return The standard engine error codes
812      */
813     ENGINE_ERROR_CODE handleSuspend(const void* cookie,
814                                     uint32_t id,
815                                     ADD_RESPONSE response);
816 
817     /**
818      * Handle the control message for resume
819      *
820      * @param cookie The cookie executing the operation
821      * @param id The identifier representing the connection to resume
822      * @param response callback used to send a response to the client
823      * @return The standard engine error codes
824      */
825     ENGINE_ERROR_CODE handleResume(const void* cookie,
826                                    uint32_t id,
827                                    ADD_RESPONSE response);
828 
829     /**
830      * @param cookie the cookie executing the operation
831      * @param key ID of the item whose CAS should be changed
832      * @param cas The new CAS
833      * @param response Response callback used to send a response to the client
834      * @return Standard engine error codes
835      */
836     ENGINE_ERROR_CODE setItemCas(const void *cookie,
837                                  const std::string& key, uint32_t cas,
838                                  ADD_RESPONSE response);
839 
840 private:
841     // Shared state between the main thread of execution and the background
842     // thread processing pending io ops.
843     std::mutex mutex;
844     std::condition_variable condvar;
845     std::queue<const void*> pending_io_ops;
846 
847     std::atomic<bool> stop_notification_thread;
848 
849     ///////////////////////////////////////////////////////////////////////////
850     //             All of the methods used in the DCP interface              //
851     //                                                                       //
852     // We don't support mocking with the DCP interface yet, so all access to //
853     // the DCP interface will be proxied down to the underlying engine.      //
854     ///////////////////////////////////////////////////////////////////////////
855     static ENGINE_ERROR_CODE dcp_step(
856             gsl::not_null<ENGINE_HANDLE*> handle,
857             gsl::not_null<const void*> cookie,
858             gsl::not_null<struct dcp_message_producers*> producers);
859 
860     static ENGINE_ERROR_CODE dcp_open(gsl::not_null<ENGINE_HANDLE*> handle,
861                                       gsl::not_null<const void*> cookie,
862                                       uint32_t opaque,
863                                       uint32_t seqno,
864                                       uint32_t flags,
865                                       cb::const_char_buffer name,
866                                       cb::const_byte_buffer json);
867 
868     static ENGINE_ERROR_CODE dcp_add_stream(
869             gsl::not_null<ENGINE_HANDLE*> handle,
870             gsl::not_null<const void*> cookie,
871             uint32_t opaque,
872             uint16_t vbucket,
873             uint32_t flags);
874 
875     static ENGINE_ERROR_CODE dcp_close_stream(
876             gsl::not_null<ENGINE_HANDLE*> handle,
877             gsl::not_null<const void*> cookie,
878             uint32_t opaque,
879             uint16_t vbucket);
880 
881     static ENGINE_ERROR_CODE dcp_stream_req(
882             gsl::not_null<ENGINE_HANDLE*> handle,
883             gsl::not_null<const void*> cookie,
884             uint32_t flags,
885             uint32_t opaque,
886             uint16_t vbucket,
887             uint64_t start_seqno,
888             uint64_t end_seqno,
889             uint64_t vbucket_uuid,
890             uint64_t snap_start_seqno,
891             uint64_t snap_end_seqno,
892             uint64_t* rollback_seqno,
893             dcp_add_failover_log callback);
894 
895     static ENGINE_ERROR_CODE dcp_get_failover_log(
896             gsl::not_null<ENGINE_HANDLE*> handle,
897             gsl::not_null<const void*> cookie,
898             uint32_t opaque,
899             uint16_t vbucket,
900             dcp_add_failover_log callback);
901 
902     static ENGINE_ERROR_CODE dcp_stream_end(
903             gsl::not_null<ENGINE_HANDLE*> handle,
904             gsl::not_null<const void*> cookie,
905             uint32_t opaque,
906             uint16_t vbucket,
907             uint32_t flags);
908 
909     static ENGINE_ERROR_CODE dcp_snapshot_marker(
910             gsl::not_null<ENGINE_HANDLE*> handle,
911             gsl::not_null<const void*> cookie,
912             uint32_t opaque,
913             uint16_t vbucket,
914             uint64_t start_seqno,
915             uint64_t end_seqno,
916             uint32_t flags);
917 
918     static ENGINE_ERROR_CODE dcp_mutation(gsl::not_null<ENGINE_HANDLE*> handle,
919                                           gsl::not_null<const void*> cookie,
920                                           uint32_t opaque,
921                                           const DocKey& key,
922                                           cb::const_byte_buffer value,
923                                           size_t priv_bytes,
924                                           uint8_t datatype,
925                                           uint64_t cas,
926                                           uint16_t vbucket,
927                                           uint32_t flags,
928                                           uint64_t by_seqno,
929                                           uint64_t rev_seqno,
930                                           uint32_t expiration,
931                                           uint32_t lock_time,
932                                           cb::const_byte_buffer meta,
933                                           uint8_t nru);
934 
935     static ENGINE_ERROR_CODE dcp_deletion(gsl::not_null<ENGINE_HANDLE*> handle,
936                                           gsl::not_null<const void*> cookie,
937                                           uint32_t opaque,
938                                           const DocKey& key,
939                                           cb::const_byte_buffer value,
940                                           size_t priv_bytes,
941                                           uint8_t datatype,
942                                           uint64_t cas,
943                                           uint16_t vbucket,
944                                           uint64_t by_seqno,
945                                           uint64_t rev_seqno,
946                                           cb::const_byte_buffer meta);
947 
948     static ENGINE_ERROR_CODE dcp_deletion_v2(
949             gsl::not_null<ENGINE_HANDLE*> handle,
950             gsl::not_null<const void*> cookie,
951             uint32_t opaque,
952             const DocKey& key,
953             cb::const_byte_buffer value,
954             size_t priv_bytes,
955             uint8_t datatype,
956             uint64_t cas,
957             uint16_t vbucket,
958             uint64_t by_seqno,
959             uint64_t rev_seqno,
960             uint32_t delete_time);
961 
962     static ENGINE_ERROR_CODE dcp_expiration(
963             gsl::not_null<ENGINE_HANDLE*> handle,
964             gsl::not_null<const void*> cookie,
965             uint32_t opaque,
966             const DocKey& key,
967             cb::const_byte_buffer value,
968             size_t priv_bytes,
969             uint8_t datatype,
970             uint64_t cas,
971             uint16_t vbucket,
972             uint64_t by_seqno,
973             uint64_t rev_seqno,
974             cb::const_byte_buffer meta);
975 
976     static ENGINE_ERROR_CODE dcp_flush(gsl::not_null<ENGINE_HANDLE*> handle,
977                                        gsl::not_null<const void*> cookie,
978                                        uint32_t opaque,
979                                        uint16_t vbucket);
980 
981     static ENGINE_ERROR_CODE dcp_set_vbucket_state(
982             gsl::not_null<ENGINE_HANDLE*> handle,
983             gsl::not_null<const void*> cookie,
984             uint32_t opaque,
985             uint16_t vbucket,
986             vbucket_state_t state);
987 
988     static ENGINE_ERROR_CODE dcp_noop(gsl::not_null<ENGINE_HANDLE*> handle,
989                                       gsl::not_null<const void*> cookie,
990                                       uint32_t opaque);
991 
992     static ENGINE_ERROR_CODE dcp_buffer_acknowledgement(
993             gsl::not_null<ENGINE_HANDLE*> handle,
994             gsl::not_null<const void*> cookie,
995             uint32_t opaque,
996             uint16_t vbucket,
997             uint32_t buffer_bytes);
998 
999     static ENGINE_ERROR_CODE dcp_control(gsl::not_null<ENGINE_HANDLE*> handle,
1000                                          gsl::not_null<const void*> cookie,
1001                                          uint32_t opaque,
1002                                          const void* key,
1003                                          uint16_t nkey,
1004                                          const void* value,
1005                                          uint32_t nvalue);
1006 
1007     static ENGINE_ERROR_CODE dcp_response_handler(
1008             gsl::not_null<ENGINE_HANDLE*> handle,
1009             gsl::not_null<const void*> cookie,
1010             const protocol_binary_response_header* response);
1011 
1012     static ENGINE_ERROR_CODE dcp_system_event(
1013             gsl::not_null<ENGINE_HANDLE*> handle,
1014             gsl::not_null<const void*> cookie,
1015             uint32_t opaque,
1016             uint16_t vbucket,
1017             mcbp::systemevent::id event,
1018             uint64_t bySeqno,
1019             cb::const_byte_buffer key,
1020             cb::const_byte_buffer eventData);
1021 
1022     static cb::engine_error collections_set_manifest(
1023             gsl::not_null<ENGINE_HANDLE*> handle, cb::const_char_buffer json);
1024 
1025     static cb::EngineErrorStringPair collections_get_manifest(
1026             gsl::not_null<ENGINE_HANDLE*> handle);
1027 
1028     static bool isXattrEnabled(gsl::not_null<ENGINE_HANDLE*> handle);
1029 
1030     static BucketCompressionMode getCompressionMode(gsl::not_null<ENGINE_HANDLE*> handle);
1031 
1032     static size_t getMaxItemSize(gsl::not_null<ENGINE_HANDLE*> handle);
1033 
1034     static float getMinCompressionRatio(gsl::not_null<ENGINE_HANDLE*> handle);
1035 
1036     // Base class for all fault injection modes.
1037     struct FaultInjectMode {
FaultInjectModeEWB_Engine::FaultInjectMode1038         FaultInjectMode(ENGINE_ERROR_CODE injected_error_)
1039           : injected_error(injected_error_) {}
1040 
add_to_pending_io_opsEWB_Engine::FaultInjectMode1041         virtual bool add_to_pending_io_ops() {
1042             return true;
1043         }
1044         virtual bool should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err) = 0;
1045 
1046         virtual std::string to_string() const = 0;
1047 
1048     protected:
1049         ENGINE_ERROR_CODE injected_error;
1050     };
1051 
1052     // Subclasses for each fault inject mode: /////////////////////////////////
1053 
1054     class ErrOnFirst : public FaultInjectMode {
1055     public:
ErrOnFirst(ENGINE_ERROR_CODE injected_error_)1056         ErrOnFirst(ENGINE_ERROR_CODE injected_error_)
1057           : FaultInjectMode(injected_error_),
1058             prev_cmd(Cmd::NONE) {}
1059 
should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err)1060         bool should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err) {
1061             // Block unless the previous command from this cookie
1062             // was the same - i.e. all of a connections' commands
1063             // will EWOULDBLOCK the first time they are called.
1064             bool inject = (prev_cmd != cmd);
1065             prev_cmd = cmd;
1066             if (inject) {
1067                 err = injected_error;
1068             }
1069             return inject;
1070         }
1071 
to_string() const1072         std::string to_string() const {
1073             return "ErrOnFirst inject_error=" + std::to_string(injected_error);
1074         }
1075 
1076     private:
1077         // Last command issued by this cookie.
1078         Cmd prev_cmd;
1079     };
1080 
1081     class ErrOnNextN : public FaultInjectMode {
1082     public:
ErrOnNextN(ENGINE_ERROR_CODE injected_error_, uint32_t count_)1083         ErrOnNextN(ENGINE_ERROR_CODE injected_error_, uint32_t count_)
1084           : FaultInjectMode(injected_error_),
1085             count(count_) {}
1086 
should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err)1087         bool should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err) {
1088             if (count > 0) {
1089                 --count;
1090                 err = injected_error;
1091                 return true;
1092             } else {
1093                 return false;
1094             }
1095         }
1096 
to_string() const1097         std::string to_string() const {
1098             return std::string("ErrOnNextN") +
1099                    " inject_error=" + std::to_string(injected_error) +
1100                    " count=" + std::to_string(count);
1101         }
1102 
1103     private:
1104         // The count of commands issued that should return error.
1105         uint32_t count;
1106     };
1107 
1108     class ErrRandom : public FaultInjectMode {
1109     public:
ErrRandom(ENGINE_ERROR_CODE injected_error_, uint32_t percentage_)1110         ErrRandom(ENGINE_ERROR_CODE injected_error_, uint32_t percentage_)
1111           : FaultInjectMode(injected_error_),
1112             percentage_to_err(percentage_) {}
1113 
should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err)1114         bool should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err) {
1115             std::random_device rd;
1116             std::mt19937 gen(rd());
1117             std::uniform_int_distribution<uint32_t> dis(1, 100);
1118             if (dis(gen) < percentage_to_err) {
1119                 err = injected_error;
1120                 return true;
1121             } else {
1122                 return false;
1123             }
1124         }
1125 
to_string() const1126         std::string to_string() const {
1127             return std::string("ErrRandom") +
1128                    " inject_error=" + std::to_string(injected_error) +
1129                    " percentage=" + std::to_string(percentage_to_err);
1130         }
1131 
1132     private:
1133         // Percentage chance that the specified error should be injected.
1134         uint32_t percentage_to_err;
1135     };
1136 
1137     class ErrSequence : public FaultInjectMode {
1138     public:
ErrSequence(ENGINE_ERROR_CODE injected_error_, uint32_t sequence_)1139         ErrSequence(ENGINE_ERROR_CODE injected_error_, uint32_t sequence_)
1140             : FaultInjectMode(injected_error_),
1141               sequence(sequence_),
1142               pos(0) {}
1143 
should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err)1144         bool should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err) {
1145             bool inject = false;
1146             if (pos < 32) {
1147                 inject = (sequence & (1 << pos)) != 0;
1148                 pos++;
1149             }
1150             if (inject) {
1151                 err = injected_error;
1152             }
1153             return inject;
1154         }
1155 
to_string() const1156         std::string to_string() const {
1157             std::stringstream ss;
1158             ss << "ErrSequence inject_error=" << injected_error
1159                << " sequence=0x" << std::hex << sequence
1160                << " pos=" << pos;
1161             return ss.str();
1162         }
1163 
1164     private:
1165         uint32_t sequence;
1166         uint32_t pos;
1167     };
1168 
1169     class ErrOnNoNotify : public FaultInjectMode {
1170         public:
ErrOnNoNotify(ENGINE_ERROR_CODE injected_error_)1171             ErrOnNoNotify(ENGINE_ERROR_CODE injected_error_)
1172               : FaultInjectMode(injected_error_),
1173                 issued_return_error(false) {}
1174 
add_to_pending_io_ops()1175             bool add_to_pending_io_ops() {return false;}
should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err)1176             bool should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err) {
1177                 if (!issued_return_error) {
1178                     issued_return_error = true;
1179                     err = injected_error;
1180                     return true;
1181                 } else {
1182                     return false;
1183                 }
1184             }
1185 
to_string() const1186             std::string to_string() const {
1187                 return std::string("ErrOnNoNotify") +
1188                        " inject_error=" + std::to_string(injected_error) +
1189                        " issued_return_error=" +
1190                        std::to_string(issued_return_error);
1191             }
1192 
1193         private:
1194             // Record of whether have yet issued return error.
1195             bool issued_return_error;
1196         };
1197 
1198     class CASMismatch : public FaultInjectMode {
1199     public:
CASMismatch(uint32_t count_)1200         CASMismatch(uint32_t count_)
1201           : FaultInjectMode(ENGINE_KEY_EEXISTS),
1202             count(count_) {}
1203 
should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err)1204         bool should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err) {
1205             if (cmd == Cmd::CAS && (count > 0)) {
1206                 --count;
1207                 err = injected_error;
1208                 return true;
1209             } else {
1210                 return false;
1211             }
1212         }
1213 
to_string() const1214         std::string to_string() const {
1215             return std::string("CASMismatch") +
1216                    " count=" + std::to_string(count);
1217         }
1218 
1219     private:
1220         uint32_t count;
1221     };
1222 
1223     // Map of connections (aka cookies) to their current mode.
1224     std::map<uint64_t, std::pair<const void*, std::shared_ptr<FaultInjectMode> > > connection_map;
1225     // Mutex for above map.
1226     std::mutex cookie_map_mutex;
1227 
1228     // Current DCP mutation `item`. We return the address of this
1229     // (in the dcp step() function) back to the server, and then in
1230     // get_item_info we check if the requested item is this one.
1231     class EwbDcpKey {
1232     public:
EwbDcpKey()1233         EwbDcpKey()
1234             : key("k") {
1235             cb::xattr::Blob builder;
1236             builder.set("_ewb", "{\"internal\":true}");
1237             builder.set("meta", "{\"author\":\"jack\"}");
1238             const auto blob = builder.finalize();
1239             std::copy(blob.buf, blob.buf + blob.len, std::back_inserter(value));
1240             // MB24971 - the body is large as it increases the probability of
1241             // transit returning TransmitResult::SoftError
1242             const std::string body(1000, 'x');
1243             std::copy(body.begin(), body.end(), std::back_inserter(value));
1244         }
1245 
1246         std::string key;
1247         std::vector<uint8_t> value;
1248     } dcp_mutation_item;
1249 
1250     /**
1251      * The dcp_stream map is used to map a cookie to the count of objects
1252      * it should send on the stream.
1253      *
1254      * Each entry in here constists of a pair containing a boolean specifying
1255      * if the stream is opened or not, and a count of how many times we should
1256      * return data
1257      */
1258     std::map<const void*, std::pair<bool, uint64_t>> dcp_stream;
1259 
1260     friend class BlockMonitorThread;
1261     std::map<uint32_t, const void*> suspended_map;
1262     std::mutex suspended_map_mutex;
1263 
suspend(const void* cookie, uint32_t id)1264     bool suspend(const void* cookie, uint32_t id) {
1265         {
1266             std::lock_guard<std::mutex> guard(suspended_map_mutex);
1267             auto iter = suspended_map.find(id);
1268             if (iter == suspended_map.cend()) {
1269                 suspended_map[id] = cookie;
1270                 return true;
1271             }
1272         }
1273 
1274         return false;
1275     }
1276 
resume(uint32_t id)1277     bool resume(uint32_t id) {
1278         const void* cookie = nullptr;
1279         {
1280             std::lock_guard<std::mutex> guard(suspended_map_mutex);
1281             auto iter = suspended_map.find(id);
1282             if (iter == suspended_map.cend()) {
1283                 return false;
1284             }
1285             cookie = iter->second;
1286             suspended_map.erase(iter);
1287         }
1288 
1289 
1290         schedule_notification(cookie);
1291         return true;
1292     }
1293 
is_connection_suspended(const void* cookie)1294     bool is_connection_suspended(const void* cookie) {
1295         std::lock_guard<std::mutex> guard(suspended_map_mutex);
1296         for (const auto c : suspended_map) {
1297             if (c.second == cookie) {
1298                 LOG_DEBUG(
1299                         "Connection {} with id {} should be suspended for "
1300                         "engine {}",
1301                         c.second,
1302                         c.first,
1303                         (void*)this);
1304 
1305                 return true;
1306             }
1307         }
1308         return false;
1309     }
1310 
schedule_notification(const void* cookie)1311     void schedule_notification(const void* cookie) {
1312         {
1313             std::lock_guard<std::mutex> guard(mutex);
1314             pending_io_ops.push(cookie);
1315         }
1316         LOG_DEBUG("EWB_Engine: connection {} should be resumed for engine {}",
1317                   (void*)cookie,
1318                   (void*)this);
1319 
1320         condvar.notify_one();
1321     }
1322 
1323     // Vector to keep track of the threads we've started to ensure
1324     // we don't leak memory ;-)
1325     std::mutex threads_mutex;
1326     std::vector<std::unique_ptr<Couchbase::Thread> > threads;
1327 };
1328 
EWB_Engine(GET_SERVER_API gsa_)1329 EWB_Engine::EWB_Engine(GET_SERVER_API gsa_)
1330   : gsa(gsa_),
1331     real_engine(NULL),
1332     real_engine_ref(nullptr),
1333     notify_io_thread(new NotificationThread(*this))
1334 {
1335     init_wrapped_api(gsa);
1336 
1337     interface.interface = 1;
1338     ENGINE_HANDLE_V1::initialize = initialize;
1339     ENGINE_HANDLE_V1::destroy = destroy;
1340     ENGINE_HANDLE_V1::allocate = allocate;
1341     ENGINE_HANDLE_V1::allocate_ex = allocate_ex;
1342     ENGINE_HANDLE_V1::remove = remove;
1343     ENGINE_HANDLE_V1::release = release;
1344     ENGINE_HANDLE_V1::get = get;
1345     ENGINE_HANDLE_V1::get_if = get_if;
1346     ENGINE_HANDLE_V1::get_locked = get_locked;
1347     ENGINE_HANDLE_V1::get_meta = get_meta;
1348     ENGINE_HANDLE_V1::get_and_touch = get_and_touch;
1349     ENGINE_HANDLE_V1::unlock = unlock;
1350     ENGINE_HANDLE_V1::store = store;
1351     ENGINE_HANDLE_V1::store_if = store_if;
1352     ENGINE_HANDLE_V1::flush = flush;
1353     ENGINE_HANDLE_V1::get_stats = get_stats;
1354     ENGINE_HANDLE_V1::reset_stats = reset_stats;
1355     ENGINE_HANDLE_V1::unknown_command = unknown_command;
1356     ENGINE_HANDLE_V1::item_set_cas = item_set_cas;
1357     ENGINE_HANDLE_V1::item_set_datatype = item_set_datatype;
1358     ENGINE_HANDLE_V1::get_item_info = get_item_info;
1359     ENGINE_HANDLE_V1::set_item_info = set_item_info;
1360     ENGINE_HANDLE_V1::set_log_level = NULL;
1361 
1362     ENGINE_HANDLE_V1::dcp = {};
1363     ENGINE_HANDLE_V1::dcp.step = dcp_step;
1364     ENGINE_HANDLE_V1::dcp.open = dcp_open;
1365     ENGINE_HANDLE_V1::dcp.stream_req = dcp_stream_req;
1366     ENGINE_HANDLE_V1::dcp.add_stream = dcp_add_stream;
1367     ENGINE_HANDLE_V1::dcp.close_stream = dcp_close_stream;
1368     ENGINE_HANDLE_V1::dcp.buffer_acknowledgement = dcp_buffer_acknowledgement;
1369     ENGINE_HANDLE_V1::dcp.control = dcp_control;
1370     ENGINE_HANDLE_V1::dcp.get_failover_log = dcp_get_failover_log;
1371     ENGINE_HANDLE_V1::dcp.stream_end = dcp_stream_end;
1372     ENGINE_HANDLE_V1::dcp.snapshot_marker = dcp_snapshot_marker;
1373     ENGINE_HANDLE_V1::dcp.mutation = dcp_mutation;
1374     ENGINE_HANDLE_V1::dcp.deletion = dcp_deletion;
1375     ENGINE_HANDLE_V1::dcp.deletion_v2 = dcp_deletion_v2;
1376     ENGINE_HANDLE_V1::dcp.expiration = dcp_expiration;
1377     ENGINE_HANDLE_V1::dcp.flush = dcp_flush;
1378     ENGINE_HANDLE_V1::dcp.set_vbucket_state = dcp_set_vbucket_state;
1379     ENGINE_HANDLE_V1::dcp.noop = dcp_noop;
1380     ENGINE_HANDLE_V1::dcp.response_handler = dcp_response_handler;
1381     ENGINE_HANDLE_V1::dcp.system_event = dcp_system_event;
1382 
1383     ENGINE_HANDLE_V1::collections = {};
1384     ENGINE_HANDLE_V1::collections.set_manifest = collections_set_manifest;
1385     ENGINE_HANDLE_V1::collections.get_manifest = collections_get_manifest;
1386 
1387     ENGINE_HANDLE_V1::isXattrEnabled = isXattrEnabled;
1388     ENGINE_HANDLE_V1::getCompressionMode = getCompressionMode;
1389     ENGINE_HANDLE_V1::getMaxItemSize = getMaxItemSize;
1390     ENGINE_HANDLE_V1::getMinCompressionRatio = getMinCompressionRatio;
1391 
1392     clustermap_revno = 1;
1393 
1394     get_connection_id = gsa()->cookie->get_connection_id;
1395 
1396     stop_notification_thread.store(false);
1397     notify_io_thread->start();
1398 }
1399 
register_callback(ENGINE_HANDLE *eh, ENGINE_EVENT_TYPE type, EVENT_CALLBACK cb, const void *cb_data)1400 static void register_callback(ENGINE_HANDLE *eh, ENGINE_EVENT_TYPE type,
1401                               EVENT_CALLBACK cb, const void *cb_data) {
1402     const auto& p = engine_map.find(eh);
1403     if (p == engine_map.end()) {
1404         std::cerr << "Can't find EWB corresponding to " << std::hex << eh << std::endl;
1405         for (const auto& pair : engine_map) {
1406             std::cerr << "EH: " << std::hex << pair.first << " = EWB: " << std::hex << pair.second << std::endl;
1407         }
1408         abort();
1409     }
1410     cb_assert(p != engine_map.end());
1411     auto wrapped_eh = reinterpret_cast<ENGINE_HANDLE*>(p->second);
1412     real_api->callback->register_callback(wrapped_eh, type, cb, cb_data);
1413 }
1414 
~EWB_Engine()1415 EWB_Engine::~EWB_Engine() {
1416     engine_map.erase(real_handle);
1417     cb_free(real_engine_ref);
1418     stop_notification_thread = true;
1419     condvar.notify_all();
1420     notify_io_thread->waitForState(Couchbase::ThreadState::Zombie);
1421 }
1422 
dcp_step( gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, gsl::not_null<struct dcp_message_producers*> producers)1423 ENGINE_ERROR_CODE EWB_Engine::dcp_step(
1424         gsl::not_null<ENGINE_HANDLE*> handle,
1425         gsl::not_null<const void*> cookie,
1426         gsl::not_null<struct dcp_message_producers*> producers) {
1427     EWB_Engine* ewb = to_engine(handle);
1428     auto stream = ewb->dcp_stream.find(cookie);
1429     if (stream != ewb->dcp_stream.end()) {
1430         auto& count = stream->second.second;
1431         // If the stream is enabled and we have data to send..
1432         if (stream->second.first && count > 0) {
1433             // This is using the internal dcp implementation which always
1434             // send the same item back
1435             auto ret = producers->mutation(cookie,
1436                                            0xdeadbeef /*opqaue*/,
1437                                            &ewb->dcp_mutation_item,
1438                                            0 /*vb*/,
1439                                            0 /*by_seqno*/,
1440                                            0 /*rev_seqno*/,
1441                                            0 /*lock_time*/,
1442                                            nullptr /*meta*/,
1443                                            0 /*nmeta*/,
1444                                            0 /*nru*/,
1445                                            0 /*collection_len*/);
1446             --count;
1447             if (ret == ENGINE_SUCCESS) {
1448                 return ENGINE_WANT_MORE;
1449             }
1450             return ret;
1451         }
1452         return ENGINE_SUCCESS;
1453     }
1454 
1455     if (ewb->real_engine->dcp.step == nullptr) {
1456         return ENGINE_ENOTSUP;
1457     } else {
1458         return ewb->real_engine->dcp.step(ewb->real_handle, cookie, producers);
1459     }
1460 }
1461 
dcp_open(gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, uint32_t opaque, uint32_t seqno, uint32_t flags, cb::const_char_buffer name, cb::const_byte_buffer json)1462 ENGINE_ERROR_CODE EWB_Engine::dcp_open(gsl::not_null<ENGINE_HANDLE*> handle,
1463                                        gsl::not_null<const void*> cookie,
1464                                        uint32_t opaque,
1465                                        uint32_t seqno,
1466                                        uint32_t flags,
1467                                        cb::const_char_buffer name,
1468                                        cb::const_byte_buffer json) {
1469     EWB_Engine* ewb = to_engine(handle);
1470     std::string nm = cb::to_string(name);
1471     if (nm.find("ewb_internal") == 0) {
1472         // Yeah, this is a request for the internal "magic" DCP stream
1473         // The user could specify the iteration count by adding a colon
1474         // at the end...
1475         auto idx = nm.rfind(":");
1476 
1477         if (idx != nm.npos) {
1478             ewb->dcp_stream[cookie] =
1479                     std::make_pair(false, std::stoull(nm.substr(idx + 1)));
1480         } else {
1481             ewb->dcp_stream[cookie] =
1482                     std::make_pair(false, std::numeric_limits<uint64_t>::max());
1483         }
1484         return ENGINE_SUCCESS;
1485     }
1486 
1487     if (ewb->real_engine->dcp.open == nullptr) {
1488         return ENGINE_ENOTSUP;
1489     } else {
1490         return ewb->real_engine->dcp.open(ewb->real_handle,
1491                                           cookie,
1492                                           opaque,
1493                                           seqno,
1494                                           flags,
1495                                           name,
1496                                           json);
1497     }
1498 }
1499 
dcp_stream_req( gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, uint32_t flags, uint32_t opaque, uint16_t vbucket, uint64_t start_seqno, uint64_t end_seqno, uint64_t vbucket_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno, uint64_t* rollback_seqno, dcp_add_failover_log callback)1500 ENGINE_ERROR_CODE EWB_Engine::dcp_stream_req(
1501         gsl::not_null<ENGINE_HANDLE*> handle,
1502         gsl::not_null<const void*> cookie,
1503         uint32_t flags,
1504         uint32_t opaque,
1505         uint16_t vbucket,
1506         uint64_t start_seqno,
1507         uint64_t end_seqno,
1508         uint64_t vbucket_uuid,
1509         uint64_t snap_start_seqno,
1510         uint64_t snap_end_seqno,
1511         uint64_t* rollback_seqno,
1512         dcp_add_failover_log callback) {
1513     EWB_Engine* ewb = to_engine(handle);
1514     auto stream = ewb->dcp_stream.find(cookie.get());
1515     if (stream != ewb->dcp_stream.end()) {
1516         // This is a client of our internal streams.. just let it pass
1517         if (start_seqno == 1) {
1518             *rollback_seqno = 0;
1519             return ENGINE_ROLLBACK;
1520         }
1521         // Start the stream
1522         stream->second.first = true;
1523         return ENGINE_SUCCESS;
1524     }
1525 
1526     if (ewb->real_engine->dcp.stream_req == nullptr) {
1527         return ENGINE_ENOTSUP;
1528     } else {
1529         return ewb->real_engine->dcp.stream_req(ewb->real_handle, cookie,
1530                                                 flags, opaque, vbucket,
1531                                                 start_seqno, end_seqno,
1532                                                 vbucket_uuid, snap_start_seqno,
1533                                                 snap_end_seqno, rollback_seqno,
1534                                                 callback);
1535     }
1536 }
1537 
dcp_add_stream( gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, uint32_t opaque, uint16_t vbucket, uint32_t flags)1538 ENGINE_ERROR_CODE EWB_Engine::dcp_add_stream(
1539         gsl::not_null<ENGINE_HANDLE*> handle,
1540         gsl::not_null<const void*> cookie,
1541         uint32_t opaque,
1542         uint16_t vbucket,
1543         uint32_t flags) {
1544     EWB_Engine* ewb = to_engine(handle);
1545     if (ewb->real_engine->dcp.add_stream == nullptr) {
1546         return ENGINE_ENOTSUP;
1547     } else {
1548         return ewb->real_engine->dcp.add_stream(ewb->real_handle, cookie,
1549                                                 opaque, vbucket, flags);
1550     }
1551 }
1552 
dcp_close_stream( gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, uint32_t opaque, uint16_t vbucket)1553 ENGINE_ERROR_CODE EWB_Engine::dcp_close_stream(
1554         gsl::not_null<ENGINE_HANDLE*> handle,
1555         gsl::not_null<const void*> cookie,
1556         uint32_t opaque,
1557         uint16_t vbucket) {
1558     EWB_Engine* ewb = to_engine(handle);
1559     if (ewb->real_engine->dcp.close_stream == nullptr) {
1560         return ENGINE_ENOTSUP;
1561     } else {
1562         return ewb->real_engine->dcp.close_stream(ewb->real_handle, cookie,
1563                                                 opaque, vbucket);
1564     }
1565 }
1566 
dcp_get_failover_log( gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, uint32_t opaque, uint16_t vbucket, dcp_add_failover_log callback)1567 ENGINE_ERROR_CODE EWB_Engine::dcp_get_failover_log(
1568         gsl::not_null<ENGINE_HANDLE*> handle,
1569         gsl::not_null<const void*> cookie,
1570         uint32_t opaque,
1571         uint16_t vbucket,
1572         dcp_add_failover_log callback) {
1573     EWB_Engine* ewb = to_engine(handle);
1574     if (ewb->real_engine->dcp.get_failover_log == nullptr) {
1575         return ENGINE_ENOTSUP;
1576     } else {
1577         return ewb->real_engine->dcp.get_failover_log(ewb->real_handle,
1578                                                       cookie,
1579                                                       opaque,
1580                                                       vbucket,
1581                                                       callback);
1582     }
1583 }
1584 
dcp_stream_end( gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, uint32_t opaque, uint16_t vbucket, uint32_t flags)1585 ENGINE_ERROR_CODE EWB_Engine::dcp_stream_end(
1586         gsl::not_null<ENGINE_HANDLE*> handle,
1587         gsl::not_null<const void*> cookie,
1588         uint32_t opaque,
1589         uint16_t vbucket,
1590         uint32_t flags) {
1591     EWB_Engine* ewb = to_engine(handle);
1592     if (ewb->real_engine->dcp.stream_end == nullptr) {
1593         return ENGINE_ENOTSUP;
1594     } else {
1595         return ewb->real_engine->dcp.stream_end(ewb->real_handle, cookie,
1596                                                 opaque, vbucket, flags);
1597     }
1598 }
1599 
dcp_snapshot_marker( gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, uint32_t opaque, uint16_t vbucket, uint64_t start_seqno, uint64_t end_seqno, uint32_t flags)1600 ENGINE_ERROR_CODE EWB_Engine::dcp_snapshot_marker(
1601         gsl::not_null<ENGINE_HANDLE*> handle,
1602         gsl::not_null<const void*> cookie,
1603         uint32_t opaque,
1604         uint16_t vbucket,
1605         uint64_t start_seqno,
1606         uint64_t end_seqno,
1607         uint32_t flags) {
1608     EWB_Engine* ewb = to_engine(handle);
1609     if (ewb->real_engine->dcp.snapshot_marker == nullptr) {
1610         return ENGINE_ENOTSUP;
1611     } else {
1612         return ewb->real_engine->dcp.snapshot_marker(ewb->real_handle, cookie,
1613                                                      opaque, vbucket,
1614                                                      start_seqno, end_seqno,
1615                                                      flags);
1616     }
1617 }
1618 
dcp_mutation(gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, uint32_t opaque, const DocKey& key, cb::const_byte_buffer value, size_t priv_bytes, uint8_t datatype, uint64_t cas, uint16_t vbucket, uint32_t flags, uint64_t by_seqno, uint64_t rev_seqno, uint32_t expiration, uint32_t lock_time, cb::const_byte_buffer meta, uint8_t nru)1619 ENGINE_ERROR_CODE EWB_Engine::dcp_mutation(gsl::not_null<ENGINE_HANDLE*> handle,
1620                                            gsl::not_null<const void*> cookie,
1621                                            uint32_t opaque,
1622                                            const DocKey& key,
1623                                            cb::const_byte_buffer value,
1624                                            size_t priv_bytes,
1625                                            uint8_t datatype,
1626                                            uint64_t cas,
1627                                            uint16_t vbucket,
1628                                            uint32_t flags,
1629                                            uint64_t by_seqno,
1630                                            uint64_t rev_seqno,
1631                                            uint32_t expiration,
1632                                            uint32_t lock_time,
1633                                            cb::const_byte_buffer meta,
1634                                            uint8_t nru) {
1635     EWB_Engine* ewb = to_engine(handle);
1636     if (ewb->real_engine->dcp.mutation == nullptr) {
1637         return ENGINE_ENOTSUP;
1638     } else {
1639         return ewb->real_engine->dcp.mutation(ewb->real_handle, cookie, opaque,
1640                                               key, value, priv_bytes, datatype,
1641                                               cas, vbucket, flags, by_seqno,
1642                                               rev_seqno, expiration, lock_time,
1643                                               meta, nru);
1644     }
1645 }
1646 
dcp_deletion(gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, uint32_t opaque, const DocKey& key, cb::const_byte_buffer value, size_t priv_bytes, uint8_t datatype, uint64_t cas, uint16_t vbucket, uint64_t by_seqno, uint64_t rev_seqno, cb::const_byte_buffer meta)1647 ENGINE_ERROR_CODE EWB_Engine::dcp_deletion(gsl::not_null<ENGINE_HANDLE*> handle,
1648                                            gsl::not_null<const void*> cookie,
1649                                            uint32_t opaque,
1650                                            const DocKey& key,
1651                                            cb::const_byte_buffer value,
1652                                            size_t priv_bytes,
1653                                            uint8_t datatype,
1654                                            uint64_t cas,
1655                                            uint16_t vbucket,
1656                                            uint64_t by_seqno,
1657                                            uint64_t rev_seqno,
1658                                            cb::const_byte_buffer meta) {
1659     EWB_Engine* ewb = to_engine(handle);
1660     if (ewb->real_engine->dcp.deletion == nullptr) {
1661         return ENGINE_ENOTSUP;
1662     } else {
1663         return ewb->real_engine->dcp.deletion(ewb->real_handle, cookie, opaque,
1664                                               key, value, priv_bytes, datatype,
1665                                               cas, vbucket, by_seqno, rev_seqno,
1666                                               meta);
1667     }
1668 }
1669 
dcp_deletion_v2( gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, uint32_t opaque, const DocKey& key, cb::const_byte_buffer value, size_t priv_bytes, uint8_t datatype, uint64_t cas, uint16_t vbucket, uint64_t by_seqno, uint64_t rev_seqno, uint32_t delete_time)1670 ENGINE_ERROR_CODE EWB_Engine::dcp_deletion_v2(
1671         gsl::not_null<ENGINE_HANDLE*> handle,
1672         gsl::not_null<const void*> cookie,
1673         uint32_t opaque,
1674         const DocKey& key,
1675         cb::const_byte_buffer value,
1676         size_t priv_bytes,
1677         uint8_t datatype,
1678         uint64_t cas,
1679         uint16_t vbucket,
1680         uint64_t by_seqno,
1681         uint64_t rev_seqno,
1682         uint32_t delete_time) {
1683     EWB_Engine* ewb = to_engine(handle);
1684     if (ewb->real_engine->dcp.deletion_v2 == nullptr) {
1685         return ENGINE_ENOTSUP;
1686     } else {
1687         return ewb->real_engine->dcp.deletion_v2(ewb->real_handle,
1688                                                  cookie,
1689                                                  opaque,
1690                                                  key,
1691                                                  value,
1692                                                  priv_bytes,
1693                                                  datatype,
1694                                                  cas,
1695                                                  vbucket,
1696                                                  by_seqno,
1697                                                  rev_seqno,
1698                                                  delete_time);
1699     }
1700 }
1701 
dcp_expiration( gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, uint32_t opaque, const DocKey& key, cb::const_byte_buffer value, size_t priv_bytes, uint8_t datatype, uint64_t cas, uint16_t vbucket, uint64_t by_seqno, uint64_t rev_seqno, cb::const_byte_buffer meta)1702 ENGINE_ERROR_CODE EWB_Engine::dcp_expiration(
1703         gsl::not_null<ENGINE_HANDLE*> handle,
1704         gsl::not_null<const void*> cookie,
1705         uint32_t opaque,
1706         const DocKey& key,
1707         cb::const_byte_buffer value,
1708         size_t priv_bytes,
1709         uint8_t datatype,
1710         uint64_t cas,
1711         uint16_t vbucket,
1712         uint64_t by_seqno,
1713         uint64_t rev_seqno,
1714         cb::const_byte_buffer meta) {
1715     EWB_Engine* ewb = to_engine(handle);
1716     if (ewb->real_engine->dcp.expiration == nullptr) {
1717         return ENGINE_ENOTSUP;
1718     } else {
1719         return ewb->real_engine->dcp.expiration(ewb->real_handle, cookie, opaque,
1720                                                 key, value, priv_bytes, datatype,
1721                                                 cas, vbucket, by_seqno, rev_seqno,
1722                                                 meta);
1723     }
1724 }
1725 
dcp_flush(gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, uint32_t opaque, uint16_t vbucket)1726 ENGINE_ERROR_CODE EWB_Engine::dcp_flush(gsl::not_null<ENGINE_HANDLE*> handle,
1727                                         gsl::not_null<const void*> cookie,
1728                                         uint32_t opaque,
1729                                         uint16_t vbucket) {
1730     EWB_Engine* ewb = to_engine(handle);
1731     if (ewb->real_engine->dcp.flush == nullptr) {
1732         return ENGINE_ENOTSUP;
1733     } else {
1734         return ewb->real_engine->dcp.flush(ewb->real_handle,
1735                                                 cookie,
1736                                                 opaque,
1737                                                 vbucket);
1738     }
1739 }
1740 
dcp_set_vbucket_state( gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, uint32_t opaque, uint16_t vbucket, vbucket_state_t state)1741 ENGINE_ERROR_CODE EWB_Engine::dcp_set_vbucket_state(
1742         gsl::not_null<ENGINE_HANDLE*> handle,
1743         gsl::not_null<const void*> cookie,
1744         uint32_t opaque,
1745         uint16_t vbucket,
1746         vbucket_state_t state) {
1747     EWB_Engine* ewb = to_engine(handle);
1748     if (ewb->real_engine->dcp.set_vbucket_state == nullptr) {
1749         return ENGINE_ENOTSUP;
1750     } else {
1751         return ewb->real_engine->dcp.set_vbucket_state(ewb->real_handle,
1752                                                 cookie,
1753                                                 opaque,
1754                                                 vbucket,
1755                                                 state);
1756     }
1757 }
1758 
dcp_noop(gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, uint32_t opaque)1759 ENGINE_ERROR_CODE EWB_Engine::dcp_noop(gsl::not_null<ENGINE_HANDLE*> handle,
1760                                        gsl::not_null<const void*> cookie,
1761                                        uint32_t opaque) {
1762     EWB_Engine* ewb = to_engine(handle);
1763     if (ewb->real_engine->dcp.noop == nullptr) {
1764         return ENGINE_ENOTSUP;
1765     } else {
1766         return ewb->real_engine->dcp.noop(ewb->real_handle, cookie, opaque);
1767     }
1768 }
1769 
dcp_buffer_acknowledgement( gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, uint32_t opaque, uint16_t vbucket, uint32_t buffer_bytes)1770 ENGINE_ERROR_CODE EWB_Engine::dcp_buffer_acknowledgement(
1771         gsl::not_null<ENGINE_HANDLE*> handle,
1772         gsl::not_null<const void*> cookie,
1773         uint32_t opaque,
1774         uint16_t vbucket,
1775         uint32_t buffer_bytes) {
1776     EWB_Engine* ewb = to_engine(handle);
1777     if (ewb->real_engine->dcp.buffer_acknowledgement == nullptr) {
1778         return ENGINE_ENOTSUP;
1779     } else {
1780         return ewb->real_engine->dcp.buffer_acknowledgement(ewb->real_handle,
1781                                                             cookie,
1782                                                             opaque,
1783                                                             vbucket,
1784                                                             buffer_bytes);
1785     }
1786 }
1787 
dcp_control(gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, uint32_t opaque, const void* key, uint16_t nkey, const void* value, uint32_t nvalue)1788 ENGINE_ERROR_CODE EWB_Engine::dcp_control(gsl::not_null<ENGINE_HANDLE*> handle,
1789                                           gsl::not_null<const void*> cookie,
1790                                           uint32_t opaque,
1791                                           const void* key,
1792                                           uint16_t nkey,
1793                                           const void* value,
1794                                           uint32_t nvalue) {
1795     EWB_Engine* ewb = to_engine(handle);
1796     if (ewb->real_engine->dcp.control == nullptr) {
1797         return ENGINE_ENOTSUP;
1798     } else {
1799         return ewb->real_engine->dcp.control(ewb->real_handle, cookie,
1800                                              opaque, key, nkey, value, nvalue);
1801     }
1802 }
1803 
dcp_response_handler( gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, const protocol_binary_response_header* response)1804 ENGINE_ERROR_CODE EWB_Engine::dcp_response_handler(
1805         gsl::not_null<ENGINE_HANDLE*> handle,
1806         gsl::not_null<const void*> cookie,
1807         const protocol_binary_response_header* response) {
1808     EWB_Engine* ewb = to_engine(handle);
1809     if (ewb->real_engine->dcp.response_handler == nullptr) {
1810         return ENGINE_ENOTSUP;
1811     } else {
1812         return ewb->real_engine->dcp.response_handler(ewb->real_handle,
1813                                                       cookie,
1814                                                       response);
1815     }
1816 }
1817 
dcp_system_event( gsl::not_null<ENGINE_HANDLE*> handle, gsl::not_null<const void*> cookie, uint32_t opaque, uint16_t vbucket, mcbp::systemevent::id event, uint64_t bySeqno, cb::const_byte_buffer key, cb::const_byte_buffer eventData)1818 ENGINE_ERROR_CODE EWB_Engine::dcp_system_event(
1819         gsl::not_null<ENGINE_HANDLE*> handle,
1820         gsl::not_null<const void*> cookie,
1821         uint32_t opaque,
1822         uint16_t vbucket,
1823         mcbp::systemevent::id event,
1824         uint64_t bySeqno,
1825         cb::const_byte_buffer key,
1826         cb::const_byte_buffer eventData) {
1827     EWB_Engine* ewb = to_engine(handle);
1828     if (ewb->real_engine->dcp.response_handler == nullptr) {
1829         return ENGINE_ENOTSUP;
1830     } else {
1831         return ewb->real_engine->dcp.system_event(ewb->real_handle,
1832                                                   cookie,
1833                                                   opaque,
1834                                                   vbucket,
1835                                                   event,
1836                                                   bySeqno,
1837                                                   key,
1838                                                   eventData);
1839     }
1840 }
1841 
collections_set_manifest( gsl::not_null<ENGINE_HANDLE*> handle, cb::const_char_buffer json)1842 cb::engine_error EWB_Engine::collections_set_manifest(
1843         gsl::not_null<ENGINE_HANDLE*> handle, cb::const_char_buffer json) {
1844     EWB_Engine* ewb = to_engine(handle);
1845     if (ewb->real_engine->collections.set_manifest == nullptr) {
1846         return {cb::engine_errc::not_supported,
1847                 "EWB_Engine::collections_set_manifest"};
1848     } else {
1849         return ewb->real_engine->collections.set_manifest(ewb->real_handle,
1850                                                           json);
1851     }
1852 }
1853 
collections_get_manifest( gsl::not_null<ENGINE_HANDLE*> handle)1854 cb::EngineErrorStringPair EWB_Engine::collections_get_manifest(
1855         gsl::not_null<ENGINE_HANDLE*> handle) {
1856     EWB_Engine* ewb = to_engine(handle);
1857     if (ewb->real_engine->collections.get_manifest == nullptr) {
1858         return {cb::engine_errc::not_supported,
1859                 "EWB_Engine::collections_get_manifest"};
1860     } else {
1861         return ewb->real_engine->collections.get_manifest(ewb->real_handle);
1862     }
1863 }
1864 
isXattrEnabled(gsl::not_null<ENGINE_HANDLE*> handle)1865 bool EWB_Engine::isXattrEnabled(gsl::not_null<ENGINE_HANDLE*> handle) {
1866     EWB_Engine* ewb = to_engine(handle);
1867     if (ewb->real_engine->isXattrEnabled == nullptr) {
1868         return false;
1869     } else {
1870         return ewb->real_engine->isXattrEnabled(ewb->real_handle);
1871     }
1872 }
1873 
getCompressionMode(gsl::not_null<ENGINE_HANDLE*> handle)1874 BucketCompressionMode EWB_Engine::getCompressionMode(gsl::not_null<ENGINE_HANDLE*> handle) {
1875     EWB_Engine* ewb = to_engine(handle);
1876     if (ewb->real_engine->getCompressionMode == nullptr) {
1877         return BucketCompressionMode::Off;
1878     } else {
1879         return ewb->real_engine->getCompressionMode(ewb->real_handle);
1880     }
1881 }
1882 
getMaxItemSize(gsl::not_null<ENGINE_HANDLE*> handle)1883 size_t EWB_Engine::getMaxItemSize(gsl::not_null<ENGINE_HANDLE*> handle) {
1884     EWB_Engine* ewb = to_engine(handle);
1885     if (ewb->real_engine->getMaxItemSize == nullptr) {
1886         return default_max_item_size;
1887     } else {
1888         return ewb->real_engine->getMaxItemSize(ewb->real_handle);
1889     }
1890 }
1891 
getMinCompressionRatio(gsl::not_null<ENGINE_HANDLE*> handle)1892 float EWB_Engine::getMinCompressionRatio(gsl::not_null<ENGINE_HANDLE*> handle) {
1893     EWB_Engine* ewb = to_engine(handle);
1894     if (ewb->real_engine->getMinCompressionRatio == nullptr) {
1895         return default_min_compression_ratio;
1896     } else {
1897         return ewb->real_engine->getMinCompressionRatio(ewb->real_handle);
1898     }
1899 }
1900 
create_instance(uint64_t interface, GET_SERVER_API gsa, ENGINE_HANDLE **handle)1901 ENGINE_ERROR_CODE create_instance(uint64_t interface,
1902                                   GET_SERVER_API gsa,
1903                                   ENGINE_HANDLE **handle)
1904 {
1905     if (interface != 1) {
1906         return ENGINE_ENOTSUP;
1907     }
1908 
1909     try {
1910         EWB_Engine* engine = new EWB_Engine(gsa);
1911         *handle = reinterpret_cast<ENGINE_HANDLE*> (engine);
1912         return ENGINE_SUCCESS;
1913 
1914     } catch (std::exception& e) {
1915         auto logger = gsa()->log->get_logger();
1916         logger->log(EXTENSION_LOG_WARNING, NULL,
1917                     "EWB_Engine: failed to create engine: %s", e.what());
1918         return ENGINE_FAILED;
1919     }
1920 
1921 }
1922 
destroy_engine(void)1923 void destroy_engine(void) {
1924     // nothing todo.
1925 }
1926 
to_string(const Cmd cmd)1927 const char* EWB_Engine::to_string(const Cmd cmd) {
1928     switch (cmd) {
1929     case Cmd::NONE:
1930         return "NONE";
1931     case Cmd::GET_INFO:
1932         return "GET_INFO";
1933     case Cmd::GET_META:
1934         return "GET_META";
1935     case Cmd::ALLOCATE:
1936         return "ALLOCATE";
1937     case Cmd::REMOVE:
1938         return "REMOVE";
1939     case Cmd::GET:
1940         return "GET";
1941     case Cmd::STORE:
1942         return "STORE";
1943     case Cmd::CAS:
1944         return "CAS";
1945     case Cmd::ARITHMETIC:
1946         return "ARITHMETIC";
1947     case Cmd::FLUSH:
1948         return "FLUSH";
1949     case Cmd::GET_STATS:
1950         return "GET_STATS";
1951     case Cmd::UNKNOWN_COMMAND:
1952         return "UNKNOWN_COMMAND";
1953     case Cmd::LOCK:
1954         return "LOCK";
1955     case Cmd::UNLOCK:
1956         return "UNLOCK";
1957     }
1958     throw std::invalid_argument("EWB_Engine::to_string() Unknown command");
1959 }
1960 
process_notifications()1961 void EWB_Engine::process_notifications() {
1962     SERVER_HANDLE_V1* server = gsa();
1963     LOG_DEBUG("EWB_Engine: notification thread running for engine {}",
1964               (void*)this);
1965     std::unique_lock<std::mutex> lk(mutex);
1966     while (!stop_notification_thread) {
1967         condvar.wait(lk);
1968         while (!pending_io_ops.empty()) {
1969             const void* cookie = pending_io_ops.front();
1970             pending_io_ops.pop();
1971             lk.unlock();
1972             LOG_DEBUG("EWB_Engine: notify {}", cookie);
1973             server->cookie->notify_io_complete(cookie, ENGINE_SUCCESS);
1974             lk.lock();
1975         }
1976     }
1977 
1978     LOG_DEBUG("EWB_Engine: notification thread stopping for engine {}",
1979               (void*)this);
1980 }
1981 
run()1982 void NotificationThread::run() {
1983     setRunning();
1984     engine.process_notifications();
1985 }
1986 
handleBlockMonitorFile(const void* cookie, uint32_t id, const std::string& file, ADD_RESPONSE response)1987 ENGINE_ERROR_CODE EWB_Engine::handleBlockMonitorFile(const void* cookie,
1988                                                      uint32_t id,
1989                                                      const std::string& file,
1990                                                      ADD_RESPONSE response) {
1991     if (file.empty()) {
1992         return ENGINE_EINVAL;
1993     }
1994 
1995     if (!cb::io::isFile(file)) {
1996         return ENGINE_KEY_ENOENT;
1997     }
1998 
1999     if (!suspend(cookie, id)) {
2000         LOG_WARNING(
2001                 "EWB_Engine::handleBlockMonitorFile(): "
2002                 "Id {} already registered",
2003                 id);
2004         return ENGINE_KEY_EEXISTS;
2005     }
2006 
2007     try {
2008         std::unique_ptr<Couchbase::Thread> thread(
2009                 new BlockMonitorThread(*this, id, file));
2010         thread->start();
2011         std::lock_guard<std::mutex> guard(threads_mutex);
2012         threads.emplace_back(thread.release());
2013     } catch (std::exception& e) {
2014         LOG_WARNING(
2015                 "EWB_Engine::handleBlockMonitorFile(): Failed to create "
2016                 "block monitor thread: {}",
2017                 e.what());
2018         return ENGINE_FAILED;
2019     }
2020 
2021     LOG_DEBUG(
2022             "Registered connection {} (engine {}) as {} to be"
2023             " suspended. Monitor file {}",
2024             cookie,
2025             (void*)this,
2026             id,
2027             file.c_str());
2028 
2029     response(nullptr, 0, nullptr, 0, nullptr, 0,
2030              PROTOCOL_BINARY_RAW_BYTES,
2031              PROTOCOL_BINARY_RESPONSE_SUCCESS, /*cas*/0, cookie);
2032     return ENGINE_SUCCESS;
2033 }
2034 
handleSuspend(const void* cookie, uint32_t id, ADD_RESPONSE response)2035 ENGINE_ERROR_CODE EWB_Engine::handleSuspend(const void* cookie,
2036                                             uint32_t id,
2037                                             ADD_RESPONSE response) {
2038     if (suspend(cookie, id)) {
2039         LOG_DEBUG("Registered connection {} as {} to be suspended", cookie, id);
2040         response(nullptr, 0, nullptr, 0, nullptr, 0,
2041                  PROTOCOL_BINARY_RAW_BYTES,
2042                  PROTOCOL_BINARY_RESPONSE_SUCCESS, /*cas*/0, cookie);
2043         return ENGINE_SUCCESS;
2044     } else {
2045         LOG_WARNING("EWB_Engine::handleSuspend(): Id {} already registered",
2046                     id);
2047         return ENGINE_KEY_EEXISTS;
2048     }
2049 }
2050 
handleResume(const void* cookie, uint32_t id, ADD_RESPONSE response)2051 ENGINE_ERROR_CODE EWB_Engine::handleResume(const void* cookie, uint32_t id,
2052                                            ADD_RESPONSE response) {
2053     if (resume(id)) {
2054         LOG_DEBUG("Connection with id {} will be resumed", id);
2055         response(nullptr, 0, nullptr, 0, nullptr, 0,
2056                  PROTOCOL_BINARY_RAW_BYTES,
2057                  PROTOCOL_BINARY_RESPONSE_SUCCESS, /*cas*/0, cookie);
2058         return ENGINE_SUCCESS;
2059     } else {
2060         LOG_WARNING(
2061                 "EWB_Engine::unknown_command(): No "
2062                 "connection registered with id {}",
2063                 id);
2064         return ENGINE_EINVAL;
2065     }
2066 }
2067 
setItemCas(const void *cookie, const std::string& key, uint32_t cas, ADD_RESPONSE response)2068 ENGINE_ERROR_CODE EWB_Engine::setItemCas(const void *cookie,
2069                                          const std::string& key,
2070                                          uint32_t cas,
2071                                          ADD_RESPONSE response) {
2072     uint64_t cas64 = cas;
2073     if (cas == static_cast<uint32_t>(-1)) {
2074         cas64 = LOCKED_CAS;
2075     }
2076 
2077     auto rv = real_engine->get(real_handle,
2078                                cookie,
2079                                DocKey{key, DocNamespace::DefaultCollection},
2080                                0,
2081                                DocStateFilter::Alive);
2082     if (rv.first != cb::engine_errc::success) {
2083         return ENGINE_ERROR_CODE(rv.first);
2084     }
2085 
2086     // item_set_cas has no return value!
2087     real_engine->item_set_cas(real_handle, rv.second.get(), cas64);
2088     response(nullptr, 0, nullptr, 0, nullptr, 0,
2089              PROTOCOL_BINARY_RAW_BYTES,
2090              PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
2091     return ENGINE_SUCCESS;
2092 }
2093 
run()2094 void BlockMonitorThread::run() {
2095     setRunning();
2096 
2097     LOG_DEBUG("Block monitor for file {} started", file);
2098 
2099     // @todo Use the file monitoring API's to avoid this "busy" loop
2100     while (cb::io::isFile(file)) {
2101         usleep(100);
2102     }
2103 
2104     LOG_DEBUG("Block monitor for file {} stopping (file is gone)", file);
2105     engine.resume(id);
2106 }
2107