1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2/* 3 * Copyright 2015 Couchbase, Inc 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18/* 19 * "ewouldblock_engine" 20 * 21 * The "ewouldblock_engine" allows one to test how memcached responds when the 22 * engine returns EWOULDBLOCK instead of the correct response. 23 * 24 * Motivation: 25 * 26 * The EWOULDBLOCK response code can be returned from a number of engine 27 * functions, and is used to indicate that the request could not be immediately 28 * fulfilled, and it "would block" if it tried to. The correct way for 29 * memcached to handle this (in general) is to suspend that request until it 30 * is later notified by the engine (via notify_io_complete()). 31 * 32 * However, engines typically return the correct response to requests 33 * immediately, only rarely (and from memcached's POV non-deterministically) 34 * returning EWOULDBLOCK. This makes testing of the code-paths handling 35 * EWOULDBLOCK tricky. 36 * 37 * 38 * Operation: 39 * This engine, when loaded by memcached proxies requests to a "real" engine. 40 * Depending on how it is configured, it can simply pass the request on to the 41 * real engine, or artificially return EWOULDBLOCK back to memcached. 42 * 43 * See the 'Modes' enum below for the possible modes for a connection. The mode 44 * can be selected by sending a `request_ewouldblock_ctl` command 45 * (opcode PROTOCOL_BINARY_CMD_EWOULDBLOCK_CTL). 46 * 47 * DCP: 48 * There is a special DCP stream named "ewb_internal" which is an 49 * endless stream of items. You may also add a number at the end 50 * e.g. "ewb_internal:10" and it'll create a stream with 10 entries. 51 * It will always send the same K-V pair. 52 * Note that we don't register for disconnect events so you might 53 * experience weirdness if you first try to use the internal dcp 54 * stream, and then later on want to use the one provided by the 55 * engine. The workaround for that is to delete the bucket 56 * in between ;-) (put them in separate test suites and it'll all 57 * be handled for you. 58 * 59 * Any other stream name results in proxying the dcp request to 60 * the underlying engine's DCP implementation. 61 * 62 */ 63 64#include "ewouldblock_engine.h" 65 66#include <atomic> 67#include <condition_variable> 68#include <cstring> 69#include <gsl/gsl> 70#include <iostream> 71#include <map> 72#include <memory> 73#include <mutex> 74#include <queue> 75#include <random> 76#include <sstream> 77#include <string> 78#include <utility> 79 80#include <logger/logger.h> 81#include <memcached/engine.h> 82#include <memcached/extension.h> 83#include <platform/cb_malloc.h> 84#include <platform/dirutils.h> 85#include <platform/thread.h> 86#include <xattr/blob.h> 87 88#include "utilities/engine_loader.h" 89 90/* Public API declaration ****************************************************/ 91 92extern "C" { 93 MEMCACHED_PUBLIC_API 94 ENGINE_ERROR_CODE create_instance(uint64_t interface, GET_SERVER_API gsa, 95 ENGINE_HANDLE **handle); 96 97 MEMCACHED_PUBLIC_API 98 void destroy_engine(void); 99} 100 101 102class EWB_Engine; 103 104// Mapping from wrapped handle to EWB handles. 105static std::map<ENGINE_HANDLE*, EWB_Engine*> engine_map; 106 107class NotificationThread : public Couchbase::Thread { 108public: 109 NotificationThread(EWB_Engine& engine_) 110 : Thread("ewb:pendingQ"), 111 engine(engine_) {} 112 113protected: 114 void run() override; 115 116protected: 117 EWB_Engine& engine; 118}; 119 120/** 121 * The BlockMonitorThread represents the thread that is 122 * monitoring the "lock" file. Once the file is no longer 123 * there it will resume the client specified with the given 124 * id. 125 */ 126class BlockMonitorThread : public Couchbase::Thread { 127public: 128 BlockMonitorThread(EWB_Engine& engine_, 129 uint32_t id_, 130 const std::string file_) 131 : Thread("ewb:BlockMon"), 132 engine(engine_), 133 id(id_), 134 file(file_) {} 135 136 /** 137 * Wait for the underlying thread to reach the zombie state 138 * (== terminated, but not reaped) 139 */ 140 ~BlockMonitorThread() { 141 waitForState(Couchbase::ThreadState::Zombie); 142 } 143 144protected: 145 void run() override; 146 147private: 148 EWB_Engine& engine; 149 const uint32_t id; 150 const std::string file; 151}; 152 153static void register_callback(ENGINE_HANDLE *, ENGINE_EVENT_TYPE, 154 EVENT_CALLBACK, const void *); 155 156static SERVER_HANDLE_V1 wrapped_api; 157static SERVER_HANDLE_V1 *real_api; 158static void init_wrapped_api(GET_SERVER_API fn) { 159 static bool init = false; 160 if (init) { 161 return; 162 } 163 164 init = true; 165 real_api = fn(); 166 wrapped_api = *real_api; 167 168 // Overrides 169 static SERVER_CALLBACK_API callback = *wrapped_api.callback; 170 callback.register_callback = register_callback; 171 wrapped_api.callback = &callback; 172} 173 174static SERVER_HANDLE_V1 *get_wrapped_gsa() { 175 return &wrapped_api; 176} 177 178/** ewouldblock_engine class */ 179class EWB_Engine : public ENGINE_HANDLE_V1 { 180 181private: 182 enum class Cmd { 183 NONE, 184 GET_INFO, 185 ALLOCATE, 186 REMOVE, 187 GET, 188 STORE, 189 CAS, 190 ARITHMETIC, 191 LOCK, 192 UNLOCK, 193 FLUSH, 194 GET_STATS, 195 GET_META, 196 UNKNOWN_COMMAND 197 }; 198 199 const char* to_string(Cmd cmd); 200 201 uint64_t (*get_connection_id)(gsl::not_null<const void*> cookie); 202 203public: 204 EWB_Engine(GET_SERVER_API gsa_); 205 206 ~EWB_Engine(); 207 208 // Convert from a handle back to the read object. 209 static EWB_Engine* to_engine(ENGINE_HANDLE* handle) { 210 return reinterpret_cast<EWB_Engine*> (handle); 211 } 212 213 /* Returns true if the next command should have a fake error code injected. 214 * @param func Address of the command function (get, store, etc). 215 * @param cookie The cookie for the user's request. 216 * @param[out] Error code to return. 217 */ 218 bool should_inject_error(Cmd cmd, const void* cookie, 219 ENGINE_ERROR_CODE& err) { 220 221 if (is_connection_suspended(cookie)) { 222 err = ENGINE_EWOULDBLOCK; 223 return true; 224 } 225 226 uint64_t id = get_connection_id(cookie); 227 228 std::lock_guard<std::mutex> guard(cookie_map_mutex); 229 230 auto iter = connection_map.find(id); 231 if (iter == connection_map.end()) { 232 return false; 233 } 234 235 if (iter->second.first != cookie) { 236 // The cookie is different so it represents a different command 237 connection_map.erase(iter); 238 return false; 239 } 240 241 const bool inject = iter->second.second->should_inject_error(cmd, err); 242 const bool add_to_pending_io_ops = iter->second.second->add_to_pending_io_ops(); 243 244 if (inject) { 245 LOG_DEBUG("EWB_Engine: injecting error:{} for cmd:{}", 246 err, 247 to_string(cmd)); 248 249 if (err == ENGINE_EWOULDBLOCK && add_to_pending_io_ops) { 250 // The server expects that if EWOULDBLOCK is returned then the 251 // server should be notified in the future when the operation is 252 // ready - so add this op to the pending IO queue. 253 schedule_notification(iter->second.first); 254 } 255 } 256 257 return inject; 258 } 259 260 /* Implementation of all the engine functions. ***************************/ 261 262 static ENGINE_ERROR_CODE initialize(gsl::not_null<ENGINE_HANDLE*> handle, 263 const char* config_str) { 264 EWB_Engine* ewb = to_engine(handle); 265 266 // Extract the name of the real engine we will be proxying; then 267 // create and initialize it. 268 std::string config(config_str); 269 auto seperator = config.find(";"); 270 std::string real_engine_name(config.substr(0, seperator)); 271 std::string real_engine_config; 272 if (seperator != std::string::npos) { 273 real_engine_config = config.substr(seperator + 1); 274 } 275 276 if ((ewb->real_engine_ref = load_engine( 277 real_engine_name.c_str(), NULL, NULL)) == NULL) { 278 LOG_CRITICAL( 279 "ERROR: EWB_Engine::initialize(): Failed to load real " 280 "engine '{}'", 281 real_engine_name); 282 abort(); 283 } 284 285 if (!create_engine_instance( 286 ewb->real_engine_ref, get_wrapped_gsa, &ewb->real_handle)) { 287 LOG_CRITICAL( 288 "ERROR: EWB_Engine::initialize(): Failed create " 289 "engine instance '{}'", 290 real_engine_name); 291 abort(); 292 } 293 294 if (ewb->real_handle->interface != 1) { 295 LOG_CRITICAL( 296 "ERROR: EWB_Engine::initialize(): Only support engine " 297 "with interface v1 - got v{}.", 298 ewb->real_engine->interface.interface); 299 abort(); 300 } 301 ewb->real_engine = 302 reinterpret_cast<ENGINE_HANDLE_V1*>(ewb->real_handle); 303 304 305 engine_map[ewb->real_handle] = ewb; 306 ENGINE_ERROR_CODE res = ewb->real_engine->initialize( 307 ewb->real_handle, real_engine_config.c_str()); 308 309 if (res == ENGINE_SUCCESS) { 310 // For engine interface functions which cannot return EWOULDBLOCK, 311 // and we otherwise don't want to interpose, we can simply use the 312 // real_engine's functions directly. 313 ewb->ENGINE_HANDLE_V1::item_set_cas = ewb->real_engine->item_set_cas; 314 ewb->ENGINE_HANDLE_V1::set_item_info = ewb->real_engine->set_item_info; 315 ewb->ENGINE_HANDLE_V1::item_set_datatype = 316 ewb->real_engine->item_set_datatype; 317 } 318 319 // Register a callback on DISCONNECT events, so we can delete 320 // any stale elements from connection_map when a connection 321 // DC's. 322 real_api->callback->register_callback( 323 reinterpret_cast<ENGINE_HANDLE*>(ewb), 324 ON_DISCONNECT, 325 handle_disconnect, 326 reinterpret_cast<ENGINE_HANDLE*>(ewb)); 327 328 return res; 329 } 330 331 static void destroy(gsl::not_null<ENGINE_HANDLE*> handle, 332 const bool force) { 333 EWB_Engine* ewb = to_engine(handle); 334 ewb->real_engine->destroy(ewb->real_handle, force); 335 delete ewb; 336 } 337 338 static cb::EngineErrorItemPair allocate( 339 gsl::not_null<ENGINE_HANDLE*> handle, 340 gsl::not_null<const void*> cookie, 341 const DocKey& key, 342 const size_t nbytes, 343 const int flags, 344 const rel_time_t exptime, 345 uint8_t datatype, 346 uint16_t vbucket) { 347 EWB_Engine* ewb = to_engine(handle); 348 ENGINE_ERROR_CODE err = ENGINE_SUCCESS; 349 if (ewb->should_inject_error(Cmd::ALLOCATE, cookie, err)) { 350 return cb::makeEngineErrorItemPair(cb::engine_errc(err)); 351 } else { 352 return ewb->real_engine->allocate(ewb->real_handle, 353 cookie, 354 key, 355 nbytes, 356 flags, 357 exptime, 358 datatype, 359 vbucket); 360 } 361 } 362 363 static std::pair<cb::unique_item_ptr, item_info> allocate_ex( 364 gsl::not_null<ENGINE_HANDLE*> handle, 365 gsl::not_null<const void*> cookie, 366 const DocKey& key, 367 size_t nbytes, 368 size_t priv_nbytes, 369 int flags, 370 rel_time_t exptime, 371 uint8_t datatype, 372 uint16_t vbucket) { 373 EWB_Engine* ewb = to_engine(handle); 374 ENGINE_ERROR_CODE err = ENGINE_SUCCESS; 375 if (ewb->should_inject_error(Cmd::ALLOCATE, cookie, err)) { 376 throw cb::engine_error(cb::engine_errc(err), "ewb: injecting error"); 377 } else { 378 return ewb->real_engine->allocate_ex(ewb->real_handle, cookie, 379 key, nbytes, priv_nbytes, 380 flags, exptime, datatype, 381 vbucket); 382 } 383 } 384 385 static ENGINE_ERROR_CODE remove(gsl::not_null<ENGINE_HANDLE*> handle, 386 gsl::not_null<const void*> cookie, 387 const DocKey& key, 388 uint64_t& cas, 389 uint16_t vbucket, 390 mutation_descr_t& mut_info) { 391 EWB_Engine* ewb = to_engine(handle); 392 ENGINE_ERROR_CODE err = ENGINE_SUCCESS; 393 if (ewb->should_inject_error(Cmd::REMOVE, cookie, err)) { 394 return err; 395 } else { 396 return ewb->real_engine->remove(ewb->real_handle, cookie, key, cas, 397 vbucket, mut_info); 398 } 399 } 400 401 static void release(gsl::not_null<ENGINE_HANDLE*> handle, 402 gsl::not_null<item*> item) { 403 EWB_Engine* ewb = to_engine(handle); 404 LOG_DEBUG("EWB_Engine: release"); 405 406 if (item == &ewb->dcp_mutation_item) { 407 // Ignore the DCP mutation, we own it (and don't track 408 // refcounts on it). 409 } else { 410 return ewb->real_engine->release(ewb->real_handle, item); 411 } 412 } 413 414 static cb::EngineErrorItemPair get(gsl::not_null<ENGINE_HANDLE*> handle, 415 gsl::not_null<const void*> cookie, 416 const DocKey& key, 417 uint16_t vbucket, 418 DocStateFilter documentStateFilter) { 419 EWB_Engine* ewb = to_engine(handle); 420 ENGINE_ERROR_CODE err = ENGINE_SUCCESS; 421 if (ewb->should_inject_error(Cmd::GET, cookie, err)) { 422 return std::make_pair( 423 cb::engine_errc(err), 424 cb::unique_item_ptr{nullptr, cb::ItemDeleter{handle}}); 425 } else { 426 return ewb->real_engine->get(ewb->real_handle, 427 cookie, 428 key, 429 vbucket, 430 documentStateFilter); 431 } 432 } 433 434 static cb::EngineErrorItemPair get_if( 435 gsl::not_null<ENGINE_HANDLE*> handle, 436 gsl::not_null<const void*> cookie, 437 const DocKey& key, 438 uint16_t vbucket, 439 std::function<bool(const item_info&)> filter) { 440 EWB_Engine* ewb = to_engine(handle); 441 ENGINE_ERROR_CODE err = ENGINE_SUCCESS; 442 if (ewb->should_inject_error(Cmd::GET, cookie, err)) { 443 return cb::makeEngineErrorItemPair(cb::engine_errc::would_block); 444 } else { 445 return ewb->real_engine->get_if( 446 ewb->real_handle, cookie, key, vbucket, filter); 447 } 448 } 449 450 static cb::EngineErrorItemPair get_and_touch( 451 gsl::not_null<ENGINE_HANDLE*> handle, 452 gsl::not_null<const void*> cookie, 453 const DocKey& key, 454 uint16_t vbucket, 455 uint32_t exptime) { 456 EWB_Engine* ewb = to_engine(handle); 457 ENGINE_ERROR_CODE err = ENGINE_SUCCESS; 458 if (ewb->should_inject_error(Cmd::GET, cookie, err)) { 459 return cb::makeEngineErrorItemPair(cb::engine_errc::would_block); 460 } else { 461 return ewb->real_engine->get_and_touch( 462 ewb->real_handle, cookie, key, vbucket, exptime); 463 } 464 } 465 466 static cb::EngineErrorItemPair get_locked( 467 gsl::not_null<ENGINE_HANDLE*> handle, 468 gsl::not_null<const void*> cookie, 469 const DocKey& key, 470 uint16_t vbucket, 471 uint32_t lock_timeout) { 472 EWB_Engine* ewb = to_engine(handle); 473 ENGINE_ERROR_CODE err = ENGINE_SUCCESS; 474 if (ewb->should_inject_error(Cmd::LOCK, cookie, err)) { 475 return cb::makeEngineErrorItemPair(cb::engine_errc(err)); 476 } else { 477 return ewb->real_engine->get_locked( 478 ewb->real_handle, cookie, key, vbucket, lock_timeout); 479 } 480 } 481 482 static cb::EngineErrorMetadataPair get_meta( 483 gsl::not_null<ENGINE_HANDLE*> handle, 484 gsl::not_null<const void*> cookie, 485 const DocKey& key, 486 uint16_t vbucket) { 487 EWB_Engine* ewb = to_engine(handle); 488 ENGINE_ERROR_CODE err = ENGINE_SUCCESS; 489 if (ewb->should_inject_error(Cmd::GET_META, cookie, err)) { 490 return std::make_pair(cb::engine_errc(err), item_info()); 491 } else { 492 return ewb->real_engine->get_meta(ewb->real_handle, 493 cookie, 494 key, 495 vbucket); 496 } 497 } 498 499 static ENGINE_ERROR_CODE unlock(gsl::not_null<ENGINE_HANDLE*> handle, 500 gsl::not_null<const void*> cookie, 501 const DocKey& key, 502 uint16_t vbucket, 503 uint64_t cas) { 504 EWB_Engine* ewb = to_engine(handle); 505 ENGINE_ERROR_CODE err = ENGINE_SUCCESS; 506 if (ewb->should_inject_error(Cmd::UNLOCK, cookie, err)) { 507 return err; 508 } else { 509 return ewb->real_engine->unlock(ewb->real_handle, cookie, key, 510 vbucket, cas); 511 } 512 } 513 514 static ENGINE_ERROR_CODE store(gsl::not_null<ENGINE_HANDLE*> handle, 515 gsl::not_null<const void*> cookie, 516 gsl::not_null<item*> item, 517 uint64_t& cas, 518 ENGINE_STORE_OPERATION operation, 519 DocumentState document_state) { 520 EWB_Engine* ewb = to_engine(handle); 521 ENGINE_ERROR_CODE err = ENGINE_SUCCESS; 522 Cmd opcode = (operation == OPERATION_CAS) ? Cmd::CAS : Cmd::STORE; 523 if (ewb->should_inject_error(opcode, cookie, err)) { 524 return err; 525 } else { 526 return ewb->real_engine->store(ewb->real_handle, cookie, item, cas, 527 operation, document_state); 528 } 529 } 530 531 static cb::EngineErrorCasPair store_if(gsl::not_null<ENGINE_HANDLE*> handle, 532 gsl::not_null<const void*> cookie, 533 gsl::not_null<item*> item, 534 uint64_t cas, 535 ENGINE_STORE_OPERATION operation, 536 cb::StoreIfPredicate predicate, 537 DocumentState document_state) { 538 EWB_Engine* ewb = to_engine(handle); 539 ENGINE_ERROR_CODE err = ENGINE_SUCCESS; 540 Cmd opcode = (operation == OPERATION_CAS) ? Cmd::CAS : Cmd::STORE; 541 if (ewb->should_inject_error(opcode, cookie, err)) { 542 return {cb::engine_errc(err), 0}; 543 } else { 544 return ewb->real_engine->store_if(ewb->real_handle, 545 cookie, 546 item, 547 cas, 548 operation, 549 predicate, 550 document_state); 551 } 552 } 553 554 static ENGINE_ERROR_CODE flush(gsl::not_null<ENGINE_HANDLE*> handle, 555 gsl::not_null<const void*> cookie) { 556 // Flush is a little different - it often returns EWOULDBLOCK, and 557 // notify_io_complete() just tells the server it can issue it's *next* 558 // command (i.e. no need to re-flush). Therefore just pass Flush 559 // straight through for now. 560 EWB_Engine* ewb = to_engine(handle); 561 return ewb->real_engine->flush(ewb->real_handle, cookie); 562 } 563 564 static ENGINE_ERROR_CODE get_stats(gsl::not_null<ENGINE_HANDLE*> handle, 565 gsl::not_null<const void*> cookie, 566 cb::const_char_buffer key, 567 ADD_STAT add_stat) { 568 EWB_Engine* ewb = to_engine(handle); 569 ENGINE_ERROR_CODE err = ENGINE_SUCCESS; 570 if (ewb->should_inject_error(Cmd::GET_STATS, cookie, err)) { 571 return err; 572 } else { 573 return ewb->real_engine->get_stats( 574 ewb->real_handle, cookie, key, add_stat); 575 } 576 } 577 578 static void reset_stats(gsl::not_null<ENGINE_HANDLE*> handle, 579 gsl::not_null<const void*> cookie) { 580 EWB_Engine* ewb = to_engine(handle); 581 return ewb->real_engine->reset_stats(ewb->real_handle, cookie); 582 } 583 584 /* Handle 'unknown_command'. In additional to wrapping calls to the 585 * underlying real engine, this is also used to configure 586 * ewouldblock_engine itself using he CMD_EWOULDBLOCK_CTL opcode. 587 */ 588 static ENGINE_ERROR_CODE unknown_command( 589 gsl::not_null<ENGINE_HANDLE*> handle, 590 const void* cookie, 591 gsl::not_null<protocol_binary_request_header*> request, 592 ADD_RESPONSE response, 593 DocNamespace doc_namespace) { 594 EWB_Engine* ewb = to_engine(handle); 595 596 if (request->request.opcode == PROTOCOL_BINARY_CMD_EWOULDBLOCK_CTL) { 597 auto* req = 598 reinterpret_cast<request_ewouldblock_ctl*>(request.get()); 599 const EWBEngineMode mode = static_cast<EWBEngineMode>(ntohl(req->message.body.mode)); 600 const uint32_t value = ntohl(req->message.body.value); 601 const ENGINE_ERROR_CODE injected_error = 602 static_cast<ENGINE_ERROR_CODE>(ntohl(req->message.body.inject_error)); 603 const std::string key((char*)req->bytes + sizeof(req->bytes), 604 ntohs(req->message.header.request.keylen)); 605 606 std::shared_ptr<FaultInjectMode> new_mode = nullptr; 607 608 // Validate mode, and construct new fault injector. 609 switch (mode) { 610 case EWBEngineMode::Next_N: 611 new_mode = std::make_shared<ErrOnNextN>(injected_error, value); 612 break; 613 614 case EWBEngineMode::Random: 615 new_mode = std::make_shared<ErrRandom>(injected_error, value); 616 break; 617 618 case EWBEngineMode::First: 619 new_mode = std::make_shared<ErrOnFirst>(injected_error); 620 break; 621 622 case EWBEngineMode::Sequence: 623 new_mode = std::make_shared<ErrSequence>(injected_error, value); 624 break; 625 626 case EWBEngineMode::No_Notify: 627 new_mode = std::make_shared<ErrOnNoNotify>(injected_error); 628 break; 629 630 case EWBEngineMode::CasMismatch: 631 new_mode = std::make_shared<CASMismatch>(value); 632 break; 633 634 case EWBEngineMode::IncrementClusterMapRevno: 635 ewb->clustermap_revno++; 636 response(nullptr, 0, nullptr, 0, nullptr, 0, 637 PROTOCOL_BINARY_RAW_BYTES, 638 PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie); 639 return ENGINE_SUCCESS; 640 641 case EWBEngineMode::BlockMonitorFile: 642 return ewb->handleBlockMonitorFile(cookie, value, key, 643 response); 644 645 case EWBEngineMode::Suspend: 646 return ewb->handleSuspend(cookie, value, response); 647 648 case EWBEngineMode::Resume: 649 return ewb->handleResume(cookie, value, response); 650 651 case EWBEngineMode::SetItemCas: 652 return ewb->setItemCas(cookie, key, value, response); 653 } 654 655 if (new_mode == nullptr) { 656 LOG_WARNING( 657 "EWB_Engine::unknown_command(): " 658 "Got unexpected mode={} for EWOULDBLOCK_CTL, ", 659 (unsigned int)mode); 660 response(nullptr, 0, nullptr, 0, nullptr, 0, 661 PROTOCOL_BINARY_RAW_BYTES, 662 PROTOCOL_BINARY_RESPONSE_EINVAL, /*cas*/0, cookie); 663 return ENGINE_FAILED; 664 } else { 665 try { 666 LOG_DEBUG( 667 "EWB_Engine::unknown_command(): Setting EWB mode " 668 "to " 669 "{} for cookie {}", 670 new_mode->to_string(), 671 cookie); 672 673 uint64_t id = ewb->get_connection_id(cookie); 674 675 { 676 std::lock_guard<std::mutex> guard(ewb->cookie_map_mutex); 677 ewb->connection_map.erase(id); 678 ewb->connection_map.emplace(id, std::make_pair(cookie, new_mode)); 679 } 680 681 response(nullptr, 0, nullptr, 0, nullptr, 0, 682 PROTOCOL_BINARY_RAW_BYTES, 683 PROTOCOL_BINARY_RESPONSE_SUCCESS, /*cas*/0, cookie); 684 return ENGINE_SUCCESS; 685 } catch (std::bad_alloc&) { 686 return ENGINE_ENOMEM; 687 } 688 } 689 } else { 690 ENGINE_ERROR_CODE err = ENGINE_SUCCESS; 691 if (ewb->should_inject_error(Cmd::UNKNOWN_COMMAND, cookie, err)) { 692 return err; 693 } else { 694 return ewb->real_engine->unknown_command(ewb->real_handle, cookie, 695 request, response, 696 doc_namespace); 697 } 698 } 699 } 700 701 static void item_set_cas(gsl::not_null<ENGINE_HANDLE*> handle, 702 gsl::not_null<item*> item, 703 uint64_t cas) { 704 // Should never be called as ENGINE_HANDLE_V1::item_set_cas is updated 705 // to point to the real_engine once it is initialized. This function 706 //only exists so there is a non-NULL value for 707 // ENGINE_HANDLE_V1::item_set_cas initially to keep load_engine() 708 // happy. 709 abort(); 710 } 711 712 static void item_set_datatype(gsl::not_null<ENGINE_HANDLE*>, 713 gsl::not_null<item*> itm, 714 protocol_binary_datatype_t datatype) { 715 // Should never be called - set item_set_datatype(). 716 abort(); 717 } 718 719 static bool get_item_info(gsl::not_null<ENGINE_HANDLE*> handle, 720 gsl::not_null<const item*> item, 721 gsl::not_null<item_info*> item_info) { 722 EWB_Engine* ewb = to_engine(handle); 723 LOG_DEBUG("EWB_Engine: get_item_info"); 724 725 // This function cannot return EWOULDBLOCK - just chain to the real 726 // engine's function, unless it is a request for our special DCP item. 727 if (item == &ewb->dcp_mutation_item) { 728 item_info->cas = 0; 729 item_info->vbucket_uuid = 0; 730 item_info->seqno = 0; 731 item_info->exptime = 0; 732 item_info->nbytes = 733 gsl::narrow<uint32_t>(ewb->dcp_mutation_item.value.size()); 734 item_info->flags = 0; 735 item_info->datatype = PROTOCOL_BINARY_DATATYPE_XATTR; 736 item_info->nkey = 737 gsl::narrow<uint16_t>(ewb->dcp_mutation_item.key.size()); 738 item_info->key = ewb->dcp_mutation_item.key.c_str(); 739 item_info->value[0].iov_base = &ewb->dcp_mutation_item.value[0]; 740 item_info->value[0].iov_len = item_info->nbytes; 741 return true; 742 } else { 743 return ewb->real_engine->get_item_info( 744 ewb->real_handle, item, item_info); 745 } 746 } 747 748 static bool set_item_info(gsl::not_null<ENGINE_HANDLE*> handle, 749 gsl::not_null<item*> item, 750 gsl::not_null<const item_info*> item_info) { 751 // Should never be called - set item_set_cas(). 752 abort(); 753 } 754 755 static void handle_disconnect(const void* cookie, 756 ENGINE_EVENT_TYPE type, 757 const void* event_data, 758 const void* cb_data) { 759 cb_assert(event_data == NULL); 760 EWB_Engine* ewb = 761 reinterpret_cast<EWB_Engine*>(const_cast<void*>(cb_data)); 762 LOG_DEBUG("EWB_Engine::handle_disconnect"); 763 764 uint64_t id = ewb->get_connection_id(cookie); 765 { 766 std::lock_guard<std::mutex> guard(ewb->cookie_map_mutex); 767 ewb->connection_map.erase(id); 768 } 769 } 770 771 GET_SERVER_API gsa; 772 773 // Actual engine we are proxying requests to. 774 ENGINE_HANDLE* real_handle; 775 ENGINE_HANDLE_V1* real_engine; 776 engine_reference* real_engine_ref; 777 778 std::atomic_int clustermap_revno; 779 780 /** 781 * The method responsible for pushing all of the notify_io_complete 782 * to the frontend. It is run by notify_io_thread and not intended to 783 * be called by anyone else!. 784 */ 785 void process_notifications(); 786 std::unique_ptr<Couchbase::Thread> notify_io_thread; 787 788protected: 789 /** 790 * Handle the control message for block monitor file 791 * 792 * @param cookie The cookie executing the operation 793 * @param id The identifier used to represent the cookie 794 * @param file The file to monitor 795 * @param response callback used to send a response to the client 796 * @return The standard engine error codes 797 */ 798 ENGINE_ERROR_CODE handleBlockMonitorFile(const void* cookie, 799 uint32_t id, 800 const std::string& file, 801 ADD_RESPONSE response); 802 803 /** 804 * Handle the control message for suspend 805 * 806 * @param cookie The cookie executing the operation 807 * @param id The identifier used to represent the cookie to resume 808 * (the use of a different id is to allow resume to 809 * be sent on a different connection) 810 * @param response callback used to send a response to the client 811 * @return The standard engine error codes 812 */ 813 ENGINE_ERROR_CODE handleSuspend(const void* cookie, 814 uint32_t id, 815 ADD_RESPONSE response); 816 817 /** 818 * Handle the control message for resume 819 * 820 * @param cookie The cookie executing the operation 821 * @param id The identifier representing the connection to resume 822 * @param response callback used to send a response to the client 823 * @return The standard engine error codes 824 */ 825 ENGINE_ERROR_CODE handleResume(const void* cookie, 826 uint32_t id, 827 ADD_RESPONSE response); 828 829 /** 830 * @param cookie the cookie executing the operation 831 * @param key ID of the item whose CAS should be changed 832 * @param cas The new CAS 833 * @param response Response callback used to send a response to the client 834 * @return Standard engine error codes 835 */ 836 ENGINE_ERROR_CODE setItemCas(const void *cookie, 837 const std::string& key, uint32_t cas, 838 ADD_RESPONSE response); 839 840private: 841 // Shared state between the main thread of execution and the background 842 // thread processing pending io ops. 843 std::mutex mutex; 844 std::condition_variable condvar; 845 std::queue<const void*> pending_io_ops; 846 847 std::atomic<bool> stop_notification_thread; 848 849 /////////////////////////////////////////////////////////////////////////// 850 // All of the methods used in the DCP interface // 851 // // 852 // We don't support mocking with the DCP interface yet, so all access to // 853 // the DCP interface will be proxied down to the underlying engine. // 854 /////////////////////////////////////////////////////////////////////////// 855 static ENGINE_ERROR_CODE dcp_step( 856 gsl::not_null<ENGINE_HANDLE*> handle, 857 gsl::not_null<const void*> cookie, 858 gsl::not_null<struct dcp_message_producers*> producers); 859 860 static ENGINE_ERROR_CODE dcp_open(gsl::not_null<ENGINE_HANDLE*> handle, 861 gsl::not_null<const void*> cookie, 862 uint32_t opaque, 863 uint32_t seqno, 864 uint32_t flags, 865 cb::const_char_buffer name, 866 cb::const_byte_buffer json); 867 868 static ENGINE_ERROR_CODE dcp_add_stream( 869 gsl::not_null<ENGINE_HANDLE*> handle, 870 gsl::not_null<const void*> cookie, 871 uint32_t opaque, 872 uint16_t vbucket, 873 uint32_t flags); 874 875 static ENGINE_ERROR_CODE dcp_close_stream( 876 gsl::not_null<ENGINE_HANDLE*> handle, 877 gsl::not_null<const void*> cookie, 878 uint32_t opaque, 879 uint16_t vbucket); 880 881 static ENGINE_ERROR_CODE dcp_stream_req( 882 gsl::not_null<ENGINE_HANDLE*> handle, 883 gsl::not_null<const void*> cookie, 884 uint32_t flags, 885 uint32_t opaque, 886 uint16_t vbucket, 887 uint64_t start_seqno, 888 uint64_t end_seqno, 889 uint64_t vbucket_uuid, 890 uint64_t snap_start_seqno, 891 uint64_t snap_end_seqno, 892 uint64_t* rollback_seqno, 893 dcp_add_failover_log callback); 894 895 static ENGINE_ERROR_CODE dcp_get_failover_log( 896 gsl::not_null<ENGINE_HANDLE*> handle, 897 gsl::not_null<const void*> cookie, 898 uint32_t opaque, 899 uint16_t vbucket, 900 dcp_add_failover_log callback); 901 902 static ENGINE_ERROR_CODE dcp_stream_end( 903 gsl::not_null<ENGINE_HANDLE*> handle, 904 gsl::not_null<const void*> cookie, 905 uint32_t opaque, 906 uint16_t vbucket, 907 uint32_t flags); 908 909 static ENGINE_ERROR_CODE dcp_snapshot_marker( 910 gsl::not_null<ENGINE_HANDLE*> handle, 911 gsl::not_null<const void*> cookie, 912 uint32_t opaque, 913 uint16_t vbucket, 914 uint64_t start_seqno, 915 uint64_t end_seqno, 916 uint32_t flags); 917 918 static ENGINE_ERROR_CODE dcp_mutation(gsl::not_null<ENGINE_HANDLE*> handle, 919 gsl::not_null<const void*> cookie, 920 uint32_t opaque, 921 const DocKey& key, 922 cb::const_byte_buffer value, 923 size_t priv_bytes, 924 uint8_t datatype, 925 uint64_t cas, 926 uint16_t vbucket, 927 uint32_t flags, 928 uint64_t by_seqno, 929 uint64_t rev_seqno, 930 uint32_t expiration, 931 uint32_t lock_time, 932 cb::const_byte_buffer meta, 933 uint8_t nru); 934 935 static ENGINE_ERROR_CODE dcp_deletion(gsl::not_null<ENGINE_HANDLE*> handle, 936 gsl::not_null<const void*> cookie, 937 uint32_t opaque, 938 const DocKey& key, 939 cb::const_byte_buffer value, 940 size_t priv_bytes, 941 uint8_t datatype, 942 uint64_t cas, 943 uint16_t vbucket, 944 uint64_t by_seqno, 945 uint64_t rev_seqno, 946 cb::const_byte_buffer meta); 947 948 static ENGINE_ERROR_CODE dcp_deletion_v2( 949 gsl::not_null<ENGINE_HANDLE*> handle, 950 gsl::not_null<const void*> cookie, 951 uint32_t opaque, 952 const DocKey& key, 953 cb::const_byte_buffer value, 954 size_t priv_bytes, 955 uint8_t datatype, 956 uint64_t cas, 957 uint16_t vbucket, 958 uint64_t by_seqno, 959 uint64_t rev_seqno, 960 uint32_t delete_time); 961 962 static ENGINE_ERROR_CODE dcp_expiration( 963 gsl::not_null<ENGINE_HANDLE*> handle, 964 gsl::not_null<const void*> cookie, 965 uint32_t opaque, 966 const DocKey& key, 967 cb::const_byte_buffer value, 968 size_t priv_bytes, 969 uint8_t datatype, 970 uint64_t cas, 971 uint16_t vbucket, 972 uint64_t by_seqno, 973 uint64_t rev_seqno, 974 cb::const_byte_buffer meta); 975 976 static ENGINE_ERROR_CODE dcp_flush(gsl::not_null<ENGINE_HANDLE*> handle, 977 gsl::not_null<const void*> cookie, 978 uint32_t opaque, 979 uint16_t vbucket); 980 981 static ENGINE_ERROR_CODE dcp_set_vbucket_state( 982 gsl::not_null<ENGINE_HANDLE*> handle, 983 gsl::not_null<const void*> cookie, 984 uint32_t opaque, 985 uint16_t vbucket, 986 vbucket_state_t state); 987 988 static ENGINE_ERROR_CODE dcp_noop(gsl::not_null<ENGINE_HANDLE*> handle, 989 gsl::not_null<const void*> cookie, 990 uint32_t opaque); 991 992 static ENGINE_ERROR_CODE dcp_buffer_acknowledgement( 993 gsl::not_null<ENGINE_HANDLE*> handle, 994 gsl::not_null<const void*> cookie, 995 uint32_t opaque, 996 uint16_t vbucket, 997 uint32_t buffer_bytes); 998 999 static ENGINE_ERROR_CODE dcp_control(gsl::not_null<ENGINE_HANDLE*> handle, 1000 gsl::not_null<const void*> cookie, 1001 uint32_t opaque, 1002 const void* key, 1003 uint16_t nkey, 1004 const void* value, 1005 uint32_t nvalue); 1006 1007 static ENGINE_ERROR_CODE dcp_response_handler( 1008 gsl::not_null<ENGINE_HANDLE*> handle, 1009 gsl::not_null<const void*> cookie, 1010 const protocol_binary_response_header* response); 1011 1012 static ENGINE_ERROR_CODE dcp_system_event( 1013 gsl::not_null<ENGINE_HANDLE*> handle, 1014 gsl::not_null<const void*> cookie, 1015 uint32_t opaque, 1016 uint16_t vbucket, 1017 mcbp::systemevent::id event, 1018 uint64_t bySeqno, 1019 cb::const_byte_buffer key, 1020 cb::const_byte_buffer eventData); 1021 1022 static cb::engine_error collections_set_manifest( 1023 gsl::not_null<ENGINE_HANDLE*> handle, cb::const_char_buffer json); 1024 1025 static cb::EngineErrorStringPair collections_get_manifest( 1026 gsl::not_null<ENGINE_HANDLE*> handle); 1027 1028 static bool isXattrEnabled(gsl::not_null<ENGINE_HANDLE*> handle); 1029 1030 static BucketCompressionMode getCompressionMode(gsl::not_null<ENGINE_HANDLE*> handle); 1031 1032 static size_t getMaxItemSize(gsl::not_null<ENGINE_HANDLE*> handle); 1033 1034 static float getMinCompressionRatio(gsl::not_null<ENGINE_HANDLE*> handle); 1035 1036 // Base class for all fault injection modes. 1037 struct FaultInjectMode { 1038 FaultInjectMode(ENGINE_ERROR_CODE injected_error_) 1039 : injected_error(injected_error_) {} 1040 1041 virtual bool add_to_pending_io_ops() { 1042 return true; 1043 } 1044 virtual bool should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err) = 0; 1045 1046 virtual std::string to_string() const = 0; 1047 1048 protected: 1049 ENGINE_ERROR_CODE injected_error; 1050 }; 1051 1052 // Subclasses for each fault inject mode: ///////////////////////////////// 1053 1054 class ErrOnFirst : public FaultInjectMode { 1055 public: 1056 ErrOnFirst(ENGINE_ERROR_CODE injected_error_) 1057 : FaultInjectMode(injected_error_), 1058 prev_cmd(Cmd::NONE) {} 1059 1060 bool should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err) { 1061 // Block unless the previous command from this cookie 1062 // was the same - i.e. all of a connections' commands 1063 // will EWOULDBLOCK the first time they are called. 1064 bool inject = (prev_cmd != cmd); 1065 prev_cmd = cmd; 1066 if (inject) { 1067 err = injected_error; 1068 } 1069 return inject; 1070 } 1071 1072 std::string to_string() const { 1073 return "ErrOnFirst inject_error=" + std::to_string(injected_error); 1074 } 1075 1076 private: 1077 // Last command issued by this cookie. 1078 Cmd prev_cmd; 1079 }; 1080 1081 class ErrOnNextN : public FaultInjectMode { 1082 public: 1083 ErrOnNextN(ENGINE_ERROR_CODE injected_error_, uint32_t count_) 1084 : FaultInjectMode(injected_error_), 1085 count(count_) {} 1086 1087 bool should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err) { 1088 if (count > 0) { 1089 --count; 1090 err = injected_error; 1091 return true; 1092 } else { 1093 return false; 1094 } 1095 } 1096 1097 std::string to_string() const { 1098 return std::string("ErrOnNextN") + 1099 " inject_error=" + std::to_string(injected_error) + 1100 " count=" + std::to_string(count); 1101 } 1102 1103 private: 1104 // The count of commands issued that should return error. 1105 uint32_t count; 1106 }; 1107 1108 class ErrRandom : public FaultInjectMode { 1109 public: 1110 ErrRandom(ENGINE_ERROR_CODE injected_error_, uint32_t percentage_) 1111 : FaultInjectMode(injected_error_), 1112 percentage_to_err(percentage_) {} 1113 1114 bool should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err) { 1115 std::random_device rd; 1116 std::mt19937 gen(rd()); 1117 std::uniform_int_distribution<uint32_t> dis(1, 100); 1118 if (dis(gen) < percentage_to_err) { 1119 err = injected_error; 1120 return true; 1121 } else { 1122 return false; 1123 } 1124 } 1125 1126 std::string to_string() const { 1127 return std::string("ErrRandom") + 1128 " inject_error=" + std::to_string(injected_error) + 1129 " percentage=" + std::to_string(percentage_to_err); 1130 } 1131 1132 private: 1133 // Percentage chance that the specified error should be injected. 1134 uint32_t percentage_to_err; 1135 }; 1136 1137 class ErrSequence : public FaultInjectMode { 1138 public: 1139 ErrSequence(ENGINE_ERROR_CODE injected_error_, uint32_t sequence_) 1140 : FaultInjectMode(injected_error_), 1141 sequence(sequence_), 1142 pos(0) {} 1143 1144 bool should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err) { 1145 bool inject = false; 1146 if (pos < 32) { 1147 inject = (sequence & (1 << pos)) != 0; 1148 pos++; 1149 } 1150 if (inject) { 1151 err = injected_error; 1152 } 1153 return inject; 1154 } 1155 1156 std::string to_string() const { 1157 std::stringstream ss; 1158 ss << "ErrSequence inject_error=" << injected_error 1159 << " sequence=0x" << std::hex << sequence 1160 << " pos=" << pos; 1161 return ss.str(); 1162 } 1163 1164 private: 1165 uint32_t sequence; 1166 uint32_t pos; 1167 }; 1168 1169 class ErrOnNoNotify : public FaultInjectMode { 1170 public: 1171 ErrOnNoNotify(ENGINE_ERROR_CODE injected_error_) 1172 : FaultInjectMode(injected_error_), 1173 issued_return_error(false) {} 1174 1175 bool add_to_pending_io_ops() {return false;} 1176 bool should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err) { 1177 if (!issued_return_error) { 1178 issued_return_error = true; 1179 err = injected_error; 1180 return true; 1181 } else { 1182 return false; 1183 } 1184 } 1185 1186 std::string to_string() const { 1187 return std::string("ErrOnNoNotify") + 1188 " inject_error=" + std::to_string(injected_error) + 1189 " issued_return_error=" + 1190 std::to_string(issued_return_error); 1191 } 1192 1193 private: 1194 // Record of whether have yet issued return error. 1195 bool issued_return_error; 1196 }; 1197 1198 class CASMismatch : public FaultInjectMode { 1199 public: 1200 CASMismatch(uint32_t count_) 1201 : FaultInjectMode(ENGINE_KEY_EEXISTS), 1202 count(count_) {} 1203 1204 bool should_inject_error(Cmd cmd, ENGINE_ERROR_CODE& err) { 1205 if (cmd == Cmd::CAS && (count > 0)) { 1206 --count; 1207 err = injected_error; 1208 return true; 1209 } else { 1210 return false; 1211 } 1212 } 1213 1214 std::string to_string() const { 1215 return std::string("CASMismatch") + 1216 " count=" + std::to_string(count); 1217 } 1218 1219 private: 1220 uint32_t count; 1221 }; 1222 1223 // Map of connections (aka cookies) to their current mode. 1224 std::map<uint64_t, std::pair<const void*, std::shared_ptr<FaultInjectMode> > > connection_map; 1225 // Mutex for above map. 1226 std::mutex cookie_map_mutex; 1227 1228 // Current DCP mutation `item`. We return the address of this 1229 // (in the dcp step() function) back to the server, and then in 1230 // get_item_info we check if the requested item is this one. 1231 class EwbDcpKey { 1232 public: 1233 EwbDcpKey() 1234 : key("k") { 1235 cb::xattr::Blob builder; 1236 builder.set("_ewb", "{\"internal\":true}"); 1237 builder.set("meta", "{\"author\":\"jack\"}"); 1238 const auto blob = builder.finalize(); 1239 std::copy(blob.buf, blob.buf + blob.len, std::back_inserter(value)); 1240 // MB24971 - the body is large as it increases the probability of 1241 // transit returning TransmitResult::SoftError 1242 const std::string body(1000, 'x'); 1243 std::copy(body.begin(), body.end(), std::back_inserter(value)); 1244 } 1245 1246 std::string key; 1247 std::vector<uint8_t> value; 1248 } dcp_mutation_item; 1249 1250 /** 1251 * The dcp_stream map is used to map a cookie to the count of objects 1252 * it should send on the stream. 1253 * 1254 * Each entry in here constists of a pair containing a boolean specifying 1255 * if the stream is opened or not, and a count of how many times we should 1256 * return data 1257 */ 1258 std::map<const void*, std::pair<bool, uint64_t>> dcp_stream; 1259 1260 friend class BlockMonitorThread; 1261 std::map<uint32_t, const void*> suspended_map; 1262 std::mutex suspended_map_mutex; 1263 1264 bool suspend(const void* cookie, uint32_t id) { 1265 { 1266 std::lock_guard<std::mutex> guard(suspended_map_mutex); 1267 auto iter = suspended_map.find(id); 1268 if (iter == suspended_map.cend()) { 1269 suspended_map[id] = cookie; 1270 return true; 1271 } 1272 } 1273 1274 return false; 1275 } 1276 1277 bool resume(uint32_t id) { 1278 const void* cookie = nullptr; 1279 { 1280 std::lock_guard<std::mutex> guard(suspended_map_mutex); 1281 auto iter = suspended_map.find(id); 1282 if (iter == suspended_map.cend()) { 1283 return false; 1284 } 1285 cookie = iter->second; 1286 suspended_map.erase(iter); 1287 } 1288 1289 1290 schedule_notification(cookie); 1291 return true; 1292 } 1293 1294 bool is_connection_suspended(const void* cookie) { 1295 std::lock_guard<std::mutex> guard(suspended_map_mutex); 1296 for (const auto c : suspended_map) { 1297 if (c.second == cookie) { 1298 LOG_DEBUG( 1299 "Connection {} with id {} should be suspended for " 1300 "engine {}", 1301 c.second, 1302 c.first, 1303 (void*)this); 1304 1305 return true; 1306 } 1307 } 1308 return false; 1309 } 1310 1311 void schedule_notification(const void* cookie) { 1312 { 1313 std::lock_guard<std::mutex> guard(mutex); 1314 pending_io_ops.push(cookie); 1315 } 1316 LOG_DEBUG("EWB_Engine: connection {} should be resumed for engine {}", 1317 (void*)cookie, 1318 (void*)this); 1319 1320 condvar.notify_one(); 1321 } 1322 1323 // Vector to keep track of the threads we've started to ensure 1324 // we don't leak memory ;-) 1325 std::mutex threads_mutex; 1326 std::vector<std::unique_ptr<Couchbase::Thread> > threads; 1327}; 1328 1329EWB_Engine::EWB_Engine(GET_SERVER_API gsa_) 1330 : gsa(gsa_), 1331 real_engine(NULL), 1332 real_engine_ref(nullptr), 1333 notify_io_thread(new NotificationThread(*this)) 1334{ 1335 init_wrapped_api(gsa); 1336 1337 interface.interface = 1; 1338 ENGINE_HANDLE_V1::initialize = initialize; 1339 ENGINE_HANDLE_V1::destroy = destroy; 1340 ENGINE_HANDLE_V1::allocate = allocate; 1341 ENGINE_HANDLE_V1::allocate_ex = allocate_ex; 1342 ENGINE_HANDLE_V1::remove = remove; 1343 ENGINE_HANDLE_V1::release = release; 1344 ENGINE_HANDLE_V1::get = get; 1345 ENGINE_HANDLE_V1::get_if = get_if; 1346 ENGINE_HANDLE_V1::get_locked = get_locked; 1347 ENGINE_HANDLE_V1::get_meta = get_meta; 1348 ENGINE_HANDLE_V1::get_and_touch = get_and_touch; 1349 ENGINE_HANDLE_V1::unlock = unlock; 1350 ENGINE_HANDLE_V1::store = store; 1351 ENGINE_HANDLE_V1::store_if = store_if; 1352 ENGINE_HANDLE_V1::flush = flush; 1353 ENGINE_HANDLE_V1::get_stats = get_stats; 1354 ENGINE_HANDLE_V1::reset_stats = reset_stats; 1355 ENGINE_HANDLE_V1::unknown_command = unknown_command; 1356 ENGINE_HANDLE_V1::item_set_cas = item_set_cas; 1357 ENGINE_HANDLE_V1::item_set_datatype = item_set_datatype; 1358 ENGINE_HANDLE_V1::get_item_info = get_item_info; 1359 ENGINE_HANDLE_V1::set_item_info = set_item_info; 1360 ENGINE_HANDLE_V1::set_log_level = NULL; 1361 1362 ENGINE_HANDLE_V1::dcp = {}; 1363 ENGINE_HANDLE_V1::dcp.step = dcp_step; 1364 ENGINE_HANDLE_V1::dcp.open = dcp_open; 1365 ENGINE_HANDLE_V1::dcp.stream_req = dcp_stream_req; 1366 ENGINE_HANDLE_V1::dcp.add_stream = dcp_add_stream; 1367 ENGINE_HANDLE_V1::dcp.close_stream = dcp_close_stream; 1368 ENGINE_HANDLE_V1::dcp.buffer_acknowledgement = dcp_buffer_acknowledgement; 1369 ENGINE_HANDLE_V1::dcp.control = dcp_control; 1370 ENGINE_HANDLE_V1::dcp.get_failover_log = dcp_get_failover_log; 1371 ENGINE_HANDLE_V1::dcp.stream_end = dcp_stream_end; 1372 ENGINE_HANDLE_V1::dcp.snapshot_marker = dcp_snapshot_marker; 1373 ENGINE_HANDLE_V1::dcp.mutation = dcp_mutation; 1374 ENGINE_HANDLE_V1::dcp.deletion = dcp_deletion; 1375 ENGINE_HANDLE_V1::dcp.deletion_v2 = dcp_deletion_v2; 1376 ENGINE_HANDLE_V1::dcp.expiration = dcp_expiration; 1377 ENGINE_HANDLE_V1::dcp.flush = dcp_flush; 1378 ENGINE_HANDLE_V1::dcp.set_vbucket_state = dcp_set_vbucket_state; 1379 ENGINE_HANDLE_V1::dcp.noop = dcp_noop; 1380 ENGINE_HANDLE_V1::dcp.response_handler = dcp_response_handler; 1381 ENGINE_HANDLE_V1::dcp.system_event = dcp_system_event; 1382 1383 ENGINE_HANDLE_V1::collections = {}; 1384 ENGINE_HANDLE_V1::collections.set_manifest = collections_set_manifest; 1385 ENGINE_HANDLE_V1::collections.get_manifest = collections_get_manifest; 1386 1387 ENGINE_HANDLE_V1::isXattrEnabled = isXattrEnabled; 1388 ENGINE_HANDLE_V1::getCompressionMode = getCompressionMode; 1389 ENGINE_HANDLE_V1::getMaxItemSize = getMaxItemSize; 1390 ENGINE_HANDLE_V1::getMinCompressionRatio = getMinCompressionRatio; 1391 1392 clustermap_revno = 1; 1393 1394 get_connection_id = gsa()->cookie->get_connection_id; 1395 1396 stop_notification_thread.store(false); 1397 notify_io_thread->start(); 1398} 1399 1400static void register_callback(ENGINE_HANDLE *eh, ENGINE_EVENT_TYPE type, 1401 EVENT_CALLBACK cb, const void *cb_data) { 1402 const auto& p = engine_map.find(eh); 1403 if (p == engine_map.end()) { 1404 std::cerr << "Can't find EWB corresponding to " << std::hex << eh << std::endl; 1405 for (const auto& pair : engine_map) { 1406 std::cerr << "EH: " << std::hex << pair.first << " = EWB: " << std::hex << pair.second << std::endl; 1407 } 1408 abort(); 1409 } 1410 cb_assert(p != engine_map.end()); 1411 auto wrapped_eh = reinterpret_cast<ENGINE_HANDLE*>(p->second); 1412 real_api->callback->register_callback(wrapped_eh, type, cb, cb_data); 1413} 1414 1415EWB_Engine::~EWB_Engine() { 1416 engine_map.erase(real_handle); 1417 cb_free(real_engine_ref); 1418 stop_notification_thread = true; 1419 condvar.notify_all(); 1420 notify_io_thread->waitForState(Couchbase::ThreadState::Zombie); 1421} 1422 1423ENGINE_ERROR_CODE EWB_Engine::dcp_step( 1424 gsl::not_null<ENGINE_HANDLE*> handle, 1425 gsl::not_null<const void*> cookie, 1426 gsl::not_null<struct dcp_message_producers*> producers) { 1427 EWB_Engine* ewb = to_engine(handle); 1428 auto stream = ewb->dcp_stream.find(cookie); 1429 if (stream != ewb->dcp_stream.end()) { 1430 auto& count = stream->second.second; 1431 // If the stream is enabled and we have data to send.. 1432 if (stream->second.first && count > 0) { 1433 // This is using the internal dcp implementation which always 1434 // send the same item back 1435 auto ret = producers->mutation(cookie, 1436 0xdeadbeef /*opqaue*/, 1437 &ewb->dcp_mutation_item, 1438 0 /*vb*/, 1439 0 /*by_seqno*/, 1440 0 /*rev_seqno*/, 1441 0 /*lock_time*/, 1442 nullptr /*meta*/, 1443 0 /*nmeta*/, 1444 0 /*nru*/, 1445 0 /*collection_len*/); 1446 --count; 1447 if (ret == ENGINE_SUCCESS) { 1448 return ENGINE_WANT_MORE; 1449 } 1450 return ret; 1451 } 1452 return ENGINE_SUCCESS; 1453 } 1454 1455 if (ewb->real_engine->dcp.step == nullptr) { 1456 return ENGINE_ENOTSUP; 1457 } else { 1458 return ewb->real_engine->dcp.step(ewb->real_handle, cookie, producers); 1459 } 1460} 1461 1462ENGINE_ERROR_CODE EWB_Engine::dcp_open(gsl::not_null<ENGINE_HANDLE*> handle, 1463 gsl::not_null<const void*> cookie, 1464 uint32_t opaque, 1465 uint32_t seqno, 1466 uint32_t flags, 1467 cb::const_char_buffer name, 1468 cb::const_byte_buffer json) { 1469 EWB_Engine* ewb = to_engine(handle); 1470 std::string nm = cb::to_string(name); 1471 if (nm.find("ewb_internal") == 0) { 1472 // Yeah, this is a request for the internal "magic" DCP stream 1473 // The user could specify the iteration count by adding a colon 1474 // at the end... 1475 auto idx = nm.rfind(":"); 1476 1477 if (idx != nm.npos) { 1478 ewb->dcp_stream[cookie] = 1479 std::make_pair(false, std::stoull(nm.substr(idx + 1))); 1480 } else { 1481 ewb->dcp_stream[cookie] = 1482 std::make_pair(false, std::numeric_limits<uint64_t>::max()); 1483 } 1484 return ENGINE_SUCCESS; 1485 } 1486 1487 if (ewb->real_engine->dcp.open == nullptr) { 1488 return ENGINE_ENOTSUP; 1489 } else { 1490 return ewb->real_engine->dcp.open(ewb->real_handle, 1491 cookie, 1492 opaque, 1493 seqno, 1494 flags, 1495 name, 1496 json); 1497 } 1498} 1499 1500ENGINE_ERROR_CODE EWB_Engine::dcp_stream_req( 1501 gsl::not_null<ENGINE_HANDLE*> handle, 1502 gsl::not_null<const void*> cookie, 1503 uint32_t flags, 1504 uint32_t opaque, 1505 uint16_t vbucket, 1506 uint64_t start_seqno, 1507 uint64_t end_seqno, 1508 uint64_t vbucket_uuid, 1509 uint64_t snap_start_seqno, 1510 uint64_t snap_end_seqno, 1511 uint64_t* rollback_seqno, 1512 dcp_add_failover_log callback) { 1513 EWB_Engine* ewb = to_engine(handle); 1514 auto stream = ewb->dcp_stream.find(cookie.get()); 1515 if (stream != ewb->dcp_stream.end()) { 1516 // This is a client of our internal streams.. just let it pass 1517 if (start_seqno == 1) { 1518 *rollback_seqno = 0; 1519 return ENGINE_ROLLBACK; 1520 } 1521 // Start the stream 1522 stream->second.first = true; 1523 return ENGINE_SUCCESS; 1524 } 1525 1526 if (ewb->real_engine->dcp.stream_req == nullptr) { 1527 return ENGINE_ENOTSUP; 1528 } else { 1529 return ewb->real_engine->dcp.stream_req(ewb->real_handle, cookie, 1530 flags, opaque, vbucket, 1531 start_seqno, end_seqno, 1532 vbucket_uuid, snap_start_seqno, 1533 snap_end_seqno, rollback_seqno, 1534 callback); 1535 } 1536} 1537 1538ENGINE_ERROR_CODE EWB_Engine::dcp_add_stream( 1539 gsl::not_null<ENGINE_HANDLE*> handle, 1540 gsl::not_null<const void*> cookie, 1541 uint32_t opaque, 1542 uint16_t vbucket, 1543 uint32_t flags) { 1544 EWB_Engine* ewb = to_engine(handle); 1545 if (ewb->real_engine->dcp.add_stream == nullptr) { 1546 return ENGINE_ENOTSUP; 1547 } else { 1548 return ewb->real_engine->dcp.add_stream(ewb->real_handle, cookie, 1549 opaque, vbucket, flags); 1550 } 1551} 1552 1553ENGINE_ERROR_CODE EWB_Engine::dcp_close_stream( 1554 gsl::not_null<ENGINE_HANDLE*> handle, 1555 gsl::not_null<const void*> cookie, 1556 uint32_t opaque, 1557 uint16_t vbucket) { 1558 EWB_Engine* ewb = to_engine(handle); 1559 if (ewb->real_engine->dcp.close_stream == nullptr) { 1560 return ENGINE_ENOTSUP; 1561 } else { 1562 return ewb->real_engine->dcp.close_stream(ewb->real_handle, cookie, 1563 opaque, vbucket); 1564 } 1565} 1566 1567ENGINE_ERROR_CODE EWB_Engine::dcp_get_failover_log( 1568 gsl::not_null<ENGINE_HANDLE*> handle, 1569 gsl::not_null<const void*> cookie, 1570 uint32_t opaque, 1571 uint16_t vbucket, 1572 dcp_add_failover_log callback) { 1573 EWB_Engine* ewb = to_engine(handle); 1574 if (ewb->real_engine->dcp.get_failover_log == nullptr) { 1575 return ENGINE_ENOTSUP; 1576 } else { 1577 return ewb->real_engine->dcp.get_failover_log(ewb->real_handle, 1578 cookie, 1579 opaque, 1580 vbucket, 1581 callback); 1582 } 1583} 1584 1585ENGINE_ERROR_CODE EWB_Engine::dcp_stream_end( 1586 gsl::not_null<ENGINE_HANDLE*> handle, 1587 gsl::not_null<const void*> cookie, 1588 uint32_t opaque, 1589 uint16_t vbucket, 1590 uint32_t flags) { 1591 EWB_Engine* ewb = to_engine(handle); 1592 if (ewb->real_engine->dcp.stream_end == nullptr) { 1593 return ENGINE_ENOTSUP; 1594 } else { 1595 return ewb->real_engine->dcp.stream_end(ewb->real_handle, cookie, 1596 opaque, vbucket, flags); 1597 } 1598} 1599 1600ENGINE_ERROR_CODE EWB_Engine::dcp_snapshot_marker( 1601 gsl::not_null<ENGINE_HANDLE*> handle, 1602 gsl::not_null<const void*> cookie, 1603 uint32_t opaque, 1604 uint16_t vbucket, 1605 uint64_t start_seqno, 1606 uint64_t end_seqno, 1607 uint32_t flags) { 1608 EWB_Engine* ewb = to_engine(handle); 1609 if (ewb->real_engine->dcp.snapshot_marker == nullptr) { 1610 return ENGINE_ENOTSUP; 1611 } else { 1612 return ewb->real_engine->dcp.snapshot_marker(ewb->real_handle, cookie, 1613 opaque, vbucket, 1614 start_seqno, end_seqno, 1615 flags); 1616 } 1617} 1618 1619ENGINE_ERROR_CODE EWB_Engine::dcp_mutation(gsl::not_null<ENGINE_HANDLE*> handle, 1620 gsl::not_null<const void*> cookie, 1621 uint32_t opaque, 1622 const DocKey& key, 1623 cb::const_byte_buffer value, 1624 size_t priv_bytes, 1625 uint8_t datatype, 1626 uint64_t cas, 1627 uint16_t vbucket, 1628 uint32_t flags, 1629 uint64_t by_seqno, 1630 uint64_t rev_seqno, 1631 uint32_t expiration, 1632 uint32_t lock_time, 1633 cb::const_byte_buffer meta, 1634 uint8_t nru) { 1635 EWB_Engine* ewb = to_engine(handle); 1636 if (ewb->real_engine->dcp.mutation == nullptr) { 1637 return ENGINE_ENOTSUP; 1638 } else { 1639 return ewb->real_engine->dcp.mutation(ewb->real_handle, cookie, opaque, 1640 key, value, priv_bytes, datatype, 1641 cas, vbucket, flags, by_seqno, 1642 rev_seqno, expiration, lock_time, 1643 meta, nru); 1644 } 1645} 1646 1647ENGINE_ERROR_CODE EWB_Engine::dcp_deletion(gsl::not_null<ENGINE_HANDLE*> handle, 1648 gsl::not_null<const void*> cookie, 1649 uint32_t opaque, 1650 const DocKey& key, 1651 cb::const_byte_buffer value, 1652 size_t priv_bytes, 1653 uint8_t datatype, 1654 uint64_t cas, 1655 uint16_t vbucket, 1656 uint64_t by_seqno, 1657 uint64_t rev_seqno, 1658 cb::const_byte_buffer meta) { 1659 EWB_Engine* ewb = to_engine(handle); 1660 if (ewb->real_engine->dcp.deletion == nullptr) { 1661 return ENGINE_ENOTSUP; 1662 } else { 1663 return ewb->real_engine->dcp.deletion(ewb->real_handle, cookie, opaque, 1664 key, value, priv_bytes, datatype, 1665 cas, vbucket, by_seqno, rev_seqno, 1666 meta); 1667 } 1668} 1669 1670ENGINE_ERROR_CODE EWB_Engine::dcp_deletion_v2( 1671 gsl::not_null<ENGINE_HANDLE*> handle, 1672 gsl::not_null<const void*> cookie, 1673 uint32_t opaque, 1674 const DocKey& key, 1675 cb::const_byte_buffer value, 1676 size_t priv_bytes, 1677 uint8_t datatype, 1678 uint64_t cas, 1679 uint16_t vbucket, 1680 uint64_t by_seqno, 1681 uint64_t rev_seqno, 1682 uint32_t delete_time) { 1683 EWB_Engine* ewb = to_engine(handle); 1684 if (ewb->real_engine->dcp.deletion_v2 == nullptr) { 1685 return ENGINE_ENOTSUP; 1686 } else { 1687 return ewb->real_engine->dcp.deletion_v2(ewb->real_handle, 1688 cookie, 1689 opaque, 1690 key, 1691 value, 1692 priv_bytes, 1693 datatype, 1694 cas, 1695 vbucket, 1696 by_seqno, 1697 rev_seqno, 1698 delete_time); 1699 } 1700} 1701 1702ENGINE_ERROR_CODE EWB_Engine::dcp_expiration( 1703 gsl::not_null<ENGINE_HANDLE*> handle, 1704 gsl::not_null<const void*> cookie, 1705 uint32_t opaque, 1706 const DocKey& key, 1707 cb::const_byte_buffer value, 1708 size_t priv_bytes, 1709 uint8_t datatype, 1710 uint64_t cas, 1711 uint16_t vbucket, 1712 uint64_t by_seqno, 1713 uint64_t rev_seqno, 1714 cb::const_byte_buffer meta) { 1715 EWB_Engine* ewb = to_engine(handle); 1716 if (ewb->real_engine->dcp.expiration == nullptr) { 1717 return ENGINE_ENOTSUP; 1718 } else { 1719 return ewb->real_engine->dcp.expiration(ewb->real_handle, cookie, opaque, 1720 key, value, priv_bytes, datatype, 1721 cas, vbucket, by_seqno, rev_seqno, 1722 meta); 1723 } 1724} 1725 1726ENGINE_ERROR_CODE EWB_Engine::dcp_flush(gsl::not_null<ENGINE_HANDLE*> handle, 1727 gsl::not_null<const void*> cookie, 1728 uint32_t opaque, 1729 uint16_t vbucket) { 1730 EWB_Engine* ewb = to_engine(handle); 1731 if (ewb->real_engine->dcp.flush == nullptr) { 1732 return ENGINE_ENOTSUP; 1733 } else { 1734 return ewb->real_engine->dcp.flush(ewb->real_handle, 1735 cookie, 1736 opaque, 1737 vbucket); 1738 } 1739} 1740 1741ENGINE_ERROR_CODE EWB_Engine::dcp_set_vbucket_state( 1742 gsl::not_null<ENGINE_HANDLE*> handle, 1743 gsl::not_null<const void*> cookie, 1744 uint32_t opaque, 1745 uint16_t vbucket, 1746 vbucket_state_t state) { 1747 EWB_Engine* ewb = to_engine(handle); 1748 if (ewb->real_engine->dcp.set_vbucket_state == nullptr) { 1749 return ENGINE_ENOTSUP; 1750 } else { 1751 return ewb->real_engine->dcp.set_vbucket_state(ewb->real_handle, 1752 cookie, 1753 opaque, 1754 vbucket, 1755 state); 1756 } 1757} 1758 1759ENGINE_ERROR_CODE EWB_Engine::dcp_noop(gsl::not_null<ENGINE_HANDLE*> handle, 1760 gsl::not_null<const void*> cookie, 1761 uint32_t opaque) { 1762 EWB_Engine* ewb = to_engine(handle); 1763 if (ewb->real_engine->dcp.noop == nullptr) { 1764 return ENGINE_ENOTSUP; 1765 } else { 1766 return ewb->real_engine->dcp.noop(ewb->real_handle, cookie, opaque); 1767 } 1768} 1769 1770ENGINE_ERROR_CODE EWB_Engine::dcp_buffer_acknowledgement( 1771 gsl::not_null<ENGINE_HANDLE*> handle, 1772 gsl::not_null<const void*> cookie, 1773 uint32_t opaque, 1774 uint16_t vbucket, 1775 uint32_t buffer_bytes) { 1776 EWB_Engine* ewb = to_engine(handle); 1777 if (ewb->real_engine->dcp.buffer_acknowledgement == nullptr) { 1778 return ENGINE_ENOTSUP; 1779 } else { 1780 return ewb->real_engine->dcp.buffer_acknowledgement(ewb->real_handle, 1781 cookie, 1782 opaque, 1783 vbucket, 1784 buffer_bytes); 1785 } 1786} 1787 1788ENGINE_ERROR_CODE EWB_Engine::dcp_control(gsl::not_null<ENGINE_HANDLE*> handle, 1789 gsl::not_null<const void*> cookie, 1790 uint32_t opaque, 1791 const void* key, 1792 uint16_t nkey, 1793 const void* value, 1794 uint32_t nvalue) { 1795 EWB_Engine* ewb = to_engine(handle); 1796 if (ewb->real_engine->dcp.control == nullptr) { 1797 return ENGINE_ENOTSUP; 1798 } else { 1799 return ewb->real_engine->dcp.control(ewb->real_handle, cookie, 1800 opaque, key, nkey, value, nvalue); 1801 } 1802} 1803 1804ENGINE_ERROR_CODE EWB_Engine::dcp_response_handler( 1805 gsl::not_null<ENGINE_HANDLE*> handle, 1806 gsl::not_null<const void*> cookie, 1807 const protocol_binary_response_header* response) { 1808 EWB_Engine* ewb = to_engine(handle); 1809 if (ewb->real_engine->dcp.response_handler == nullptr) { 1810 return ENGINE_ENOTSUP; 1811 } else { 1812 return ewb->real_engine->dcp.response_handler(ewb->real_handle, 1813 cookie, 1814 response); 1815 } 1816} 1817 1818ENGINE_ERROR_CODE EWB_Engine::dcp_system_event( 1819 gsl::not_null<ENGINE_HANDLE*> handle, 1820 gsl::not_null<const void*> cookie, 1821 uint32_t opaque, 1822 uint16_t vbucket, 1823 mcbp::systemevent::id event, 1824 uint64_t bySeqno, 1825 cb::const_byte_buffer key, 1826 cb::const_byte_buffer eventData) { 1827 EWB_Engine* ewb = to_engine(handle); 1828 if (ewb->real_engine->dcp.response_handler == nullptr) { 1829 return ENGINE_ENOTSUP; 1830 } else { 1831 return ewb->real_engine->dcp.system_event(ewb->real_handle, 1832 cookie, 1833 opaque, 1834 vbucket, 1835 event, 1836 bySeqno, 1837 key, 1838 eventData); 1839 } 1840} 1841 1842cb::engine_error EWB_Engine::collections_set_manifest( 1843 gsl::not_null<ENGINE_HANDLE*> handle, cb::const_char_buffer json) { 1844 EWB_Engine* ewb = to_engine(handle); 1845 if (ewb->real_engine->collections.set_manifest == nullptr) { 1846 return {cb::engine_errc::not_supported, 1847 "EWB_Engine::collections_set_manifest"}; 1848 } else { 1849 return ewb->real_engine->collections.set_manifest(ewb->real_handle, 1850 json); 1851 } 1852} 1853 1854cb::EngineErrorStringPair EWB_Engine::collections_get_manifest( 1855 gsl::not_null<ENGINE_HANDLE*> handle) { 1856 EWB_Engine* ewb = to_engine(handle); 1857 if (ewb->real_engine->collections.get_manifest == nullptr) { 1858 return {cb::engine_errc::not_supported, 1859 "EWB_Engine::collections_get_manifest"}; 1860 } else { 1861 return ewb->real_engine->collections.get_manifest(ewb->real_handle); 1862 } 1863} 1864 1865bool EWB_Engine::isXattrEnabled(gsl::not_null<ENGINE_HANDLE*> handle) { 1866 EWB_Engine* ewb = to_engine(handle); 1867 if (ewb->real_engine->isXattrEnabled == nullptr) { 1868 return false; 1869 } else { 1870 return ewb->real_engine->isXattrEnabled(ewb->real_handle); 1871 } 1872} 1873 1874BucketCompressionMode EWB_Engine::getCompressionMode(gsl::not_null<ENGINE_HANDLE*> handle) { 1875 EWB_Engine* ewb = to_engine(handle); 1876 if (ewb->real_engine->getCompressionMode == nullptr) { 1877 return BucketCompressionMode::Off; 1878 } else { 1879 return ewb->real_engine->getCompressionMode(ewb->real_handle); 1880 } 1881} 1882 1883size_t EWB_Engine::getMaxItemSize(gsl::not_null<ENGINE_HANDLE*> handle) { 1884 EWB_Engine* ewb = to_engine(handle); 1885 if (ewb->real_engine->getMaxItemSize == nullptr) { 1886 return default_max_item_size; 1887 } else { 1888 return ewb->real_engine->getMaxItemSize(ewb->real_handle); 1889 } 1890} 1891 1892float EWB_Engine::getMinCompressionRatio(gsl::not_null<ENGINE_HANDLE*> handle) { 1893 EWB_Engine* ewb = to_engine(handle); 1894 if (ewb->real_engine->getMinCompressionRatio == nullptr) { 1895 return default_min_compression_ratio; 1896 } else { 1897 return ewb->real_engine->getMinCompressionRatio(ewb->real_handle); 1898 } 1899} 1900 1901ENGINE_ERROR_CODE create_instance(uint64_t interface, 1902 GET_SERVER_API gsa, 1903 ENGINE_HANDLE **handle) 1904{ 1905 if (interface != 1) { 1906 return ENGINE_ENOTSUP; 1907 } 1908 1909 try { 1910 EWB_Engine* engine = new EWB_Engine(gsa); 1911 *handle = reinterpret_cast<ENGINE_HANDLE*> (engine); 1912 return ENGINE_SUCCESS; 1913 1914 } catch (std::exception& e) { 1915 auto logger = gsa()->log->get_logger(); 1916 logger->log(EXTENSION_LOG_WARNING, NULL, 1917 "EWB_Engine: failed to create engine: %s", e.what()); 1918 return ENGINE_FAILED; 1919 } 1920 1921} 1922 1923void destroy_engine(void) { 1924 // nothing todo. 1925} 1926 1927const char* EWB_Engine::to_string(const Cmd cmd) { 1928 switch (cmd) { 1929 case Cmd::NONE: 1930 return "NONE"; 1931 case Cmd::GET_INFO: 1932 return "GET_INFO"; 1933 case Cmd::GET_META: 1934 return "GET_META"; 1935 case Cmd::ALLOCATE: 1936 return "ALLOCATE"; 1937 case Cmd::REMOVE: 1938 return "REMOVE"; 1939 case Cmd::GET: 1940 return "GET"; 1941 case Cmd::STORE: 1942 return "STORE"; 1943 case Cmd::CAS: 1944 return "CAS"; 1945 case Cmd::ARITHMETIC: 1946 return "ARITHMETIC"; 1947 case Cmd::FLUSH: 1948 return "FLUSH"; 1949 case Cmd::GET_STATS: 1950 return "GET_STATS"; 1951 case Cmd::UNKNOWN_COMMAND: 1952 return "UNKNOWN_COMMAND"; 1953 case Cmd::LOCK: 1954 return "LOCK"; 1955 case Cmd::UNLOCK: 1956 return "UNLOCK"; 1957 } 1958 throw std::invalid_argument("EWB_Engine::to_string() Unknown command"); 1959} 1960 1961void EWB_Engine::process_notifications() { 1962 SERVER_HANDLE_V1* server = gsa(); 1963 LOG_DEBUG("EWB_Engine: notification thread running for engine {}", 1964 (void*)this); 1965 std::unique_lock<std::mutex> lk(mutex); 1966 while (!stop_notification_thread) { 1967 condvar.wait(lk); 1968 while (!pending_io_ops.empty()) { 1969 const void* cookie = pending_io_ops.front(); 1970 pending_io_ops.pop(); 1971 lk.unlock(); 1972 LOG_DEBUG("EWB_Engine: notify {}", cookie); 1973 server->cookie->notify_io_complete(cookie, ENGINE_SUCCESS); 1974 lk.lock(); 1975 } 1976 } 1977 1978 LOG_DEBUG("EWB_Engine: notification thread stopping for engine {}", 1979 (void*)this); 1980} 1981 1982void NotificationThread::run() { 1983 setRunning(); 1984 engine.process_notifications(); 1985} 1986 1987ENGINE_ERROR_CODE EWB_Engine::handleBlockMonitorFile(const void* cookie, 1988 uint32_t id, 1989 const std::string& file, 1990 ADD_RESPONSE response) { 1991 if (file.empty()) { 1992 return ENGINE_EINVAL; 1993 } 1994 1995 if (!cb::io::isFile(file)) { 1996 return ENGINE_KEY_ENOENT; 1997 } 1998 1999 if (!suspend(cookie, id)) { 2000 LOG_WARNING( 2001 "EWB_Engine::handleBlockMonitorFile(): " 2002 "Id {} already registered", 2003 id); 2004 return ENGINE_KEY_EEXISTS; 2005 } 2006 2007 try { 2008 std::unique_ptr<Couchbase::Thread> thread( 2009 new BlockMonitorThread(*this, id, file)); 2010 thread->start(); 2011 std::lock_guard<std::mutex> guard(threads_mutex); 2012 threads.emplace_back(thread.release()); 2013 } catch (std::exception& e) { 2014 LOG_WARNING( 2015 "EWB_Engine::handleBlockMonitorFile(): Failed to create " 2016 "block monitor thread: {}", 2017 e.what()); 2018 return ENGINE_FAILED; 2019 } 2020 2021 LOG_DEBUG( 2022 "Registered connection {} (engine {}) as {} to be" 2023 " suspended. Monitor file {}", 2024 cookie, 2025 (void*)this, 2026 id, 2027 file.c_str()); 2028 2029 response(nullptr, 0, nullptr, 0, nullptr, 0, 2030 PROTOCOL_BINARY_RAW_BYTES, 2031 PROTOCOL_BINARY_RESPONSE_SUCCESS, /*cas*/0, cookie); 2032 return ENGINE_SUCCESS; 2033} 2034 2035ENGINE_ERROR_CODE EWB_Engine::handleSuspend(const void* cookie, 2036 uint32_t id, 2037 ADD_RESPONSE response) { 2038 if (suspend(cookie, id)) { 2039 LOG_DEBUG("Registered connection {} as {} to be suspended", cookie, id); 2040 response(nullptr, 0, nullptr, 0, nullptr, 0, 2041 PROTOCOL_BINARY_RAW_BYTES, 2042 PROTOCOL_BINARY_RESPONSE_SUCCESS, /*cas*/0, cookie); 2043 return ENGINE_SUCCESS; 2044 } else { 2045 LOG_WARNING("EWB_Engine::handleSuspend(): Id {} already registered", 2046 id); 2047 return ENGINE_KEY_EEXISTS; 2048 } 2049} 2050 2051ENGINE_ERROR_CODE EWB_Engine::handleResume(const void* cookie, uint32_t id, 2052 ADD_RESPONSE response) { 2053 if (resume(id)) { 2054 LOG_DEBUG("Connection with id {} will be resumed", id); 2055 response(nullptr, 0, nullptr, 0, nullptr, 0, 2056 PROTOCOL_BINARY_RAW_BYTES, 2057 PROTOCOL_BINARY_RESPONSE_SUCCESS, /*cas*/0, cookie); 2058 return ENGINE_SUCCESS; 2059 } else { 2060 LOG_WARNING( 2061 "EWB_Engine::unknown_command(): No " 2062 "connection registered with id {}", 2063 id); 2064 return ENGINE_EINVAL; 2065 } 2066} 2067 2068ENGINE_ERROR_CODE EWB_Engine::setItemCas(const void *cookie, 2069 const std::string& key, 2070 uint32_t cas, 2071 ADD_RESPONSE response) { 2072 uint64_t cas64 = cas; 2073 if (cas == static_cast<uint32_t>(-1)) { 2074 cas64 = LOCKED_CAS; 2075 } 2076 2077 auto rv = real_engine->get(real_handle, 2078 cookie, 2079 DocKey{key, DocNamespace::DefaultCollection}, 2080 0, 2081 DocStateFilter::Alive); 2082 if (rv.first != cb::engine_errc::success) { 2083 return ENGINE_ERROR_CODE(rv.first); 2084 } 2085 2086 // item_set_cas has no return value! 2087 real_engine->item_set_cas(real_handle, rv.second.get(), cas64); 2088 response(nullptr, 0, nullptr, 0, nullptr, 0, 2089 PROTOCOL_BINARY_RAW_BYTES, 2090 PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie); 2091 return ENGINE_SUCCESS; 2092} 2093 2094void BlockMonitorThread::run() { 2095 setRunning(); 2096 2097 LOG_DEBUG("Block monitor for file {} started", file); 2098 2099 // @todo Use the file monitoring API's to avoid this "busy" loop 2100 while (cb::io::isFile(file)) { 2101 usleep(100); 2102 } 2103 2104 LOG_DEBUG("Block monitor for file {} stopping (file is gone)", file); 2105 engine.resume(id); 2106} 2107