xref: /6.0.3/kv_engine/daemon/connection.h (revision ea5da9f2)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17#pragma once
18
19#include "config.h"
20
21#include "datatype.h"
22#include "dynamic_buffer.h"
23#include "log_macros.h"
24#include "settings.h"
25#include "ssl_context.h"
26#include "statemachine_mcbp.h"
27#include "task.h"
28#include "stats.h"
29
30#include <cJSON.h>
31#include <cbsasl/cbsasl.h>
32#include <daemon/protocol/mcbp/command_context.h>
33#include <memcached/openssl.h>
34#include <memcached/rbac.h>
35#include <platform/sized_buffer.h>
36
37#include <array>
38#include <chrono>
39#include <memory>
40#include <queue>
41#include <string>
42#include <vector>
43
44struct LIBEVENT_THREAD;
45class Cookie;
46class ListeningPort;
47class Bucket;
48class ServerEvent;
49
50/**
51 * Adjust a message header structure by "consuming" nbytes of data.
52 *
53 * The msghdr structure contains an io-vector of data to send, and
54 * by consuming data, we "rebuild" the io-vector by moving the
55 * base pointer to the io-vector past all of the fully transferred
56 * elements, and move the last iov_base pointer the resulting bytes
57 * forward (and reduce the last iov_len the same number of bytes)
58 *
59 * @param pipe The pipe structure where we may have stored data pointed
60 *             to in the io-vector. We need to mark those as consumed
61 *             when we skip them in the io-vector.
62 * @param m The message header structure to update
63 * @param nbytes The number of bytes to skip
64 * @return The number of bytes left in the first element in the io-vector
65 */
66size_t adjust_msghdr(cb::Pipe& pipe, struct msghdr* m, ssize_t nbytes);
67
68/**
69 * The maximum number of character the core preserves for the
70 * agent name for each connection
71 */
72const size_t MaxSavedAgentName = 33;
73
74/**
75 * The maximum number of character the core preserves for the
76 * connection identifier for each connection
77 */
78const size_t MaxSavedConnectionId = 34;
79
80/**
81 * The structure representing a connection in memcached.
82 */
83class Connection {
84public:
85    enum class Priority : uint8_t {
86        High,
87        Medium,
88        Low
89    };
90
91    Connection(const Connection&) = delete;
92
93    Connection(SOCKET sfd, event_base* b, const ListeningPort& ifc);
94
95    ~Connection();
96
97    /**
98     * Return an identifier for this connection. To be backwards compatible
99     * this is the socket filedescriptor (or the socket handle casted to an
100     * unsigned integer on windows).
101     */
102    uint32_t getId() const {
103        return uint32_t(socketDescriptor);
104    }
105
106    /**
107     *  Get the socket descriptor used by this connection.
108     */
109    SOCKET getSocketDescriptor() const {
110        return socketDescriptor;
111    }
112
113    /**
114     * Set the socket descriptor used by this connection
115     */
116    void setSocketDescriptor(SOCKET sfd) {
117        Connection::socketDescriptor = sfd;
118    }
119
120    bool isSocketClosed() const {
121        return socketDescriptor == INVALID_SOCKET;
122    }
123
124    const std::string& getPeername() const {
125        return peername;
126    }
127
128    const std::string& getSockname() const {
129        return sockname;
130    }
131
132    /**
133     * Returns a descriptive name for the connection, of the form:
134     *   "[peer_name - local_name ]"
135     * (system) is appended to the string for system connections.
136     */
137    const std::string& getDescription() const {
138        return description;
139    }
140
141    /**
142     * Signal a connection if it's idle
143     *
144     * @param logbusy set to true if you want to log the connection details
145     *                if the connection isn't idle
146     * @param workerthead the id of the workerthread (for logging purposes)
147     */
148    void signalIfIdle(bool logbusy, size_t workerthread);
149
150    /**
151     * Terminate the eventloop for the current event base. This method doesn't
152     * really fit as a member for the class, but I don't want clients to access
153     * the libevent details from outside the class (so I didn't want to make
154     * a "getEventBase()" method.
155     */
156    void eventBaseLoopbreak() {
157        event_base_loopbreak(base);
158    }
159
160    /**
161     * Is the connection representing a system internal user
162     */
163    bool isInternal() const {
164        return internal;
165    }
166
167    /**
168     * Specify if this connection is representing an internal user.
169     * An internal user is a user which is used by one of the components
170     * in Couchbase (like ns_server, indexer etc).
171     */
172    void setInternal(bool internal) {
173        Connection::internal = internal;
174    }
175
176    /**
177     * Update the username to reflect what the user used from the SASL
178     * authentication.
179     */
180    void resetUsernameCache();
181
182
183    bool isAuthenticated() const {
184        return authenticated;
185    }
186
187    void setAuthenticated(bool authenticated) {
188        Connection::authenticated = authenticated;
189        if (authenticated) {
190            updateDescription();
191            privilegeContext = cb::rbac::createContext(username, "");
192        } else {
193            resetUsernameCache();
194            privilegeContext = cb::rbac::PrivilegeContext{};
195        }
196    }
197
198    void setPriority(const Priority& priority);
199
200    /**
201     * Create a cJSON representation of the members of the connection
202     */
203    unique_cJSON_ptr toJSON() const;
204
205    /**
206     * Enable or disable TCP NoDelay on the underlying socket
207     *
208     * @return true on success, false otherwise
209     */
210    bool setTcpNoDelay(bool enable);
211
212    /**
213     * Get the username this connection is authenticated as
214     *
215     * NOTE: the return value should not be returned by the client
216     */
217    const char* getUsername() const {
218        return username.c_str();
219    }
220
221    /**
222     * Get the domain where the user is defined (builtin or saslauthd)
223     */
224    cb::sasl::Domain getDomain() const {
225        return domain;
226    }
227
228    cbsasl_conn_t* getSaslConn() const {
229        return sasl_conn.get();
230    }
231
232    /**
233     * Get the current reference count
234     */
235    uint8_t getRefcount() const {
236        return refcount;
237    }
238
239    void incrementRefcount() {
240        ++refcount;
241    }
242
243    void decrementRefcount() {
244        --refcount;
245    }
246
247    LIBEVENT_THREAD* getThread() const {
248        return thread.load(std::memory_order_relaxed);
249    }
250
251    void setThread(LIBEVENT_THREAD* thread) {
252        Connection::thread.store(thread,
253                                 std::memory_order::memory_order_relaxed);
254    }
255
256    in_port_t getParentPort() const {
257        return parent_port;
258    }
259
260    /**
261     * Check if this connection is in posession of the requested privilege
262     *
263     * @param privilege the privilege to check for
264     * @return Ok - the connection holds the privilege
265     *         Fail - the connection is missing the privilege
266     *         Stale - the authentication context is stale
267     */
268    cb::rbac::PrivilegeAccess checkPrivilege(cb::rbac::Privilege privilege,
269                                             Cookie& cookie);
270
271    /**
272     * Try to drop the specified privilege from the current context
273     *
274     * @param privilege the privilege to drop
275     * @return The appropriate error code to return back to the client
276     */
277    cb::engine_errc dropPrivilege(cb::rbac::Privilege privilege);
278
279    int getBucketIndex() const {
280        return bucketIndex.load(std::memory_order_relaxed);
281    }
282
283    void setBucketIndex(int bucketIndex);
284
285    Bucket& getBucket() const;
286
287    ENGINE_HANDLE_V1* getBucketEngine() const {
288        return bucketEngine;
289    };
290
291    ENGINE_HANDLE* getBucketEngineAsV0() const {
292        return reinterpret_cast<ENGINE_HANDLE*>(bucketEngine);
293    }
294
295    void setBucketEngine(ENGINE_HANDLE_V1* bucketEngine) {
296        Connection::bucketEngine = bucketEngine;
297    };
298
299    void* getEngineStorage() const {
300        return engine_storage;
301    }
302
303    void setEngineStorage(void* engine_storage) {
304        Connection::engine_storage = engine_storage;
305    }
306
307    int getClustermapRevno() const {
308        return clustermap_revno;
309    }
310
311    void setClustermapRevno(int clustermap_revno) {
312        Connection::clustermap_revno = clustermap_revno;
313    }
314
315    /**
316     * Restart the authentication (this clears all of the authentication
317     * data...)
318     */
319    void restartAuthentication();
320
321    bool isXerrorSupport() const {
322        return xerror_support;
323    }
324
325    void setXerrorSupport(bool xerror_support) {
326        Connection::xerror_support = xerror_support;
327    }
328
329    bool isCollectionsSupported() const {
330        return collections_support;
331    }
332
333    void setCollectionsSupported(bool collections_support) {
334        Connection::collections_support = collections_support;
335    }
336
337    DocNamespace getDocNamespace() const {
338        if (isCollectionsSupported()) {
339            return DocNamespace::Collections;
340        } else {
341            return DocNamespace::DefaultCollection;
342        }
343    }
344
345    bool isDuplexSupported() const {
346        return duplex_support;
347    }
348
349    void setDuplexSupported(bool duplex_support) {
350        Connection::duplex_support = duplex_support;
351    }
352
353    bool isClustermapChangeNotificationSupported() const {
354        return cccp.load(std::memory_order_acquire);
355    }
356
357    void setClustermapChangeNotificationSupported(bool cccp) {
358        Connection::cccp.store(cccp, std::memory_order_release);
359    }
360
361    bool allowUnorderedExecution() const {
362        return allow_unordered_execution;
363    }
364
365    void setAllowUnorderedExecution(bool allow_unordered_execution) {
366        Connection::allow_unordered_execution = allow_unordered_execution;
367    }
368
369    /**
370     * Remap the current error code
371     *
372     * The method modifies the input code and returns the mapped value
373     * (to make the function a bit easier to use).
374     *
375     * Depending on which features the client have enabled the method
376     * may either just return the input value, map it to a different value
377     * (like ENGINE_DISCONNECT if the client hasn't enabled the extened
378     * error codes).
379     *
380     * @param code The code to map (will be changed on return)
381     * @return the mapped value.
382     */
383    ENGINE_ERROR_CODE remapErrorCode(ENGINE_ERROR_CODE code) const;
384
385    /**
386     * Add the specified number of ns to the amount of CPU time this
387     * connection have used on the CPU (We could alternatively have
388     * separate "ON_CPU" and "OFF_CPU" events and record all of this
389     * within the connection object instead, but it seemed easier to
390     * just wrap it from the method driving the event loop (as we
391     * also want to record the delta to the thread scheduler histogram
392     *
393     * @param ns The number of nanoseconds spent in this iteration.
394     */
395    void addCpuTime(std::chrono::nanoseconds ns);
396
397    /**
398     * Enqueue a new server event
399     *
400     * @param event
401     */
402    void enqueueServerEvent(std::unique_ptr<ServerEvent> event);
403
404    /**
405     * Close the connection. If there is any references to the connection
406     * or the cookies we'll enter the "pending close" state to wait for
407     * these operations to complete before changing state to immediate
408     * close.
409     */
410    void close();
411
412    /**
413     * fire ON_DISCONNECT for all of the cookie objects (in case the
414     * underlying engine keeps track of any of them)
415     */
416    void propagateDisconnect() const;
417
418    void setState(McbpStateMachine::State next_state);
419
420    McbpStateMachine::State getState() const {
421        return stateMachine.getCurrentState();
422    }
423
424    const char* getStateName() const {
425        return stateMachine.getCurrentStateName();
426    }
427
428    bool isDCP() const {
429        return dcp;
430    }
431
432    void setDCP(bool dcp) {
433        Connection::dcp = dcp;
434    }
435
436    bool isDcpXattrAware() const {
437        return dcpXattrAware;
438    }
439
440    void setDcpXattrAware(bool dcpXattrAware) {
441        Connection::dcpXattrAware = dcpXattrAware;
442    }
443
444    bool isDcpCollectionAware() const {
445        return dcpCollectionAware;
446    }
447
448    void setDcpCollectionAware(bool dcpCollectionAware) {
449        Connection::dcpCollectionAware = dcpCollectionAware;
450    }
451
452    void setDcpDeleteTimeEnabled(bool dcpDeleteTimeEnabled) {
453        Connection::dcpDeleteTimeEnabled = dcpDeleteTimeEnabled;
454    }
455
456    bool isDcpDeleteTimeEnabled() const {
457        return dcpDeleteTimeEnabled;
458    }
459
460    /// returns true if either collections or delete_time is enabled
461    bool isDcpDeleteV2() const {
462        return isDcpCollectionAware() || isDcpDeleteTimeEnabled();
463    }
464
465    /**
466     * Get the DocNamespace for a DcpMessage (mutation/deletion/expiration)
467     * If the connection is dcp aware and the passed length is not zero, then
468     * the document belongs to a collection.
469     * @param collectionLength the length sent by the producer
470     * @return the DocNamespace (DefaultCollection or Collections)
471     */
472    DocNamespace getDocNamespaceForDcpMessage(uint8_t collectionLength) const {
473        DocNamespace ns = DocNamespace::DefaultCollection;
474        if (isDcpCollectionAware() && collectionLength != 0) {
475            // Collection aware DCP sends non-zero collectionLength for
476            // documents that belong to a collection.
477            ns = DocNamespace::Collections;
478        }
479        return ns;
480    }
481
482    bool isDcpNoValue() const {
483        return dcpNoValue;
484    }
485
486    void setDcpNoValue(bool dcpNoValue) {
487        Connection::dcpNoValue = dcpNoValue;
488    }
489
490    /**
491     * Decrement the number of events to process and return the new value
492     */
493    int decrementNumEvents() {
494        return --numEvents;
495    }
496
497    /**
498     * Set the number of events to process per timeslice of the worker
499     * thread before yielding.
500     */
501    void setNumEvents(int nevents) {
502        Connection::numEvents = nevents;
503    }
504
505    /**
506     * Get the maximum number of events we should process per invocation
507     * for a connection object (to avoid starvation of other connections)
508     */
509    int getMaxReqsPerEvent() const {
510        return max_reqs_per_event;
511    }
512
513    /**
514     * Update the settings in libevent for this connection
515     *
516     * @param mask the new event mask to get notified about
517     */
518    bool updateEvent(const short new_flags);
519
520    /**
521     * Reapply the event mask (in case of a timeout we might want to do
522     * that)
523     */
524    bool reapplyEventmask();
525
526    /**
527     * Unregister the event structure from libevent
528     * @return true if success, false otherwise
529     */
530    bool unregisterEvent();
531
532    /**
533     * Register the event structure in libevent
534     * @return true if success, false otherwise
535     */
536    bool registerEvent();
537
538    bool isRegisteredInLibevent() const {
539        return registered_in_libevent;
540    }
541
542    void setCurrentEvent(short ev) {
543        currentEvent = ev;
544    }
545
546    /** Is the current event a readevent? */
547    bool isReadEvent() const {
548        return currentEvent & EV_READ;
549    }
550
551    /** Is the current event a writeevent? */
552    bool isWriteEvent() const {
553        return currentEvent & EV_WRITE;
554    }
555
556    /**
557     * Shrinks a connection's buffers if they're too big.  This prevents
558     * periodic large "get" requests from permanently chewing lots of server
559     * memory.
560     *
561     * This should only be called in between requests since it can wipe output
562     * buffers!
563     */
564    void shrinkBuffers();
565
566    /**
567     * Receive data from the socket
568     *
569     * @param where to store the result
570     * @param nbytes the size of the buffer
571     *
572     * @return the number of bytes read, or -1 for an error
573     */
574    int recv(char* dest, size_t nbytes);
575
576    /**
577     * Send data over the socket
578     *
579     * @param m the message header to send
580     * @return the number of bytes sent, or -1 for an error
581     */
582    ssize_t sendmsg(struct msghdr* m);
583
584    enum class TransmitResult {
585        /** All done writing. */
586        Complete,
587        /** More data remaining to write. */
588        Incomplete,
589        /** Can't write any more right now. */
590        SoftError,
591        /** Can't write (c->state is set to conn_closing) */
592        HardError
593    };
594
595    /**
596     * Transmit the next chunk of data from our list of msgbuf structures.
597     *
598     * Returns:
599     *   Complete   All done writing.
600     *   Incomplete More data remaining to write.
601     *   SoftError Can't write any more right now.
602     *   HardError Can't write (c->state is set to conn_closing)
603     */
604    TransmitResult transmit();
605
606    enum class TryReadResult {
607        /** Data received on the socket and ready to parse */
608        DataReceived,
609        /** No data received on the socket */
610        NoDataReceived,
611        /** The client closed the connection */
612        SocketClosed,
613        /** An error occurred on the socket */
614        SocketError,
615        /** Failed to allocate more memory for the input buffer */
616        MemoryError
617    };
618
619    /**
620     * read from network as much as we can, handle buffer overflow and
621     * connection close. Before reading, move the remaining incomplete fragment
622     * of a command (if any) to the beginning of the buffer.
623     *
624     * @return enum try_read_result
625     */
626    TryReadResult tryReadNetwork();
627
628    const McbpStateMachine::State getWriteAndGo() const {
629        return write_and_go;
630    }
631
632    void setWriteAndGo(McbpStateMachine::State write_and_go) {
633        Connection::write_and_go = write_and_go;
634    }
635
636    /**
637     * Get the number of entries in use in the IO Vector
638     */
639    size_t getIovUsed() const {
640        return iovused;
641    }
642
643    /**
644     * Adds a message header to a connection.
645     *
646     * @param reset set to true to reset all message headers
647     * @throws std::bad_alloc
648     */
649    void addMsgHdr(bool reset);
650
651    /**
652     * Add a chunk of memory to the the IO vector to send
653     *
654     * @param buf pointer to the data to send
655     * @param len number of bytes to send
656     * @throws std::bad_alloc
657     */
658    void addIov(const void* buf, size_t len);
659
660    /**
661     * Release all of the items we've saved a reference to
662     */
663    void releaseReservedItems() {
664        ENGINE_HANDLE* handle = reinterpret_cast<ENGINE_HANDLE*>(bucketEngine);
665        for (auto* it : reservedItems) {
666            bucketEngine->release(handle, it);
667        }
668        reservedItems.clear();
669    }
670
671    /**
672     * Put an item on our list of reserved items (which we should release
673     * at a later time through releaseReservedItems).
674     *
675     * @return true if success, false otherwise
676     */
677    bool reserveItem(void* item) {
678        try {
679            reservedItems.push_back(item);
680            return true;
681        } catch (const std::bad_alloc&) {
682            return false;
683        }
684    }
685
686    void releaseTempAlloc() {
687        for (auto* ptr : temp_alloc) {
688            cb_free(ptr);
689        }
690        temp_alloc.resize(0);
691    }
692
693    void pushTempAlloc(char* ptr) {
694        temp_alloc.push_back(ptr);
695    }
696
697    /**
698     * Enable the datatype which corresponds to the feature
699     *
700     * @param feature mcbp::Feature::JSON|XATTR|SNAPPY
701     * @throws if feature does not correspond to a datatype
702     */
703    void enableDatatype(cb::mcbp::Feature feature) {
704        datatype.enable(feature);
705    }
706
707    /**
708     * Disable all the datatypes
709     */
710    void disableAllDatatypes() {
711        datatype.disableAll();
712    }
713
714    /**
715     * Given the input datatype, return only those which are enabled for the
716     * connection.
717     *
718     * @param dtype the set to intersect against the enabled set
719     * @returns the intersection of the enabled bits and dtype
720     */
721    protocol_binary_datatype_t getEnabledDatatypes(
722            protocol_binary_datatype_t dtype) const {
723        return datatype.getIntersection(dtype);
724    }
725
726    /**
727     * @return true if the all of the dtype datatypes are all enabled
728     */
729    bool isDatatypeEnabled(protocol_binary_datatype_t dtype) const {
730        bool rv = datatype.isEnabled(dtype);
731
732        // If the bucket has disabled xattr, then we must reflect that in the
733        // returned value
734        if (rv && mcbp::datatype::is_xattr(dtype) &&
735            !selectedBucketIsXattrEnabled()) {
736            rv = false;
737        }
738        return rv;
739    }
740
741    /**
742     * @return true if JSON datatype is enabled
743     */
744    bool isJsonEnabled() const {
745        return datatype.isJsonEnabled();
746    }
747
748    /**
749     * @return true if compression datatype is enabled
750     */
751    bool isSnappyEnabled() const {
752        return datatype.isSnappyEnabled();
753    }
754
755    /**
756     * @return true if the XATTR datatype is enabled
757     */
758    bool isXattrEnabled() const {
759        return datatype.isXattrEnabled();
760    }
761
762    bool isSupportsMutationExtras() const {
763        return supports_mutation_extras;
764    }
765
766    void setSupportsMutationExtras(bool supports_mutation_extras) {
767        Connection::supports_mutation_extras = supports_mutation_extras;
768    }
769
770    const ENGINE_ERROR_CODE& getAiostat() const {
771        return aiostat;
772    }
773
774    void setAiostat(const ENGINE_ERROR_CODE& aiostat) {
775        Connection::aiostat = aiostat;
776    }
777
778    bool isTracingEnabled() const {
779        return tracingEnabled;
780    }
781
782    void setTracingEnabled(bool enable) {
783        tracingEnabled = enable;
784    }
785
786    bool isEwouldblock() const {
787        return ewouldblock;
788    }
789
790    void setEwouldblock(bool ewouldblock) {
791        Connection::ewouldblock = ewouldblock;
792    }
793
794    /**
795     * Is SSL enabled for this connection or not?
796     *
797     * @return true if the connection is running over SSL, false otherwise
798     */
799    bool isSslEnabled() const {
800        return ssl.isEnabled();
801    }
802
803    /**
804     * Do we have any pending input data on this connection?
805     */
806    bool havePendingInputData() {
807        return (!read->empty() || ssl.havePendingInputData());
808    }
809
810    /**
811     * Try to find RBAC user from the client ssl cert
812     *
813     * @return true if username has been linked to RBAC or ssl cert was not
814     * presented by the client.
815     */
816    bool tryAuthFromSslCert(const std::string& userName);
817
818    bool shouldDelete();
819
820    void runEventLoop(short which);
821
822    /**
823     * Input buffer containing the data we've read of the socket. It is
824     * assigned to the connection when the connection is to be served, and
825     * returned to the thread context if the pipe is empty when we're done
826     * serving this connection.
827     */
828    std::unique_ptr<cb::Pipe> read;
829
830    /** Write buffer */
831    std::unique_ptr<cb::Pipe> write;
832
833    Cookie& getCookieObject() {
834        return *cookies.front();
835    }
836
837    /**
838     * Get the number of cookies currently bound to this connection
839     */
840    size_t getNumberOfCookies() const;
841
842    /**
843     * Check to see if the next packet to process is completely received
844     * and available in the input pipe.
845     *
846     * @return true if we've got the entire packet, false otherwise
847     */
848    bool isPacketAvailable() const {
849        auto buffer = read->rdata();
850
851        if (buffer.size() < sizeof(cb::mcbp::Request)) {
852            // we don't have the header, so we can't even look at the body
853            // length
854            return false;
855        }
856
857        const auto* req =
858                reinterpret_cast<const cb::mcbp::Request*>(buffer.data());
859        return buffer.size() >= sizeof(cb::mcbp::Request) + req->getBodylen();
860    }
861
862    /**
863     * Is SASL disabled for this connection or not? (connection authenticated
864     * with SSL certificates will disable the possibility re-authenticate over
865     * SASL)
866     */
867    bool isSaslAuthEnabled() const {
868        return saslAuthEnabled;
869    }
870
871    bool selectedBucketIsXattrEnabled() const;
872
873    /**
874     * Try to process some of the server events. This may _ONLY_ be performed
875     * after we've completely transferred the response for one command, and
876     * before we start executing the next one.
877     *
878     * @return true if processing server events set changed the path in the
879     *              state machine (and the current task should be
880     *              terminated immediately)
881     */
882    bool processServerEvents();
883
884    /**
885     * Set the name of the connected agent
886     */
887    void setAgentName(cb::const_char_buffer name);
888
889    const std::array<char, MaxSavedAgentName>& getAgentName() const {
890        return agentName;
891    }
892
893    /**
894     * Get the Identifier specified for this connection.
895     */
896    const std::array<char, MaxSavedConnectionId>& getConnectionId() {
897        return connectionId;
898    }
899
900    /**
901     * Set the identifier for this connection. By default the
902     * identifier is set to the peername, but the client
903     * may set it to whatever it likes (truncated at 33
904     * characters)
905     *
906     * @param uuid the uuid to use
907     */
908    void setConnectionId(cb::const_char_buffer uuid);
909
910    /// Notify that this connection is going to yield the CPU to allow
911    /// other connections to perform operations
912    void yield() {
913        yields++;
914        // Update the aggregated stat
915        get_thread_stats(this)->conn_yields++;
916    }
917
918protected:
919    /**
920     * Protected constructor so that it may only be used by MockSubclasses
921     */
922    Connection();
923
924    /**
925     * Resolve the name of the local socket and the peer for the connected
926     * socket.
927     * @param listening True if the local socket is a listening socket.
928     */
929    void resolveConnectionName();
930
931    /**
932     * Update the description string for the connection. This
933     * method should be called every time the authentication data
934     * (or the sockname/peername) changes
935     */
936    void updateDescription();
937
938    void runStateMachinery();
939
940    /**
941     * Initialize the event structure and add it to libevent
942     *
943     * @return true upon success, false otherwise
944     */
945    bool initializeEvent();
946
947    /**
948     * Ensures that there is room for another struct iovec in a connection's
949     * iov list.
950     *
951     * @throws std::bad_alloc
952     */
953    void ensureIovSpace();
954
955    /**
956     * Try to enable SSL for this connection
957     *
958     * @param cert the SSL certificate to use
959     * @param pkey the SSL private key to use
960     * @return true if successful, false otherwise
961     */
962    bool enableSSL(const std::string& cert, const std::string& pkey) {
963        if (ssl.enable(cert, pkey)) {
964            if (settings.getVerbose() > 1) {
965                ssl.dumpCipherList(getId());
966            }
967
968            return true;
969        }
970
971        return false;
972    }
973
974    /**
975     * Log error information from the SSL_accept/read/write call
976     *
977     * @param method the method which caused the error
978     * @rval the return value for that call
979     */
980    void logSslErrorInfo(const std::string& method, int rval);
981
982    /**
983     * Read data over the SSL connection
984     *
985     * @param dest where to store the data
986     * @param nbytes the size of the destination buffer
987     * @return the number of bytes read
988     */
989    int sslRead(char* dest, size_t nbytes);
990
991    /**
992     * Write data over the SSL stream
993     *
994     * @param src the source of the data
995     * @param nbytes the number of bytes to send
996     * @return the number of bytes written
997     */
998    int sslWrite(const char* src, size_t nbytes);
999
1000    /**
1001     * Handle the state for the ssl connection before the ssl connection
1002     * is fully established
1003     */
1004    int sslPreConnection();
1005
1006    /**
1007     * The actual socket descriptor used by this connection
1008     */
1009    SOCKET socketDescriptor;
1010
1011    // The number of times we've been backing off and yielding
1012    // to allow other threads to run
1013    Couchbase::RelaxedAtomic<uint64_t> yields;
1014
1015    /**
1016     * The event base this connection is bound to
1017     */
1018    event_base *base;
1019
1020    /**
1021     * The current privilege context
1022     */
1023    cb::rbac::PrivilegeContext privilegeContext;
1024
1025    /**
1026     * The SASL object used to do sasl authentication
1027     */
1028    unique_cbsasl_conn_t sasl_conn;
1029
1030    /** Is this a system internal connection */
1031    bool internal{false};
1032
1033    /** Is the connection authenticated or not */
1034    bool authenticated{false};
1035
1036    /** The username authenticated as */
1037    std::string username{"unknown"};
1038
1039    /** The domain where the user is defined */
1040    cb::sasl::Domain domain{cb::sasl::Domain::Local};
1041
1042    /** The description of the connection */
1043    std::string description;
1044
1045    /** Is tcp nodelay enabled or not? */
1046    bool nodelay{false};
1047
1048    /** number of references to the object */
1049    uint8_t refcount{0};
1050
1051    /**
1052     * Pointer to engine-specific data which the engine has requested the server
1053     * to persist for the life of the connection.
1054     * See SERVER_COOKIE_API::{get,store}_engine_specific()
1055     */
1056    void* engine_storage{nullptr};
1057
1058    /** Pointer to the thread object serving this connection */
1059    std::atomic<LIBEVENT_THREAD*> thread{nullptr};
1060
1061    /** Listening port that creates this connection instance */
1062    const in_port_t parent_port{0};
1063
1064    /**
1065     * The index of the connected bucket
1066     */
1067    std::atomic_int bucketIndex{0};
1068
1069    /**
1070     * The engine interface for the connected bucket
1071     */
1072    ENGINE_HANDLE_V1* bucketEngine{nullptr};
1073
1074    /** Name of the peer if known */
1075    std::string peername{"unknown"};
1076
1077    /** Name of the local socket if known */
1078    std::string sockname{"unknown"};
1079
1080    /** The connections priority */
1081    Priority priority{Priority::Medium};
1082
1083    /** The cluster map revision used by this client */
1084    int clustermap_revno{-2};
1085
1086    /**
1087     * Is XERROR supported for this connection or not (or should we just
1088     * silently disconnect the client)
1089     */
1090    bool xerror_support{false};
1091
1092    /**
1093     * Is COLLECTIONS supported for this connection or not. Collection aware
1094     * clients are allowed to encode operations to occur against their defined
1095     * collections or the legacy default collection (and receive new errors).
1096     * Collection aware clients also see mutations/deletions for all collection
1097     * if they are subscribed to DCP.
1098     * Collections unaware clients can only target operations at the legacy
1099     * default collection and receive no new errors. They also only ever see
1100     * default collection mutations/deletions etc... when subscribed to DCP.
1101     */
1102    bool collections_support{false};
1103
1104    /**
1105     * Is duplex mode supported by this client? (do the server allow sending
1106     * commands)
1107     */
1108    bool duplex_support{false};
1109
1110    std::atomic_bool cccp{false};
1111
1112    bool allow_unordered_execution{false};
1113
1114    std::queue<std::unique_ptr<ServerEvent>> server_events;
1115
1116    /**
1117     * The total time this connection been on the CPU
1118     */
1119    std::chrono::nanoseconds total_cpu_time = std::chrono::nanoseconds::zero();
1120    /**
1121     * The shortest time this connection was occupying the thread
1122     */
1123    std::chrono::nanoseconds min_sched_time = std::chrono::nanoseconds::max();
1124    /**
1125     * The longest time this connection was occupying the thread
1126     */
1127    std::chrono::nanoseconds max_sched_time = std::chrono::nanoseconds::zero();
1128
1129    /**
1130     * The name of the client provided to us by hello
1131     */
1132    std::array<char, MaxSavedAgentName> agentName{};
1133
1134    /**
1135     * The connection id as specified by the client.
1136     *
1137     * The connection UUID is defined to be a string of 33 characters
1138     * (two 8 byte integers separated with a /). To ease the printout
1139     * of the string we allocate room for the termination character.
1140     */
1141    std::array<char, MaxSavedConnectionId> connectionId{};
1142
1143    /**
1144     * The state machine we're currently using
1145     */
1146    McbpStateMachine stateMachine;
1147
1148    /** Is this connection used by a DCP connection? */
1149    bool dcp = false;
1150
1151    /** Is this DCP channel XAttrAware */
1152    bool dcpXattrAware = false;
1153
1154    /** Shuld values be stripped off? */
1155    bool dcpNoValue = false;
1156
1157    /** Is this DCP channel collection aware? */
1158    bool dcpCollectionAware = false;
1159
1160    /** Is Tracing enabled for this connection? */
1161    bool tracingEnabled = false;
1162
1163    /** Should DCP replicate the time a delete was created? */
1164    bool dcpDeleteTimeEnabled = false;
1165
1166    /** The maximum requests we can process in a worker thread timeslice */
1167    int max_reqs_per_event =
1168            settings.getRequestsPerEventNotification(EventPriority::Default);
1169
1170    /**
1171     * number of events this connection can process in a single worker
1172     * thread timeslice
1173     */
1174    int numEvents = 0;
1175
1176    // Members related to libevent
1177
1178    /** Is the connection currently registered in libevent? */
1179    bool registered_in_libevent = false;
1180    /** The libevent object */
1181    struct event event = {};
1182    /** The current flags we've registered in libevent */
1183    short ev_flags = 0;
1184    /** which events were just triggered */
1185    short currentEvent = 0;
1186
1187    /** which state to go into after finishing current write */
1188    McbpStateMachine::State write_and_go = McbpStateMachine::State::new_cmd;
1189
1190    /* data for the mwrite state */
1191    std::vector<iovec> iov;
1192    /** number of elements used in iov[] */
1193    size_t iovused = 0;
1194
1195    /** The message list being used for transfer */
1196    std::vector<struct msghdr> msglist;
1197    /** element in msglist[] being transmitted now */
1198    size_t msgcurr = 0;
1199    /** number of bytes in current msg */
1200    size_t msgbytes = 0;
1201
1202    /**
1203     * List of items we've reserved during the command (should call
1204     * item_release when transmit is complete)
1205     */
1206    std::vector<void*> reservedItems;
1207
1208    /**
1209     * A vector of temporary allocations that should be freed when the
1210     * the connection is done sending all of the data. Use pushTempAlloc to
1211     * push a pointer to this list (must be allocated with malloc/calloc/strdup
1212     * etc.. will be freed by calling "free")
1213     */
1214    std::vector<char*> temp_alloc;
1215
1216    /**
1217     * If the client enabled the mutation seqno feature each mutation
1218     * command will return the vbucket UUID and sequence number for the
1219     * mutation.
1220     */
1221    bool supports_mutation_extras = false;
1222
1223    /**
1224     * The status for the async io operation
1225     */
1226    ENGINE_ERROR_CODE aiostat = ENGINE_SUCCESS;
1227
1228    /**
1229     * Is this connection currently in an "ewouldblock" state?
1230     */
1231    bool ewouldblock = false;
1232
1233    /**
1234     * The SSL context used by this connection (if enabled)
1235     */
1236    SslContext ssl;
1237
1238    // Total number of bytes received on the network
1239    size_t totalRecv = 0;
1240    // Total number of bytes sent to the network
1241    size_t totalSend = 0;
1242
1243    /**
1244     * The list of commands currently being processed. Currently we
1245     * only use a single entry in this vector (and always reuse that
1246     * object for all commands), but when the client tries to
1247     * enable unordered execution we may operate with multiple
1248     * commands at the same time and they're all stored in this
1249     * vector)
1250     */
1251    std::vector<std::unique_ptr<Cookie>> cookies;
1252
1253    Datatype datatype;
1254
1255    /**
1256     * It is possible to disable the SASL authentication for some
1257     * connections after they've been established.
1258     */
1259    bool saslAuthEnabled = true;
1260};
1261
1262/**
1263 * Convert a priority to a textual representation
1264 */
1265const char* to_string(const Connection::Priority& priority);
1266