16bb87f99STrond Norbye/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
26bb87f99STrond Norbye/*
36bb87f99STrond Norbye *     Copyright 2015 Couchbase, Inc.
46bb87f99STrond Norbye *
56bb87f99STrond Norbye *   Licensed under the Apache License, Version 2.0 (the "License");
66bb87f99STrond Norbye *   you may not use this file except in compliance with the License.
76bb87f99STrond Norbye *   You may obtain a copy of the License at
86bb87f99STrond Norbye *
96bb87f99STrond Norbye *       http://www.apache.org/licenses/LICENSE-2.0
106bb87f99STrond Norbye *
116bb87f99STrond Norbye *   Unless required by applicable law or agreed to in writing, software
126bb87f99STrond Norbye *   distributed under the License is distributed on an "AS IS" BASIS,
136bb87f99STrond Norbye *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
146bb87f99STrond Norbye *   See the License for the specific language governing permissions and
156bb87f99STrond Norbye *   limitations under the License.
166bb87f99STrond Norbye */
176bb87f99STrond Norbye#pragma once
186bb87f99STrond Norbye
1964828eb2SDave Rigby#include "config.h"
2064828eb2SDave Rigby
21bf74734dSTrond Norbye#include "datatype.h"
22bf74734dSTrond Norbye#include "dynamic_buffer.h"
23bf74734dSTrond Norbye#include "log_macros.h"
24c630a8c2SJim Walker#include "settings.h"
25bf74734dSTrond Norbye#include "ssl_context.h"
26bf74734dSTrond Norbye#include "statemachine_mcbp.h"
27bf74734dSTrond Norbye#include "task.h"
28034291b0STrond Norbye#include "stats.h"
29c630a8c2SJim Walker
302d5fe4daSTrond Norbye#include <cJSON.h>
312d5fe4daSTrond Norbye#include <cbsasl/cbsasl.h>
32bf74734dSTrond Norbye#include <daemon/protocol/mcbp/command_context.h>
33bf74734dSTrond Norbye#include <memcached/openssl.h>
34c5efff7bSTrond Norbye#include <memcached/rbac.h>
35bf74734dSTrond Norbye#include <platform/sized_buffer.h>
36bf74734dSTrond Norbye
37bf74734dSTrond Norbye#include <array>
38bf3926e0STrond Norbye#include <chrono>
39bf74734dSTrond Norbye#include <memory>
40bf3926e0STrond Norbye#include <queue>
4164828eb2SDave Rigby#include <string>
42bf74734dSTrond Norbye#include <vector>
4364828eb2SDave Rigby
442d5fe4daSTrond Norbyestruct LIBEVENT_THREAD;
459738c807STrond Norbyeclass Cookie;
46c133abeeSTrond Norbyeclass ListeningPort;
4737950946STrond Norbyeclass Bucket;
48bf3926e0STrond Norbyeclass ServerEvent;
496bb87f99STrond Norbye
50bf74734dSTrond Norbye/**
51bf74734dSTrond Norbye * Adjust a message header structure by "consuming" nbytes of data.
52bf74734dSTrond Norbye *
53bf74734dSTrond Norbye * The msghdr structure contains an io-vector of data to send, and
54bf74734dSTrond Norbye * by consuming data, we "rebuild" the io-vector by moving the
55bf74734dSTrond Norbye * base pointer to the io-vector past all of the fully transferred
56bf74734dSTrond Norbye * elements, and move the last iov_base pointer the resulting bytes
57bf74734dSTrond Norbye * forward (and reduce the last iov_len the same number of bytes)
58bf74734dSTrond Norbye *
59bf74734dSTrond Norbye * @param pipe The pipe structure where we may have stored data pointed
60bf74734dSTrond Norbye *             to in the io-vector. We need to mark those as consumed
61bf74734dSTrond Norbye *             when we skip them in the io-vector.
62bf74734dSTrond Norbye * @param m The message header structure to update
63bf74734dSTrond Norbye * @param nbytes The number of bytes to skip
64bf74734dSTrond Norbye * @return The number of bytes left in the first element in the io-vector
65bf74734dSTrond Norbye */
66bf74734dSTrond Norbyesize_t adjust_msghdr(cb::Pipe& pipe, struct msghdr* m, ssize_t nbytes);
67bf74734dSTrond Norbye
68bf74734dSTrond Norbye/**
69bf74734dSTrond Norbye * The maximum number of character the core preserves for the
70bf74734dSTrond Norbye * agent name for each connection
71bf74734dSTrond Norbye */
72bf74734dSTrond Norbyeconst size_t MaxSavedAgentName = 33;
73bf74734dSTrond Norbye
74bf74734dSTrond Norbye/**
75bf74734dSTrond Norbye * The maximum number of character the core preserves for the
76bf74734dSTrond Norbye * connection identifier for each connection
77bf74734dSTrond Norbye */
78bf74734dSTrond Norbyeconst size_t MaxSavedConnectionId = 34;
79bf74734dSTrond Norbye
806bb87f99STrond Norbye/**
816bb87f99STrond Norbye * The structure representing a connection in memcached.
826bb87f99STrond Norbye */
836bb87f99STrond Norbyeclass Connection {
846bb87f99STrond Norbyepublic:
85668794c4STrond Norbye    enum class Priority : uint8_t {
86668794c4STrond Norbye        High,
87668794c4STrond Norbye        Medium,
88668794c4STrond Norbye        Low
89668794c4STrond Norbye    };
906bb87f99STrond Norbye
916bb87f99STrond Norbye    Connection(const Connection&) = delete;
926bb87f99STrond Norbye
93bf74734dSTrond Norbye    Connection(SOCKET sfd, event_base* b, const ListeningPort& ifc);
94bf74734dSTrond Norbye
95bf74734dSTrond Norbye    ~Connection();
96bf74734dSTrond Norbye
978629facbSTrond Norbye    /**
988629facbSTrond Norbye     * Return an identifier for this connection. To be backwards compatible
998629facbSTrond Norbye     * this is the socket filedescriptor (or the socket handle casted to an
1008629facbSTrond Norbye     * unsigned integer on windows).
1018629facbSTrond Norbye     */
1028629facbSTrond Norbye    uint32_t getId() const {
1038629facbSTrond Norbye        return uint32_t(socketDescriptor);
1048629facbSTrond Norbye    }
1058629facbSTrond Norbye
1068629facbSTrond Norbye    /**
1078629facbSTrond Norbye     *  Get the socket descriptor used by this connection.
1088629facbSTrond Norbye     */
1098629facbSTrond Norbye    SOCKET getSocketDescriptor() const {
1108629facbSTrond Norbye        return socketDescriptor;
1118629facbSTrond Norbye    }
1128629facbSTrond Norbye
1138629facbSTrond Norbye    /**
1148629facbSTrond Norbye     * Set the socket descriptor used by this connection
1158629facbSTrond Norbye     */
1168629facbSTrond Norbye    void setSocketDescriptor(SOCKET sfd) {
1178629facbSTrond Norbye        Connection::socketDescriptor = sfd;
1188629facbSTrond Norbye    }
1198629facbSTrond Norbye
1208629facbSTrond Norbye    bool isSocketClosed() const {
1218629facbSTrond Norbye        return socketDescriptor == INVALID_SOCKET;
1228629facbSTrond Norbye    }
1238629facbSTrond Norbye
1246bb87f99STrond Norbye    const std::string& getPeername() const {
1256bb87f99STrond Norbye        return peername;
1266bb87f99STrond Norbye    }
1276bb87f99STrond Norbye
1286bb87f99STrond Norbye    const std::string& getSockname() const {
1296bb87f99STrond Norbye        return sockname;
1306bb87f99STrond Norbye    }
1316bb87f99STrond Norbye
132b73a9caaSDave Rigby    /**
133b73a9caaSDave Rigby     * Returns a descriptive name for the connection, of the form:
134b73a9caaSDave Rigby     *   "[peer_name - local_name ]"
135ca701e11STrond Norbye     * (system) is appended to the string for system connections.
136b73a9caaSDave Rigby     */
137a969c7dcSTrond Norbye    const std::string& getDescription() const {
138a969c7dcSTrond Norbye        return description;
139a969c7dcSTrond Norbye    }
140b73a9caaSDave Rigby
1416bb87f99STrond Norbye    /**
142668794c4STrond Norbye     * Signal a connection if it's idle
1436bb87f99STrond Norbye     *
144668794c4STrond Norbye     * @param logbusy set to true if you want to log the connection details
145668794c4STrond Norbye     *                if the connection isn't idle
146668794c4STrond Norbye     * @param workerthead the id of the workerthread (for logging purposes)
1476bb87f99STrond Norbye     */
148bf74734dSTrond Norbye    void signalIfIdle(bool logbusy, size_t workerthread);
1496bb87f99STrond Norbye
1506bb87f99STrond Norbye    /**
1516bb87f99STrond Norbye     * Terminate the eventloop for the current event base. This method doesn't
1526bb87f99STrond Norbye     * really fit as a member for the class, but I don't want clients to access
1536bb87f99STrond Norbye     * the libevent details from outside the class (so I didn't want to make
1546bb87f99STrond Norbye     * a "getEventBase()" method.
1556bb87f99STrond Norbye     */
1566bb87f99STrond Norbye    void eventBaseLoopbreak() {
157668794c4STrond Norbye        event_base_loopbreak(base);
1586bb87f99STrond Norbye    }
1596bb87f99STrond Norbye
160ca701e11STrond Norbye    /**
161ca701e11STrond Norbye     * Is the connection representing a system internal user
162ca701e11STrond Norbye     */
163ca701e11STrond Norbye    bool isInternal() const {
164ca701e11STrond Norbye        return internal;
1656bb87f99STrond Norbye    }
1666bb87f99STrond Norbye
167ca701e11STrond Norbye    /**
168ca701e11STrond Norbye     * Specify if this connection is representing an internal user.
169ca701e11STrond Norbye     * An internal user is a user which is used by one of the components
170ca701e11STrond Norbye     * in Couchbase (like ns_server, indexer etc).
171ca701e11STrond Norbye     */
172ca701e11STrond Norbye    void setInternal(bool internal) {
173ca701e11STrond Norbye        Connection::internal = internal;
1746bb87f99STrond Norbye    }
1756bb87f99STrond Norbye
176b177f95dSTrond Norbye    /**
177b177f95dSTrond Norbye     * Update the username to reflect what the user used from the SASL
178b177f95dSTrond Norbye     * authentication.
179b177f95dSTrond Norbye     */
180b177f95dSTrond Norbye    void resetUsernameCache();
181b177f95dSTrond Norbye
182b177f95dSTrond Norbye
183629eb1f8STrond Norbye    bool isAuthenticated() const {
184629eb1f8STrond Norbye        return authenticated;
185629eb1f8STrond Norbye    }
186629eb1f8STrond Norbye
187629eb1f8STrond Norbye    void setAuthenticated(bool authenticated) {
188629eb1f8STrond Norbye        Connection::authenticated = authenticated;
18988898db5STrond Norbye        if (authenticated) {
190f823a51fSManik Taneja            updateDescription();
19188898db5STrond Norbye            privilegeContext = cb::rbac::createContext(username, "");
19288898db5STrond Norbye        } else {
193f823a51fSManik Taneja            resetUsernameCache();
19488898db5STrond Norbye            privilegeContext = cb::rbac::PrivilegeContext{};
19588898db5STrond Norbye        }
196629eb1f8STrond Norbye    }
197629eb1f8STrond Norbye
198bf74734dSTrond Norbye    void setPriority(const Priority& priority);
1996bb87f99STrond Norbye
2006bb87f99STrond Norbye    /**
2016bb87f99STrond Norbye     * Create a cJSON representation of the members of the connection
2026bb87f99STrond Norbye     */
203bf74734dSTrond Norbye    unique_cJSON_ptr toJSON() const;
2046bb87f99STrond Norbye
2056bb87f99STrond Norbye    /**
2066bb87f99STrond Norbye     * Enable or disable TCP NoDelay on the underlying socket
2076bb87f99STrond Norbye     *
2086bb87f99STrond Norbye     * @return true on success, false otherwise
2096bb87f99STrond Norbye     */
2106bb87f99STrond Norbye    bool setTcpNoDelay(bool enable);
2116bb87f99STrond Norbye
21267d9d01fSTrond Norbye    /**
21367d9d01fSTrond Norbye     * Get the username this connection is authenticated as
2147c2e90d4STrond Norbye     *
2157c2e90d4STrond Norbye     * NOTE: the return value should not be returned by the client
21667d9d01fSTrond Norbye     */
21767d9d01fSTrond Norbye    const char* getUsername() const {
2187c2e90d4STrond Norbye        return username.c_str();
21967d9d01fSTrond Norbye    }
2206bb87f99STrond Norbye
221d887bf62STrond Norbye    /**
222d887bf62STrond Norbye     * Get the domain where the user is defined (builtin or saslauthd)
223d887bf62STrond Norbye     */
224d887bf62STrond Norbye    cb::sasl::Domain getDomain() const {
225d887bf62STrond Norbye        return domain;
226d887bf62STrond Norbye    }
227d887bf62STrond Norbye
2286bb87f99STrond Norbye    cbsasl_conn_t* getSaslConn() const {
229dfaadc06STrond Norbye        return sasl_conn.get();
2306bb87f99STrond Norbye    }
2316bb87f99STrond Norbye
2327e2cfc10STrond Norbye    /**
2337e2cfc10STrond Norbye     * Get the current reference count
2347e2cfc10STrond Norbye     */
2356bb87f99STrond Norbye    uint8_t getRefcount() const {
2366bb87f99STrond Norbye        return refcount;
2376bb87f99STrond Norbye    }
2386bb87f99STrond Norbye
2397e2cfc10STrond Norbye    void incrementRefcount() {
2407e2cfc10STrond Norbye        ++refcount;
2417e2cfc10STrond Norbye    }
2427e2cfc10STrond Norbye
2437e2cfc10STrond Norbye    void decrementRefcount() {
2447e2cfc10STrond Norbye        --refcount;
2456bb87f99STrond Norbye    }
2466bb87f99STrond Norbye
2476bb87f99STrond Norbye    LIBEVENT_THREAD* getThread() const {
248f23df968STrond Norbye        return thread.load(std::memory_order_relaxed);
2496bb87f99STrond Norbye    }
2506bb87f99STrond Norbye
2516bb87f99STrond Norbye    void setThread(LIBEVENT_THREAD* thread) {
252668794c4STrond Norbye        Connection::thread.store(thread,
253668794c4STrond Norbye                                 std::memory_order::memory_order_relaxed);
2546bb87f99STrond Norbye    }
2556bb87f99STrond Norbye
2566bb87f99STrond Norbye    in_port_t getParentPort() const {
2576bb87f99STrond Norbye        return parent_port;
2586bb87f99STrond Norbye    }
2596bb87f99STrond Norbye
260ada135cfSTrond Norbye    /**
2613b2a836dSTrond Norbye     * Check if this connection is in posession of the requested privilege
262ada135cfSTrond Norbye     *
2633b2a836dSTrond Norbye     * @param privilege the privilege to check for
2643b2a836dSTrond Norbye     * @return Ok - the connection holds the privilege
2653b2a836dSTrond Norbye     *         Fail - the connection is missing the privilege
2663b2a836dSTrond Norbye     *         Stale - the authentication context is stale
267ada135cfSTrond Norbye     */
26880f0b6a0STrond Norbye    cb::rbac::PrivilegeAccess checkPrivilege(cb::rbac::Privilege privilege,
26980f0b6a0STrond Norbye                                             Cookie& cookie);
27043cc30b0STrond Norbye
27174c314eaSTrond Norbye    /**
27274c314eaSTrond Norbye     * Try to drop the specified privilege from the current context
27374c314eaSTrond Norbye     *
27474c314eaSTrond Norbye     * @param privilege the privilege to drop
27574c314eaSTrond Norbye     * @return The appropriate error code to return back to the client
27674c314eaSTrond Norbye     */
27774c314eaSTrond Norbye    cb::engine_errc dropPrivilege(cb::rbac::Privilege privilege);
27874c314eaSTrond Norbye
2798bcac5cdSTrond Norbye    int getBucketIndex() const {
280805557ffSTrond Norbye        return bucketIndex.load(std::memory_order_relaxed);
2818bcac5cdSTrond Norbye    }
2828bcac5cdSTrond Norbye
28388898db5STrond Norbye    void setBucketIndex(int bucketIndex);
284aa513568STrond Norbye
28537950946STrond Norbye    Bucket& getBucket() const;
28637950946STrond Norbye
2878bcac5cdSTrond Norbye    ENGINE_HANDLE_V1* getBucketEngine() const {
2888bcac5cdSTrond Norbye        return bucketEngine;
2898bcac5cdSTrond Norbye    };
2908bcac5cdSTrond Norbye
2917088b018STrond Norbye    ENGINE_HANDLE* getBucketEngineAsV0() const {
2927088b018STrond Norbye        return reinterpret_cast<ENGINE_HANDLE*>(bucketEngine);
2937088b018STrond Norbye    }
2947088b018STrond Norbye
2958bcac5cdSTrond Norbye    void setBucketEngine(ENGINE_HANDLE_V1* bucketEngine) {
2968bcac5cdSTrond Norbye        Connection::bucketEngine = bucketEngine;
2978bcac5cdSTrond Norbye    };
298aa513568STrond Norbye
299668794c4STrond Norbye    void* getEngineStorage() const {
300668794c4STrond Norbye        return engine_storage;
3014ecec1b3SJim Walker    }
302b768bae8STrond Norbye
303668794c4STrond Norbye    void setEngineStorage(void* engine_storage) {
304668794c4STrond Norbye        Connection::engine_storage = engine_storage;
305a21009eeSTrond Norbye    }
306a21009eeSTrond Norbye
30787a8b9a1STrond Norbye    int getClustermapRevno() const {
30887a8b9a1STrond Norbye        return clustermap_revno;
30987a8b9a1STrond Norbye    }
31087a8b9a1STrond Norbye
31187a8b9a1STrond Norbye    void setClustermapRevno(int clustermap_revno) {
31287a8b9a1STrond Norbye        Connection::clustermap_revno = clustermap_revno;
31387a8b9a1STrond Norbye    }
31487a8b9a1STrond Norbye
31507a55912STrond Norbye    /**
31607a55912STrond Norbye     * Restart the authentication (this clears all of the authentication
31707a55912STrond Norbye     * data...)
31807a55912STrond Norbye     */
31907a55912STrond Norbye    void restartAuthentication();
32007a55912STrond Norbye
321cede2f7fSTrond Norbye    bool isXerrorSupport() const {
322cede2f7fSTrond Norbye        return xerror_support;
323cede2f7fSTrond Norbye    }
324cede2f7fSTrond Norbye
325cede2f7fSTrond Norbye    void setXerrorSupport(bool xerror_support) {
326cede2f7fSTrond Norbye        Connection::xerror_support = xerror_support;
327cede2f7fSTrond Norbye    }
328cede2f7fSTrond Norbye
3290fba613eSJim Walker    bool isCollectionsSupported() const {
3300fba613eSJim Walker        return collections_support;
3310fba613eSJim Walker    }
3320fba613eSJim Walker
3330fba613eSJim Walker    void setCollectionsSupported(bool collections_support) {
3340fba613eSJim Walker        Connection::collections_support = collections_support;
3350fba613eSJim Walker    }
3360fba613eSJim Walker
3370fba613eSJim Walker    DocNamespace getDocNamespace() const {
3380fba613eSJim Walker        if (isCollectionsSupported()) {
3390fba613eSJim Walker            return DocNamespace::Collections;
3400fba613eSJim Walker        } else {
3410fba613eSJim Walker            return DocNamespace::DefaultCollection;
3420fba613eSJim Walker        }
3430fba613eSJim Walker    }
3440fba613eSJim Walker
3456014cb78STrond Norbye    bool isDuplexSupported() const {
3466014cb78STrond Norbye        return duplex_support;
3476014cb78STrond Norbye    }
3486014cb78STrond Norbye
3496014cb78STrond Norbye    void setDuplexSupported(bool duplex_support) {
3506014cb78STrond Norbye        Connection::duplex_support = duplex_support;
3516014cb78STrond Norbye    }
3526014cb78STrond Norbye
353c6e3c4afSTrond Norbye    bool isClustermapChangeNotificationSupported() const {
354c6e3c4afSTrond Norbye        return cccp.load(std::memory_order_acquire);
355c6e3c4afSTrond Norbye    }
356c6e3c4afSTrond Norbye
357c6e3c4afSTrond Norbye    void setClustermapChangeNotificationSupported(bool cccp) {
358c6e3c4afSTrond Norbye        Connection::cccp.store(cccp, std::memory_order_release);
359c6e3c4afSTrond Norbye    }
360c6e3c4afSTrond Norbye
361e53770eeSTrond Norbye    bool allowUnorderedExecution() const {
362e53770eeSTrond Norbye        return allow_unordered_execution;
363e53770eeSTrond Norbye    }
364e53770eeSTrond Norbye
365e53770eeSTrond Norbye    void setAllowUnorderedExecution(bool allow_unordered_execution) {
366e53770eeSTrond Norbye        Connection::allow_unordered_execution = allow_unordered_execution;
367e53770eeSTrond Norbye    }
368e53770eeSTrond Norbye
369cede2f7fSTrond Norbye    /**
370cede2f7fSTrond Norbye     * Remap the current error code
371cede2f7fSTrond Norbye     *
372cede2f7fSTrond Norbye     * The method modifies the input code and returns the mapped value
373cede2f7fSTrond Norbye     * (to make the function a bit easier to use).
374cede2f7fSTrond Norbye     *
375cede2f7fSTrond Norbye     * Depending on which features the client have enabled the method
376cede2f7fSTrond Norbye     * may either just return the input value, map it to a different value
377cede2f7fSTrond Norbye     * (like ENGINE_DISCONNECT if the client hasn't enabled the extened
378cede2f7fSTrond Norbye     * error codes).
379cede2f7fSTrond Norbye     *
380cede2f7fSTrond Norbye     * @param code The code to map (will be changed on return)
381cede2f7fSTrond Norbye     * @return the mapped value.
382cede2f7fSTrond Norbye     */
383cede2f7fSTrond Norbye    ENGINE_ERROR_CODE remapErrorCode(ENGINE_ERROR_CODE code) const;
384cede2f7fSTrond Norbye
385bc0aba29STrond Norbye    /**
386bc0aba29STrond Norbye     * Add the specified number of ns to the amount of CPU time this
387bc0aba29STrond Norbye     * connection have used on the CPU (We could alternatively have
388bc0aba29STrond Norbye     * separate "ON_CPU" and "OFF_CPU" events and record all of this
389bc0aba29STrond Norbye     * within the connection object instead, but it seemed easier to
390bc0aba29STrond Norbye     * just wrap it from the method driving the event loop (as we
391bc0aba29STrond Norbye     * also want to record the delta to the thread scheduler histogram
392bc0aba29STrond Norbye     *
393bc0aba29STrond Norbye     * @param ns The number of nanoseconds spent in this iteration.
394bc0aba29STrond Norbye     */
395bc0aba29STrond Norbye    void addCpuTime(std::chrono::nanoseconds ns);
396bc0aba29STrond Norbye
397bf3926e0STrond Norbye    /**
398bf3926e0STrond Norbye     * Enqueue a new server event
399bf3926e0STrond Norbye     *
400bf3926e0STrond Norbye     * @param event
401bf3926e0STrond Norbye     */
402bf3926e0STrond Norbye    void enqueueServerEvent(std::unique_ptr<ServerEvent> event);
403bf3926e0STrond Norbye
404bf74734dSTrond Norbye    /**
405bf74734dSTrond Norbye     * Close the connection. If there is any references to the connection
406bf74734dSTrond Norbye     * or the cookies we'll enter the "pending close" state to wait for
407bf74734dSTrond Norbye     * these operations to complete before changing state to immediate
408bf74734dSTrond Norbye     * close.
409bf74734dSTrond Norbye     */
410bf74734dSTrond Norbye    void close();
411bf74734dSTrond Norbye
412bf74734dSTrond Norbye    /**
413bf74734dSTrond Norbye     * fire ON_DISCONNECT for all of the cookie objects (in case the
414bf74734dSTrond Norbye     * underlying engine keeps track of any of them)
415bf74734dSTrond Norbye     */
416bf74734dSTrond Norbye    void propagateDisconnect() const;
417bf74734dSTrond Norbye
418bf74734dSTrond Norbye    void setState(McbpStateMachine::State next_state);
419bf74734dSTrond Norbye
420bf74734dSTrond Norbye    McbpStateMachine::State getState() const {
421bf74734dSTrond Norbye        return stateMachine.getCurrentState();
422bf74734dSTrond Norbye    }
423bf74734dSTrond Norbye
424bf74734dSTrond Norbye    const char* getStateName() const {
425bf74734dSTrond Norbye        return stateMachine.getCurrentStateName();
426bf74734dSTrond Norbye    }
427bf74734dSTrond Norbye
428bf74734dSTrond Norbye    bool isDCP() const {
429bf74734dSTrond Norbye        return dcp;
430bf74734dSTrond Norbye    }
431bf74734dSTrond Norbye
432bf74734dSTrond Norbye    void setDCP(bool dcp) {
433bf74734dSTrond Norbye        Connection::dcp = dcp;
434bf74734dSTrond Norbye    }
435bf74734dSTrond Norbye
436bf74734dSTrond Norbye    bool isDcpXattrAware() const {
437bf74734dSTrond Norbye        return dcpXattrAware;
438bf74734dSTrond Norbye    }
439bf74734dSTrond Norbye
440bf74734dSTrond Norbye    void setDcpXattrAware(bool dcpXattrAware) {
441bf74734dSTrond Norbye        Connection::dcpXattrAware = dcpXattrAware;
442bf74734dSTrond Norbye    }
443bf74734dSTrond Norbye
444bf74734dSTrond Norbye    bool isDcpCollectionAware() const {
445bf74734dSTrond Norbye        return dcpCollectionAware;
446bf74734dSTrond Norbye    }
447bf74734dSTrond Norbye
448bf74734dSTrond Norbye    void setDcpCollectionAware(bool dcpCollectionAware) {
449bf74734dSTrond Norbye        Connection::dcpCollectionAware = dcpCollectionAware;
450bf74734dSTrond Norbye    }
451bf74734dSTrond Norbye
452bf74734dSTrond Norbye    void setDcpDeleteTimeEnabled(bool dcpDeleteTimeEnabled) {
453bf74734dSTrond Norbye        Connection::dcpDeleteTimeEnabled = dcpDeleteTimeEnabled;
454bf74734dSTrond Norbye    }
455bf74734dSTrond Norbye
456bf74734dSTrond Norbye    bool isDcpDeleteTimeEnabled() const {
457bf74734dSTrond Norbye        return dcpDeleteTimeEnabled;
458bf74734dSTrond Norbye    }
459bf74734dSTrond Norbye
460bf74734dSTrond Norbye    /// returns true if either collections or delete_time is enabled
461bf74734dSTrond Norbye    bool isDcpDeleteV2() const {
462bf74734dSTrond Norbye        return isDcpCollectionAware() || isDcpDeleteTimeEnabled();
463bf74734dSTrond Norbye    }
464bf74734dSTrond Norbye
465bf74734dSTrond Norbye    /**
466bf74734dSTrond Norbye     * Get the DocNamespace for a DcpMessage (mutation/deletion/expiration)
467bf74734dSTrond Norbye     * If the connection is dcp aware and the passed length is not zero, then
468bf74734dSTrond Norbye     * the document belongs to a collection.
469bf74734dSTrond Norbye     * @param collectionLength the length sent by the producer
470bf74734dSTrond Norbye     * @return the DocNamespace (DefaultCollection or Collections)
471bf74734dSTrond Norbye     */
472bf74734dSTrond Norbye    DocNamespace getDocNamespaceForDcpMessage(uint8_t collectionLength) const {
473bf74734dSTrond Norbye        DocNamespace ns = DocNamespace::DefaultCollection;
474bf74734dSTrond Norbye        if (isDcpCollectionAware() && collectionLength != 0) {
475bf74734dSTrond Norbye            // Collection aware DCP sends non-zero collectionLength for
476bf74734dSTrond Norbye            // documents that belong to a collection.
477bf74734dSTrond Norbye            ns = DocNamespace::Collections;
478bf74734dSTrond Norbye        }
479bf74734dSTrond Norbye        return ns;
480bf74734dSTrond Norbye    }
481bf74734dSTrond Norbye
482bf74734dSTrond Norbye    bool isDcpNoValue() const {
483bf74734dSTrond Norbye        return dcpNoValue;
484bf74734dSTrond Norbye    }
485bf74734dSTrond Norbye
486bf74734dSTrond Norbye    void setDcpNoValue(bool dcpNoValue) {
487bf74734dSTrond Norbye        Connection::dcpNoValue = dcpNoValue;
488bf74734dSTrond Norbye    }
489bf74734dSTrond Norbye
490bf74734dSTrond Norbye    /**
491bf74734dSTrond Norbye     * Decrement the number of events to process and return the new value
492bf74734dSTrond Norbye     */
493bf74734dSTrond Norbye    int decrementNumEvents() {
494bf74734dSTrond Norbye        return --numEvents;
495bf74734dSTrond Norbye    }
496bf74734dSTrond Norbye
497bf74734dSTrond Norbye    /**
498bf74734dSTrond Norbye     * Set the number of events to process per timeslice of the worker
499bf74734dSTrond Norbye     * thread before yielding.
500bf74734dSTrond Norbye     */
501bf74734dSTrond Norbye    void setNumEvents(int nevents) {
502bf74734dSTrond Norbye        Connection::numEvents = nevents;
503bf74734dSTrond Norbye    }
504bf74734dSTrond Norbye
505bf74734dSTrond Norbye    /**
506bf74734dSTrond Norbye     * Get the maximum number of events we should process per invocation
507bf74734dSTrond Norbye     * for a connection object (to avoid starvation of other connections)
508bf74734dSTrond Norbye     */
509bf74734dSTrond Norbye    int getMaxReqsPerEvent() const {
510bf74734dSTrond Norbye        return max_reqs_per_event;
511bf74734dSTrond Norbye    }
512bf74734dSTrond Norbye
513bf74734dSTrond Norbye    /**
514bf74734dSTrond Norbye     * Update the settings in libevent for this connection
515bf74734dSTrond Norbye     *
516bf74734dSTrond Norbye     * @param mask the new event mask to get notified about
517bf74734dSTrond Norbye     */
518bf74734dSTrond Norbye    bool updateEvent(const short new_flags);
519bf74734dSTrond Norbye
520bf74734dSTrond Norbye    /**
521bf74734dSTrond Norbye     * Reapply the event mask (in case of a timeout we might want to do
522bf74734dSTrond Norbye     * that)
523bf74734dSTrond Norbye     */
524bf74734dSTrond Norbye    bool reapplyEventmask();
525bf74734dSTrond Norbye
526bf74734dSTrond Norbye    /**
527bf74734dSTrond Norbye     * Unregister the event structure from libevent
528bf74734dSTrond Norbye     * @return true if success, false otherwise
529bf74734dSTrond Norbye     */
530bf74734dSTrond Norbye    bool unregisterEvent();
531bf74734dSTrond Norbye
532bf74734dSTrond Norbye    /**
533bf74734dSTrond Norbye     * Register the event structure in libevent
534bf74734dSTrond Norbye     * @return true if success, false otherwise
535bf74734dSTrond Norbye     */
536bf74734dSTrond Norbye    bool registerEvent();
537bf74734dSTrond Norbye
538bf74734dSTrond Norbye    bool isRegisteredInLibevent() const {
539bf74734dSTrond Norbye        return registered_in_libevent;
540bf74734dSTrond Norbye    }
5416bb87f99STrond Norbye
542bf74734dSTrond Norbye    void setCurrentEvent(short ev) {
543bf74734dSTrond Norbye        currentEvent = ev;
544bf74734dSTrond Norbye    }
545bf74734dSTrond Norbye
546bf74734dSTrond Norbye    /** Is the current event a readevent? */
547bf74734dSTrond Norbye    bool isReadEvent() const {
548bf74734dSTrond Norbye        return currentEvent & EV_READ;
549bf74734dSTrond Norbye    }
550bf74734dSTrond Norbye
551bf74734dSTrond Norbye    /** Is the current event a writeevent? */
552bf74734dSTrond Norbye    bool isWriteEvent() const {
553bf74734dSTrond Norbye        return currentEvent & EV_WRITE;
554bf74734dSTrond Norbye    }
555bf74734dSTrond Norbye
556bf74734dSTrond Norbye    /**
557bf74734dSTrond Norbye     * Shrinks a connection's buffers if they're too big.  This prevents
558bf74734dSTrond Norbye     * periodic large "get" requests from permanently chewing lots of server
559bf74734dSTrond Norbye     * memory.
560bf74734dSTrond Norbye     *
561bf74734dSTrond Norbye     * This should only be called in between requests since it can wipe output
562bf74734dSTrond Norbye     * buffers!
563bf74734dSTrond Norbye     */
564bf74734dSTrond Norbye    void shrinkBuffers();
565bf74734dSTrond Norbye
566bf74734dSTrond Norbye    /**
567bf74734dSTrond Norbye     * Receive data from the socket
568bf74734dSTrond Norbye     *
569bf74734dSTrond Norbye     * @param where to store the result
570bf74734dSTrond Norbye     * @param nbytes the size of the buffer
571bf74734dSTrond Norbye     *
572bf74734dSTrond Norbye     * @return the number of bytes read, or -1 for an error
573bf74734dSTrond Norbye     */
574bf74734dSTrond Norbye    int recv(char* dest, size_t nbytes);
575bf74734dSTrond Norbye