1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2 /* 3 * Copyright 2015 Couchbase, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 #pragma once 18 19 #include <boost/optional/optional_fwd.hpp> 20 #include <engines/ewouldblock_engine/ewouldblock_engine.h> 21 #include <memcached/engine_error.h> 22 #include <memcached/openssl.h> 23 #include <memcached/protocol_binary.h> 24 #include <memcached/rbac.h> 25 #include <memcached/types.h> 26 #include <platform/dynamic.h> 27 #include <platform/sized_buffer.h> 28 #include <platform/socket.h> 29 30 #include <nlohmann/json_fwd.hpp> 31 32 #include <chrono> 33 #include <cstdlib> 34 #include <functional> 35 #include <stdexcept> 36 #include <string> 37 #include <unordered_set> 38 #include <utility> 39 #include <vector> 40 41 class FrameInfo; 42 43 using FrameInfoVector = std::vector<std::unique_ptr<FrameInfo>>; 44 using GetFrameInfoFunction = std::function<FrameInfoVector()>; 45 46 /** 47 * The Frame class is used to represent all of the data included in the 48 * protocol unit going over the wire. 49 */ 50 class Frame { 51 public: reset()52 void reset() { 53 payload.resize(0); 54 } 55 getMagic() const56 cb::mcbp::Magic getMagic() const { 57 const auto magic(cb::mcbp::Magic(payload.at(0))); 58 if (!cb::mcbp::is_legal(magic)) { 59 throw std::invalid_argument( 60 "Frame::getMagic: invalid magic provided in buffer"); 61 } 62 63 return magic; 64 } 65 getRequest() const66 const cb::mcbp::Request* getRequest() const { 67 return reinterpret_cast<const cb::mcbp::Request*>(payload.data()); 68 } 69 getResponse() const70 const cb::mcbp::Response* getResponse() const { 71 return reinterpret_cast<const cb::mcbp::Response*>(payload.data()); 72 } 73 74 std::vector<uint8_t> payload; 75 typedef std::vector<uint8_t>::size_type size_type; 76 }; 77 78 class DocumentInfo { 79 public: 80 std::string id; 81 uint32_t flags; 82 uint32_t expiration; 83 cb::mcbp::Datatype datatype; 84 uint64_t cas; 85 DocumentInfo()86 DocumentInfo() 87 : id(""), 88 flags(0), 89 expiration(0), 90 datatype(cb::mcbp::Datatype::Raw), 91 cas(0) { 92 } 93 operator ==(const DocumentInfo& rhs) const94 bool operator==(const DocumentInfo& rhs) const { 95 return (id == rhs.id) && (flags == rhs.flags) && 96 (expiration == rhs.expiration) && (datatype == rhs.datatype) && 97 (cas == rhs.cas); 98 } 99 }; 100 101 ::std::ostream& operator<<(::std::ostream& os, const DocumentInfo& info); 102 103 class Document { 104 public: operator ==(const Document& rhs) const105 bool operator==(const Document& rhs) const { 106 return (info == rhs.info) && (value == rhs.value); 107 } 108 109 /** 110 * Compress this document using Snappy. Replaces the value with a compressed 111 * version, adds Snappy to the set of datatypes. 112 */ 113 void compress(); 114 115 DocumentInfo info; 116 std::string value; 117 }; 118 119 ::std::ostream& operator<<(::std::ostream& os, const Document& doc); 120 121 class MutationInfo { 122 public: 123 uint64_t cas; 124 size_t size; 125 uint64_t seqno; 126 uint64_t vbucketuuid; 127 }; 128 129 struct ObserveInfo { 130 uint8_t formatType; 131 Vbid vbId; 132 uint64_t uuid; 133 uint64_t lastPersistedSeqno; 134 uint64_t currentSeqno; 135 uint64_t failoverUUID; 136 uint64_t failoverSeqno; 137 }; 138 139 enum class BucketType : uint8_t { 140 Invalid = 0, 141 Memcached = 1, 142 Couchbase = 2, 143 EWouldBlock = 3 144 }; 145 146 enum class MutationType { 147 Add, Set, Replace, Append, Prepend 148 }; 149 150 std::string to_string(MutationType type); 151 152 class BinprotResponse; 153 class BinprotCommand; 154 155 class ConnectionError : public std::runtime_error { 156 public: 157 ConnectionError(const std::string& prefix, cb::mcbp::Status reason_); 158 159 ConnectionError(const std::string& prefix, const BinprotResponse& response); 160 getReason() const161 cb::mcbp::Status getReason() const { 162 return reason; 163 } 164 isInvalidArguments() const165 bool isInvalidArguments() const { 166 return reason == cb::mcbp::Status::Einval; 167 } 168 isAlreadyExists() const169 bool isAlreadyExists() const { 170 return reason == cb::mcbp::Status::KeyEexists; 171 } 172 isNotFound() const173 bool isNotFound() const { 174 return reason == cb::mcbp::Status::KeyEnoent; 175 } 176 isNotMyVbucket() const177 bool isNotMyVbucket() const { 178 return reason == cb::mcbp::Status::NotMyVbucket; 179 } 180 isNotStored() const181 bool isNotStored() const { 182 return reason == cb::mcbp::Status::NotStored; 183 } 184 isAccessDenied() const185 bool isAccessDenied() const { 186 return reason == cb::mcbp::Status::Eaccess; 187 } 188 isDeltaBadval() const189 bool isDeltaBadval() const { 190 return reason == cb::mcbp::Status::DeltaBadval; 191 } 192 isAuthError() const193 bool isAuthError() const { 194 return reason == cb::mcbp::Status::AuthError; 195 } 196 isNotSupported() const197 bool isNotSupported() const { 198 return reason == cb::mcbp::Status::NotSupported; 199 } 200 isLocked() const201 bool isLocked() const { 202 return reason == cb::mcbp::Status::Locked; 203 } 204 isTemporaryFailure() const205 bool isTemporaryFailure() const { 206 return reason == cb::mcbp::Status::Etmpfail; 207 } 208 isTooBig() const209 bool isTooBig() const { 210 return reason == cb::mcbp::Status::E2big; 211 } 212 213 std::string getErrorContext() const; 214 215 nlohmann::json getErrorJsonContext() const; 216 217 private: 218 const cb::mcbp::Status reason; 219 const std::string payload; 220 }; 221 222 /** 223 * Exception thrown when the received response deosn't match our expections. 224 */ 225 struct ValidationError : public std::runtime_error { ValidationErrorBucketType::ValidationError226 explicit ValidationError(const std::string& msg) : std::runtime_error(msg) { 227 } 228 }; 229 230 /** 231 * The execution mode represents the mode the server executes commands 232 * retrieved over the network. In an Ordered mode (that's the default mode 233 * and how things was defined in the initial implementation of the binary 234 * protocol) the server must not start executing the next command before 235 * the execution of the current command is completed. In Unordered mode 236 * the server may start executing (and report the result back to the client) 237 * whenever it feels like. 238 */ 239 enum class ExecutionMode { Ordered, Unordered }; 240 241 /** 242 * The MemcachedConnection class is an abstract class representing a 243 * connection to memcached. The concrete implementations of the class 244 * implements the Memcached binary protocol and Greenstack. 245 * 246 * By default a connection is set into a synchronous mode. 247 * 248 * All methods is expeted to work, and all failures is reported through 249 * exceptions. Unexpected packets / responses etc will use the ConnectionError, 250 * and other problems (like network error etc) use std::runtime_error. 251 * 252 */ 253 class MemcachedConnection { 254 public: 255 MemcachedConnection() = delete; 256 257 MemcachedConnection(const MemcachedConnection&) = delete; 258 259 /** 260 * Create a new instance of the MemcachedConnection 261 * 262 * @param host the hostname to connect to (empty == localhost) 263 * @param port the port number to connect to 264 * @param family the socket family to connect as (AF_INET, AF_INET6 265 * or use AF_UNSPEC to just pick one) 266 * @param ssl connect over SSL or not 267 */ 268 MemcachedConnection(std::string host, 269 in_port_t port, 270 sa_family_t family, 271 bool ssl); 272 273 ~MemcachedConnection(); 274 275 /** 276 * Release the socket from this instance. The caller is required 277 * to close the socket when it is no longer in use! 278 * 279 * @return the underlying socket 280 */ 281 SOCKET releaseSocket(); 282 283 // Set a tag / label on this connection setTag(std::string tag)284 void setTag(std::string tag) { 285 MemcachedConnection::tag = std::move(tag); 286 } 287 getTag() const288 const std::string& getTag() const { 289 return tag; 290 } 291 292 // Creates clone (copy) of the given connection - i.e. a second independent 293 // channel to memcached. Used for multi-connection testing. 294 std::unique_ptr<MemcachedConnection> clone(); 295 getPort() const296 in_port_t getPort() const { 297 return port; 298 } 299 getFamily() const300 sa_family_t getFamily() const { 301 return family; 302 } 303 isSsl() const304 bool isSsl() const { 305 return ssl; 306 } 307 308 /** 309 * Set the SSL Certificate file to use 310 * 311 * @throws std::system_error if the file doesn't exist 312 */ 313 void setSslCertFile(const std::string& file); 314 315 /** 316 * Set the SSL private key file to use 317 * 318 * @throws std::system_error if the file doesn't exist 319 */ 320 void setSslKeyFile(const std::string& file); 321 322 /// Set the TLS version to use 323 void setTlsProtocol(std::string protocol); 324 /// Set the ciphers to use for TLS < 1.3 325 void setTls12Ciphers(std::string ciphers); 326 /// Set the ciphers to use for TLS >= 1.3 327 void setTls13Ciphers(std::string ciphers); 328 329 /** 330 * Try to establish a connection to the server. 331 * 332 * @thows std::exception if an error occurs 333 */ 334 void connect(); 335 336 /** 337 * Close the connection to the server 338 */ 339 void close(); 340 341 /** 342 * Drop the current connection to the server and re-establish the 343 * connection. 344 */ reconnect()345 void reconnect() { 346 close(); 347 connect(); 348 } 349 350 /** 351 * Perform a SASL authentication to memcached 352 * 353 * @param username the username to use in authentication 354 * @param password the password to use in authentication 355 * @param mech the SASL mech to use 356 */ 357 void authenticate(const std::string& username, 358 const std::string& password, 359 const std::string& mech); 360 361 /** 362 * Create a bucket 363 * 364 * @param name the name of the bucket 365 * @param config the buckets configuration attributes 366 * @param type the kind of bucket to create 367 */ 368 void createBucket(const std::string& name, 369 const std::string& config, 370 BucketType type); 371 372 /** 373 * Delete the named bucket 374 * 375 * @param name the name of the bucket 376 */ 377 void deleteBucket(const std::string& name); 378 379 /** 380 * Select the named bucket 381 * 382 * @param name the name of the bucket to select 383 */ 384 void selectBucket(const std::string& name); 385 386 /** 387 * List all of the buckets on the server 388 * 389 * @return a vector containing all of the buckets 390 */ 391 std::vector<std::string> listBuckets( 392 GetFrameInfoFunction getFrameInfo = {}); 393 394 /** 395 * Fetch a document from the server 396 * 397 * @param id the name of the document 398 * @param vbucket the vbucket the document resides in 399 * @return a document object containg the information about the 400 * document. 401 */ 402 Document get(const std::string& id, 403 Vbid vbucket, 404 GetFrameInfoFunction getFrameInfo = {}); 405 406 /** 407 * Fetch multiple documents 408 * 409 * Send a pipeline of (quiet) get commands to the server and fire 410 * the documentCallback with the documents found in the server. 411 * 412 * If the server returns with an error the provided error callback 413 * will be called. (note that you won't receive a callback for 414 * documents that don't exist on the server as we're using the 415 * quiet commands. 416 * 417 * Use the getFrameInfo method if you'd like the server to perform 418 * out of order requests (note: the connection must be set to 419 * allow unordered execution). 420 * 421 * @param id The key and the vbucket the document resides in 422 * @param documentCallback the callback with the document for an 423 * operation 424 * @param errorCallback the callback if the server returns an error 425 * @param getFrameInfo Optional FrameInfo to inject to the commands 426 * @return A vector containing all of the found documents 427 */ 428 void mget(const std::vector<std::pair<const std::string, Vbid>>& id, 429 std::function<void(std::unique_ptr<Document>&)> documentCallback, 430 std::function<void(const std::string&, const cb::mcbp::Response&)> 431 errorCallback = {}, 432 GetFrameInfoFunction getFrameInfo = {}); 433 434 /** 435 * Fetch and lock a document from the server 436 * 437 * @param id the name of the document 438 * @param vbucket the vbucket the document resides in 439 * @param lock_timeout the timeout (in sec) for the lock. 0 means 440 * use the default lock timeout from the server 441 * @return a document object containing the information about the 442 * document. 443 */ 444 Document get_and_lock(const std::string& id, 445 Vbid vbucket, 446 uint32_t lock_timeout, 447 GetFrameInfoFunction getFrameInfo = {}); 448 449 /** 450 * Get the Failover Log for a given VBucket 451 * 452 * @param vbucket 453 * @return the raw BinprotResponse 454 */ 455 BinprotResponse getFailoverLog(Vbid vbucket, 456 GetFrameInfoFunction getFrameInfo = {}); 457 458 /** 459 * Unlock a locked document 460 * 461 * @param id the name of the document 462 * @param vbucket the vbucket the document resides in 463 * @param cas the cas identifier of the locked document 464 */ 465 void unlock(const std::string& id, 466 Vbid vbucket, 467 uint64_t cas, 468 GetFrameInfoFunction getFrameInfo = {}); 469 470 void dropPrivilege(cb::rbac::Privilege privilege, 471 GetFrameInfoFunction getFrameInfo = {}); 472 473 /* 474 * Form a Frame representing a CMD_GET 475 */ 476 static Frame encodeCmdGet(const std::string& id, Vbid vbucket); 477 mutate(const Document& doc, Vbid vbucket, MutationType type, GetFrameInfoFunction getFrameInfo = {})478 MutationInfo mutate(const Document& doc, 479 Vbid vbucket, 480 MutationType type, 481 GetFrameInfoFunction getFrameInfo = {}) { 482 return mutate(doc.info, 483 vbucket, 484 cb::const_byte_buffer(reinterpret_cast<const uint8_t*>( 485 doc.value.data()), 486 doc.value.size()), 487 type, 488 getFrameInfo); 489 } 490 491 /** 492 * Perform the mutation on the attached document. 493 * 494 * The method throws an exception upon errors 495 * 496 * @param info Document metadata 497 * @param vbucket the vbucket to operate on 498 * @param value new value for the document 499 * @param type the type of mutation to perform 500 * @return the new cas value for success 501 */ 502 MutationInfo mutate(const DocumentInfo& info, 503 Vbid vbucket, 504 cb::const_byte_buffer value, 505 MutationType type, 506 GetFrameInfoFunction getFrameInfo = {}); 507 508 /** 509 * Convenience method to store (aka "upsert") an item. 510 * @param id The item's ID 511 * @param vbucket vBucket 512 * @param value Value of the item. 513 * @return The mutation result. 514 */ 515 MutationInfo store(const std::string& id, 516 Vbid vbucket, 517 std::string value, 518 cb::mcbp::Datatype datatype = cb::mcbp::Datatype::Raw, 519 GetFrameInfoFunction getFrameInfo = {}); 520 521 /** 522 * Get statistics from the server, and fire a callback with the key and 523 * value of each reported stat 524 * 525 * @param callback the callback to call for each stat 526 * @param group the stats group to request 527 */ 528 void 529 stats(std::function<void(const std::string&, const std::string&)> callback, 530 const std::string& group = std::string{}); 531 532 /** 533 * Get stats as a map 534 * @param subcommand 535 * @return 536 */ 537 std::map<std::string, std::string> statsMap(const std::string& subcommand); 538 539 nlohmann::json stats(const std::string& subcommand); 540 541 /** 542 * Instruct the audit daemon to reload the configuration 543 */ 544 void reloadAuditConfiguration(GetFrameInfoFunction getFrameInfo = {}); 545 546 /** 547 * Sent the given frame over this connection 548 * 549 * @param frame the frame to send to the server 550 */ 551 void sendFrame(const Frame& frame); 552 553 /** Send part of the given frame over this connection. Upon success, 554 * the frame's payload will be modified such that the sent bytes are 555 * deleted - i.e. after a successful call the frame object will only have 556 * the remaining, unsent bytes left. 557 * 558 * @param frame The frame to partially send. 559 * @param length The number of bytes to transmit. Must be less than or 560 * equal to the size of the frame. 561 */ 562 void sendPartialFrame(Frame& frame, Frame::size_type length); 563 564 /** 565 * Receive the next frame on the connection 566 * 567 * @param frame the frame object to populate with the next frame 568 */ 569 void recvFrame(Frame& frame); 570 571 void sendCommand(const BinprotCommand& command); 572 573 void recvResponse(BinprotResponse& response); 574 575 /** 576 * Execute a command on the server and return the raw response packet. 577 */ 578 BinprotResponse execute(const BinprotCommand& command); 579 580 /// Execute a command on the server and return the response 581 Frame execute(const Frame& frame); 582 /** 583 * Get a textual representation of this connection 584 * 585 * @return a textual representation of the connection including the 586 * protocol and any special attributes 587 */ 588 std::string to_string(); 589 590 /** 591 * Try to configure the ewouldblock engine 592 * 593 * See the header /engines/ewouldblock_engine/ewouldblock_engine.h 594 * for a full description on the parameters. 595 */ 596 void configureEwouldBlockEngine( 597 const EWBEngineMode& mode, 598 ENGINE_ERROR_CODE err_code = ENGINE_EWOULDBLOCK, 599 uint32_t value = 0, 600 const std::string& key = ""); 601 602 /** 603 * Disable the ewouldblock engine entirely. 604 */ disableEwouldBlockEngine()605 void disableEwouldBlockEngine() { 606 // We disable the engine by telling it to inject the given error 607 // the next 0 times 608 configureEwouldBlockEngine(EWBEngineMode::Next_N, ENGINE_SUCCESS, 0); 609 } 610 611 /** 612 * Identify ourself to the server. 613 * 614 * @throws std::runtime_error if an error occurs 615 */ 616 void hello(const std::string& userAgent, 617 const std::string& userAgentVersion, 618 const std::string& comment); 619 620 /** 621 * Get the servers SASL mechanisms. 622 * 623 * @throws std::runtime_error if an error occurs 624 */ 625 std::string getSaslMechanisms(); 626 627 /** 628 * Request the IOCTL value from the server 629 * 630 * @param key the IOCTL to request 631 * @return A textual representation of the key 632 */ 633 std::string ioctl_get(const std::string& key, 634 GetFrameInfoFunction getFrameInfo = {}); 635 636 /** 637 * Perform an IOCTL on the server 638 * 639 * @param key the IOCTL to set 640 * @param value the value to specify for the given key 641 */ 642 void ioctl_set(const std::string& key, 643 const std::string& value, 644 GetFrameInfoFunction getFrameInfo = {}); 645 646 /** 647 * Perform an arithmetic operation on a document (increment or decrement) 648 * 649 * You may use this method when operating on "small" delta values which 650 * fit into a signed 64 bit integer. If you for some reason need to 651 * incr / decr values above that you must use increment and decrement 652 * directly. 653 * 654 * @param key the document to operate on 655 * @param delta The value to increment / decrement 656 * @param initial Create with the initial value (exptime must be set to 657 * != 0xffffffff) 658 * @param exptime The expiry time for the document 659 * @param info Where to store the mutation info. 660 * @return The new value for the counter 661 */ arithmetic(const std::string& key, int64_t delta, uint64_t initial = 0, rel_time_t exptime = 0, MutationInfo* info = nullptr, GetFrameInfoFunction getFrameInfo = {})662 uint64_t arithmetic(const std::string& key, 663 int64_t delta, 664 uint64_t initial = 0, 665 rel_time_t exptime = 0, 666 MutationInfo* info = nullptr, 667 GetFrameInfoFunction getFrameInfo = {}) { 668 if (delta < 0) { 669 return decrement(key, 670 uint64_t(std::abs(delta)), 671 initial, 672 exptime, 673 info, 674 getFrameInfo); 675 } else { 676 return increment( 677 key, uint64_t(delta), initial, exptime, info, getFrameInfo); 678 } 679 } 680 681 /** 682 * Perform an increment operation on a document 683 * 684 * This method only exists in order to test the situations where you want 685 * to increment a value that wouldn't fit into a signed 64 bit integer. 686 * 687 * @param key the document to operate on 688 * @param delta The value to increment 689 * @param initial Create with the initial value (exptime must be set to 690 * != 0xffffffff) 691 * @param exptime The expiry time for the document 692 * @param info Where to store the mutation info. 693 * @return The new value for the counter 694 */ 695 uint64_t increment(const std::string& key, 696 uint64_t delta, 697 uint64_t initial = 0, 698 rel_time_t exptime = 0, 699 MutationInfo* info = nullptr, 700 GetFrameInfoFunction getFrameInfo = {}); 701 702 /** 703 * Perform an decrement operation on a document 704 * 705 * @param key the document to operate on 706 * @param delta The value to increment / decrement 707 * @param initial Create with the initial value (exptime must be set to 708 * != 0xffffffff) 709 * @param exptime The expiry time for the document 710 * @param info Where to store the mutation info. 711 * @return The new value for the counter 712 */ 713 uint64_t decrement(const std::string& key, 714 uint64_t delta, 715 uint64_t initial = 0, 716 rel_time_t exptime = 0, 717 MutationInfo* info = nullptr, 718 GetFrameInfoFunction getFrameInfo = {}); 719 720 /** 721 * Remove the named document 722 * 723 * @param key the document to remove 724 * @param vbucket the vbucket the document is stored in 725 * @param cas the specific version of the document or 0 for "any" 726 * @return Details about the detion 727 */ 728 MutationInfo remove(const std::string& key, 729 Vbid vbucket, 730 uint64_t cas = 0, 731 GetFrameInfoFunction getFrameInfo = {}); 732 733 /** 734 * Mutate with meta - stores doc into the bucket using all the metadata 735 * from doc, e.g. doc.cas will become the stored cas (on success). 736 * 737 * @param doc The document to set 738 * @param vbucket The vbucket the document is stored in 739 * @param cas The cas used for the setWithMeta (note this cas is not stored 740 * on success) 741 * @param seqno The seqno to store the document as 742 * @param metaOption MCBP options that can be sent with the command 743 * @param metaExtras Optional - see ep/src/ext_meta_parser.h for the details 744 * of this. 745 */ 746 MutationInfo mutateWithMeta(Document& doc, 747 Vbid vbucket, 748 uint64_t cas, 749 uint64_t seqno, 750 uint32_t metaOption, 751 std::vector<uint8_t> metaExtras = {}, 752 GetFrameInfoFunction getFrameInfo = {}); 753 754 std::pair<cb::mcbp::Status, GetMetaResponse> getMeta( 755 const std::string& key, 756 Vbid vbucket, 757 GetMetaVersion version, 758 GetFrameInfoFunction getFrameInfo = {}); 759 760 /** 761 * Evict the provided key 762 * 763 * @param key The key to evict 764 * @param vbucket The vbucket the key belongs to 765 * @param getFrameInfo Optional frame ids 766 */ 767 void evict(const std::string& key, 768 Vbid vbucket, 769 GetFrameInfoFunction getFrameInfo = {}); 770 771 /** 772 * Observe Seqno command - retrieve the persistence status of the given 773 * vBucket and UUID. 774 */ 775 ObserveInfo observeSeqno(Vbid vbid, 776 uint64_t uuid, 777 GetFrameInfoFunction getFrameInfo = {}); 778 779 /// Enable persistence for the connected bucket. 780 void enablePersistence(GetFrameInfoFunction getFrameInfo = {}); 781 782 /// Disable persistence for the connected bucket. 783 void disablePersistence(GetFrameInfoFunction getFrameInfo = {}); 784 hasFeature(cb::mcbp::Feature feature) const785 bool hasFeature(cb::mcbp::Feature feature) const { 786 return effective_features.find(uint16_t(feature)) != 787 effective_features.end(); 788 } 789 setDatatypeJson(bool enable)790 void setDatatypeJson(bool enable) { 791 setFeature(cb::mcbp::Feature::JSON, enable); 792 } 793 setMutationSeqnoSupport(bool enable)794 void setMutationSeqnoSupport(bool enable) { 795 setFeature(cb::mcbp::Feature::MUTATION_SEQNO, enable); 796 } 797 setXattrSupport(bool enable)798 void setXattrSupport(bool enable) { 799 setFeature(cb::mcbp::Feature::XATTR, enable); 800 } 801 setXerrorSupport(bool enable)802 void setXerrorSupport(bool enable) { 803 setFeature(cb::mcbp::Feature::XERROR, enable); 804 } 805 setDuplexSupport(bool enable)806 void setDuplexSupport(bool enable) { 807 setFeature(cb::mcbp::Feature::Duplex, enable); 808 } 809 setClustermapChangeNotification(bool enable)810 void setClustermapChangeNotification(bool enable) { 811 setFeature(cb::mcbp::Feature::ClustermapChangeNotification, enable); 812 } 813 814 void setUnorderedExecutionMode(ExecutionMode mode); 815 816 /** 817 * Attempts to enable or disable a feature 818 * @param feature Feature to enable or disable 819 * @param enabled whether to enable or disable 820 */ 821 void setFeature(cb::mcbp::Feature feature, bool enabled); 822 getTraceData() const823 boost::optional<std::chrono::microseconds> getTraceData() const { 824 return traceData; 825 } 826 827 /** 828 * Set the connection features to use 829 * 830 * @param agent the agent name to report to the server 831 * @param features a vector containing all of the features to try 832 * to enable on the server 833 */ 834 void setFeatures(const std::string& agent, 835 const std::vector<cb::mcbp::Feature>& features); 836 837 void setVbucket(Vbid vbid, 838 vbucket_state_t state, 839 const nlohmann::json& payload, 840 GetFrameInfoFunction getFrameInfo = {}); 841 842 /// should the client automatically retry operations which fail 843 /// with a tmpfail or not (note that this is only possible when 844 /// the client object have the command frame available setAutoRetryTmpfail(bool value)845 void setAutoRetryTmpfail(bool value) { 846 auto_retry_tmpfail = value; 847 } 848 getAutoRetryTmpfail() const849 bool getAutoRetryTmpfail() const { 850 return auto_retry_tmpfail; 851 } 852 853 Document getRandomKey(Vbid vbid); 854 855 protected: 856 void read(Frame& frame, size_t bytes); 857 858 void readPlain(Frame& frame, size_t bytes); 859 860 void readSsl(Frame& frame, size_t bytes); 861 862 void sendBuffer(const std::vector<iovec>& buf); 863 void sendBuffer(cb::const_byte_buffer buf); 864 865 void sendBufferPlain(cb::const_byte_buffer buf); 866 void sendBufferPlain(const std::vector<iovec>& list); 867 868 void sendBufferSsl(cb::const_byte_buffer buf); 869 void sendBufferSsl(const std::vector<iovec>& list); 870 871 void applyFrameInfos(BinprotCommand& command, GetFrameInfoFunction& fi); 872 873 /** 874 * Keep on calling the executor function until it returns true. 875 * 876 * Every time the function returns false the thread sleeps for the 877 * provided number of milliseconds. If the loop takes longer than 878 * the provided number of seconds the method throws an exception. 879 * 880 * @param executor The function to call until it returns true 881 * @param backoff The number of milliseconds to back off 882 * @param timeout The number of seconds until an exception is thrown 883 * @throws std::runtime_error for timeouts 884 */ 885 void backoff_execute( 886 std::function<bool()> executor, 887 std::chrono::milliseconds backoff = std::chrono::milliseconds(10), 888 std::chrono::seconds timeout = std::chrono::seconds(30)); 889 890 std::string host; 891 in_port_t port; 892 sa_family_t family; 893 bool auto_retry_tmpfail = false; 894 bool ssl; 895 std::string tls_protocol; 896 std::string tls12_ciphers{"HIGH"}; 897 std::string tls13_ciphers{ 898 "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_128_" 899 "GCM_SHA256"}; 900 std::string ssl_cert_file; 901 std::string ssl_key_file; 902 SSL_CTX* context = nullptr; 903 BIO* bio = nullptr; 904 SOCKET sock = INVALID_SOCKET; 905 std::string tag; 906 boost::optional<std::chrono::microseconds> traceData; 907 908 typedef std::unordered_set<uint16_t> Featureset; 909 910 uint64_t incr_decr(cb::mcbp::ClientOpcode opcode, 911 const std::string& key, 912 uint64_t delta, 913 uint64_t initial, 914 rel_time_t exptime, 915 MutationInfo* info, 916 GetFrameInfoFunction getFrameInfo = {}); 917 918 /** 919 * Set the features on the server by using the MCBP hello command 920 * 921 * The internal `features` array is updated with the result sent back 922 * from the server. 923 * 924 * @param agent the agent name provided by the client 925 * @param feat the featureset to enable. 926 */ 927 void applyFeatures(const std::string& agent, const Featureset& features); 928 929 Featureset effective_features; 930 }; 931 932 namespace cb { 933 namespace net { 934 /** 935 * Create a new socket and connect it to the given host 936 * 937 * @param host The name of the host to try to connect to. If 938 * empty (or set to localhost) it'll be replaced 939 * with "127.0.0.1" or "::1" depending on the value 940 * of family 941 * @param port The port number to connect to 942 * @param family The socket family to create (AF_INET/AF_INET6/AF_UNSPEC) 943 * @return The connected socket or INVALID_SOCKET if we failed to connect 944 * to the socket 945 * 946 */ 947 SOCKET new_socket(const std::string& host, in_port_t port, sa_family_t family); 948 949 /** 950 * Create a new socket and connect it to the given host 951 * 952 * @param host The name of the host to try to connect to. If 953 * empty (or set to localhost) it'll be replaced 954 * with "127.0.0.1" or "::1" depending on the value 955 * of family 956 * @param port The port number to connect to 957 * @param family The socket family to create (AF_INET/AF_INET6/AF_UNSPEC) 958 * @param setup_ssl_ctx callback to configure the SSL context 959 * @return Tuple with: 960 * SOCKET The connected socket or INVALID_SOCKET if we failed 961 * to connect to the socket 962 * SSL_CTX The ssl context in use 963 * BIO The BIO to use. 964 * 965 * The caller takes ownership of the socket, ssl_ctx and bio and must 966 * release the resources when done using them. 967 * 968 * @throws std::exception for SSL related problems 969 */ 970 std::tuple<SOCKET, SSL_CTX*, BIO*> new_ssl_socket( 971 const std::string& host, 972 in_port_t port, 973 sa_family_t family, 974 std::function<void(SSL_CTX*)> setup_ssl_ctx); 975 976 } // namespace net 977 } // namespace cb 978