1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 #pragma once
18 
19 #include <boost/optional/optional_fwd.hpp>
20 #include <engines/ewouldblock_engine/ewouldblock_engine.h>
21 #include <memcached/engine_error.h>
22 #include <memcached/openssl.h>
23 #include <memcached/protocol_binary.h>
24 #include <memcached/rbac.h>
25 #include <memcached/types.h>
26 #include <platform/dynamic.h>
27 #include <platform/sized_buffer.h>
28 #include <platform/socket.h>
29 
30 #include <nlohmann/json_fwd.hpp>
31 
32 #include <chrono>
33 #include <cstdlib>
34 #include <functional>
35 #include <stdexcept>
36 #include <string>
37 #include <unordered_set>
38 #include <utility>
39 #include <vector>
40 
41 class FrameInfo;
42 
43 using FrameInfoVector = std::vector<std::unique_ptr<FrameInfo>>;
44 using GetFrameInfoFunction = std::function<FrameInfoVector()>;
45 
46 /**
47  * The Frame class is used to represent all of the data included in the
48  * protocol unit going over the wire.
49  */
50 class Frame {
51 public:
reset()52     void reset() {
53         payload.resize(0);
54     }
55 
getMagic() const56     cb::mcbp::Magic getMagic() const {
57         const auto magic(cb::mcbp::Magic(payload.at(0)));
58         if (!cb::mcbp::is_legal(magic)) {
59             throw std::invalid_argument(
60                     "Frame::getMagic: invalid magic provided in buffer");
61         }
62 
63         return magic;
64     }
65 
getRequest() const66     const cb::mcbp::Request* getRequest() const {
67         return reinterpret_cast<const cb::mcbp::Request*>(payload.data());
68     }
69 
getResponse() const70     const cb::mcbp::Response* getResponse() const {
71         return reinterpret_cast<const cb::mcbp::Response*>(payload.data());
72     }
73 
74     std::vector<uint8_t> payload;
75     typedef std::vector<uint8_t>::size_type size_type;
76 };
77 
78 class DocumentInfo {
79 public:
80     std::string id;
81     uint32_t flags;
82     uint32_t expiration;
83     cb::mcbp::Datatype datatype;
84     uint64_t cas;
85 
DocumentInfo()86     DocumentInfo()
87         : id(""),
88           flags(0),
89           expiration(0),
90           datatype(cb::mcbp::Datatype::Raw),
91           cas(0) {
92     }
93 
operator ==(const DocumentInfo& rhs) const94     bool operator==(const DocumentInfo& rhs) const {
95         return (id == rhs.id) && (flags == rhs.flags) &&
96                (expiration == rhs.expiration) && (datatype == rhs.datatype) &&
97                (cas == rhs.cas);
98     }
99 };
100 
101 ::std::ostream& operator<<(::std::ostream& os, const DocumentInfo& info);
102 
103 class Document {
104 public:
operator ==(const Document& rhs) const105     bool operator==(const Document& rhs) const {
106         return (info == rhs.info) && (value == rhs.value);
107     }
108 
109     /**
110      * Compress this document using Snappy. Replaces the value with a compressed
111      * version, adds Snappy to the set of datatypes.
112      */
113     void compress();
114 
115     DocumentInfo info;
116     std::string value;
117 };
118 
119 ::std::ostream& operator<<(::std::ostream& os, const Document& doc);
120 
121 class MutationInfo {
122 public:
123     uint64_t cas;
124     size_t size;
125     uint64_t seqno;
126     uint64_t vbucketuuid;
127 };
128 
129 struct ObserveInfo {
130     uint8_t formatType;
131     Vbid vbId;
132     uint64_t uuid;
133     uint64_t lastPersistedSeqno;
134     uint64_t currentSeqno;
135     uint64_t failoverUUID;
136     uint64_t failoverSeqno;
137 };
138 
139 enum class BucketType : uint8_t {
140     Invalid = 0,
141     Memcached = 1,
142     Couchbase = 2,
143     EWouldBlock = 3
144 };
145 
146 enum class MutationType {
147     Add, Set, Replace, Append, Prepend
148 };
149 
150 std::string to_string(MutationType type);
151 
152 class BinprotResponse;
153 class BinprotCommand;
154 
155 class ConnectionError : public std::runtime_error {
156 public:
157     ConnectionError(const std::string& prefix, cb::mcbp::Status reason_);
158 
159     ConnectionError(const std::string& prefix, const BinprotResponse& response);
160 
getReason() const161     cb::mcbp::Status getReason() const {
162         return reason;
163     }
164 
isInvalidArguments() const165     bool isInvalidArguments() const {
166         return reason == cb::mcbp::Status::Einval;
167     }
168 
isAlreadyExists() const169     bool isAlreadyExists() const {
170         return reason == cb::mcbp::Status::KeyEexists;
171     }
172 
isNotFound() const173     bool isNotFound() const {
174         return reason == cb::mcbp::Status::KeyEnoent;
175     }
176 
isNotMyVbucket() const177     bool isNotMyVbucket() const {
178         return reason == cb::mcbp::Status::NotMyVbucket;
179     }
180 
isNotStored() const181     bool isNotStored() const {
182         return reason == cb::mcbp::Status::NotStored;
183     }
184 
isAccessDenied() const185     bool isAccessDenied() const {
186         return reason == cb::mcbp::Status::Eaccess;
187     }
188 
isDeltaBadval() const189     bool isDeltaBadval() const {
190         return reason == cb::mcbp::Status::DeltaBadval;
191     }
192 
isAuthError() const193     bool isAuthError() const {
194         return reason == cb::mcbp::Status::AuthError;
195     }
196 
isNotSupported() const197     bool isNotSupported() const {
198         return reason == cb::mcbp::Status::NotSupported;
199     }
200 
isLocked() const201     bool isLocked() const {
202         return reason == cb::mcbp::Status::Locked;
203     }
204 
isTemporaryFailure() const205     bool isTemporaryFailure() const {
206         return reason == cb::mcbp::Status::Etmpfail;
207     }
208 
isTooBig() const209     bool isTooBig() const {
210         return reason == cb::mcbp::Status::E2big;
211     }
212 
213     std::string getErrorContext() const;
214 
215     nlohmann::json getErrorJsonContext() const;
216 
217 private:
218     const cb::mcbp::Status reason;
219     const std::string payload;
220 };
221 
222 /**
223  * Exception thrown when the received response deosn't match our expections.
224  */
225 struct ValidationError : public std::runtime_error {
ValidationErrorBucketType::ValidationError226     explicit ValidationError(const std::string& msg) : std::runtime_error(msg) {
227     }
228 };
229 
230 /**
231  * The execution mode represents the mode the server executes commands
232  * retrieved over the network. In an Ordered mode (that's the default mode
233  * and how things was defined in the initial implementation of the binary
234  * protocol) the server must not start executing the next command before
235  * the execution of the current command is completed. In Unordered mode
236  * the server may start executing (and report the result back to the client)
237  * whenever it feels like.
238  */
239 enum class ExecutionMode { Ordered, Unordered };
240 
241 /**
242  * The MemcachedConnection class is an abstract class representing a
243  * connection to memcached. The concrete implementations of the class
244  * implements the Memcached binary protocol and Greenstack.
245  *
246  * By default a connection is set into a synchronous mode.
247  *
248  * All methods is expeted to work, and all failures is reported through
249  * exceptions. Unexpected packets / responses etc will use the ConnectionError,
250  * and other problems (like network error etc) use std::runtime_error.
251  *
252  */
253 class MemcachedConnection {
254 public:
255     MemcachedConnection() = delete;
256 
257     MemcachedConnection(const MemcachedConnection&) = delete;
258 
259     /**
260      * Create a new instance of the MemcachedConnection
261      *
262      * @param host the hostname to connect to (empty == localhost)
263      * @param port the port number to connect to
264      * @param family the socket family to connect as (AF_INET, AF_INET6
265      *               or use AF_UNSPEC to just pick one)
266      * @param ssl connect over SSL or not
267      */
268     MemcachedConnection(std::string host,
269                         in_port_t port,
270                         sa_family_t family,
271                         bool ssl);
272 
273     ~MemcachedConnection();
274 
275     /**
276      * Release the socket from this instance. The caller is required
277      * to close the socket when it is no longer in use!
278      *
279      * @return the underlying socket
280      */
281     SOCKET releaseSocket();
282 
283     // Set a tag / label on this connection
setTag(std::string tag)284     void setTag(std::string tag) {
285         MemcachedConnection::tag = std::move(tag);
286     }
287 
getTag() const288     const std::string& getTag() const {
289         return tag;
290     }
291 
292     // Creates clone (copy) of the given connection - i.e. a second independent
293     // channel to memcached. Used for multi-connection testing.
294     std::unique_ptr<MemcachedConnection> clone();
295 
getPort() const296     in_port_t getPort() const {
297         return port;
298     }
299 
getFamily() const300     sa_family_t getFamily() const {
301         return family;
302     }
303 
isSsl() const304     bool isSsl() const {
305         return ssl;
306     }
307 
308     /**
309      * Set the SSL Certificate file to use
310      *
311      * @throws std::system_error if the file doesn't exist
312      */
313     void setSslCertFile(const std::string& file);
314 
315     /**
316      * Set the SSL private key file to use
317      *
318      * @throws std::system_error if the file doesn't exist
319      */
320     void setSslKeyFile(const std::string& file);
321 
322     /// Set the TLS version to use
323     void setTlsProtocol(std::string protocol);
324     /// Set the ciphers to use for TLS < 1.3
325     void setTls12Ciphers(std::string ciphers);
326     /// Set the ciphers to use for TLS >= 1.3
327     void setTls13Ciphers(std::string ciphers);
328 
329     /**
330      * Try to establish a connection to the server.
331      *
332      * @thows std::exception if an error occurs
333      */
334     void connect();
335 
336     /**
337      * Close the connection to the server
338      */
339     void close();
340 
341     /**
342      * Drop the current connection to the server and re-establish the
343      * connection.
344      */
reconnect()345     void reconnect() {
346         close();
347         connect();
348     }
349 
350     /**
351      * Perform a SASL authentication to memcached
352      *
353      * @param username the username to use in authentication
354      * @param password the password to use in authentication
355      * @param mech the SASL mech to use
356      */
357     void authenticate(const std::string& username,
358                       const std::string& password,
359                       const std::string& mech);
360 
361     /**
362      * Create a bucket
363      *
364      * @param name the name of the bucket
365      * @param config the buckets configuration attributes
366      * @param type the kind of bucket to create
367      */
368     void createBucket(const std::string& name,
369                       const std::string& config,
370                       BucketType type);
371 
372     /**
373      * Delete the named bucket
374      *
375      * @param name the name of the bucket
376      */
377     void deleteBucket(const std::string& name);
378 
379     /**
380      * Select the named bucket
381      *
382      * @param name the name of the bucket to select
383      */
384     void selectBucket(const std::string& name);
385 
386     /**
387      * List all of the buckets on the server
388      *
389      * @return a vector containing all of the buckets
390      */
391     std::vector<std::string> listBuckets(
392             GetFrameInfoFunction getFrameInfo = {});
393 
394     /**
395      * Fetch a document from the server
396      *
397      * @param id the name of the document
398      * @param vbucket the vbucket the document resides in
399      * @return a document object containg the information about the
400      *         document.
401      */
402     Document get(const std::string& id,
403                  Vbid vbucket,
404                  GetFrameInfoFunction getFrameInfo = {});
405 
406     /**
407      * Fetch multiple documents
408      *
409      * Send a pipeline of (quiet) get commands to the server and fire
410      * the documentCallback with the documents found in the server.
411      *
412      * If the server returns with an error the provided error callback
413      * will be called. (note that you won't receive a callback for
414      * documents that don't exist on the server as we're using the
415      * quiet commands.
416      *
417      * Use the getFrameInfo method if you'd like the server to perform
418      * out of order requests (note: the connection must be set to
419      * allow unordered execution).
420      *
421      * @param id The key and the vbucket the document resides in
422      * @param documentCallback the callback with the document for an
423      *                         operation
424      * @param errorCallback the callback if the server returns an error
425      * @param getFrameInfo Optional FrameInfo to inject to the commands
426      * @return A vector containing all of the found documents
427      */
428     void mget(const std::vector<std::pair<const std::string, Vbid>>& id,
429               std::function<void(std::unique_ptr<Document>&)> documentCallback,
430               std::function<void(const std::string&, const cb::mcbp::Response&)>
431                       errorCallback = {},
432               GetFrameInfoFunction getFrameInfo = {});
433 
434     /**
435      * Fetch and lock a document from the server
436      *
437      * @param id the name of the document
438      * @param vbucket the vbucket the document resides in
439      * @param lock_timeout the timeout (in sec) for the lock. 0 means
440      *                     use the default lock timeout from the server
441      * @return a document object containing the information about the
442      *         document.
443      */
444     Document get_and_lock(const std::string& id,
445                           Vbid vbucket,
446                           uint32_t lock_timeout,
447                           GetFrameInfoFunction getFrameInfo = {});
448 
449     /**
450      * Get the Failover Log for a given VBucket
451      *
452      * @param vbucket
453      * @return the raw BinprotResponse
454      */
455     BinprotResponse getFailoverLog(Vbid vbucket,
456                                    GetFrameInfoFunction getFrameInfo = {});
457 
458     /**
459      * Unlock a locked document
460      *
461      * @param id the name of the document
462      * @param vbucket the vbucket the document resides in
463      * @param cas the cas identifier of the locked document
464      */
465     void unlock(const std::string& id,
466                 Vbid vbucket,
467                 uint64_t cas,
468                 GetFrameInfoFunction getFrameInfo = {});
469 
470     void dropPrivilege(cb::rbac::Privilege privilege,
471                        GetFrameInfoFunction getFrameInfo = {});
472 
473     /*
474      * Form a Frame representing a CMD_GET
475      */
476     static Frame encodeCmdGet(const std::string& id, Vbid vbucket);
477 
mutate(const Document& doc, Vbid vbucket, MutationType type, GetFrameInfoFunction getFrameInfo = {})478     MutationInfo mutate(const Document& doc,
479                         Vbid vbucket,
480                         MutationType type,
481                         GetFrameInfoFunction getFrameInfo = {}) {
482         return mutate(doc.info,
483                       vbucket,
484                       cb::const_byte_buffer(reinterpret_cast<const uint8_t*>(
485                                                     doc.value.data()),
486                                             doc.value.size()),
487                       type,
488                       getFrameInfo);
489     }
490 
491     /**
492      * Perform the mutation on the attached document.
493      *
494      * The method throws an exception upon errors
495      *
496      * @param info Document metadata
497      * @param vbucket the vbucket to operate on
498      * @param value new value for the document
499      * @param type the type of mutation to perform
500      * @return the new cas value for success
501      */
502     MutationInfo mutate(const DocumentInfo& info,
503                         Vbid vbucket,
504                         cb::const_byte_buffer value,
505                         MutationType type,
506                         GetFrameInfoFunction getFrameInfo = {});
507 
508     /**
509      * Convenience method to store (aka "upsert") an item.
510      * @param id The item's ID
511      * @param vbucket vBucket
512      * @param value Value of the item.
513      * @return The mutation result.
514      */
515     MutationInfo store(const std::string& id,
516                        Vbid vbucket,
517                        std::string value,
518                        cb::mcbp::Datatype datatype = cb::mcbp::Datatype::Raw,
519                        GetFrameInfoFunction getFrameInfo = {});
520 
521     /**
522      * Get statistics from the server, and fire a callback with the key and
523      * value of each reported stat
524      *
525      * @param callback the callback to call for each stat
526      * @param group the stats group to request
527      */
528     void
529     stats(std::function<void(const std::string&, const std::string&)> callback,
530           const std::string& group = std::string{});
531 
532     /**
533      * Get stats as a map
534      * @param subcommand
535      * @return
536      */
537     std::map<std::string, std::string> statsMap(const std::string& subcommand);
538 
539     nlohmann::json stats(const std::string& subcommand);
540 
541     /**
542      * Instruct the audit daemon to reload the configuration
543      */
544     void reloadAuditConfiguration(GetFrameInfoFunction getFrameInfo = {});
545 
546     /**
547      * Sent the given frame over this connection
548      *
549      * @param frame the frame to send to the server
550      */
551     void sendFrame(const Frame& frame);
552 
553     /** Send part of the given frame over this connection. Upon success,
554      * the frame's payload will be modified such that the sent bytes are
555      * deleted - i.e. after a successful call the frame object will only have
556      * the remaining, unsent bytes left.
557      *
558      * @param frame The frame to partially send.
559      * @param length The number of bytes to transmit. Must be less than or
560      *               equal to the size of the frame.
561      */
562     void sendPartialFrame(Frame& frame, Frame::size_type length);
563 
564     /**
565      * Receive the next frame on the connection
566      *
567      * @param frame the frame object to populate with the next frame
568      */
569     void recvFrame(Frame& frame);
570 
571     void sendCommand(const BinprotCommand& command);
572 
573     void recvResponse(BinprotResponse& response);
574 
575     /**
576      * Execute a command on the server and return the raw response packet.
577      */
578     BinprotResponse execute(const BinprotCommand& command);
579 
580     /// Execute a command on the server and return the response
581     Frame execute(const Frame& frame);
582     /**
583      * Get a textual representation of this connection
584      *
585      * @return a textual representation of the connection including the
586      *         protocol and any special attributes
587      */
588     std::string to_string();
589 
590     /**
591      * Try to configure the ewouldblock engine
592      *
593      * See the header /engines/ewouldblock_engine/ewouldblock_engine.h
594      * for a full description on the parameters.
595      */
596     void configureEwouldBlockEngine(
597             const EWBEngineMode& mode,
598             ENGINE_ERROR_CODE err_code = ENGINE_EWOULDBLOCK,
599             uint32_t value = 0,
600             const std::string& key = "");
601 
602     /**
603      * Disable the ewouldblock engine entirely.
604      */
disableEwouldBlockEngine()605     void disableEwouldBlockEngine() {
606         // We disable the engine by telling it to inject the given error
607         // the next 0 times
608         configureEwouldBlockEngine(EWBEngineMode::Next_N, ENGINE_SUCCESS, 0);
609     }
610 
611     /**
612      * Identify ourself to the server.
613      *
614      * @throws std::runtime_error if an error occurs
615      */
616     void hello(const std::string& userAgent,
617                const std::string& userAgentVersion,
618                const std::string& comment);
619 
620     /**
621      * Get the servers SASL mechanisms.
622      *
623      * @throws std::runtime_error if an error occurs
624      */
625     std::string getSaslMechanisms();
626 
627     /**
628      * Request the IOCTL value from the server
629      *
630      * @param key the IOCTL to request
631      * @return A textual representation of the key
632      */
633     std::string ioctl_get(const std::string& key,
634                           GetFrameInfoFunction getFrameInfo = {});
635 
636     /**
637      * Perform an IOCTL on the server
638      *
639      * @param key the IOCTL to set
640      * @param value the value to specify for the given key
641      */
642     void ioctl_set(const std::string& key,
643                    const std::string& value,
644                    GetFrameInfoFunction getFrameInfo = {});
645 
646     /**
647      * Perform an arithmetic operation on a document (increment or decrement)
648      *
649      * You may use this method when operating on "small" delta values which
650      * fit into a signed 64 bit integer. If you for some reason need to
651      * incr / decr values above that you must use increment and decrement
652      * directly.
653      *
654      * @param key the document to operate on
655      * @param delta The value to increment / decrement
656      * @param initial Create with the initial value (exptime must be set to
657      *                != 0xffffffff)
658      * @param exptime The expiry time for the document
659      * @param info Where to store the mutation info.
660      * @return The new value for the counter
661      */
arithmetic(const std::string& key, int64_t delta, uint64_t initial = 0, rel_time_t exptime = 0, MutationInfo* info = nullptr, GetFrameInfoFunction getFrameInfo = {})662     uint64_t arithmetic(const std::string& key,
663                         int64_t delta,
664                         uint64_t initial = 0,
665                         rel_time_t exptime = 0,
666                         MutationInfo* info = nullptr,
667                         GetFrameInfoFunction getFrameInfo = {}) {
668         if (delta < 0) {
669             return decrement(key,
670                              uint64_t(std::abs(delta)),
671                              initial,
672                              exptime,
673                              info,
674                              getFrameInfo);
675         } else {
676             return increment(
677                     key, uint64_t(delta), initial, exptime, info, getFrameInfo);
678         }
679     }
680 
681     /**
682      * Perform an increment operation on a document
683      *
684      * This method only exists in order to test the situations where you want
685      * to increment a value that wouldn't fit into a signed 64 bit integer.
686      *
687      * @param key the document to operate on
688      * @param delta The value to increment
689      * @param initial Create with the initial value (exptime must be set to
690      *                != 0xffffffff)
691      * @param exptime The expiry time for the document
692      * @param info Where to store the mutation info.
693      * @return The new value for the counter
694      */
695     uint64_t increment(const std::string& key,
696                        uint64_t delta,
697                        uint64_t initial = 0,
698                        rel_time_t exptime = 0,
699                        MutationInfo* info = nullptr,
700                        GetFrameInfoFunction getFrameInfo = {});
701 
702     /**
703      * Perform an decrement operation on a document
704      *
705      * @param key the document to operate on
706      * @param delta The value to increment / decrement
707      * @param initial Create with the initial value (exptime must be set to
708      *                != 0xffffffff)
709      * @param exptime The expiry time for the document
710      * @param info Where to store the mutation info.
711      * @return The new value for the counter
712      */
713     uint64_t decrement(const std::string& key,
714                        uint64_t delta,
715                        uint64_t initial = 0,
716                        rel_time_t exptime = 0,
717                        MutationInfo* info = nullptr,
718                        GetFrameInfoFunction getFrameInfo = {});
719 
720     /**
721      * Remove the named document
722      *
723      * @param key the document to remove
724      * @param vbucket the vbucket the document is stored in
725      * @param cas the specific version of the document or 0 for "any"
726      * @return Details about the detion
727      */
728     MutationInfo remove(const std::string& key,
729                         Vbid vbucket,
730                         uint64_t cas = 0,
731                         GetFrameInfoFunction getFrameInfo = {});
732 
733     /**
734      * Mutate with meta - stores doc into the bucket using all the metadata
735      * from doc, e.g. doc.cas will become the stored cas (on success).
736      *
737      * @param doc The document to set
738      * @param vbucket The vbucket the document is stored in
739      * @param cas The cas used for the setWithMeta (note this cas is not stored
740      *            on success)
741      * @param seqno The seqno to store the document as
742      * @param metaOption MCBP options that can be sent with the command
743      * @param metaExtras Optional - see ep/src/ext_meta_parser.h for the details
744      *                   of this.
745      */
746     MutationInfo mutateWithMeta(Document& doc,
747                                 Vbid vbucket,
748                                 uint64_t cas,
749                                 uint64_t seqno,
750                                 uint32_t metaOption,
751                                 std::vector<uint8_t> metaExtras = {},
752                                 GetFrameInfoFunction getFrameInfo = {});
753 
754     std::pair<cb::mcbp::Status, GetMetaResponse> getMeta(
755             const std::string& key,
756             Vbid vbucket,
757             GetMetaVersion version,
758             GetFrameInfoFunction getFrameInfo = {});
759 
760     /**
761      * Evict the provided key
762      *
763      * @param key The key to evict
764      * @param vbucket The vbucket the key belongs to
765      * @param getFrameInfo  Optional frame ids
766      */
767     void evict(const std::string& key,
768                Vbid vbucket,
769                GetFrameInfoFunction getFrameInfo = {});
770 
771     /**
772      * Observe Seqno command - retrieve the persistence status of the given
773      * vBucket and UUID.
774      */
775     ObserveInfo observeSeqno(Vbid vbid,
776                              uint64_t uuid,
777                              GetFrameInfoFunction getFrameInfo = {});
778 
779     /// Enable persistence for the connected bucket.
780     void enablePersistence(GetFrameInfoFunction getFrameInfo = {});
781 
782     /// Disable persistence for the connected bucket.
783     void disablePersistence(GetFrameInfoFunction getFrameInfo = {});
784 
hasFeature(cb::mcbp::Feature feature) const785     bool hasFeature(cb::mcbp::Feature feature) const {
786         return effective_features.find(uint16_t(feature)) !=
787                effective_features.end();
788     }
789 
setDatatypeJson(bool enable)790     void setDatatypeJson(bool enable) {
791         setFeature(cb::mcbp::Feature::JSON, enable);
792     }
793 
setMutationSeqnoSupport(bool enable)794     void setMutationSeqnoSupport(bool enable) {
795         setFeature(cb::mcbp::Feature::MUTATION_SEQNO, enable);
796     }
797 
setXattrSupport(bool enable)798     void setXattrSupport(bool enable) {
799         setFeature(cb::mcbp::Feature::XATTR, enable);
800     }
801 
setXerrorSupport(bool enable)802     void setXerrorSupport(bool enable) {
803         setFeature(cb::mcbp::Feature::XERROR, enable);
804     }
805 
setDuplexSupport(bool enable)806     void setDuplexSupport(bool enable) {
807         setFeature(cb::mcbp::Feature::Duplex, enable);
808     }
809 
setClustermapChangeNotification(bool enable)810     void setClustermapChangeNotification(bool enable) {
811         setFeature(cb::mcbp::Feature::ClustermapChangeNotification, enable);
812     }
813 
814     void setUnorderedExecutionMode(ExecutionMode mode);
815 
816     /**
817      * Attempts to enable or disable a feature
818      * @param feature Feature to enable or disable
819      * @param enabled whether to enable or disable
820      */
821     void setFeature(cb::mcbp::Feature feature, bool enabled);
822 
getTraceData() const823     boost::optional<std::chrono::microseconds> getTraceData() const {
824         return traceData;
825     }
826 
827     /**
828      * Set the connection features to use
829      *
830      * @param agent the agent name to report to the server
831      * @param features a vector containing all of the features to try
832      *                 to enable on the server
833      */
834     void setFeatures(const std::string& agent,
835                      const std::vector<cb::mcbp::Feature>& features);
836 
837     void setVbucket(Vbid vbid,
838                     vbucket_state_t state,
839                     const nlohmann::json& payload,
840                     GetFrameInfoFunction getFrameInfo = {});
841 
842     /// should the client automatically retry operations which fail
843     /// with a tmpfail or not (note that this is only possible when
844     /// the client object have the command frame available
setAutoRetryTmpfail(bool value)845     void setAutoRetryTmpfail(bool value) {
846         auto_retry_tmpfail = value;
847     }
848 
getAutoRetryTmpfail() const849     bool getAutoRetryTmpfail() const {
850         return auto_retry_tmpfail;
851     }
852 
853     Document getRandomKey(Vbid vbid);
854 
855 protected:
856     void read(Frame& frame, size_t bytes);
857 
858     void readPlain(Frame& frame, size_t bytes);
859 
860     void readSsl(Frame& frame, size_t bytes);
861 
862     void sendBuffer(const std::vector<iovec>& buf);
863     void sendBuffer(cb::const_byte_buffer buf);
864 
865     void sendBufferPlain(cb::const_byte_buffer buf);
866     void sendBufferPlain(const std::vector<iovec>& list);
867 
868     void sendBufferSsl(cb::const_byte_buffer buf);
869     void sendBufferSsl(const std::vector<iovec>& list);
870 
871     void applyFrameInfos(BinprotCommand& command, GetFrameInfoFunction& fi);
872 
873     /**
874      * Keep on calling the executor function until it returns true.
875      *
876      * Every time the function returns false the thread sleeps for the
877      * provided number of milliseconds. If the loop takes longer than
878      * the provided number of seconds the method throws an exception.
879      *
880      * @param executor The function to call until it returns true
881      * @param backoff The number of milliseconds to back off
882      * @param timeout The number of seconds until an exception is thrown
883      * @throws std::runtime_error for timeouts
884      */
885     void backoff_execute(
886             std::function<bool()> executor,
887             std::chrono::milliseconds backoff = std::chrono::milliseconds(10),
888             std::chrono::seconds timeout = std::chrono::seconds(30));
889 
890     std::string host;
891     in_port_t port;
892     sa_family_t family;
893     bool auto_retry_tmpfail = false;
894     bool ssl;
895     std::string tls_protocol;
896     std::string tls12_ciphers{"HIGH"};
897     std::string tls13_ciphers{
898             "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_128_"
899             "GCM_SHA256"};
900     std::string ssl_cert_file;
901     std::string ssl_key_file;
902     SSL_CTX* context = nullptr;
903     BIO* bio = nullptr;
904     SOCKET sock = INVALID_SOCKET;
905     std::string tag;
906     boost::optional<std::chrono::microseconds> traceData;
907 
908     typedef std::unordered_set<uint16_t> Featureset;
909 
910     uint64_t incr_decr(cb::mcbp::ClientOpcode opcode,
911                        const std::string& key,
912                        uint64_t delta,
913                        uint64_t initial,
914                        rel_time_t exptime,
915                        MutationInfo* info,
916                        GetFrameInfoFunction getFrameInfo = {});
917 
918     /**
919      * Set the features on the server by using the MCBP hello command
920      *
921      * The internal `features` array is updated with the result sent back
922      * from the server.
923      *
924      * @param agent the agent name provided by the client
925      * @param feat the featureset to enable.
926      */
927     void applyFeatures(const std::string& agent, const Featureset& features);
928 
929     Featureset effective_features;
930 };
931 
932 namespace cb {
933 namespace net {
934 /**
935  * Create a new socket and connect it to the given host
936  *
937  * @param host The name of the host to try to connect to. If
938  *             empty (or set to localhost) it'll be replaced
939  *             with "127.0.0.1" or "::1" depending on the value
940  *             of family
941  * @param port The port number to connect to
942  * @param family The socket family to create (AF_INET/AF_INET6/AF_UNSPEC)
943  * @return The connected socket or INVALID_SOCKET if we failed to connect
944  *         to the socket
945  *
946  */
947 SOCKET new_socket(const std::string& host, in_port_t port, sa_family_t family);
948 
949 /**
950  * Create a new socket and connect it to the given host
951  *
952  * @param host The name of the host to try to connect to. If
953  *             empty (or set to localhost) it'll be replaced
954  *             with "127.0.0.1" or "::1" depending on the value
955  *             of family
956  * @param port The port number to connect to
957  * @param family The socket family to create (AF_INET/AF_INET6/AF_UNSPEC)
958  * @param setup_ssl_ctx callback to configure the SSL context
959  * @return Tuple with:
960  *             SOCKET The connected socket or INVALID_SOCKET if we failed
961  *                    to connect to the socket
962  *             SSL_CTX The ssl context in use
963  *             BIO The BIO to use.
964  *
965  * The caller takes ownership of the socket, ssl_ctx and bio and must
966  * release the resources when done using them.
967  *
968  * @throws std::exception for SSL related problems
969  */
970 std::tuple<SOCKET, SSL_CTX*, BIO*> new_ssl_socket(
971         const std::string& host,
972         in_port_t port,
973         sa_family_t family,
974         std::function<void(SSL_CTX*)> setup_ssl_ctx);
975 
976 } // namespace net
977 } // namespace cb
978