1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18/*
19 *                "ewouldblock_engine"
20 *
21 * The "ewouldblock_engine" allows one to test how memcached responds when the
22 * engine returns EWOULDBLOCK instead of the correct response.
23 *
24 * Motivation:
25 *
26 * The EWOULDBLOCK response code can be returned from a number of engine
27 * functions, and is used to indicate that the request could not be immediately
28 * fulfilled, and it "would block" if it tried to. The correct way for
29 * memcached to handle this (in general) is to suspend that request until it
30 * is later notified by the engine (via notify_io_complete()).
31 *
32 * However, engines typically return the correct response to requests
33 * immediately, only rarely (and from memcached's POV non-deterministically)
34 * returning EWOULDBLOCK. This makes testing of the code-paths handling
35 * EWOULDBLOCK tricky.
36 *
37 *
38 * Operation:
39 * This engine, when loaded by memcached proxies requests to a "real" engine.
40 * Depending on how it is configured, it can simply pass the request on to the
41 * real engine, or artificially return EWOULDBLOCK back to memcached.
42 *
43 * See the 'Modes' enum below for the possible modes for a connection. The mode
44 * can be selected by sending a `request_ewouldblock_ctl` command
45 *  (opcode PROTOCOL_BINARY_CMD_EWOULDBLOCK_CTL).
46 *
47 * DCP:
48 *    There is a special DCP stream named "ewb_internal" which is an
49 *    endless stream of items. You may also add a number at the end
50 *    e.g. "ewb_internal:10" and it'll create a stream with 10 entries.
51 *    It will always send the same K-V pair.
52 *    Note that we don't register for disconnect events so you might
53 *    experience weirdness if you first try to use the internal dcp
54 *    stream, and then later on want to use the one provided by the
55 *    engine. The workaround for that is to delete the bucket
56 *    in between ;-) (put them in separate test suites and it'll all
57 *    be handled for you.
58 *
59 *    Any other stream name results in proxying the dcp request to
60 *    the underlying engine's DCP implementation.
61 *
62 */
63
64#include "ewouldblock_engine.h"
65
66#include <atomic>
67#include <condition_variable>
68#include <cstring>
69#include <gsl/gsl>
70#include <iostream>
71#include <map>
72#include <memory>
73#include <mutex>
74#include <queue>
75#include <random>
76#include <sstream>
77#include <string>
78#include <utility>
79
80#include <logger/logger.h>
81#include <memcached/engine.h>
82#include <memcached/extension.h>
83#include <platform/cb_malloc.h>
84#include <platform/dirutils.h>
85#include <platform/thread.h>
86#include <xattr/blob.h>
87
88#include "utilities/engine_loader.h"
89
90/* Public API declaration ****************************************************/
91
92extern "C" {
93    MEMCACHED_PUBLIC_API
94    ENGINE_ERROR_CODE create_instance(uint64_t interface, GET_SERVER_API gsa,
95                                      ENGINE_HANDLE **handle);
96
97    MEMCACHED_PUBLIC_API
98    void destroy_engine(void);
99}
100
101
102class EWB_Engine;
103
104// Mapping from wrapped handle to EWB handles.
105static std::map<ENGINE_HANDLE*, EWB_Engine*> engine_map;
106
107class NotificationThread : public Couchbase::Thread {
108public:
109    NotificationThread(EWB_Engine& engine_)
110        : Thread("ewb:pendingQ"),
111          engine(engine_) {}
112
113protected:
114    void run() override;
115
116protected:
117    EWB_Engine& engine;
118};
119
120/**
121 * The BlockMonitorThread represents the thread that is
122 * monitoring the "lock" file. Once the file is no longer
123 * there it will resume the client specified with the given
124 * id.
125 */
126class BlockMonitorThread : public Couchbase::Thread {
127public:
128    BlockMonitorThread(EWB_Engine& engine_,
129                       uint32_t id_,
130                       const std::string file_)
131        : Thread("ewb:BlockMon"),
132          engine(engine_),
133          id(id_),
134          file(file_) {}
135
136    /**
137     * Wait for the underlying thread to reach the zombie state
138     * (== terminated, but not reaped)
139     */
140    ~BlockMonitorThread() {
141        waitForState(Couchbase::ThreadState::Zombie);
142    }
143
144protected:
145    void run() override;
146
147private:
148    EWB_Engine& engine;
149    const uint32_t id;
150    const std::string file;
151};
152
153static void register_callback(ENGINE_HANDLE *, ENGINE_EVENT_TYPE,
154                              EVENT_CALLBACK, const void *);
155
156static SERVER_HANDLE_V1 wrapped_api;
157static SERVER_HANDLE_V1 *real_api;
158static void init_wrapped_api(GET_SERVER_API fn) {
159    static bool init = false;
160    if (init) {
161        return;
162    }
163
164    init = true;
165    real_api = fn();
166    wrapped_api = *real_api;
167
168    // Overrides
169    static SERVER_CALLBACK_API callback = *wrapped_api.callback;
170    callback.register_callback = register_callback;
171    wrapped_api.callback = &callback;
172}
173
174static SERVER_HANDLE_V1 *get_wrapped_gsa() {
175    return &wrapped_api;
176}
177
178/** ewouldblock_engine class */
179class EWB_Engine : public ENGINE_HANDLE_V1 {
180
181private:
182    enum class Cmd {
183        NONE,
184        GET_INFO,
185        ALLOCATE,
186        REMOVE,
187        GET,
188        STORE,
189        CAS,
190        ARITHMETIC,
191        LOCK,
192        UNLOCK,
193        FLUSH,
194        GET_STATS,
195        GET_META,
196        UNKNOWN_COMMAND
197    };
198
199    const char* to_string(Cmd cmd);
200
201    uint64_t (*get_connection_id)(gsl::not_null<const void*> cookie);
202
203public:
204    EWB_Engine(GET_SERVER_API gsa_);
205
206    ~EWB_Engine();
207
208    // Convert from a handle back to the read object.
209    static EWB_Engine* to_engine(ENGINE_HANDLE* handle) {
210        return reinterpret_cast<EWB_Engine*> (handle);
211    }
212
213    /* Returns true if the next command should have a fake error code injected.
214     * @param func Address of the command function (get, store, etc).
215     * @param cookie The cookie for the user's request.
216     * @param[out] Error code to return.
217     */
218    bool should_inject_error(Cmd cmd, const void* cookie,
219                             ENGINE_ERROR_CODE& err) {
220
221        if (is_connection_suspended(cookie)) {
222            err = ENGINE_EWOULDBLOCK;
223            return true;
224        }
225
226        uint64_t id = get_connection_id(cookie);
227
228        std::lock_guard<std::mutex> guard(cookie_map_mutex);
229
230        auto iter = connection_map.find(id);
231        if (iter == connection_map.end()) {
232            return false;
233        }
234
235        if (iter->second.first != cookie) {
236            // The cookie is different so it represents a different command
237            connection_map.erase(iter);
238            return false;
239        }
240
241        const bool inject = iter->second.second->should_inject_error(cmd, err);
242        const bool add_to_pending_io_ops = iter->second.second->add_to_pending_io_ops();
243
244        if (inject) {
245            LOG_DEBUG("EWB_Engine: injecting error:{} for cmd:{}",
246                      err,
247                      to_string(cmd));
248
249            if (err == ENGINE_EWOULDBLOCK && add_to_pending_io_ops) {
250                // The server expects that if EWOULDBLOCK is returned then the
251                // server should be notified in the future when the operation is
252                // ready - so add this op to the pending IO queue.
253                schedule_notification(iter->second.first);
254            }
255        }
256
257        return inject;
258    }
259
260    /* Implementation of all the engine functions. ***************************/
261
262    static ENGINE_ERROR_CODE initialize(gsl::not_null<ENGINE_HANDLE*> handle,
263                                        const char* config_str) {
264        EWB_Engine* ewb = to_engine(handle);
265
266        // Extract the name of the real engine we will be proxying; then
267        // create and initialize it.
268        std::string config(config_str);
269        auto seperator = config.find(";");
270        std::string real_engine_name(config.substr(0, seperator));
271        std::string real_engine_config;
272        if (seperator != std::string::npos) {
273            real_engine_config = config.substr(seperator + 1);
274        }
275
276        if ((ewb->real_engine_ref = load_engine(
277                     real_engine_name.c_str(), NULL, NULL)) == NULL) {
278            LOG_CRITICAL(
279                    "ERROR: EWB_Engine::initialize(): Failed to load real "
280                    "engine '{}'",
281                    real_engine_name);
282            abort();
283        }
284
285        if (!create_engine_instance(
286                    ewb->real_engine_ref, get_wrapped_gsa, &ewb->real_handle)) {
287            LOG_CRITICAL(
288                    "ERROR: EWB_Engine::initialize(): Failed create "
289                    "engine instance '{}'",
290                    real_engine_name);
291            abort();
292        }
293
294        if (ewb->real_handle->interface != 1) {
295            LOG_CRITICAL(
296                    "ERROR: EWB_Engine::initialize(): Only support engine "
297                    "with interface v1 - got v{}.",
298                    ewb->real_engine->interface.interface);
299            abort();
300        }
301        ewb->real_engine =
302                reinterpret_cast<ENGINE_HANDLE_V1*>(ewb->real_handle);
303
304
305        engine_map[ewb->real_handle] = ewb;
306        ENGINE_ERROR_CODE res = ewb->real_engine->initialize(
307                ewb->real_handle, real_engine_config.c_str());
308
309        if (res == ENGINE_SUCCESS) {
310            // For engine interface functions which cannot return EWOULDBLOCK,
311            // and we otherwise don't want to interpose, we can simply use the
312            // real_engine's functions directly.
313            ewb->ENGINE_HANDLE_V1::item_set_cas = ewb->real_engine->item_set_cas;
314            ewb->ENGINE_HANDLE_V1::set_item_info = ewb->real_engine->set_item_info;
315            ewb->ENGINE_HANDLE_V1::item_set_datatype =
316                    ewb->real_engine->item_set_datatype;
317        }
318
319        // Register a callback on DISCONNECT events, so we can delete
320        // any stale elements from connection_map when a connection
321        // DC's.
322        real_api->callback->register_callback(
323                reinterpret_cast<ENGINE_HANDLE*>(ewb),
324                ON_DISCONNECT,
325                handle_disconnect,
326                reinterpret_cast<ENGINE_HANDLE*>(ewb));
327
328        return res;
329    }
330
331    static void destroy(gsl::not_null<ENGINE_HANDLE*> handle,
332                        const bool force) {
333        EWB_Engine* ewb = to_engine(handle);
334        ewb->real_engine->destroy(ewb->real_handle, force);
335        delete ewb;
336    }
337
338    static cb::EngineErrorItemPair allocate(
339            gsl::not_null<ENGINE_HANDLE*> handle,
340            gsl::not_null<const void*> cookie,
341            const DocKey& key,
342            const size_t nbytes,
343            const int flags,
344            const rel_time_t exptime,
345            uint8_t datatype,
346            uint16_t vbucket) {
347        EWB_Engine* ewb = to_engine(handle);
348        ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
349        if (ewb->should_inject_error(Cmd::ALLOCATE, cookie, err)) {
350            return cb::makeEngineErrorItemPair(cb::engine_errc(err));
351        } else {
352            return ewb->real_engine->allocate(ewb->real_handle,
353                                              cookie,
354                                              key,
355                                              nbytes,
356                                              flags,
357                                              exptime,
358                                              datatype,
359                                              vbucket);
360        }
361    }
362
363    static std::pair<cb::unique_item_ptr, item_info> allocate_ex(
364            gsl::not_null<ENGINE_HANDLE*> handle,
365            gsl::not_null<const void*> cookie,
366            const DocKey& key,
367            size_t nbytes,
368            size_t priv_nbytes,
369            int flags,
370            rel_time_t exptime,
371            uint8_t datatype,
372            uint16_t vbucket) {
373        EWB_Engine* ewb = to_engine(handle);
374        ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
375        if (ewb->should_inject_error(Cmd::ALLOCATE, cookie, err)) {
376            throw cb::engine_error(cb::engine_errc(err), "ewb: injecting error");
377        } else {
378            return ewb->real_engine->allocate_ex(ewb->real_handle, cookie,
379                                                 key, nbytes, priv_nbytes,
380                                                 flags, exptime, datatype,
381                                                 vbucket);
382        }
383    }
384
385    static ENGINE_ERROR_CODE remove(gsl::not_null<ENGINE_HANDLE*> handle,
386                                    gsl::not_null<const void*> cookie,
387                                    const DocKey& key,
388                                    uint64_t& cas,
389                                    uint16_t vbucket,
390                                    mutation_descr_t& mut_info) {
391        EWB_Engine* ewb = to_engine(handle);
392        ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
393        if (ewb->should_inject_error(Cmd::REMOVE, cookie, err)) {
394            return err;
395        } else {
396            return ewb->real_engine->remove(ewb->real_handle, cookie, key, cas,
397                                            vbucket, mut_info);
398        }
399    }
400
401    static void release(gsl::not_null<ENGINE_HANDLE*> handle,
402                        gsl::not_null<item*> item) {
403        EWB_Engine* ewb = to_engine(handle);
404        LOG_DEBUG("EWB_Engine: release");
405
406        if (item == &ewb->dcp_mutation_item) {
407            // Ignore the DCP mutation, we own it (and don't track
408            // refcounts on it).
409        } else {
410            return ewb->real_engine->release(ewb->real_handle, item);
411        }
412    }
413
414    static cb::EngineErrorItemPair get(gsl::not_null<ENGINE_HANDLE*> handle,
415                                       gsl::not_null<const void*> cookie,
416                                       const DocKey& key,
417                                       uint16_t vbucket,
418                                       DocStateFilter documentStateFilter) {
419        EWB_Engine* ewb = to_engine(handle);
420        ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
421        if (ewb->should_inject_error(Cmd::GET, cookie, err)) {
422            return std::make_pair(
423                    cb::engine_errc(err),
424                    cb::unique_item_ptr{nullptr, cb::ItemDeleter{handle}});
425        } else {
426            return ewb->real_engine->get(ewb->real_handle,
427                                         cookie,
428                                         key,
429                                         vbucket,
430                                         documentStateFilter);
431        }
432    }
433
434    static cb::EngineErrorItemPair get_if(
435            gsl::not_null<ENGINE_HANDLE*> handle,
436            gsl::not_null<const void*> cookie,
437            const DocKey& key,
438            uint16_t vbucket,
439            std::function<bool(const item_info&)> filter) {
440        EWB_Engine* ewb = to_engine(handle);
441        ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
442        if (ewb->should_inject_error(Cmd::GET, cookie, err)) {
443            return cb::makeEngineErrorItemPair(cb::engine_errc::would_block);
444        } else {
445            return ewb->real_engine->get_if(
446                    ewb->real_handle, cookie, key, vbucket, filter);
447        }
448    }
449
450    static cb::EngineErrorItemPair get_and_touch(
451            gsl::not_null<ENGINE_HANDLE*> handle,
452            gsl::not_null<const void*> cookie,
453            const DocKey& key,
454            uint16_t vbucket,
455            uint32_t exptime) {
456        EWB_Engine* ewb = to_engine(handle);
457        ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
458        if (ewb->should_inject_error(Cmd::GET, cookie, err)) {
459            return cb::makeEngineErrorItemPair(cb::engine_errc::would_block);
460        } else {
461            return ewb->real_engine->get_and_touch(
462                    ewb->real_handle, cookie, key, vbucket, exptime);
463        }
464    }
465
466    static cb::EngineErrorItemPair get_locked(
467            gsl::not_null<ENGINE_HANDLE*> handle,
468            gsl::not_null<const void*> cookie,
469            const DocKey& key,
470            uint16_t vbucket,
471            uint32_t lock_timeout) {
472        EWB_Engine* ewb = to_engine(handle);
473        ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
474        if (ewb->should_inject_error(Cmd::LOCK, cookie, err)) {
475            return cb::makeEngineErrorItemPair(cb::engine_errc(err));
476        } else {
477            return ewb->real_engine->get_locked(
478                    ewb->real_handle, cookie, key, vbucket, lock_timeout);
479        }
480    }
481
482    static cb::EngineErrorMetadataPair get_meta(
483            gsl::not_null<ENGINE_HANDLE*> handle,
484            gsl::not_null<const void*> cookie,
485            const DocKey& key,
486            uint16_t vbucket) {
487        EWB_Engine* ewb = to_engine(handle);
488        ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
489        if (ewb->should_inject_error(Cmd::GET_META, cookie, err)) {
490            return std::make_pair(cb::engine_errc(err), item_info());
491        } else {
492            return ewb->real_engine->get_meta(ewb->real_handle,
493                                              cookie,
494                                              key,
495                                              vbucket);
496        }
497    }
498
499    static ENGINE_ERROR_CODE unlock(gsl::not_null<ENGINE_HANDLE*> handle,
500                                    gsl::not_null<const void*> cookie,
501                                    const DocKey& key,
502                                    uint16_t vbucket,
503                                    uint64_t cas) {
504        EWB_Engine* ewb = to_engine(handle);
505        ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
506        if (ewb->should_inject_error(Cmd::UNLOCK, cookie, err)) {
507            return err;
508        } else {
509            return ewb->real_engine->unlock(ewb->real_handle, cookie, key,
510                                            vbucket, cas);
511        }
512    }
513
514    static ENGINE_ERROR_CODE store(gsl::not_null<ENGINE_HANDLE*> handle,
515                                   gsl::not_null<const void*> cookie,
516                                   gsl::not_null<item*> item,
517                                   uint64_t& cas,
518                                   ENGINE_STORE_OPERATION operation,
519                                   DocumentState document_state) {
520        EWB_Engine* ewb = to_engine(handle);
521        ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
522        Cmd opcode = (operation == OPERATION_CAS) ? Cmd::CAS : Cmd::STORE;
523        if (ewb->should_inject_error(opcode, cookie, err)) {
524            return err;
525        } else {
526            return ewb->real_engine->store(ewb->real_handle, cookie, item, cas,
527                                           operation, document_state);
528        }
529    }
530
531    static cb::EngineErrorCasPair store_if(gsl::not_null<ENGINE_HANDLE*> handle,
532                                           gsl::not_null<const void*> cookie,
533                                           gsl::not_null<item*> item,
534                                           uint64_t cas,
535                                           ENGINE_STORE_OPERATION operation,
536                                           cb::StoreIfPredicate predicate,
537                                           DocumentState document_state) {
538        EWB_Engine* ewb = to_engine(handle);
539        ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
540        Cmd opcode = (operation == OPERATION_CAS) ? Cmd::CAS : Cmd::STORE;
541        if (ewb->should_inject_error(opcode, cookie, err)) {
542            return {cb::engine_errc(err), 0};
543        } else {
544            return ewb->real_engine->store_if(ewb->real_handle,
545                                              cookie,
546                                              item,
547                                              cas,
548                                              operation,
549                                              predicate,
550                                              document_state);
551        }
552    }
553
554    static ENGINE_ERROR_CODE flush(gsl::not_null<ENGINE_HANDLE*> handle,
555                                   gsl::not_null<const void*> cookie) {
556        // Flush is a little different - it often returns EWOULDBLOCK, and
557        // notify_io_complete() just tells the server it can issue it's *next*
558        // command (i.e. no need to re-flush). Therefore just pass Flush
559        // straight through for now.
560        EWB_Engine* ewb = to_engine(handle);
561        return ewb->real_engine->flush(ewb->real_handle, cookie);
562    }
563
564    static ENGINE_ERROR_CODE get_stats(gsl::not_null<ENGINE_HANDLE*> handle,
565                                       gsl::not_null<const void*> cookie,
566                                       cb::const_char_buffer key,
567                                       ADD_STAT add_stat) {
568        EWB_Engine* ewb = to_engine(handle);
569        ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
570        if (ewb->should_inject_error(Cmd::GET_STATS, cookie, err)) {
571            return err;
572        } else {
573            return ewb->real_engine->get_stats(
574                    ewb->real_handle, cookie, key, add_stat);
575        }
576    }
577
578    static void reset_stats(gsl::not_null<ENGINE_HANDLE*> handle,
579                            gsl::not_null<const void*> cookie) {
580        EWB_Engine* ewb = to_engine(handle);
581        return ewb->real_engine->reset_stats(ewb->real_handle, cookie);
582    }
583
584    /* Handle 'unknown_command'. In additional to wrapping calls to the
585     * underlying real engine, this is also used to configure
586     * ewouldblock_engine itself using he CMD_EWOULDBLOCK_CTL opcode.
587     */
588    static ENGINE_ERROR_CODE unknown_command(
589            gsl::not_null<ENGINE_HANDLE*> handle,
590            const void* cookie,
591            gsl::not_null<protocol_binary_request_header*> request,
592            ADD_RESPONSE response,
593            DocNamespace doc_namespace) {
594        EWB_Engine* ewb = to_engine(handle);
595
596        if (request->request.opcode == PROTOCOL_BINARY_CMD_EWOULDBLOCK_CTL) {
597            auto* req =
598                    reinterpret_cast<request_ewouldblock_ctl*>(request.get());
599            const EWBEngineMode mode = static_cast<EWBEngineMode>(ntohl(req->message.body.mode));
600            const uint32_t value = ntohl(req->message.body.value);
601            const ENGINE_ERROR_CODE injected_error =
602                    static_cast<ENGINE_ERROR_CODE>(ntohl(req->message.body.inject_error));
603            const std::string key((char*)req->bytes + sizeof(req->bytes),
604                                  ntohs(req->message.header.request.keylen));
605
606            std::shared_ptr<FaultInjectMode> new_mode = nullptr;
607
608            // Validate mode, and construct new fault injector.
609            switch (mode) {
610                case EWBEngineMode::Next_N:
611                    new_mode = std::make_shared<ErrOnNextN>(injected_error, value);
612                    break;
613
614                case EWBEngineMode::Random:
615                    new_mode = std::make_shared<ErrRandom>(injected_error, value);
616                    break;
617
618                case EWBEngineMode::First:
619                    new_mode = std::make_shared<ErrOnFirst>(injected_error);
620                    break;
621
622                case EWBEngineMode::Sequence:
623                    new_mode = std::make_shared<ErrSequence>(injected_error, value);
624                    break;
625
626                case EWBEngineMode::No_Notify:
627                    new_mode = std::make_shared<ErrOnNoNotify>(injected_error);
628                    break;
629
630                case EWBEngineMode::CasMismatch:
631                    new_mode = std::make_shared<CASMismatch>(value);
632                    break;
633
634                case EWBEngineMode::IncrementClusterMapRevno:
635                    ewb->clustermap_revno++;
636                    response(nullptr, 0, nullptr, 0, nullptr, 0,
637                             PROTOCOL_BINARY_RAW_BYTES,
638                             PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
639                    return ENGINE_SUCCESS;
640
641                case EWBEngineMode::BlockMonitorFile:
642                    return ewb->handleBlockMonitorFile(cookie, value, key,
643                                                       response);
644
645                case EWBEngineMode::Suspend:
646                    return ewb->handleSuspend(cookie, value, response);
647
648                case EWBEngineMode::Resume:
649                    return ewb->handleResume(cookie, value, response);
650
651                case EWBEngineMode::SetItemCas:
652                    return ewb->setItemCas(cookie, key, value, response);
653            }
654
655            if (new_mode == nullptr) {
656                LOG_WARNING(
657                        "EWB_Engine::unknown_command(): "
658                        "Got unexpected mode={} for EWOULDBLOCK_CTL, ",
659                        (unsigned int)mode);
660                response(nullptr, 0, nullptr, 0, nullptr, 0,
661                         PROTOCOL_BINARY_RAW_BYTES,
662                         PROTOCOL_BINARY_RESPONSE_EINVAL, /*cas*/0, cookie);
663                return ENGINE_FAILED;
664            } else {
665                try {
666                    LOG_DEBUG(
667                            "EWB_Engine::unknown_command(): Setting EWB mode "
668                            "to "
669                            "{} for cookie {}",
670                            new_mode->to_string(),
671                            cookie);
672
673                    uint64_t id = ewb->get_connection_id(cookie);
674
675                    {
676                        std::lock_guard<std::mutex> guard(ewb->cookie_map_mutex);
677                        ewb->connection_map.erase(id);
678                        ewb->connection_map.emplace(id, std::make_pair(cookie, new_mode));
679                    }
680
681                    response(nullptr, 0, nullptr, 0, nullptr, 0,
682                             PROTOCOL_BINARY_RAW_BYTES,
683                             PROTOCOL_BINARY_RESPONSE_SUCCESS, /*cas*/0, cookie);
684                    return ENGINE_SUCCESS;
685                } catch (std::bad_alloc&) {
686                    return ENGINE_ENOMEM;
687                }
688            }
689        } else {
690            ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
691            if (ewb->should_inject_error(Cmd::UNKNOWN_COMMAND, cookie, err)) {
692                return err;
693            } else {
694                return ewb->real_engine->unknown_command(ewb->real_handle, cookie,
695                                                         request, response,
696                                                         doc_namespace);
697            }
698        }
699    }
700
701    static void item_set_cas(gsl::not_null<ENGINE_HANDLE*> handle,
702                             gsl::not_null<item*> item,
703                             uint64_t cas) {
704        // Should never be called as ENGINE_HANDLE_V1::item_set_cas is updated
705        // to point to the real_engine once it is initialized. This function
706        //only exists so there is a non-NULL value for
707        // ENGINE_HANDLE_V1::item_set_cas initially to keep load_engine()
708        // happy.
709        abort();
710    }
711
712    static void item_set_datatype(gsl::not_null<ENGINE_HANDLE*>,
713                                  gsl::not_null<item*> itm,
714                                  protocol_binary_datatype_t datatype) {
715        // Should never be called - set item_set_datatype().
716        abort();
717    }
718
719    static bool get_item_info(gsl::not_null<ENGINE_HANDLE*> handle,
720                              gsl::not_null<const item*> item,
721                              gsl::not_null<item_info*> item_info) {
722        EWB_Engine* ewb = to_engine(handle);
723        LOG_DEBUG("EWB_Engine: get_item_info");
724
725        // This function cannot return EWOULDBLOCK - just chain to the real
726        // engine's function, unless it is a request for our special DCP item.
727        if (item == &ewb->dcp_mutation_item) {
728            item_info->cas = 0;
729            item_info->vbucket_uuid = 0;
730            item_info->seqno = 0;
731            item_info->exptime = 0;
732            item_info->nbytes =
733                    gsl::narrow<uint32_t>(ewb->dcp_mutation_item.value.size());
734            item_info->flags = 0;
735            item_info->datatype = PROTOCOL_BINARY_DATATYPE_XATTR;
736            item_info->nkey =
737                    gsl::narrow<uint16_t>(ewb->dcp_mutation_item.key.size());
738            item_info->key = ewb->dcp_mutation_item.key.c_str();
739            item_info->value[0].iov_base = &ewb->dcp_mutation_item.value[0];
740            item_info->value[0].iov_len = item_info->nbytes;
741            return true;
742        } else {
743            return ewb->real_engine->get_item_info(
744                    ewb->real_handle, item, item_info);
745        }
746    }
747
748    static bool set_item_info(gsl::not_null<ENGINE_HANDLE*> handle,
749                              gsl::not_null<item*> item,
750                              gsl::not_null<const item_info*> item_info) {
751        // Should never be called - set item_set_cas().
752        abort();
753    }
754
755    static void handle_disconnect(const void* cookie,
756                                  ENGINE_EVENT_TYPE type,
757                                  const void* event_data,
758                                  const void* cb_data) {
759        cb_assert(event_data == NULL);
760        EWB_Engine* ewb =
761                reinterpret_cast<EWB_Engine*>(const_cast<void*>(cb_data));
762        LOG_DEBUG("EWB_Engine::handle_disconnect");
763
764        uint64_t id = ewb->get_connection_id(cookie);
765        {
766            std::lock_guard<std::mutex> guard(ewb->cookie_map_mutex);
767            ewb->connection_map.erase(id);
768        }
769    }
770
771    GET_SERVER_API gsa;
772
773    // Actual engine we are proxying requests to.
774    ENGINE_HANDLE* real_handle;
775    ENGINE_HANDLE_V1* real_engine;
776    engine_reference* real_engine_ref;
777
778    std::atomic_int clustermap_revno;
779
780    /**
781     * The method responsible for pushing all of the notify_io_complete
782     * to the frontend. It is run by notify_io_thread and not intended to
783     * be called by anyone else!.
784     */
785    void process_notifications();
786    std::unique_ptr<Couchbase::Thread> notify_io_thread;
787
788protected:
789    /**
790     * Handle the control message for block monitor file
791     *
792     * @param cookie The cookie executing the operation
793     * @param id The identifier used to represent the cookie
794     * @param file The file to monitor
795     * @param response callback used to send a response to the client
796     * @return The standard engine error codes
797     */
798    ENGINE_ERROR_CODE handleBlockMonitorFile(const void* cookie,
799                                             uint32_t id,
800                                             const std::string& file,
801                                             ADD_RESPONSE response);
802
803    /**
804     * Handle the control message for suspend
805     *
806     * @param cookie The cookie executing the operation
807     * @param id The identifier used to represent the cookie to resume
808     *           (the use of a different id is to allow resume to
809     *           be sent on a different connection)
810     * @param response callback used to send a response to the client
811     * @return The standard engine error codes
812     */
813    ENGINE_ERROR_CODE handleSuspend(const void* cookie,
814                                    uint32_t id,
815                                    ADD_RESPONSE response);
816
817    /**
818     * Handle the control message for resume
819     *
820     * @param cookie The cookie executing the operation
821     * @param id The identifier representing the connection to resume
822     * @param response callback used to send a response to the client
823     * @return The standard engine error codes
824     */
825    ENGINE_ERROR_CODE handleResume(const void* cookie,
826                                   uint32_t id,
827                                   ADD_RESPONSE response);
828
829    /**
830     * @param cookie the cookie executing the operation
831     * @param key ID of the item whose CAS should be changed
832     * @param cas The new CAS
833     * @param response Response callback used to send a response to the client
834     * @return Standard engine error codes
835     */
836    ENGINE_ERROR_CODE setItemCas(const void *cookie,
837                                 const std::string& key, uint32_t cas,
838                                 ADD_RESPONSE response);
839
840private:
841    // Shared state between the main thread of execution and the background
842    // thread processing pending io ops.
843    std::mutex mutex;
844    std::condition_variable condvar;
845    std::queue<const void*> pending_io_ops;
846
847    std::atomic<bool> stop_notification_thread;
848
849    ///////////////////////////////////////////////////////////////////////////
850    //             All of the methods used in the DCP interface              //
851    //                                                                       //
852    // We don't support mocking with the DCP interface yet, so all access to //
853    // the DCP interface will be proxied down to the underlying engine.      //
854    ///////////////////////////////////////////////////////////////////////////
855    static ENGINE_ERROR_CODE dcp_step(
856            gsl::not_null<ENGINE_HANDLE*> handle,
857            gsl::not_null<const void*> cookie,
858            gsl::not_null<struct dcp_message_producers*> producers);
859
860    static ENGINE_ERROR_CODE dcp_open(gsl::not_null<ENGINE_HANDLE*> handle,
861                                      gsl::not_null<const void*> cookie,
862                                      uint32_t opaque,
863                                      uint32_t seqno,
864                                      uint32_t flags,
865                                      cb::const_char_buffer name,
866                                      cb::const_byte_buffer json);
867
868    static ENGINE_ERROR_CODE dcp_add_stream(
869            gsl::not_null<ENGINE_HANDLE*> handle,
870            gsl::not_null<const void*> cookie,
871            uint32_t opaque,
872            uint16_t vbucket,
873            uint32_t flags);
874
875    static ENGINE_ERROR_CODE dcp_close_stream(
876            gsl::not_null<ENGINE_HANDLE*> handle,
877            gsl::not_null<const void*> cookie,
878            uint32_t opaque,
879            uint16_t vbucket);
880
881    static ENGINE_ERROR_CODE dcp_stream_req(
882            gsl::not_null<ENGINE_HANDLE*> handle,
883            gsl::not_null<const void*> cookie,
884            uint32_t flags,
885            uint32_t opaque,
886            uint16_t vbucket,
887            uint64_t start_seqno,
888            uint64_t end_seqno,
889            uint64_t vbucket_uuid,
890            uint64_t snap_start_seqno,
891            uint64_t snap_end_seqno,
892            uint64_t* rollback_seqno,
893            dcp_add_failover_log callback);
894
895    static ENGINE_ERROR_CODE dcp_get_failover_log(
896            gsl::not_null<ENGINE_HANDLE*> handle,
897            gsl::not_null<const void*> cookie,
898            uint32_t opaque,
899            uint16_t vbucket,
900            dcp_add_failover_log callback);
901
902    static ENGINE_ERROR_CODE dcp_stream_end(
903            gsl::not_null<ENGINE_HANDLE*> handle,
904            gsl::not_null<const void*> cookie,
905            uint32_t opaque,
906            uint16_t vbucket,
907            uint32_t flags);
908
909    static ENGINE_ERROR_CODE dcp_snapshot_marker(
910            gsl::not_null<ENGINE_HANDLE*> handle,
911            gsl::not_null<const void*> cookie,
912            uint32_t opaque,
913            uint16_t vbucket,
914            uint64_t start_seqno,
915            uint64_t end_seqno,
916            uint32_t flags);
917
918    static ENGINE_ERROR_CODE dcp_mutation(gsl::not_null<ENGINE_HANDLE*> handle,
919                                          gsl::not_null<const void*> cookie,
920                                          uint32_t opaque,
921                                          const DocKey& key,
922                                          cb::const_byte_buffer value,
923                                          size_t priv_bytes,
924                                          uint8_t datatype,
925                                          uint64_t cas,
926                                          uint16_t vbucket,
927                                          uint32_t flags,
928                                          uint64_t by_seqno,
929                                          uint64_t rev_seqno,
930                                          uint32_t expiration,
931                                          uint32_t lock_time,
932                                          cb::const_byte_buffer meta,
933                                          uint8_t nru);
934
935    static ENGINE_ERROR_CODE dcp_deletion(gsl::not_null<ENGINE_HANDLE*> handle,
936                                          gsl::not_null<const void*> cookie,
937                                          uint32_t opaque,
938                                          const DocKey& key,
939                                          cb::const_byte_buffer value,
940                                          size_t priv_bytes,
941                                          uint8_t datatype,
942                                          uint64_t cas,
943                                          uint16_t vbucket,
944                                          uint64_t by_seqno,
945                                          uint64_t rev_seqno,
946                                          cb::const_byte_buffer meta);
947
948    static ENGINE_ERROR_CODE dcp_deletion_v2(
949            gsl::not_null<ENGINE_HANDLE*> handle,
950            gsl::not_null<const void*> cookie,
951            uint32_t opaque,
952            const DocKey& key,
953            cb::const_byte_buffer value,
954            size_t priv_bytes,
955            uint8_t datatype,
956            uint64_t cas,
957            uint16_t vbucket,
958            uint64_t by_seqno,
959            uint64_t rev_seqno,
960            uint32_t delete_time);
961
962    static ENGINE_ERROR_CODE dcp_expiration(
963            gsl::not_null<ENGINE_HANDLE*> handle,
964            gsl::not_null<const void*> cookie,
965            uint32_t opaque,
966            const DocKey& key,
967            cb::const_byte_buffer value,
968            size_t priv_bytes,
969            uint8_t datatype,
970            uint64_t cas,
971            uint16_t vbucket,
972            uint64_t by_seqno,
973            uint64_t rev_seqno,
974            cb::const_byte_buffer meta);
975
976    static ENGINE_ERROR_CODE dcp_flush(gsl::not_null<ENGINE_HANDLE*> handle,
977                                       gsl::not_null<const void*> cookie,
978                                       uint32_t opaque,
979                                       uint16_t vbucket);
980
981    static ENGINE_ERROR_CODE dcp_set_vbucket_state(
982            gsl::not_null<ENGINE_HANDLE*> handle,
983            gsl::not_null<const void*> cookie,
984            uint32_t opaque,
985            uint16_t vbucket,
986            vbucket_state_t state);
987
988    static ENGINE_ERROR_CODE dcp_noop(gsl::not_null<ENGINE_HANDLE*> handle,
989                                      gsl::not_null<const void*> cookie,
990                                      uint32_t opaque);
991
992    static ENGINE_ERROR_CODE dcp_buffer_acknowledgement(
993            gsl::not_null<ENGINE_HANDLE*> handle,
994            gsl::not_null<const void*> cookie,
995            uint32_t opaque,
996            uint16_t vbucket,
997            uint32_t buffer_bytes);
998
999    static ENGINE_ERROR_CODE dcp_control(gsl::not_null<ENGINE_HANDLE*> handle,
1000                                         gsl::not_null<const void*> cookie,
1001                                         uint32_t opaque,
1002                                         const void* key,
1003                                         uint16_t nkey,
1004                                         const void* value,
1005                                         uint32_t nvalue);
1006
1007    static ENGINE_ERROR_CODE dcp_response_handler(
1008            gsl::not_null<ENGINE_HANDLE*> handle,
1009            gsl::not_null<const void*> cookie,
1010            const protocol_binary_response_header* response);
1011
1012    static ENGINE_ERROR_CODE dcp_system_event(
1013            gsl::not_null<ENGINE_HANDLE*> handle,
1014            gsl::not_null<const void*> cookie,
1015            uint32_t opaque,
1016            uint16_t vbucket,
1017            mcbp::systemevent::id event,
1018            uint64_t bySeqno,
1019            cb::const_byte_buffer key,
1020            cb::const_byte_buffer eventData);
1021
1022    static cb::engine_error collections_set_manifest(
1023            gsl::not_null<ENGINE_HANDLE*> handle, cb::const_char_buffer json);
1024
1025    static cb::EngineErrorStringPair collections_get_manifest(
1026            gsl::not_null<ENGINE_HANDLE*> handle);
1027
1028    static bool isXattrEnabled(gsl::not_null<ENGINE_HANDLE*> handle);
1029
1030    static BucketCompressionMode getCompressionMode(gsl::not_null<ENGINE_HANDLE*> handle);
1031
1032    static size_t getMaxItemSize(gsl::not_null<ENGINE_HANDLE*> handle);
1033
1034    static float getMinCompressionRatio(gsl::not_null<ENGINE_HANDLE*> handle);
1035
1036    // Base class for all fault injection modes.
1037    struct FaultInjectMode {
1038        FaultInjectMode(ENGINE_ERROR_CODE injected_error_)
1039          : injected_error(injected_error_) {}
1040
1041        virtual bool add_to_pending_io_ops() {
1042            return true;
1043        }
1044        virtual bool should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err) = 0;
1045
1046        virtual std::string to_string() const = 0;
1047
1048    protected:
1049        ENGINE_ERROR_CODE injected_error;
1050    };
1051
1052    // Subclasses for each fault inject mode: /////////////////////////////////
1053
1054    class ErrOnFirst : public FaultInjectMode {
1055    public:
1056        ErrOnFirst(ENGINE_ERROR_CODE injected_error_)
1057          : FaultInjectMode(injected_error_),
1058            prev_cmd(Cmd::NONE) {}
1059
1060        bool should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err) {
1061            // Block unless the previous command from this cookie
1062            // was the same - i.e. all of a connections' commands
1063            // will EWOULDBLOCK the first time they are called.
1064            bool inject = (prev_cmd != cmd);
1065            prev_cmd = cmd;
1066            if (inject) {
1067                err = injected_error;
1068            }
1069            return inject;
1070        }
1071
1072        std::string to_string() const {
1073            return "ErrOnFirst inject_error=" + std::to_string(injected_error);
1074        }
1075
1076    private:
1077        // Last command issued by this cookie.
1078        Cmd prev_cmd;
1079    };
1080
1081    class ErrOnNextN : public FaultInjectMode {
1082    public:
1083        ErrOnNextN(ENGINE_ERROR_CODE injected_error_, uint32_t count_)
1084          : FaultInjectMode(injected_error_),
1085            count(count_) {}
1086
1087        bool should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err) {
1088            if (count > 0) {
1089                --count;
1090                err = injected_error;
1091                return true;
1092            } else {
1093                return false;
1094            }
1095        }
1096
1097        std::string to_string() const {
1098            return std::string("ErrOnNextN") +
1099                   " inject_error=" + std::to_string(injected_error) +
1100                   " count=" + std::to_string(count);
1101        }
1102
1103    private:
1104        // The count of commands issued that should return error.
1105        uint32_t count;
1106    };
1107
1108    class ErrRandom : public FaultInjectMode {
1109    public:
1110        ErrRandom(ENGINE_ERROR_CODE injected_error_, uint32_t percentage_)
1111          : FaultInjectMode(injected_error_),
1112            percentage_to_err(percentage_) {}
1113
1114        bool should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err) {
1115            std::random_device rd;
1116            std::mt19937 gen(rd());
1117            std::uniform_int_distribution<uint32_t> dis(1, 100);
1118            if (dis(gen) < percentage_to_err) {
1119                err = injected_error;
1120                return true;
1121            } else {
1122                return false;
1123            }
1124        }
1125
1126        std::string to_string() const {
1127            return std::string("ErrRandom") +
1128                   " inject_error=" + std::to_string(injected_error) +
1129                   " percentage=" + std::to_string(percentage_to_err);
1130        }
1131
1132    private:
1133        // Percentage chance that the specified error should be injected.
1134        uint32_t percentage_to_err;
1135    };
1136
1137    class ErrSequence : public FaultInjectMode {
1138    public:
1139        ErrSequence(ENGINE_ERROR_CODE injected_error_, uint32_t sequence_)
1140            : FaultInjectMode(injected_error_),
1141              sequence(sequence_),
1142              pos(0) {}
1143
1144        bool should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err) {
1145            bool inject = false;
1146            if (pos < 32) {
1147                inject = (sequence & (1 << pos)) != 0;
1148                pos++;
1149            }
1150            if (inject) {
1151                err = injected_error;
1152            }
1153            return inject;
1154        }
1155
1156        std::string to_string() const {
1157            std::stringstream ss;
1158            ss << "ErrSequence inject_error=" << injected_error
1159               << " sequence=0x" << std::hex << sequence
1160               << " pos=" << pos;
1161            return ss.str();
1162        }
1163
1164    private:
1165        uint32_t sequence;
1166        uint32_t pos;
1167    };
1168
1169    class ErrOnNoNotify : public FaultInjectMode {
1170        public:
1171            ErrOnNoNotify(ENGINE_ERROR_CODE injected_error_)
1172              : FaultInjectMode(injected_error_),
1173                issued_return_error(false) {}
1174
1175            bool add_to_pending_io_ops() {return false;}
1176            bool should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err) {
1177                if (!issued_return_error) {
1178                    issued_return_error = true;
1179                    err = injected_error;
1180                    return true;
1181                } else {
1182                    return false;
1183                }
1184            }
1185
1186            std::string to_string() const {
1187                return std::string("ErrOnNoNotify") +
1188                       " inject_error=" + std::to_string(injected_error) +
1189                       " issued_return_error=" +
1190                       std::to_string(issued_return_error);
1191            }
1192
1193        private:
1194            // Record of whether have yet issued return error.
1195            bool issued_return_error;
1196        };
1197
1198    class CASMismatch : public FaultInjectMode {
1199    public:
1200        CASMismatch(uint32_t count_)
1201          : FaultInjectMode(ENGINE_KEY_EEXISTS),
1202            count(count_) {}
1203
1204        bool should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err) {
1205            if (cmd == Cmd::CAS && (count > 0)) {
1206                --count;
1207                err = injected_error;
1208                return true;
1209            } else {
1210                return false;
1211            }
1212        }
1213
1214        std::string to_string() const {
1215            return std::string("CASMismatch") +
1216                   " count=" + std::to_string(count);
1217        }
1218
1219    private:
1220        uint32_t count;
1221    };
1222
1223    // Map of connections (aka cookies) to their current mode.
1224    std::map<uint64_t, std::pair<const void*, std::shared_ptr<FaultInjectMode> > > connection_map;
1225    // Mutex for above map.
1226    std::mutex cookie_map_mutex;
1227
1228    // Current DCP mutation `item`. We return the address of this
1229    // (in the dcp step() function) back to the server, and then in
1230    // get_item_info we check if the requested item is this one.
1231    class EwbDcpKey {
1232    public:
1233        EwbDcpKey()
1234            : key("k") {
1235            cb::xattr::Blob builder;
1236            builder.set("_ewb", "{\"internal\":true}");
1237            builder.set("meta", "{\"author\":\"jack\"}");
1238            const auto blob = builder.finalize();
1239            std::copy(blob.buf, blob.buf + blob.len, std::back_inserter(value));
1240            // MB24971 - the body is large as it increases the probability of
1241            // transit returning TransmitResult::SoftError
1242            const std::string body(1000, 'x');
1243            std::copy(body.begin(), body.end(), std::back_inserter(value));
1244        }
1245
1246        std::string key;
1247        std::vector<uint8_t> value;
1248    } dcp_mutation_item;
1249
1250    /**
1251     * The dcp_stream map is used to map a cookie to the count of objects
1252     * it should send on the stream.
1253     *
1254     * Each entry in here constists of a pair containing a boolean specifying
1255     * if the stream is opened or not, and a count of how many times we should
1256     * return data
1257     */
1258    std::map<const void*, std::pair<bool, uint64_t>> dcp_stream;
1259
1260    friend class BlockMonitorThread;
1261    std::map<uint32_t, const void*> suspended_map;
1262    std::mutex suspended_map_mutex;
1263
1264    bool suspend(const void* cookie, uint32_t id) {
1265        {
1266            std::lock_guard<std::mutex> guard(suspended_map_mutex);
1267            auto iter = suspended_map.find(id);
1268            if (iter == suspended_map.cend()) {
1269                suspended_map[id] = cookie;
1270                return true;
1271            }
1272        }
1273
1274        return false;
1275    }
1276
1277    bool resume(uint32_t id) {
1278        const void* cookie = nullptr;
1279        {
1280            std::lock_guard<std::mutex> guard(suspended_map_mutex);
1281            auto iter = suspended_map.find(id);
1282            if (iter == suspended_map.cend()) {
1283                return false;
1284            }
1285            cookie = iter->second;
1286            suspended_map.erase(iter);
1287        }
1288
1289
1290        schedule_notification(cookie);
1291        return true;
1292    }
1293
1294    bool is_connection_suspended(const void* cookie) {
1295        std::lock_guard<std::mutex> guard(suspended_map_mutex);
1296        for (const auto c : suspended_map) {
1297            if (c.second == cookie) {
1298                LOG_DEBUG(
1299                        "Connection {} with id {} should be suspended for "
1300                        "engine {}",
1301                        c.second,
1302                        c.first,
1303                        (void*)this);
1304
1305                return true;
1306            }
1307        }
1308        return false;
1309    }
1310
1311    void schedule_notification(const void* cookie) {
1312        {
1313            std::lock_guard<std::mutex> guard(mutex);
1314            pending_io_ops.push(cookie);
1315        }
1316        LOG_DEBUG("EWB_Engine: connection {} should be resumed for engine {}",
1317                  (void*)cookie,
1318                  (void*)this);
1319
1320        condvar.notify_one();
1321    }
1322
1323    // Vector to keep track of the threads we've started to ensure
1324    // we don't leak memory ;-)
1325    std::mutex threads_mutex;
1326    std::vector<std::unique_ptr<Couchbase::Thread> > threads;
1327};
1328
1329EWB_Engine::EWB_Engine(GET_SERVER_API gsa_)
1330  : gsa(gsa_),
1331    real_engine(NULL),
1332    real_engine_ref(nullptr),
1333    notify_io_thread(new NotificationThread(*this))
1334{
1335    init_wrapped_api(gsa);
1336
1337    interface.interface = 1;
1338    ENGINE_HANDLE_V1::initialize = initialize;
1339    ENGINE_HANDLE_V1::destroy = destroy;
1340    ENGINE_HANDLE_V1::allocate = allocate;
1341    ENGINE_HANDLE_V1::allocate_ex = allocate_ex;
1342    ENGINE_HANDLE_V1::remove = remove;
1343    ENGINE_HANDLE_V1::release = release;
1344    ENGINE_HANDLE_V1::get = get;
1345    ENGINE_HANDLE_V1::get_if = get_if;
1346    ENGINE_HANDLE_V1::get_locked = get_locked;
1347    ENGINE_HANDLE_V1::get_meta = get_meta;
1348    ENGINE_HANDLE_V1::get_and_touch = get_and_touch;
1349    ENGINE_HANDLE_V1::unlock = unlock;
1350    ENGINE_HANDLE_V1::store = store;
1351    ENGINE_HANDLE_V1::store_if = store_if;
1352    ENGINE_HANDLE_V1::flush = flush;
1353    ENGINE_HANDLE_V1::get_stats = get_stats;
1354    ENGINE_HANDLE_V1::reset_stats = reset_stats;
1355    ENGINE_HANDLE_V1::unknown_command = unknown_command;
1356    ENGINE_HANDLE_V1::item_set_cas = item_set_cas;
1357    ENGINE_HANDLE_V1::item_set_datatype = item_set_datatype;
1358    ENGINE_HANDLE_V1::get_item_info = get_item_info;
1359    ENGINE_HANDLE_V1::set_item_info = set_item_info;
1360    ENGINE_HANDLE_V1::set_log_level = NULL;
1361
1362    ENGINE_HANDLE_V1::dcp = {};
1363    ENGINE_HANDLE_V1::dcp.step = dcp_step;
1364    ENGINE_HANDLE_V1::dcp.open = dcp_open;
1365    ENGINE_HANDLE_V1::dcp.stream_req = dcp_stream_req;
1366    ENGINE_HANDLE_V1::dcp.add_stream = dcp_add_stream;
1367    ENGINE_HANDLE_V1::dcp.close_stream = dcp_close_stream;
1368    ENGINE_HANDLE_V1::dcp.buffer_acknowledgement = dcp_buffer_acknowledgement;
1369    ENGINE_HANDLE_V1::dcp.control = dcp_control;
1370    ENGINE_HANDLE_V1::dcp.get_failover_log = dcp_get_failover_log;
1371    ENGINE_HANDLE_V1::dcp.stream_end = dcp_stream_end;
1372    ENGINE_HANDLE_V1::dcp.snapshot_marker = dcp_snapshot_marker;
1373    ENGINE_HANDLE_V1::dcp.mutation = dcp_mutation;
1374    ENGINE_HANDLE_V1::dcp.deletion = dcp_deletion;
1375    ENGINE_HANDLE_V1::dcp.deletion_v2 = dcp_deletion_v2;
1376    ENGINE_HANDLE_V1::dcp.expiration = dcp_expiration;
1377    ENGINE_HANDLE_V1::dcp.flush = dcp_flush;
1378    ENGINE_HANDLE_V1::dcp.set_vbucket_state = dcp_set_vbucket_state;
1379    ENGINE_HANDLE_V1::dcp.noop = dcp_noop;
1380    ENGINE_HANDLE_V1::dcp.response_handler = dcp_response_handler;
1381    ENGINE_HANDLE_V1::dcp.system_event = dcp_system_event;
1382
1383    ENGINE_HANDLE_V1::collections = {};
1384    ENGINE_HANDLE_V1::collections.set_manifest = collections_set_manifest;
1385    ENGINE_HANDLE_V1::collections.get_manifest = collections_get_manifest;
1386
1387    ENGINE_HANDLE_V1::isXattrEnabled = isXattrEnabled;
1388    ENGINE_HANDLE_V1::getCompressionMode = getCompressionMode;
1389    ENGINE_HANDLE_V1::getMaxItemSize = getMaxItemSize;
1390    ENGINE_HANDLE_V1::getMinCompressionRatio = getMinCompressionRatio;
1391
1392    clustermap_revno = 1;
1393
1394    get_connection_id = gsa()->cookie->get_connection_id;
1395
1396    stop_notification_thread.store(false);
1397    notify_io_thread->start();
1398}
1399
1400static void register_callback(ENGINE_HANDLE *eh, ENGINE_EVENT_TYPE type,
1401                              EVENT_CALLBACK cb, const void *cb_data) {
1402    const auto& p = engine_map.find(eh);
1403    if (p == engine_map.end()) {
1404        std::cerr << "Can't find EWB corresponding to " << std::hex << eh << std::endl;
1405        for (const auto& pair : engine_map) {
1406            std::cerr << "EH: " << std::hex << pair.first << " = EWB: " << std::hex << pair.second << std::endl;
1407        }
1408        abort();
1409    }
1410    cb_assert(p != engine_map.end());
1411    auto wrapped_eh = reinterpret_cast<ENGINE_HANDLE*>(p->second);
1412    real_api->callback->register_callback(wrapped_eh, type, cb, cb_data);
1413}
1414
1415EWB_Engine::~EWB_Engine() {
1416    engine_map.erase(real_handle);
1417    cb_free(real_engine_ref);
1418    stop_notification_thread = true;
1419    condvar.notify_all();
1420    notify_io_thread->waitForState(Couchbase::ThreadState::Zombie);
1421}
1422
1423ENGINE_ERROR_CODE EWB_Engine::dcp_step(
1424        gsl::not_null<ENGINE_HANDLE*> handle,
1425        gsl::not_null<const void*> cookie,
1426        gsl::not_null<struct dcp_message_producers*> producers) {
1427    EWB_Engine* ewb = to_engine(handle);
1428    auto stream = ewb->dcp_stream.find(cookie);
1429    if (stream != ewb->dcp_stream.end()) {
1430        auto& count = stream->second.second;
1431        // If the stream is enabled and we have data to send..
1432        if (stream->second.first && count > 0) {
1433            // This is using the internal dcp implementation which always
1434            // send the same item back
1435            auto ret = producers->mutation(cookie,
1436                                           0xdeadbeef /*opqaue*/,
1437                                           &ewb->dcp_mutation_item,
1438                                           0 /*vb*/,
1439                                           0 /*by_seqno*/,
1440                                           0 /*rev_seqno*/,
1441                                           0 /*lock_time*/,
1442                                           nullptr /*meta*/,
1443                                           0 /*nmeta*/,
1444                                           0 /*nru*/,
1445                                           0 /*collection_len*/);
1446            --count;
1447            if (ret == ENGINE_SUCCESS) {
1448                return ENGINE_WANT_MORE;
1449            }
1450            return ret;
1451        }
1452        return ENGINE_SUCCESS;
1453    }
1454
1455    if (ewb->real_engine->dcp.step == nullptr) {
1456        return ENGINE_ENOTSUP;
1457    } else {
1458        return ewb->real_engine->dcp.step(ewb->real_handle, cookie, producers);
1459    }
1460}
1461
1462ENGINE_ERROR_CODE EWB_Engine::dcp_open(gsl::not_null<ENGINE_HANDLE*> handle,
1463                                       gsl::not_null<const void*> cookie,
1464                                       uint32_t opaque,
1465                                       uint32_t seqno,
1466                                       uint32_t flags,
1467                                       cb::const_char_buffer name,
1468                                       cb::const_byte_buffer json) {
1469    EWB_Engine* ewb = to_engine(handle);
1470    std::string nm = cb::to_string(name);
1471    if (nm.find("ewb_internal") == 0) {
1472        // Yeah, this is a request for the internal "magic" DCP stream
1473        // The user could specify the iteration count by adding a colon
1474        // at the end...
1475        auto idx = nm.rfind(":");
1476
1477        if (idx != nm.npos) {
1478            ewb->dcp_stream[cookie] =
1479                    std::make_pair(false, std::stoull(nm.substr(idx + 1)));
1480        } else {
1481            ewb->dcp_stream[cookie] =
1482                    std::make_pair(false, std::numeric_limits<uint64_t>::max());
1483        }
1484        return ENGINE_SUCCESS;
1485    }
1486
1487    if (ewb->real_engine->dcp.open == nullptr) {
1488        return ENGINE_ENOTSUP;
1489    } else {
1490        return ewb->real_engine->dcp.open(ewb->real_handle,
1491                                          cookie,
1492                                          opaque,
1493                                          seqno,
1494                                          flags,
1495                                          name,
1496                                          json);
1497    }
1498}
1499
1500ENGINE_ERROR_CODE EWB_Engine::dcp_stream_req(
1501        gsl::not_null<ENGINE_HANDLE*> handle,
1502        gsl::not_null<const void*> cookie,
1503        uint32_t flags,
1504        uint32_t opaque,
1505        uint16_t vbucket,
1506        uint64_t start_seqno,
1507        uint64_t end_seqno,
1508        uint64_t vbucket_uuid,
1509        uint64_t snap_start_seqno,
1510        uint64_t snap_end_seqno,
1511        uint64_t* rollback_seqno,
1512        dcp_add_failover_log callback) {
1513    EWB_Engine* ewb = to_engine(handle);
1514    auto stream = ewb->dcp_stream.find(cookie.get());
1515    if (stream != ewb->dcp_stream.end()) {
1516        // This is a client of our internal streams.. just let it pass
1517        if (start_seqno == 1) {
1518            *rollback_seqno = 0;
1519            return ENGINE_ROLLBACK;
1520        }
1521        // Start the stream
1522        stream->second.first = true;
1523        return ENGINE_SUCCESS;
1524    }
1525
1526    if (ewb->real_engine->dcp.stream_req == nullptr) {
1527        return ENGINE_ENOTSUP;
1528    } else {
1529        return ewb->real_engine->dcp.stream_req(ewb->real_handle, cookie,
1530                                                flags, opaque, vbucket,
1531                                                start_seqno, end_seqno,
1532                                                vbucket_uuid, snap_start_seqno,
1533                                                snap_end_seqno, rollback_seqno,
1534                                                callback);
1535    }
1536}
1537
1538ENGINE_ERROR_CODE EWB_Engine::dcp_add_stream(
1539        gsl::not_null<ENGINE_HANDLE*> handle,
1540        gsl::not_null<const void*> cookie,
1541        uint32_t opaque,
1542        uint16_t vbucket,
1543        uint32_t flags) {
1544    EWB_Engine* ewb = to_engine(handle);
1545    if (ewb->real_engine->dcp.add_stream == nullptr) {
1546        return ENGINE_ENOTSUP;
1547    } else {
1548        return ewb->real_engine->dcp.add_stream(ewb->real_handle, cookie,
1549                                                opaque, vbucket, flags);
1550    }
1551}
1552
1553ENGINE_ERROR_CODE EWB_Engine::dcp_close_stream(
1554        gsl::not_null<ENGINE_HANDLE*> handle,
1555        gsl::not_null<const void*> cookie,
1556        uint32_t opaque,
1557        uint16_t vbucket) {
1558    EWB_Engine* ewb = to_engine(handle);
1559    if (ewb->real_engine->dcp.close_stream == nullptr) {
1560        return ENGINE_ENOTSUP;
1561    } else {
1562        return ewb->real_engine->dcp.close_stream(ewb->real_handle, cookie,
1563                                                opaque, vbucket);
1564    }
1565}
1566
1567ENGINE_ERROR_CODE EWB_Engine::dcp_get_failover_log(
1568        gsl::not_null<ENGINE_HANDLE*> handle,
1569        gsl::not_null<const void*> cookie,
1570        uint32_t opaque,
1571        uint16_t vbucket,
1572        dcp_add_failover_log callback) {
1573    EWB_Engine* ewb = to_engine(handle);
1574    if (ewb->real_engine->dcp.get_failover_log == nullptr) {
1575        return ENGINE_ENOTSUP;
1576    } else {
1577        return ewb->real_engine->dcp.get_failover_log(ewb->real_handle,
1578                                                      cookie,
1579                                                      opaque,
1580                                                      vbucket,
1581                                                      callback);
1582    }
1583}
1584
1585ENGINE_ERROR_CODE EWB_Engine::dcp_stream_end(
1586        gsl::not_null<ENGINE_HANDLE*> handle,
1587        gsl::not_null<const void*> cookie,
1588        uint32_t opaque,
1589        uint16_t vbucket,
1590        uint32_t flags) {
1591    EWB_Engine* ewb = to_engine(handle);
1592    if (ewb->real_engine->dcp.stream_end == nullptr) {
1593        return ENGINE_ENOTSUP;
1594    } else {
1595        return ewb->real_engine->dcp.stream_end(ewb->real_handle, cookie,
1596                                                opaque, vbucket, flags);
1597    }
1598}
1599
1600ENGINE_ERROR_CODE EWB_Engine::dcp_snapshot_marker(
1601        gsl::not_null<ENGINE_HANDLE*> handle,
1602        gsl::not_null<const void*> cookie,
1603        uint32_t opaque,
1604        uint16_t vbucket,
1605        uint64_t start_seqno,
1606        uint64_t end_seqno,
1607        uint32_t flags) {
1608    EWB_Engine* ewb = to_engine(handle);
1609    if (ewb->real_engine->dcp.snapshot_marker == nullptr) {
1610        return ENGINE_ENOTSUP;
1611    } else {
1612        return ewb->real_engine->dcp.snapshot_marker(ewb->real_handle, cookie,
1613                                                     opaque, vbucket,
1614                                                     start_seqno, end_seqno,
1615                                                     flags);
1616    }
1617}
1618
1619ENGINE_ERROR_CODE EWB_Engine::dcp_mutation(gsl::not_null<ENGINE_HANDLE*> handle,
1620                                           gsl::not_null<const void*> cookie,
1621                                           uint32_t opaque,
1622                                           const DocKey& key,
1623                                           cb::const_byte_buffer value,
1624                                           size_t priv_bytes,
1625                                           uint8_t datatype,
1626                                           uint64_t cas,
1627                                           uint16_t vbucket,
1628                                           uint32_t flags,
1629                                           uint64_t by_seqno,
1630                                           uint64_t rev_seqno,
1631                                           uint32_t expiration,
1632                                           uint32_t lock_time,
1633                                           cb::const_byte_buffer meta,
1634                                           uint8_t nru) {
1635    EWB_Engine* ewb = to_engine(handle);
1636    if (ewb->real_engine->dcp.mutation == nullptr) {
1637        return ENGINE_ENOTSUP;
1638    } else {
1639        return ewb->real_engine->dcp.mutation(ewb->real_handle, cookie, opaque,
1640                                              key, value, priv_bytes, datatype,
1641                                              cas, vbucket, flags, by_seqno,
1642                                              rev_seqno, expiration, lock_time,
1643                                              meta, nru);
1644    }
1645}
1646
1647ENGINE_ERROR_CODE EWB_Engine::dcp_deletion(gsl::not_null<ENGINE_HANDLE*> handle,
1648                                           gsl::not_null<const void*> cookie,
1649                                           uint32_t opaque,
1650                                           const DocKey& key,
1651                                           cb::const_byte_buffer value,
1652                                           size_t priv_bytes,
1653                                           uint8_t datatype,
1654                                           uint64_t cas,
1655                                           uint16_t vbucket,
1656                                           uint64_t by_seqno,
1657                                           uint64_t rev_seqno,
1658                                           cb::const_byte_buffer meta) {
1659    EWB_Engine* ewb = to_engine(handle);
1660    if (ewb->real_engine->dcp.deletion == nullptr) {
1661        return ENGINE_ENOTSUP;
1662    } else {
1663        return ewb->real_engine->dcp.deletion(ewb->real_handle, cookie, opaque,
1664                                              key, value, priv_bytes, datatype,
1665                                              cas, vbucket, by_seqno, rev_seqno,
1666                                              meta);
1667    }
1668}
1669
1670ENGINE_ERROR_CODE EWB_Engine::dcp_deletion_v2(
1671        gsl::not_null<ENGINE_HANDLE*> handle,
1672        gsl::not_null<const void*> cookie,
1673        uint32_t opaque,
1674        const DocKey& key,
1675        cb::const_byte_buffer value,
1676        size_t priv_bytes,
1677        uint8_t datatype,
1678        uint64_t cas,
1679        uint16_t vbucket,
1680        uint64_t by_seqno,
1681        uint64_t rev_seqno,
1682        uint32_t delete_time) {
1683    EWB_Engine* ewb = to_engine(handle);
1684    if (ewb->real_engine->dcp.deletion_v2 == nullptr) {
1685        return ENGINE_ENOTSUP;
1686    } else {
1687        return ewb->real_engine->dcp.deletion_v2(ewb->real_handle,
1688                                                 cookie,
1689                                                 opaque,
1690                                                 key,
1691                                                 value,
1692                                                 priv_bytes,
1693                                                 datatype,
1694                                                 cas,
1695                                                 vbucket,
1696                                                 by_seqno,
1697                                                 rev_seqno,
1698                                                 delete_time);
1699    }
1700}
1701
1702ENGINE_ERROR_CODE EWB_Engine::dcp_expiration(
1703        gsl::not_null<ENGINE_HANDLE*> handle,
1704        gsl::not_null<const void*> cookie,
1705        uint32_t opaque,
1706        const DocKey& key,
1707        cb::const_byte_buffer value,
1708        size_t priv_bytes,
1709        uint8_t datatype,
1710        uint64_t cas,
1711        uint16_t vbucket,
1712        uint64_t by_seqno,
1713        uint64_t rev_seqno,
1714        cb::const_byte_buffer meta) {
1715    EWB_Engine* ewb = to_engine(handle);
1716    if (ewb->real_engine->dcp.expiration == nullptr) {
1717        return ENGINE_ENOTSUP;
1718    } else {
1719        return ewb->real_engine->dcp.expiration(ewb->real_handle, cookie, opaque,
1720                                                key, value, priv_bytes, datatype,
1721                                                cas, vbucket, by_seqno, rev_seqno,
1722                                                meta);
1723    }
1724}
1725
1726ENGINE_ERROR_CODE EWB_Engine::dcp_flush(gsl::not_null<ENGINE_HANDLE*> handle,
1727                                        gsl::not_null<const void*> cookie,
1728                                        uint32_t opaque,
1729                                        uint16_t vbucket) {
1730    EWB_Engine* ewb = to_engine(handle);
1731    if (ewb->real_engine->dcp.flush == nullptr) {
1732        return ENGINE_ENOTSUP;
1733    } else {
1734        return ewb->real_engine->dcp.flush(ewb->real_handle,
1735                                                cookie,
1736                                                opaque,
1737                                                vbucket);
1738    }
1739}
1740
1741ENGINE_ERROR_CODE EWB_Engine::dcp_set_vbucket_state(
1742        gsl::not_null<ENGINE_HANDLE*> handle,
1743        gsl::not_null<const void*> cookie,
1744        uint32_t opaque,
1745        uint16_t vbucket,
1746        vbucket_state_t state) {
1747    EWB_Engine* ewb = to_engine(handle);
1748    if (ewb->real_engine->dcp.set_vbucket_state == nullptr) {
1749        return ENGINE_ENOTSUP;
1750    } else {
1751        return ewb->real_engine->dcp.set_vbucket_state(ewb->real_handle,
1752                                                cookie,
1753                                                opaque,
1754                                                vbucket,
1755                                                state);
1756    }
1757}
1758
1759ENGINE_ERROR_CODE EWB_Engine::dcp_noop(gsl::not_null<ENGINE_HANDLE*> handle,
1760                                       gsl::not_null<const void*> cookie,
1761                                       uint32_t opaque) {
1762    EWB_Engine* ewb = to_engine(handle);
1763    if (ewb->real_engine->dcp.noop == nullptr) {
1764        return ENGINE_ENOTSUP;
1765    } else {
1766        return ewb->real_engine->dcp.noop(ewb->real_handle, cookie, opaque);
1767    }
1768}
1769
1770ENGINE_ERROR_CODE EWB_Engine::dcp_buffer_acknowledgement(
1771        gsl::not_null<ENGINE_HANDLE*> handle,
1772        gsl::not_null<const void*> cookie,
1773        uint32_t opaque,
1774        uint16_t vbucket,
1775        uint32_t buffer_bytes) {
1776    EWB_Engine* ewb = to_engine(handle);
1777    if (ewb->real_engine->dcp.buffer_acknowledgement == nullptr) {
1778        return ENGINE_ENOTSUP;
1779    } else {
1780        return ewb->real_engine->dcp.buffer_acknowledgement(ewb->real_handle,
1781                                                            cookie,
1782                                                            opaque,
1783                                                            vbucket,
1784                                                            buffer_bytes);
1785    }
1786}
1787
1788ENGINE_ERROR_CODE EWB_Engine::dcp_control(gsl::not_null<ENGINE_HANDLE*> handle,
1789                                          gsl::not_null<const void*> cookie,
1790                                          uint32_t opaque,
1791                                          const void* key,
1792                                          uint16_t nkey,
1793                                          const void* value,
1794                                          uint32_t nvalue) {
1795    EWB_Engine* ewb = to_engine(handle);
1796    if (ewb->real_engine->dcp.control == nullptr) {
1797        return ENGINE_ENOTSUP;
1798    } else {
1799        return ewb->real_engine->dcp.control(ewb->real_handle, cookie,
1800                                             opaque, key, nkey, value, nvalue);
1801    }
1802}
1803
1804ENGINE_ERROR_CODE EWB_Engine::dcp_response_handler(
1805        gsl::not_null<ENGINE_HANDLE*> handle,
1806        gsl::not_null<const void*> cookie,
1807        const protocol_binary_response_header* response) {
1808    EWB_Engine* ewb = to_engine(handle);
1809    if (ewb->real_engine->dcp.response_handler == nullptr) {
1810        return ENGINE_ENOTSUP;
1811    } else {
1812        return ewb->real_engine->dcp.response_handler(ewb->real_handle,
1813                                                      cookie,
1814                                                      response);
1815    }
1816}
1817
1818ENGINE_ERROR_CODE EWB_Engine::dcp_system_event(
1819        gsl::not_null<ENGINE_HANDLE*> handle,
1820        gsl::not_null<const void*> cookie,
1821        uint32_t opaque,
1822        uint16_t vbucket,
1823        mcbp::systemevent::id event,
1824        uint64_t bySeqno,
1825        cb::const_byte_buffer key,
1826        cb::const_byte_buffer eventData) {
1827    EWB_Engine* ewb = to_engine(handle);
1828    if (ewb->real_engine->dcp.response_handler == nullptr) {
1829        return ENGINE_ENOTSUP;
1830    } else {
1831        return ewb->real_engine->dcp.system_event(ewb->real_handle,
1832                                                  cookie,
1833                                                  opaque,
1834                                                  vbucket,
1835                                                  event,
1836                                                  bySeqno,
1837                                                  key,
1838                                                  eventData);
1839    }
1840}
1841
1842cb::engine_error EWB_Engine::collections_set_manifest(
1843        gsl::not_null<ENGINE_HANDLE*> handle, cb::const_char_buffer json) {
1844    EWB_Engine* ewb = to_engine(handle);
1845    if (ewb->real_engine->collections.set_manifest == nullptr) {
1846        return {cb::engine_errc::not_supported,
1847                "EWB_Engine::collections_set_manifest"};
1848    } else {
1849        return ewb->real_engine->collections.set_manifest(ewb->real_handle,
1850                                                          json);
1851    }
1852}
1853
1854cb::EngineErrorStringPair EWB_Engine::collections_get_manifest(
1855        gsl::not_null<ENGINE_HANDLE*> handle) {
1856    EWB_Engine* ewb = to_engine(handle);
1857    if (ewb->real_engine->collections.get_manifest == nullptr) {
1858        return {cb::engine_errc::not_supported,
1859                "EWB_Engine::collections_get_manifest"};
1860    } else {
1861        return ewb->real_engine->collections.get_manifest(ewb->real_handle);
1862    }
1863}
1864
1865bool EWB_Engine::isXattrEnabled(gsl::not_null<ENGINE_HANDLE*> handle) {
1866    EWB_Engine* ewb = to_engine(handle);
1867    if (ewb->real_engine->isXattrEnabled == nullptr) {
1868        return false;
1869    } else {
1870        return ewb->real_engine->isXattrEnabled(ewb->real_handle);
1871    }
1872}
1873
1874BucketCompressionMode EWB_Engine::getCompressionMode(gsl::not_null<ENGINE_HANDLE*> handle) {
1875    EWB_Engine* ewb = to_engine(handle);
1876    if (ewb->real_engine->getCompressionMode == nullptr) {
1877        return BucketCompressionMode::Off;
1878    } else {
1879        return ewb->real_engine->getCompressionMode(ewb->real_handle);
1880    }
1881}
1882
1883size_t EWB_Engine::getMaxItemSize(gsl::not_null<ENGINE_HANDLE*> handle) {
1884    EWB_Engine* ewb = to_engine(handle);
1885    if (ewb->real_engine->getMaxItemSize == nullptr) {
1886        return default_max_item_size;
1887    } else {
1888        return ewb->real_engine->getMaxItemSize(ewb->real_handle);
1889    }
1890}
1891
1892float EWB_Engine::getMinCompressionRatio(gsl::not_null<ENGINE_HANDLE*> handle) {
1893    EWB_Engine* ewb = to_engine(handle);
1894    if (ewb->real_engine->getMinCompressionRatio == nullptr) {
1895        return default_min_compression_ratio;
1896    } else {
1897        return ewb->real_engine->getMinCompressionRatio(ewb->real_handle);
1898    }
1899}
1900
1901ENGINE_ERROR_CODE create_instance(uint64_t interface,
1902                                  GET_SERVER_API gsa,
1903                                  ENGINE_HANDLE **handle)
1904{
1905    if (interface != 1) {
1906        return ENGINE_ENOTSUP;
1907    }
1908
1909    try {
1910        EWB_Engine* engine = new EWB_Engine(gsa);
1911        *handle = reinterpret_cast<ENGINE_HANDLE*> (engine);
1912        return ENGINE_SUCCESS;
1913
1914    } catch (std::exception& e) {
1915        auto logger = gsa()->log->get_logger();
1916        logger->log(EXTENSION_LOG_WARNING, NULL,
1917                    "EWB_Engine: failed to create engine: %s", e.what());
1918        return ENGINE_FAILED;
1919    }
1920
1921}
1922
1923void destroy_engine(void) {
1924    // nothing todo.
1925}
1926
1927const char* EWB_Engine::to_string(const Cmd cmd) {
1928    switch (cmd) {
1929    case Cmd::NONE:
1930        return "NONE";
1931    case Cmd::GET_INFO:
1932        return "GET_INFO";
1933    case Cmd::GET_META:
1934        return "GET_META";
1935    case Cmd::ALLOCATE:
1936        return "ALLOCATE";
1937    case Cmd::REMOVE:
1938        return "REMOVE";
1939    case Cmd::GET:
1940        return "GET";
1941    case Cmd::STORE:
1942        return "STORE";
1943    case Cmd::CAS:
1944        return "CAS";
1945    case Cmd::ARITHMETIC:
1946        return "ARITHMETIC";
1947    case Cmd::FLUSH:
1948        return "FLUSH";
1949    case Cmd::GET_STATS:
1950        return "GET_STATS";
1951    case Cmd::UNKNOWN_COMMAND:
1952        return "UNKNOWN_COMMAND";
1953    case Cmd::LOCK:
1954        return "LOCK";
1955    case Cmd::UNLOCK:
1956        return "UNLOCK";
1957    }
1958    throw std::invalid_argument("EWB_Engine::to_string() Unknown command");
1959}
1960
1961void EWB_Engine::process_notifications() {
1962    SERVER_HANDLE_V1* server = gsa();
1963    LOG_DEBUG("EWB_Engine: notification thread running for engine {}",
1964              (void*)this);
1965    std::unique_lock<std::mutex> lk(mutex);
1966    while (!stop_notification_thread) {
1967        condvar.wait(lk);
1968        while (!pending_io_ops.empty()) {
1969            const void* cookie = pending_io_ops.front();
1970            pending_io_ops.pop();
1971            lk.unlock();
1972            LOG_DEBUG("EWB_Engine: notify {}", cookie);
1973            server->cookie->notify_io_complete(cookie, ENGINE_SUCCESS);
1974            lk.lock();
1975        }
1976    }
1977
1978    LOG_DEBUG("EWB_Engine: notification thread stopping for engine {}",
1979              (void*)this);
1980}
1981
1982void NotificationThread::run() {
1983    setRunning();
1984    engine.process_notifications();
1985}
1986
1987ENGINE_ERROR_CODE EWB_Engine::handleBlockMonitorFile(const void* cookie,
1988                                                     uint32_t id,
1989                                                     const std::string& file,
1990                                                     ADD_RESPONSE response) {
1991    if (file.empty()) {
1992        return ENGINE_EINVAL;
1993    }
1994
1995    if (!cb::io::isFile(file)) {
1996        return ENGINE_KEY_ENOENT;
1997    }
1998
1999    if (!suspend(cookie, id)) {
2000        LOG_WARNING(
2001                "EWB_Engine::handleBlockMonitorFile(): "
2002                "Id {} already registered",
2003                id);
2004        return ENGINE_KEY_EEXISTS;
2005    }
2006
2007    try {
2008        std::unique_ptr<Couchbase::Thread> thread(
2009                new BlockMonitorThread(*this, id, file));
2010        thread->start();
2011        std::lock_guard<std::mutex> guard(threads_mutex);
2012        threads.emplace_back(thread.release());
2013    } catch (std::exception& e) {
2014        LOG_WARNING(
2015                "EWB_Engine::handleBlockMonitorFile(): Failed to create "
2016                "block monitor thread: {}",
2017                e.what());
2018        return ENGINE_FAILED;
2019    }
2020
2021    LOG_DEBUG(
2022            "Registered connection {} (engine {}) as {} to be"
2023            " suspended. Monitor file {}",
2024            cookie,
2025            (void*)this,
2026            id,
2027            file.c_str());
2028
2029    response(nullptr, 0, nullptr, 0, nullptr, 0,
2030             PROTOCOL_BINARY_RAW_BYTES,
2031             PROTOCOL_BINARY_RESPONSE_SUCCESS, /*cas*/0, cookie);
2032    return ENGINE_SUCCESS;
2033}
2034
2035ENGINE_ERROR_CODE EWB_Engine::handleSuspend(const void* cookie,
2036                                            uint32_t id,
2037                                            ADD_RESPONSE response) {
2038    if (suspend(cookie, id)) {
2039        LOG_DEBUG("Registered connection {} as {} to be suspended", cookie, id);
2040        response(nullptr, 0, nullptr, 0, nullptr, 0,
2041                 PROTOCOL_BINARY_RAW_BYTES,
2042                 PROTOCOL_BINARY_RESPONSE_SUCCESS, /*cas*/0, cookie);
2043        return ENGINE_SUCCESS;
2044    } else {
2045        LOG_WARNING("EWB_Engine::handleSuspend(): Id {} already registered",
2046                    id);
2047        return ENGINE_KEY_EEXISTS;
2048    }
2049}
2050
2051ENGINE_ERROR_CODE EWB_Engine::handleResume(const void* cookie, uint32_t id,
2052                                           ADD_RESPONSE response) {
2053    if (resume(id)) {
2054        LOG_DEBUG("Connection with id {} will be resumed", id);
2055        response(nullptr, 0, nullptr, 0, nullptr, 0,
2056                 PROTOCOL_BINARY_RAW_BYTES,
2057                 PROTOCOL_BINARY_RESPONSE_SUCCESS, /*cas*/0, cookie);
2058        return ENGINE_SUCCESS;
2059    } else {
2060        LOG_WARNING(
2061                "EWB_Engine::unknown_command(): No "
2062                "connection registered with id {}",
2063                id);
2064        return ENGINE_EINVAL;
2065    }
2066}
2067
2068ENGINE_ERROR_CODE EWB_Engine::setItemCas(const void *cookie,
2069                                         const std::string& key,
2070                                         uint32_t cas,
2071                                         ADD_RESPONSE response) {
2072    uint64_t cas64 = cas;
2073    if (cas == static_cast<uint32_t>(-1)) {
2074        cas64 = LOCKED_CAS;
2075    }
2076
2077    auto rv = real_engine->get(real_handle,
2078                               cookie,
2079                               DocKey{key, DocNamespace::DefaultCollection},
2080                               0,
2081                               DocStateFilter::Alive);
2082    if (rv.first != cb::engine_errc::success) {
2083        return ENGINE_ERROR_CODE(rv.first);
2084    }
2085
2086    // item_set_cas has no return value!
2087    real_engine->item_set_cas(real_handle, rv.second.get(), cas64);
2088    response(nullptr, 0, nullptr, 0, nullptr, 0,
2089             PROTOCOL_BINARY_RAW_BYTES,
2090             PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
2091    return ENGINE_SUCCESS;
2092}
2093
2094void BlockMonitorThread::run() {
2095    setRunning();
2096
2097    LOG_DEBUG("Block monitor for file {} started", file);
2098
2099    // @todo Use the file monitoring API's to avoid this "busy" loop
2100    while (cb::io::isFile(file)) {
2101        usleep(100);
2102    }
2103
2104    LOG_DEBUG("Block monitor for file {} stopping (file is gone)", file);
2105    engine.resume(id);
2106}
2107