1397bfe65SDave Rigby /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2397bfe65SDave Rigby /*
3397bfe65SDave Rigby * Copyright 2015 Couchbase, Inc
4397bfe65SDave Rigby *
5397bfe65SDave Rigby * Licensed under the Apache License, Version 2.0 (the "License");
6397bfe65SDave Rigby * you may not use this file except in compliance with the License.
7397bfe65SDave Rigby * You may obtain a copy of the License at
8397bfe65SDave Rigby *
9397bfe65SDave Rigby * http://www.apache.org/licenses/LICENSE-2.0
10397bfe65SDave Rigby *
11397bfe65SDave Rigby * Unless required by applicable law or agreed to in writing, software
12397bfe65SDave Rigby * distributed under the License is distributed on an "AS IS" BASIS,
13397bfe65SDave Rigby * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14397bfe65SDave Rigby * See the License for the specific language governing permissions and
15397bfe65SDave Rigby * limitations under the License.
16397bfe65SDave Rigby */
17397bfe65SDave Rigby
18397bfe65SDave Rigby /*
19397bfe65SDave Rigby * "ewouldblock_engine"
20397bfe65SDave Rigby *
21397bfe65SDave Rigby * The "ewouldblock_engine" allows one to test how memcached responds when the
22397bfe65SDave Rigby * engine returns EWOULDBLOCK instead of the correct response.
23397bfe65SDave Rigby *
24397bfe65SDave Rigby * Motivation:
25397bfe65SDave Rigby *
26397bfe65SDave Rigby * The EWOULDBLOCK response code can be returned from a number of engine
27397bfe65SDave Rigby * functions, and is used to indicate that the request could not be immediately
28397bfe65SDave Rigby * fulfilled, and it "would block" if it tried to. The correct way for
29397bfe65SDave Rigby * memcached to handle this (in general) is to suspend that request until it
30397bfe65SDave Rigby * is later notified by the engine (via notify_io_complete()).
31397bfe65SDave Rigby *
32397bfe65SDave Rigby * However, engines typically return the correct response to requests
33397bfe65SDave Rigby * immediately, only rarely (and from memcached's POV non-deterministically)
34397bfe65SDave Rigby * returning EWOULDBLOCK. This makes testing of the code-paths handling
35397bfe65SDave Rigby * EWOULDBLOCK tricky.
36397bfe65SDave Rigby *
37397bfe65SDave Rigby *
38397bfe65SDave Rigby * Operation:
39397bfe65SDave Rigby * This engine, when loaded by memcached proxies requests to a "real" engine.
40397bfe65SDave Rigby * Depending on how it is configured, it can simply pass the request on to the
41397bfe65SDave Rigby * real engine, or artificially return EWOULDBLOCK back to memcached.
42397bfe65SDave Rigby *
437eba45a8SDave Rigby * See the 'Modes' enum below for the possible modes for a connection. The mode
447eba45a8SDave Rigby * can be selected by sending a `request_ewouldblock_ctl` command
45397bfe65SDave Rigby * (opcode PROTOCOL_BINARY_CMD_EWOULDBLOCK_CTL).
46d84f664bSDave Rigby *
47d84f664bSDave Rigby * DCP:
486750cc57STrond Norbye * There is a special DCP stream named "ewb_internal" which is an
496750cc57STrond Norbye * endless stream of items. You may also add a number at the end
506750cc57STrond Norbye * e.g. "ewb_internal:10" and it'll create a stream with 10 entries.
516750cc57STrond Norbye * It will always send the same K-V pair.
526750cc57STrond Norbye * Note that we don't register for disconnect events so you might
536750cc57STrond Norbye * experience weirdness if you first try to use the internal dcp
546750cc57STrond Norbye * stream, and then later on want to use the one provided by the
556750cc57STrond Norbye * engine. The workaround for that is to delete the bucket
566750cc57STrond Norbye * in between ;-) (put them in separate test suites and it'll all
576750cc57STrond Norbye * be handled for you.
58d84f664bSDave Rigby *
596750cc57STrond Norbye * Any other stream name results in proxying the dcp request to
606750cc57STrond Norbye * the underlying engine's DCP implementation.
616750cc57STrond Norbye *
62397bfe65SDave Rigby */
63397bfe65SDave Rigby
647eba45a8SDave Rigby #include "ewouldblock_engine.h"
65397bfe65SDave Rigby
66397bfe65SDave Rigby #include <atomic>
67397bfe65SDave Rigby #include <condition_variable>
68397bfe65SDave Rigby #include <cstring>
69fa526109STim Bradgate #include <gsl/gsl>
708ca49c14SMark Nunberg #include <iostream>
71397bfe65SDave Rigby #include <map>
72397bfe65SDave Rigby #include <memory>
73397bfe65SDave Rigby #include <mutex>
74397bfe65SDave Rigby #include <queue>
75397bfe65SDave Rigby #include <random>
76934353e8SDave Rigby #include <sstream>
77397bfe65SDave Rigby #include <string>
78386f37fcSTrond Norbye #include <utility>
79397bfe65SDave Rigby
804dd9b0f4STrond Norbye #include <logger/logger.h>
81397bfe65SDave Rigby #include <memcached/engine.h>
82d93edd49STrond Norbye #include <memcached/extension.h>
83aa327187SDave Rigby #include <platform/cb_malloc.h>
84d93edd49STrond Norbye #include <platform/dirutils.h>
851c9bbeaaSTrond Norbye #include <platform/thread.h>
86f9e30e01STrond Norbye #include <xattr/blob.h>
87f9e30e01STrond Norbye
88397bfe65SDave Rigby #include "utilities/engine_loader.h"
89397bfe65SDave Rigby
90397bfe65SDave Rigby /* Public API declaration ****************************************************/
91397bfe65SDave Rigby
92e25e5f8bSjim extern "C" {
93397bfe65SDave Rigby MEMCACHED_PUBLIC_API
94397bfe65SDave Rigby ENGINE_ERROR_CODE create_instance(uint64_t interface, GET_SERVER_API gsa,
95397bfe65SDave Rigby ENGINE_HANDLE **handle);
96397bfe65SDave Rigby
97e25e5f8bSjim MEMCACHED_PUBLIC_API
98e25e5f8bSjim void destroy_engine(void);
99e25e5f8bSjim }
100397bfe65SDave Rigby
1011c9bbeaaSTrond Norbye
1021c9bbeaaSTrond Norbye class EWB_Engine;
1031c9bbeaaSTrond Norbye
1048ca49c14SMark Nunberg // Mapping from wrapped handle to EWB handles.
1058ca49c14SMark Nunberg static std::map<ENGINE_HANDLE*, EWB_Engine*> engine_map;
1068ca49c14SMark Nunberg
1071c9bbeaaSTrond Norbye class NotificationThread : public Couchbase::Thread {
1081c9bbeaaSTrond Norbye public:
NotificationThread(EWB_Engine & engine_)1091c9bbeaaSTrond Norbye NotificationThread(EWB_Engine& engine_)
1101c9bbeaaSTrond Norbye : Thread("ewb:pendingQ"),
1111c9bbeaaSTrond Norbye engine(engine_) {}
1121c9bbeaaSTrond Norbye
1131c9bbeaaSTrond Norbye protected:
114fbd739aeSTrond Norbye void run() override;
1151c9bbeaaSTrond Norbye
1161c9bbeaaSTrond Norbye protected:
1171c9bbeaaSTrond Norbye EWB_Engine& engine;
1181c9bbeaaSTrond Norbye };
1191c9bbeaaSTrond Norbye
120d93edd49STrond Norbye /**
121d93edd49STrond Norbye * The BlockMonitorThread represents the thread that is
122d93edd49STrond Norbye * monitoring the "lock" file. Once the file is no longer
123d93edd49STrond Norbye * there it will resume the client specified with the given
124d93edd49STrond Norbye * id.
125d93edd49STrond Norbye */
126d93edd49STrond Norbye class BlockMonitorThread : public Couchbase::Thread {
127d93edd49STrond Norbye public:
BlockMonitorThread(EWB_Engine & engine_,uint32_t id_,const std::string file_)128d93edd49STrond Norbye BlockMonitorThread(EWB_Engine& engine_,
129d93edd49STrond Norbye uint32_t id_,
130d93edd49STrond Norbye const std::string file_)
13118821d13SDave Rigby : Thread("ewb:BlockMon"),
132d93edd49STrond Norbye engine(engine_),
133d93edd49STrond Norbye id(id_),
134d93edd49STrond Norbye file(file_) {}
135d93edd49STrond Norbye
136d93edd49STrond Norbye /**
137d93edd49STrond Norbye * Wait for the underlying thread to reach the zombie state
138d93edd49STrond Norbye * (== terminated, but not reaped)
139d93edd49STrond Norbye */
~BlockMonitorThread()140d93edd49STrond Norbye ~BlockMonitorThread() {
141d93edd49STrond Norbye waitForState(Couchbase::ThreadState::Zombie);
142d93edd49STrond Norbye }
143d93edd49STrond Norbye
144d93edd49STrond Norbye protected:
145fbd739aeSTrond Norbye void run() override;
146d93edd49STrond Norbye
147d93edd49STrond Norbye private:
148d93edd49STrond Norbye EWB_Engine& engine;
149d93edd49STrond Norbye const uint32_t id;
150d93edd49STrond Norbye const std::string file;
151d93edd49STrond Norbye };
1521c9bbeaaSTrond Norbye
1538ca49c14SMark Nunberg static void register_callback(ENGINE_HANDLE *, ENGINE_EVENT_TYPE,
1548ca49c14SMark Nunberg EVENT_CALLBACK, const void *);
1558ca49c14SMark Nunberg
1568ca49c14SMark Nunberg static SERVER_HANDLE_V1 wrapped_api;
1578ca49c14SMark Nunberg static SERVER_HANDLE_V1 *real_api;
init_wrapped_api(GET_SERVER_API fn)1588ca49c14SMark Nunberg static void init_wrapped_api(GET_SERVER_API fn) {
1598ca49c14SMark Nunberg static bool init = false;
1608ca49c14SMark Nunberg if (init) {
1618ca49c14SMark Nunberg return;
1628ca49c14SMark Nunberg }
1638ca49c14SMark Nunberg
1648ca49c14SMark Nunberg init = true;
1658ca49c14SMark Nunberg real_api = fn();
1668ca49c14SMark Nunberg wrapped_api = *real_api;
1678ca49c14SMark Nunberg
1688ca49c14SMark Nunberg // Overrides
1698ca49c14SMark Nunberg static SERVER_CALLBACK_API callback = *wrapped_api.callback;
1708ca49c14SMark Nunberg callback.register_callback = register_callback;
1718ca49c14SMark Nunberg wrapped_api.callback = &callback;
1728ca49c14SMark Nunberg }
1738ca49c14SMark Nunberg
get_wrapped_gsa()1748ca49c14SMark Nunberg static SERVER_HANDLE_V1 *get_wrapped_gsa() {
1758ca49c14SMark Nunberg return &wrapped_api;
1768ca49c14SMark Nunberg }
1778ca49c14SMark Nunberg
178397bfe65SDave Rigby /** ewouldblock_engine class */
179397bfe65SDave Rigby class EWB_Engine : public ENGINE_HANDLE_V1 {
180397bfe65SDave Rigby
181397bfe65SDave Rigby private:
1828611dc8cSPaolo Cocchi enum class Cmd {
1838611dc8cSPaolo Cocchi NONE,
1848611dc8cSPaolo Cocchi GET_INFO,
1858611dc8cSPaolo Cocchi ALLOCATE,
1868611dc8cSPaolo Cocchi REMOVE,
1878611dc8cSPaolo Cocchi GET,
1888611dc8cSPaolo Cocchi STORE,
1898611dc8cSPaolo Cocchi CAS,
1908611dc8cSPaolo Cocchi ARITHMETIC,
1918611dc8cSPaolo Cocchi LOCK,
1928611dc8cSPaolo Cocchi UNLOCK,
1938611dc8cSPaolo Cocchi FLUSH,
1948611dc8cSPaolo Cocchi GET_STATS,
1958611dc8cSPaolo Cocchi GET_META,
1968611dc8cSPaolo Cocchi UNKNOWN_COMMAND
1978611dc8cSPaolo Cocchi };
198397bfe65SDave Rigby
1992a203429SDave Rigby const char* to_string(Cmd cmd);
2000ead7f70SDave Rigby
201929e6fbdSDave Rigby uint64_t (*get_connection_id)(gsl::not_null<const void*> cookie);
202c063ab99STrond Norbye
203397bfe65SDave Rigby public:
204397bfe65SDave Rigby EWB_Engine(GET_SERVER_API gsa_);
205397bfe65SDave Rigby
206397bfe65SDave Rigby ~EWB_Engine();
207397bfe65SDave Rigby
208397bfe65SDave Rigby // Convert from a handle back to the read object.
to_engine(ENGINE_HANDLE * handle)209397bfe65SDave Rigby static EWB_Engine* to_engine(ENGINE_HANDLE* handle) {
210397bfe65SDave Rigby return reinterpret_cast<EWB_Engine*> (handle);
211397bfe65SDave Rigby }
212397bfe65SDave Rigby
2130736971eSDave Rigby /* Returns true if the next command should have a fake error code injected.
214397bfe65SDave Rigby * @param func Address of the command function (get, store, etc).
215397bfe65SDave Rigby * @param cookie The cookie for the user's request.
2160736971eSDave Rigby * @param[out] Error code to return.
217397bfe65SDave Rigby */
should_inject_error(Cmd cmd,const void * cookie,ENGINE_ERROR_CODE & err)2180736971eSDave Rigby bool should_inject_error(Cmd cmd, const void* cookie,
2190736971eSDave Rigby ENGINE_ERROR_CODE& err) {
220c063ab99STrond Norbye
221d93edd49STrond Norbye if (is_connection_suspended(cookie)) {
222d93edd49STrond Norbye err = ENGINE_EWOULDBLOCK;
223d93edd49STrond Norbye return true;
224d93edd49STrond Norbye }
225d93edd49STrond Norbye
226c063ab99STrond Norbye uint64_t id = get_connection_id(cookie);
227c063ab99STrond Norbye
2287eba45a8SDave Rigby std::lock_guard<std::mutex> guard(cookie_map_mutex);
229c063ab99STrond Norbye
230c063ab99STrond Norbye auto iter = connection_map.find(id);
231c063ab99STrond Norbye if (iter == connection_map.end()) {
2320ead7f70SDave Rigby return false;
233397bfe65SDave Rigby }
2344e7e5778SDave Rigby
23585ea0554STrond Norbye if (iter->second.first != cookie) {
23685ea0554STrond Norbye // The cookie is different so it represents a different command
23785ea0554STrond Norbye connection_map.erase(iter);
23885ea0554STrond Norbye return false;
23985ea0554STrond Norbye }
24085ea0554STrond Norbye
241c063ab99STrond Norbye const bool inject = iter->second.second->should_inject_error(cmd, err);
242ce3c861dSDaniel Owen const bool add_to_pending_io_ops = iter->second.second->add_to_pending_io_ops();
243397bfe65SDave Rigby
2440736971eSDave Rigby if (inject) {
245*7245345dSTim Bradgate LOG_DEBUG("EWB_Engine: injecting error:{} for cmd:{}",
2464dd9b0f4STrond Norbye err,
2474dd9b0f4STrond Norbye to_string(cmd));
24874411436SDave Rigby
249ce3c861dSDaniel Owen if (err == ENGINE_EWOULDBLOCK && add_to_pending_io_ops) {
2500736971eSDave Rigby // The server expects that if EWOULDBLOCK is returned then the
2510736971eSDave Rigby // server should be notified in the future when the operation is
2520736971eSDave Rigby // ready - so add this op to the pending IO queue.
253d93edd49STrond Norbye schedule_notification(iter->second.first);
254397bfe65SDave Rigby }
2550736971eSDave Rigby }
2560736971eSDave Rigby
2570736971eSDave Rigby return inject;
258397bfe65SDave Rigby }
259397bfe65SDave Rigby
260397bfe65SDave Rigby /* Implementation of all the engine functions. ***************************/
261397bfe65SDave Rigby
initialize(gsl::not_null<ENGINE_HANDLE * > handle,const char * config_str)262aacf24fdSTrond Norbye static ENGINE_ERROR_CODE initialize(gsl::not_null<ENGINE_HANDLE*> handle,
263397bfe65SDave Rigby const char* config_str) {
264397bfe65SDave Rigby EWB_Engine* ewb = to_engine(handle);
265397bfe65SDave Rigby
266397bfe65SDave Rigby // Extract the name of the real engine we will be proxying; then
267397bfe65SDave Rigby // create and initialize it.
268397bfe65SDave Rigby std::string config(config_str);
269397bfe65SDave Rigby auto seperator = config.find(";");
270397bfe65SDave Rigby std::string real_engine_name(config.substr(0, seperator));
271397bfe65SDave Rigby std::string real_engine_config;
272397bfe65SDave Rigby if (seperator != std::string::npos) {
273f6b14cbaSTrond Norbye real_engine_config = config.substr(seperator + 1);
274397bfe65SDave Rigby }
275397bfe65SDave Rigby
2764dd9b0f4STrond Norbye if ((ewb->real_engine_ref = load_engine(
2774dd9b0f4STrond Norbye real_engine_name.c_str(), NULL, NULL)) == NULL) {
278*7245345dSTim Bradgate LOG_CRITICAL(
279c49c8b68STrond Norbye "ERROR: EWB_Engine::initialize(): Failed to load real "
2804dd9b0f4STrond Norbye "engine '{}'",
2814dd9b0f4STrond Norbye real_engine_name);
282397bfe65SDave Rigby abort();
283397bfe65SDave Rigby }
284397bfe65SDave Rigby
2854dd9b0f4STrond Norbye if (!create_engine_instance(
2864dd9b0f4STrond Norbye ewb->real_engine_ref, get_wrapped_gsa, &ewb->real_handle)) {
287*7245345dSTim Bradgate LOG_CRITICAL(
288c49c8b68STrond Norbye "ERROR: EWB_Engine::initialize(): Failed create "
2894dd9b0f4STrond Norbye "engine instance '{}'",
2904dd9b0f4STrond Norbye real_engine_name);
291e25e5f8bSjim abort();
292e25e5f8bSjim }
293e25e5f8bSjim
294397bfe65SDave Rigby if (ewb->real_handle->interface != 1) {
295*7245345dSTim Bradgate LOG_CRITICAL(
296c49c8b68STrond Norbye "ERROR: EWB_Engine::initialize(): Only support engine "
2974dd9b0f4STrond Norbye "with interface v1 - got v{}.",
298397bfe65SDave Rigby ewb->real_engine->interface.interface);
299397bfe65SDave Rigby abort();
300397bfe65SDave Rigby }
301397bfe65SDave Rigby ewb->real_engine =
302397bfe65SDave Rigby reinterpret_cast<ENGINE_HANDLE_V1*>(ewb->real_handle);
3038ca49c14SMark Nunberg
3048ca49c14SMark Nunberg
3058ca49c14SMark Nunberg engine_map[ewb->real_handle] = ewb;
306397bfe65SDave Rigby ENGINE_ERROR_CODE res = ewb->real_engine->initialize(
307397bfe65SDave Rigby ewb->real_handle, real_engine_config.c_str());
308397bfe65SDave Rigby
309397bfe65SDave Rigby if (res == ENGINE_SUCCESS) {
310d84f664bSDave Rigby // For engine interface functions which cannot return EWOULDBLOCK,
311d84f664bSDave Rigby // and we otherwise don't want to interpose, we can simply use the
312d84f664bSDave Rigby // real_engine's functions directly.
313397bfe65SDave Rigby ewb->ENGINE_HANDLE_V1::item_set_cas = ewb->real_engine->item_set_cas;
314397bfe65SDave Rigby ewb->ENGINE_HANDLE_V1::set_item_info = ewb->real_engine->set_item_info;
3156f86aa1eSTim Bradgate ewb->ENGINE_HANDLE_V1::item_set_datatype =
3166f86aa1eSTim Bradgate ewb->real_engine->item_set_datatype;
317397bfe65SDave Rigby }
31896535e7fSDave Rigby
31996535e7fSDave Rigby // Register a callback on DISCONNECT events, so we can delete
32096535e7fSDave Rigby // any stale elements from connection_map when a connection
32196535e7fSDave Rigby // DC's.
32296535e7fSDave Rigby real_api->callback->register_callback(
32396535e7fSDave Rigby reinterpret_cast<ENGINE_HANDLE*>(ewb),
32496535e7fSDave Rigby ON_DISCONNECT,
32596535e7fSDave Rigby handle_disconnect,
32696535e7fSDave Rigby reinterpret_cast<ENGINE_HANDLE*>(ewb));
32796535e7fSDave Rigby
328397bfe65SDave Rigby return res;
329397bfe65SDave Rigby }
330397bfe65SDave Rigby
destroy(gsl::not_null<ENGINE_HANDLE * > handle,const bool force)331aacf24fdSTrond Norbye static void destroy(gsl::not_null<ENGINE_HANDLE*> handle,
332aacf24fdSTrond Norbye const bool force) {
333397bfe65SDave Rigby EWB_Engine* ewb = to_engine(handle);
334397bfe65SDave Rigby ewb->real_engine->destroy(ewb->real_handle, force);
335397bfe65SDave Rigby delete ewb;
336397bfe65SDave Rigby }
337397bfe65SDave Rigby
allocate(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,const size_t nbytes,const int flags,const rel_time_t exptime,uint8_t datatype,uint16_t vbucket)338aacf24fdSTrond Norbye static cb::EngineErrorItemPair allocate(
339aacf24fdSTrond Norbye gsl::not_null<ENGINE_HANDLE*> handle,
3400d8ac478STrond Norbye gsl::not_null<const void*> cookie,
341b67a513dSolivermd const DocKey& key,
342b67a513dSolivermd const size_t nbytes,
343b67a513dSolivermd const int flags,
3447c1ef08bSJim Walker const rel_time_t exptime,
345b67a513dSolivermd uint8_t datatype,
346b67a513dSolivermd uint16_t vbucket) {
347397bfe65SDave Rigby EWB_Engine* ewb = to_engine(handle);
3480736971eSDave Rigby ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
3490736971eSDave Rigby if (ewb->should_inject_error(Cmd::ALLOCATE, cookie, err)) {
350b67a513dSolivermd return cb::makeEngineErrorItemPair(cb::engine_errc(err));
351397bfe65SDave Rigby } else {
352b67a513dSolivermd return ewb->real_engine->allocate(ewb->real_handle,
353b67a513dSolivermd cookie,
354b67a513dSolivermd key,
355b67a513dSolivermd nbytes,
356b67a513dSolivermd flags,
357b67a513dSolivermd exptime,
358b67a513dSolivermd datatype,
359b67a513dSolivermd vbucket);
360397bfe65SDave Rigby }
361397bfe65SDave Rigby }
362