xref: /6.0.3/kv_engine/daemon/connection.cc (revision 67a5e264)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17#include "config.h"
18#include "connection.h"
19
20#include "buckets.h"
21#include "connections.h"
22#include "mc_time.h"
23#include "mcaudit.h"
24#include "memcached.h"
25#include "runtime.h"
26#include "server_event.h"
27#include "statemachine_mcbp.h"
28#include "trace.h"
29
30#include <mcbp/mcbp.h>
31#include <mcbp/protocol/header.h>
32#include <phosphor/phosphor.h>
33#include <platform/cb_malloc.h>
34#include <platform/checked_snprintf.h>
35#include <platform/socket.h>
36#include <platform/strerror.h>
37#include <platform/timeutils.h>
38#include <utilities/logtags.h>
39#include <utilities/protocol2text.h>
40#include <cctype>
41#include <exception>
42#include <gsl/gsl>
43
44const char* to_string(const Connection::Priority& priority) {
45    switch (priority) {
46    case Connection::Priority::High:
47        return "High";
48    case Connection::Priority::Medium:
49        return "Medium";
50    case Connection::Priority::Low:
51        return "Low";
52    }
53    throw std::invalid_argument("No such priority: " +
54                                std::to_string(int(priority)));
55}
56
57static cbsasl_conn_t* create_new_cbsasl_server_t() {
58    cbsasl_conn_t *conn;
59    if (cbsasl_server_new("memcached", // service
60                          nullptr, // Server DQDN
61                          nullptr, // user realm
62                          nullptr, // iplocalport
63                          nullptr, // ipremoteport
64                          nullptr, // callbacks
65                          0, // flags
66                          &conn) != CBSASL_OK) {
67        throw std::bad_alloc();
68    }
69    return conn;
70}
71
72void Connection::resolveConnectionName() {
73    if (socketDescriptor == INVALID_SOCKET) {
74        // Our unit tests run without a socket connected, and we don't
75        // want them to flood the console with error messages
76        peername = "[invalid]";
77        sockname = "[invalid]";
78        return;
79    }
80
81    try {
82        peername = cb::net::getpeername(socketDescriptor);
83        sockname = cb::net::getsockname(socketDescriptor);
84        updateDescription();
85    } catch (const std::bad_alloc& e) {
86        LOG_WARNING(
87                "Connection::resolveConnectionName: failed to allocate "
88                "memory: "
89                "{}",
90                e.what());
91    }
92}
93
94bool Connection::setTcpNoDelay(bool enable) {
95    if (socketDescriptor == INVALID_SOCKET) {
96        // Our unit test run without a connected socket (and there is
97        // no point of running setsockopt on an invalid socket and
98        // get the error message from there).. But we don't want them
99        // (the unit tests) to flood the console with error messages
100        // that setsockopt failed
101        return false;
102    }
103
104    const int flags = enable ? 1 : 0;
105    int error = cb::net::setsockopt(socketDescriptor,
106                                    IPPROTO_TCP,
107                                    TCP_NODELAY,
108                                    reinterpret_cast<const void*>(&flags),
109                                    sizeof(flags));
110
111    if (error != 0) {
112        std::string errmsg = cb_strerror(cb::net::get_socket_error());
113        LOG_WARNING("setsockopt(TCP_NODELAY): {}", errmsg);
114        nodelay = false;
115        return false;
116    } else {
117        nodelay = enable;
118    }
119
120    return true;
121}
122
123/**
124 * Get a JSON representation of an event mask
125 *
126 * @param mask the mask to convert to JSON
127 * @return the json representation. Caller is responsible for calling
128 *         cJSON_Delete()
129 */
130static cJSON* event_mask_to_json(const short mask) {
131    cJSON* ret = cJSON_CreateObject();
132    cJSON* array = cJSON_CreateArray();
133
134    cJSON_AddUintPtrToObject(ret, "raw", mask);
135    if (mask & EV_READ) {
136        cJSON_AddItemToArray(array, cJSON_CreateString("read"));
137    }
138    if (mask & EV_WRITE) {
139        cJSON_AddItemToArray(array, cJSON_CreateString("write"));
140    }
141    if (mask & EV_PERSIST) {
142        cJSON_AddItemToArray(array, cJSON_CreateString("persist"));
143    }
144    if (mask & EV_TIMEOUT) {
145        cJSON_AddItemToArray(array, cJSON_CreateString("timeout"));
146    }
147
148    cJSON_AddItemToObject(ret, "decoded", array);
149    return ret;
150}
151
152unique_cJSON_ptr Connection::toJSON() const {
153    unique_cJSON_ptr ret(cJSON_CreateObject());
154    cJSON* obj = ret.get();
155    cJSON_AddUintPtrToObject(obj, "connection", (uintptr_t)this);
156    if (socketDescriptor == INVALID_SOCKET) {
157        cJSON_AddStringToObject(obj, "socket", "disconnected");
158        return ret;
159    }
160
161    cJSON_AddNumberToObject(obj, "socket", socketDescriptor);
162    cJSON_AddNumberToObject(obj, "yields", yields.load());
163    cJSON_AddStringToObject(obj, "protocol", "memcached");
164    cJSON_AddStringToObject(obj, "peername", getPeername().c_str());
165    cJSON_AddStringToObject(obj, "sockname", getSockname().c_str());
166    cJSON_AddNumberToObject(obj, "parent_port", parent_port);
167    cJSON_AddNumberToObject(obj, "bucket_index", getBucketIndex());
168    cJSON_AddBoolToObject(obj, "internal", isInternal());
169    if (authenticated) {
170        if (internal) {
171            // We want to be able to map these connections, and given
172            // that it is internal we don't reveal any user data
173            cJSON_AddStringToObject(obj, "username", username);
174        } else {
175            using cb::logtags::tagUserData;
176            cJSON_AddStringToObject(obj, "username", tagUserData(username));
177        }
178    }
179
180        if (sasl_conn != NULL) {
181            cJSON_AddUintPtrToObject(obj, "sasl_conn",
182                                       (uintptr_t)sasl_conn.get());
183        }
184        cJSON_AddBoolToObject(obj, "nodelay", nodelay);
185        cJSON_AddNumberToObject(obj, "refcount", refcount);
186
187        cJSON* features = cJSON_CreateObject();
188        cJSON_AddBoolToObject(features, "mutation_extras",
189                                isSupportsMutationExtras());
190        cJSON_AddBoolToObject(features, "xerror", isXerrorSupport());
191
192        cJSON_AddItemToObject(obj, "features", features);
193
194        cJSON_AddUintPtrToObject(obj, "engine_storage",
195                                   (uintptr_t)engine_storage);
196        cJSON_AddUintPtrToObject(obj, "thread", (uintptr_t)thread.load(
197            std::memory_order::memory_order_relaxed));
198        cJSON_AddStringToObject(obj, "priority", to_string(priority));
199
200        if (clustermap_revno == -2) {
201            cJSON_AddStringToObject(obj, "clustermap_revno", "unknown");
202        } else {
203            cJSON_AddNumberToObject(obj, "clustermap_revno", clustermap_revno);
204        }
205
206        cJSON_AddStringToObject(obj,
207                                "total_cpu_time",
208                                std::to_string(total_cpu_time.count()).c_str());
209        cJSON_AddStringToObject(obj,
210                                "min_sched_time",
211                                std::to_string(min_sched_time.count()).c_str());
212        cJSON_AddStringToObject(obj,
213                                "max_sched_time",
214                                std::to_string(max_sched_time.count()).c_str());
215
216        unique_cJSON_ptr arr(cJSON_CreateArray());
217        for (const auto& c : cookies) {
218            cJSON_AddItemToArray(arr.get(), c->toJSON().release());
219        }
220        cJSON_AddItemToObject(obj, "cookies", arr.release());
221
222        if (agentName.front() != '\0') {
223            cJSON_AddStringToObject(obj, "agent_name", agentName.data());
224        }
225        if (connectionId.front() != '\0') {
226            cJSON_AddStringToObject(obj, "connection_id", connectionId.data());
227        }
228
229        cJSON_AddBoolToObject(obj, "tracing", tracingEnabled);
230        cJSON_AddBoolToObject(obj, "sasl_enabled", saslAuthEnabled);
231        cJSON_AddBoolToObject(obj, "dcp", isDCP());
232        cJSON_AddBoolToObject(obj, "dcp_xattr_aware", isDcpXattrAware());
233        cJSON_AddBoolToObject(obj, "dcp_no_value", isDcpNoValue());
234        cJSON_AddNumberToObject(obj, "max_reqs_per_event", max_reqs_per_event);
235        cJSON_AddNumberToObject(obj, "nevents", numEvents);
236        cJSON_AddStringToObject(obj, "state", getStateName());
237
238        {
239            cJSON* o = cJSON_CreateObject();
240            cJSON_AddBoolToObject(o, "registered", isRegisteredInLibevent());
241            cJSON_AddItemToObject(o, "ev_flags", event_mask_to_json(ev_flags));
242            cJSON_AddItemToObject(o, "which", event_mask_to_json(currentEvent));
243            cJSON_AddItemToObject(obj, "libevent", o);
244        }
245
246        if (read) {
247            cJSON_AddItemToObject(obj, "read", read->to_json().release());
248        }
249
250        if (write) {
251            cJSON_AddItemToObject(obj, "write", write->to_json().release());
252        }
253
254        cJSON_AddStringToObject(
255                obj, "write_and_go", stateMachine.getStateName(write_and_go));
256
257        {
258            cJSON* iovobj = cJSON_CreateObject();
259            cJSON_AddNumberToObject(iovobj, "size", iov.size());
260            cJSON_AddNumberToObject(iovobj, "used", iovused);
261            cJSON_AddItemToObject(obj, "iov", iovobj);
262        }
263
264        {
265            cJSON* msg = cJSON_CreateObject();
266            cJSON_AddNumberToObject(msg, "used", msglist.size());
267            cJSON_AddNumberToObject(msg, "curr", msgcurr);
268            cJSON_AddNumberToObject(msg, "bytes", msgbytes);
269            cJSON_AddItemToObject(obj, "msglist", msg);
270        }
271        {
272            cJSON* ilist = cJSON_CreateObject();
273            cJSON_AddNumberToObject(ilist, "size", reservedItems.size());
274            cJSON_AddItemToObject(obj, "itemlist", ilist);
275        }
276        {
277            cJSON* talloc = cJSON_CreateObject();
278            cJSON_AddNumberToObject(talloc, "size", temp_alloc.size());
279            cJSON_AddItemToObject(obj, "temp_alloc_list", talloc);
280        }
281
282        /* @todo we should decode the binary header */
283        cJSON_AddNumberToObject(obj, "aiostat", aiostat);
284        cJSON_AddBoolToObject(obj, "ewouldblock", ewouldblock);
285        cJSON_AddItemToObject(obj, "ssl", ssl.toJSON());
286        cJSON_AddNumberToObject(obj, "total_recv", totalRecv);
287        cJSON_AddNumberToObject(obj, "total_send", totalSend);
288        cJSON_AddStringToObject(
289                obj,
290                "datatype",
291                mcbp::datatype::to_string(datatype.getRaw()).c_str());
292        return ret;
293}
294
295void Connection::restartAuthentication() {
296    sasl_conn.reset(create_new_cbsasl_server_t());
297    internal = false;
298    authenticated = false;
299    username = "";
300}
301
302cb::engine_errc Connection::dropPrivilege(cb::rbac::Privilege privilege) {
303    if (privilegeContext.dropPrivilege(privilege)) {
304        return cb::engine_errc::success;
305    }
306
307    return cb::engine_errc::no_access;
308}
309
310cb::rbac::PrivilegeAccess Connection::checkPrivilege(
311        cb::rbac::Privilege privilege, Cookie& cookie) {
312    cb::rbac::PrivilegeAccess ret;
313    unsigned int retries = 0;
314    const unsigned int max_retries = 100;
315
316    while ((ret = privilegeContext.check(privilege)) ==
317                   cb::rbac::PrivilegeAccess::Stale &&
318           retries < max_retries) {
319        ++retries;
320        const auto opcode = cookie.getHeader().getOpcode();
321        const std::string command(memcached_opcode_2_text(opcode));
322
323        // The privilege context we had could have been a dummy entry
324        // (created when the client connected, and used until the
325        // connection authenticates). Let's try to automatically update it,
326        // but let the client deal with whatever happens after
327        // a single update.
328        try {
329            privilegeContext = cb::rbac::createContext(getUsername(),
330                                                       all_buckets[bucketIndex].name);
331        } catch (const cb::rbac::NoSuchBucketException&) {
332            // Remove all access to the bucket
333            privilegeContext = cb::rbac::createContext(getUsername(), "");
334            LOG_INFO(
335                    "{}: RBAC: Connection::checkPrivilege({}) {} No access "
336                    "to "
337                    "bucket [{}]. command: [{}] new privilege set: {}",
338                    getId(),
339                    to_string(privilege),
340                    getDescription(),
341                    all_buckets[bucketIndex].name,
342                    command,
343                    privilegeContext.to_string());
344        } catch (const cb::rbac::Exception& error) {
345            LOG_WARNING(
346                    "{}: RBAC: Connection::checkPrivilege({}) {}: An "
347                    "exception occurred. command: [{}] bucket: [{}] UUID:"
348                    "[{}] message: {}",
349                    getId(),
350                    to_string(privilege),
351                    getDescription(),
352                    command,
353                    all_buckets[bucketIndex].name,
354                    cookie.getEventId(),
355                    error.what());
356            // Add a textual error as well
357            cookie.setErrorContext("An exception occurred. command: [" +
358                                   command + "]");
359            return cb::rbac::PrivilegeAccess::Fail;
360        }
361    }
362
363    if (retries == max_retries) {
364        LOG_INFO(
365                "{}: RBAC: Gave up rebuilding privilege context after {} "
366                "times. Let the client handle the stale authentication "
367                "context",
368                getId(),
369                retries);
370
371    } else if (retries > 1) {
372        LOG_INFO("{}: RBAC: Had to rebuild privilege context {} times",
373                 getId(),
374                 retries);
375    }
376
377    if (ret == cb::rbac::PrivilegeAccess::Fail) {
378        const auto opcode = cookie.getHeader().getOpcode();
379        const std::string command(memcached_opcode_2_text(opcode));
380        const std::string privilege_string = cb::rbac::to_string(privilege);
381        const std::string context = privilegeContext.to_string();
382
383        if (settings.isPrivilegeDebug()) {
384            audit_privilege_debug(this,
385                                  command,
386                                  all_buckets[bucketIndex].name,
387                                  privilege_string,
388                                  context);
389
390            LOG_INFO(
391                    "{}: RBAC privilege debug:{} command:[{}] bucket:[{}] "
392                    "privilege:[{}] context:{}",
393                    getId(),
394                    getDescription(),
395                    command,
396                    all_buckets[bucketIndex].name,
397                    privilege_string,
398                    context);
399
400            return cb::rbac::PrivilegeAccess::Ok;
401        } else {
402            LOG_INFO(
403                    "{} RBAC {} missing privilege {} for {} in bucket:[{}] "
404                    "with context: "
405                    "{} UUID:[{}]",
406                    getId(),
407                    getDescription(),
408                    privilege_string,
409                    command,
410                    all_buckets[bucketIndex].name,
411                    context,
412                    cookie.getEventId());
413            // Add a textual error as well
414            cookie.setErrorContext("Authorization failure: can't execute " +
415                                   command + " operation without the " +
416                                   privilege_string + " privilege");
417        }
418    }
419
420    return ret;
421}
422
423Bucket& Connection::getBucket() const {
424    return all_buckets[getBucketIndex()];
425}
426
427ENGINE_ERROR_CODE Connection::remapErrorCode(ENGINE_ERROR_CODE code) const {
428    if (xerror_support) {
429        return code;
430    }
431
432    // Check our whitelist
433    switch (code) {
434    case ENGINE_SUCCESS: // FALLTHROUGH
435    case ENGINE_KEY_ENOENT: // FALLTHROUGH
436    case ENGINE_KEY_EEXISTS: // FALLTHROUGH
437    case ENGINE_ENOMEM: // FALLTHROUGH
438    case ENGINE_NOT_STORED: // FALLTHROUGH
439    case ENGINE_EINVAL: // FALLTHROUGH
440    case ENGINE_ENOTSUP: // FALLTHROUGH
441    case ENGINE_EWOULDBLOCK: // FALLTHROUGH
442    case ENGINE_E2BIG: // FALLTHROUGH
443    case ENGINE_WANT_MORE: // FALLTHROUGH
444    case ENGINE_DISCONNECT: // FALLTHROUGH
445    case ENGINE_NOT_MY_VBUCKET: // FALLTHROUGH
446    case ENGINE_TMPFAIL: // FALLTHROUGH
447    case ENGINE_ERANGE: // FALLTHROUGH
448    case ENGINE_ROLLBACK: // FALLTHROUGH
449    case ENGINE_EBUSY: // FALLTHROUGH
450    case ENGINE_DELTA_BADVAL: // FALLTHROUGH
451    case ENGINE_PREDICATE_FAILED:
452    case ENGINE_FAILED:
453        return code;
454
455    case ENGINE_LOCKED:
456        return ENGINE_KEY_EEXISTS;
457    case ENGINE_LOCKED_TMPFAIL:
458        return ENGINE_TMPFAIL;
459    case ENGINE_UNKNOWN_COLLECTION:
460        return isCollectionsSupported() ? code : ENGINE_EINVAL;
461
462    case ENGINE_EACCESS:break;
463    case ENGINE_NO_BUCKET:break;
464    case ENGINE_AUTH_STALE:break;
465    }
466
467    // Seems like the rest of the components in our system isn't
468    // prepared to receive access denied or authentincation stale.
469    // For now we should just disconnect them
470    auto errc = cb::make_error_condition(cb::engine_errc(code));
471    LOG_WARNING(
472            "{} - Client {} not aware of extended error code ({}). "
473            "Disconnecting",
474            getId(),
475            getDescription().c_str(),
476            errc.message().c_str());
477
478    return ENGINE_DISCONNECT;
479}
480
481void Connection::resetUsernameCache() {
482    static const char unknown[] = "unknown";
483    const void* unm = unknown;
484
485    if (cbsasl_getprop(sasl_conn.get(),
486                       CBSASL_USERNAME, &unm) != CBSASL_OK) {
487        unm = unknown;
488    }
489
490    username.assign(reinterpret_cast<const char*>(unm));
491
492    domain = cb::sasl::get_domain(sasl_conn.get());
493
494    updateDescription();
495}
496
497void Connection::updateDescription() {
498    description.assign("[ " + getPeername() + " - " + getSockname());
499    if (authenticated) {
500        description += " (";
501        if (isInternal()) {
502            description += "System, ";
503        }
504        description += cb::logtags::tagUserData(getUsername());
505
506        if (domain == cb::sasl::Domain::External) {
507            description += " (LDAP)";
508        }
509        description += ")";
510    } else {
511        description += " (not authenticated)";
512    }
513    description += " ]";
514}
515
516void Connection::setBucketIndex(int bucketIndex) {
517    Connection::bucketIndex.store(bucketIndex, std::memory_order_relaxed);
518
519    if (bucketIndex < 0) {
520        // The connection objects which listens to the ports to accept
521        // use a bucketIndex of -1. Those connection objects should
522        // don't need an entry
523        return;
524    }
525
526    // Update the privilege context. If a problem occurs within the RBAC
527    // module we'll assign an empty privilege context to the connection.
528    try {
529        if (authenticated) {
530            // The user have logged in, so we should create a context
531            // representing the users context in the desired bucket.
532            privilegeContext = cb::rbac::createContext(username,
533                                                       all_buckets[bucketIndex].name);
534        } else if (is_default_bucket_enabled() &&
535                   strcmp("default", all_buckets[bucketIndex].name) == 0) {
536            // We've just connected to the _default_ bucket, _AND_ the client
537            // is unknown.
538            // Personally I think the "default bucket" concept is a really
539            // really bad idea, but we need to be backwards compatible for
540            // a while... lets look up a profile named "default" and
541            // assign that. It should only contain access to the default
542            // bucket.
543            privilegeContext = cb::rbac::createContext("default",
544                                                       all_buckets[bucketIndex].name);
545        } else {
546            // The user has not authenticated, and this isn't for the
547            // "default bucket". Assign an empty profile which won't give
548            // you any privileges.
549            privilegeContext = cb::rbac::PrivilegeContext{};
550        }
551    } catch (const cb::rbac::Exception&) {
552        privilegeContext = cb::rbac::PrivilegeContext{};
553    }
554
555    if (bucketIndex == 0) {
556        // If we're connected to the no bucket we should return
557        // no bucket instead of EACCESS. Lets give the connection all
558        // possible bucket privileges
559        privilegeContext.setBucketPrivileges();
560    }
561}
562
563void Connection::addCpuTime(std::chrono::nanoseconds ns) {
564    total_cpu_time += ns;
565    min_sched_time = std::min(min_sched_time, ns);
566    max_sched_time = std::max(min_sched_time, ns);
567}
568
569void Connection::enqueueServerEvent(std::unique_ptr<ServerEvent> event) {
570    server_events.push(std::move(event));
571}
572
573bool Connection::unregisterEvent() {
574    if (!registered_in_libevent) {
575        LOG_WARNING(
576                "Connection::unregisterEvent: Not registered in libevent - "
577                "ignoring unregister attempt");
578        return false;
579    }
580
581    cb_assert(socketDescriptor != INVALID_SOCKET);
582
583    if (event_del(&event) == -1) {
584        LOG_WARNING("Failed to remove connection to libevent: {}",
585                    cb_strerror());
586        return false;
587    }
588
589    registered_in_libevent = false;
590    return true;
591}
592
593bool Connection::registerEvent() {
594    if (registered_in_libevent) {
595        LOG_WARNING(
596                "Connection::registerEvent: Already registered in"
597                " libevent - ignoring register attempt");
598        return false;
599    }
600
601    if (event_add(&event, nullptr) == -1) {
602        LOG_WARNING("Failed to add connection to libevent: {}", cb_strerror());
603        return false;
604    }
605
606    registered_in_libevent = true;
607    return true;
608}
609
610bool Connection::updateEvent(const short new_flags) {
611    struct event_base* base = event.ev_base;
612
613    if (ssl.isEnabled() && ssl.isConnected() && (new_flags & EV_READ)) {
614        /*
615         * If we want more data and we have SSL, that data might be inside
616         * SSL's internal buffers rather than inside the socket buffer. In
617         * that case signal an EV_READ event without actually polling the
618         * socket.
619         */
620        if (ssl.havePendingInputData()) {
621            // signal a call to the handler
622            event_active(&event, EV_READ, 0);
623            return true;
624        }
625    }
626
627    if (ev_flags == new_flags) {
628        // We do "cache" the current libevent state (using EV_PERSIST) to avoid
629        // having to re-register it when it doesn't change (which it mostly
630        // don't).
631        return true;
632    }
633
634    if (!unregisterEvent()) {
635        LOG_WARNING(
636                "{}: Failed to remove connection from event notification "
637                "library. Shutting down connection {}",
638                getId(),
639                getDescription());
640        return false;
641    }
642
643    if (event_assign(&event,
644                     base,
645                     socketDescriptor,
646                     new_flags,
647                     event_handler,
648                     reinterpret_cast<void*>(this)) == -1) {
649        LOG_WARNING(
650                "{}: Failed to set up event notification. "
651                "Shutting down connection {}",
652                getId(),
653                getDescription());
654        return false;
655    }
656    ev_flags = new_flags;
657
658    if (!registerEvent()) {
659        LOG_WARNING(
660                "{}: Failed to add connection to the event notification "
661                "library. Shutting down connection {}",
662                getId(),
663                getDescription());
664        return false;
665    }
666
667    return true;
668}
669
670bool Connection::reapplyEventmask() {
671    return updateEvent(ev_flags);
672}
673
674bool Connection::initializeEvent() {
675    short event_flags = (EV_READ | EV_PERSIST);
676
677    if (event_assign(&event,
678                     base,
679                     socketDescriptor,
680                     event_flags,
681                     event_handler,
682                     reinterpret_cast<void*>(this)) == -1) {
683        return false;
684    }
685    ev_flags = event_flags;
686
687    return registerEvent();
688}
689
690void Connection::shrinkBuffers() {
691    // We share the buffers with the thread, so we don't need to worry
692    // about the read and write buffer.
693
694    if (msglist.size() > MSG_LIST_HIGHWAT) {
695        try {
696            msglist.resize(MSG_LIST_INITIAL);
697            msglist.shrink_to_fit();
698        } catch (const std::bad_alloc&) {
699            LOG_WARNING("{}: Failed to shrink msglist down to {} elements.",
700                        getId(),
701                        MSG_LIST_INITIAL);
702        }
703    }
704
705    if (iov.size() > IOV_LIST_HIGHWAT) {
706        try {
707            iov.resize(IOV_LIST_INITIAL);
708            iov.shrink_to_fit();
709        } catch (const std::bad_alloc&) {
710            LOG_WARNING("{}: Failed to shrink iov down to {} elements.",
711                        getId(),
712                        IOV_LIST_INITIAL);
713        }
714    }
715}
716
717bool Connection::tryAuthFromSslCert(const std::string& userName) {
718    username.assign(userName);
719    domain = cb::sasl::Domain::Local;
720
721    try {
722        auto context =
723                cb::rbac::createInitialContext(getUsername(), getDomain());
724        setAuthenticated(true);
725        setInternal(context.second);
726        audit_auth_success(this);
727        LOG_INFO(
728                "{}: Client {} authenticated as '{}' via X509 "
729                "certificate",
730                getId(),
731                getPeername(),
732                cb::logtags::tagUserData(getUsername()));
733        // Connections authenticated by using X.509 certificates should not
734        // be able to use SASL to change it's identity.
735        saslAuthEnabled = false;
736    } catch (const cb::rbac::NoSuchUserException& e) {
737        setAuthenticated(false);
738        LOG_WARNING("{}: User [{}] is not defined as a user in Couchbase",
739                    getId(),
740                    cb::logtags::tagUserData(e.what()));
741        return false;
742    }
743    return true;
744}
745
746void Connection::logSslErrorInfo(const std::string& method, int rval) {
747    const int error = ssl.getError(rval);
748    unsigned long code = ERR_peek_error();
749    if (code == 0) {
750        LOG_WARNING("{}: ERROR: {} returned {} with error {}",
751                    getId(),
752                    method,
753                    rval,
754                    error);
755        return;
756    }
757
758    try {
759        std::string errmsg(method + "() returned " +
760                           std::to_string(rval) + " with error " +
761                           std::to_string(error));
762        while ((code = ERR_get_error()) != 0) {
763            std::vector<char> ssl_err(1024);
764            ERR_error_string_n(
765                               code, ssl_err.data(), ssl_err.size());
766            LOG_WARNING("{}: {}: {}", getId(), errmsg, ssl_err.data());
767        }
768    } catch (const std::bad_alloc&) {
769        // unable to print error message; continue.
770        LOG_WARNING("{}: {}() returned {} with error {}",
771                    getId(),
772                    method,
773                    rval,
774                    error);
775    }
776}
777
778int Connection::sslPreConnection() {
779    int r = ssl.accept();
780    if (r == 1) {
781        ssl.drainBioSendPipe(socketDescriptor);
782        ssl.setConnected();
783        auto certResult = ssl.getCertUserName();
784        bool disconnect = false;
785        switch (certResult.first) {
786        case cb::x509::Status::NoMatch:
787        case cb::x509::Status::Error:
788            disconnect = true;
789            break;
790        case cb::x509::Status::NotPresent:
791            if (settings.getClientCertMode() == cb::x509::Mode::Mandatory) {
792                disconnect = true;
793            } else if (is_default_bucket_enabled()) {
794                associate_bucket(*this, "default");
795            }
796            break;
797        case cb::x509::Status::Success:
798            if (!tryAuthFromSslCert(certResult.second)) {
799                disconnect = true;
800                // Don't print an error message... already logged
801                certResult.second.resize(0);
802            }
803        }
804        if (disconnect) {
805            // Set the username to "[unknown]" if we failed to pick
806            // out a username from the certificate to avoid the
807            // audit event being "empty"
808            if (username.empty()) {
809                username.assign("[unknown]");
810            }
811            if (certResult.first == cb::x509::Status::NotPresent) {
812                audit_auth_failure(
813                        this, "Client did not provide an X.509 certificate");
814            } else {
815                audit_auth_failure(
816                        this, "Failed to use client prided X.509 certificate");
817            }
818            if (username == "[unknown]") {
819                username.clear();
820            }
821            cb::net::set_econnreset();
822            if (!certResult.second.empty()) {
823                LOG_WARNING(
824                        "{}: SslPreConnection: disconnection client due to"
825                        " error [{}]",
826                        getId(),
827                        certResult.second);
828            }
829            return -1;
830        }
831    } else {
832        if (ssl.getError(r) == SSL_ERROR_WANT_READ) {
833            ssl.drainBioSendPipe(socketDescriptor);
834            cb::net::set_ewouldblock();
835            return -1;
836        } else {
837            logSslErrorInfo("SSL_accept", r);
838            cb::net::set_econnreset();
839            return -1;
840        }
841    }
842
843    return 0;
844}
845
846int Connection::recv(char* dest, size_t nbytes) {
847    if (nbytes == 0) {
848        throw std::logic_error("Connection::recv: Can't read 0 bytes");
849    }
850
851    int res = -1;
852    if (ssl.isEnabled()) {
853        ssl.drainBioRecvPipe(socketDescriptor);
854
855        if (ssl.hasError()) {
856            cb::net::set_econnreset();
857            return -1;
858        }
859
860        if (!ssl.isConnected()) {
861            res = sslPreConnection();
862            if (res == -1) {
863                return -1;
864            }
865        }
866
867        /* The SSL negotiation might be complete at this time */
868        if (ssl.isConnected()) {
869            res = sslRead(dest, nbytes);
870        }
871    } else {
872        res = (int)::cb::net::recv(socketDescriptor, dest, nbytes, 0);
873        if (res > 0) {
874            totalRecv += res;
875        }
876    }
877
878    return res;
879}
880
881ssize_t Connection::sendmsg(struct msghdr* m) {
882    ssize_t res = 0;
883    if (ssl.isEnabled()) {
884        for (int ii = 0; ii < int(m->msg_iovlen); ++ii) {
885            int n = sslWrite(reinterpret_cast<char*>(m->msg_iov[ii].iov_base),
886                             m->msg_iov[ii].iov_len);
887            if (n > 0) {
888                res += n;
889                if (n != int(m->msg_iov[ii].iov_len)) {
890                    // We didnt' send the entire chunk. return the number
891                    // of bytes sent so far to the caller and let them
892                    // deal with adjusting the pointers and retry
893                    return res;
894                }
895            } else {
896                // We failed to send the data over ssl. it might be
897                // because the underlying socket buffer is full, or
898                // if there is a real error with the socket or inside
899                // OpenSSL. If the error is because we the network
900                // is full we'll return the number of bytes we've sent
901                // so far (so that it may adjust the iov_base and iov_len
902                // fields before it'll try to call us again and the
903                // send will most likely fail again, but this we'll
904                // return -1 and when the caller checks it'll see it is
905                // because the network buffer is full).
906                auto error = cb::net::get_socket_error();
907                if (cb::net::is_blocking(error) && res > 0) {
908                    return res;
909                }
910                return -1;
911            }
912        }
913        ssl.drainBioSendPipe(socketDescriptor);
914        return res;
915    } else {
916        res = cb::net::sendmsg(socketDescriptor, m, 0);
917        if (res > 0) {
918            totalSend += res;
919        }
920    }
921
922    return res;
923}
924
925/**
926 * Adjust the msghdr by "removing" n bytes of data from it.
927 *
928 * @param m the msgheader to update
929 * @param nbytes
930 * @return the number of bytes left in the current iov entry
931 */
932size_t adjust_msghdr(cb::Pipe& pipe, struct msghdr* m, ssize_t nbytes) {
933    auto rbuf = pipe.rdata();
934
935    // We've written some of the data. Remove the completed
936    // iovec entries from the list of pending writes.
937    while (m->msg_iovlen > 0 && nbytes >= ssize_t(m->msg_iov->iov_len)) {
938        if (rbuf.data() == static_cast<const uint8_t*>(m->msg_iov->iov_base)) {
939            pipe.consumed(m->msg_iov->iov_len);
940            rbuf = pipe.rdata();
941        }
942        nbytes -= (ssize_t)m->msg_iov->iov_len;
943        m->msg_iovlen--;
944        m->msg_iov++;
945    }
946
947    // Might have written just part of the last iovec entry;
948    // adjust it so the next write will do the rest.
949    if (nbytes > 0) {
950        if (rbuf.data() == static_cast<const uint8_t*>(m->msg_iov->iov_base)) {
951            pipe.consumed(nbytes);
952        }
953        m->msg_iov->iov_base =
954                (void*)((unsigned char*)m->msg_iov->iov_base + nbytes);
955        m->msg_iov->iov_len -= nbytes;
956    }
957
958    return m->msg_iov->iov_len;
959}
960
961Connection::TransmitResult Connection::transmit() {
962    if (ssl.isEnabled()) {
963        // We use OpenSSL to write data into a buffer before we send it
964        // over the wire... Lets go ahead and drain that BIO pipe before
965        // we may do anything else.
966        ssl.drainBioSendPipe(socketDescriptor);
967        if (ssl.morePendingOutput()) {
968            if (ssl.hasError() || !updateEvent(EV_WRITE | EV_PERSIST)) {
969                setState(McbpStateMachine::State::closing);
970                return TransmitResult::HardError;
971            }
972            return TransmitResult::SoftError;
973        }
974
975        // The output buffer is completely drained (well, put in the kernel
976        // buffer to send to the client). Go ahead and send more data
977    }
978
979    while (msgcurr < msglist.size() && msglist[msgcurr].msg_iovlen == 0) {
980        /* Finished writing the current msg; advance to the next. */
981        msgcurr++;
982    }
983
984    if (msgcurr < msglist.size()) {
985        ssize_t res;
986        struct msghdr* m = &msglist[msgcurr];
987
988        res = sendmsg(m);
989        auto error = cb::net::get_socket_error();
990        if (res > 0) {
991            get_thread_stats(this)->bytes_written += res;
992
993            if (adjust_msghdr(*write, m, res) == 0) {
994                msgcurr++;
995                if (msgcurr == msglist.size()) {
996                    // We sent the final chunk of data.. In our SSL connections
997                    // we might however have data spooled in the SSL buffers
998                    // which needs to be drained before we may consider the
999                    // transmission complete (note that our sendmsg tried
1000                    // to drain the buffers before returning).
1001                    if (ssl.isEnabled() && ssl.morePendingOutput()) {
1002                        if (ssl.hasError() ||
1003                            !updateEvent(EV_WRITE | EV_PERSIST)) {
1004                            setState(McbpStateMachine::State::closing);
1005                            return TransmitResult::HardError;
1006                        }
1007                        return TransmitResult::SoftError;
1008                    }
1009                    return TransmitResult::Complete;
1010                }
1011            }
1012
1013            return TransmitResult::Incomplete;
1014        }
1015
1016        if (res == -1 && cb::net::is_blocking(error)) {
1017            if (!updateEvent(EV_WRITE | EV_PERSIST)) {
1018                setState(McbpStateMachine::State::closing);
1019                return TransmitResult::HardError;
1020            }
1021            return TransmitResult::SoftError;
1022        }
1023
1024        // if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
1025        // we have a real error, on which we close the connection
1026        if (res == -1) {
1027            if (cb::net::is_closed_conn(error)) {
1028                LOG_INFO("{}: Failed to send data; peer closed the connection",
1029                         getId());
1030            } else {
1031                LOG_WARNING("Failed to write, and not due to blocking: {}",
1032                            cb_strerror(error));
1033            }
1034        } else {
1035            // sendmsg should return the number of bytes written, but we
1036            // sent 0 bytes. That shouldn't be possible unless we
1037            // requested to write 0 bytes (otherwise we should have gotten
1038            // -1 with EWOULDBLOCK)
1039            // Log the request buffer so that we can look into this
1040            LOG_WARNING("{} - sendmsg returned 0", socketDescriptor);
1041            for (int ii = 0; ii < int(m->msg_iovlen); ++ii) {
1042                LOG_WARNING(
1043                        "\t{} - {}", socketDescriptor, m->msg_iov[ii].iov_len);
1044            }
1045        }
1046
1047        setState(McbpStateMachine::State::closing);
1048        return TransmitResult::HardError;
1049    } else {
1050        return TransmitResult::Complete;
1051    }
1052}
1053
1054/**
1055 * To protect us from someone flooding a connection with bogus data causing
1056 * the connection to eat up all available memory, break out and start
1057 * looking at the data I've got after a number of reallocs...
1058 */
1059Connection::TryReadResult Connection::tryReadNetwork() {
1060    // When we get here we've either got an empty buffer, or we've got
1061    // a buffer with less than a packet header filled in.
1062    //
1063    // Verify that assumption!!!
1064    if (read->rsize() >= sizeof(cb::mcbp::Request)) {
1065        // The above don't hold true ;)
1066        throw std::logic_error(
1067                "tryReadNetwork: Expected the input buffer to be empty or "
1068                "contain a partial header");
1069    }
1070
1071    // Make sure we can fit the header into the input buffer
1072    try {
1073        read->ensureCapacity(sizeof(cb::mcbp::Request) - read->rsize());
1074    } catch (const std::bad_alloc&) {
1075        return TryReadResult::MemoryError;
1076    }
1077
1078    Connection* c = this;
1079    const auto res = read->produce([c](cb::byte_buffer buffer) -> ssize_t {
1080        return c->recv(reinterpret_cast<char*>(buffer.data()), buffer.size());
1081    });
1082
1083    if (res > 0) {
1084        get_thread_stats(this)->bytes_read += res;
1085        return TryReadResult::DataReceived;
1086    }
1087
1088    if (res == 0) {
1089        LOG_DEBUG(
1090                "{} Closing connection as the other side closed the "
1091                "connection {}",
1092                getId(),
1093                getDescription());
1094        return TryReadResult::SocketClosed;
1095    }
1096
1097    const auto error = cb::net::get_socket_error();
1098    if (cb::net::is_blocking(error)) {
1099        return TryReadResult::NoDataReceived;
1100    }
1101
1102    // There was an error reading from the socket. There isn't much we
1103    // can do about that apart from logging it and close the connection.
1104    // Keep this as INFO as it isn't a problem with the memcached server,
1105    // it is a network issue (or a bad client not closing the connection
1106    // cleanly)
1107    LOG_INFO(
1108            "{} Closing connection {} due to read "
1109            "error: {}",
1110            getId(),
1111            getDescription(),
1112            cb_strerror(error));
1113    return TryReadResult::SocketError;
1114}
1115
1116int Connection::sslRead(char* dest, size_t nbytes) {
1117    int ret = 0;
1118
1119    while (ret < int(nbytes)) {
1120        int n;
1121        ssl.drainBioRecvPipe(socketDescriptor);
1122        if (ssl.hasError()) {
1123            cb::net::set_econnreset();
1124            return -1;
1125        }
1126        n = ssl.read(dest + ret, (int)(nbytes - ret));
1127        if (n > 0) {
1128            ret += n;
1129        } else {
1130            /* n < 0 and n == 0 require a check of SSL error*/
1131            const int error = ssl.getError(n);
1132
1133            switch (error) {
1134            case SSL_ERROR_WANT_READ:
1135                /*
1136                 * Drain the buffers and retry if we've got data in
1137                 * our input buffers
1138                 */
1139                if (ssl.moreInputAvailable()) {
1140                    /* our recv buf has data feed the BIO */
1141                    ssl.drainBioRecvPipe(socketDescriptor);
1142                } else if (ret > 0) {
1143                    /* nothing in our recv buf, return what we have */
1144                    return ret;
1145                } else {
1146                    cb::net::set_ewouldblock();
1147                    return -1;
1148                }
1149                break;
1150
1151            case SSL_ERROR_ZERO_RETURN:
1152                /* The TLS/SSL connection has been closed (cleanly). */
1153                return 0;
1154
1155            default:
1156                logSslErrorInfo("SSL_read", n);
1157                cb::net::set_econnreset();
1158                return -1;
1159            }
1160        }
1161    }
1162
1163    return ret;
1164}
1165
1166int Connection::sslWrite(const char* src, size_t nbytes) {
1167    // Start by trying to send everything we've already got
1168    // buffered in our bio
1169    ssl.drainBioSendPipe(socketDescriptor);
1170    if (ssl.hasError()) {
1171        cb::net::set_econnreset();
1172        return -1;
1173    }
1174
1175    // If the network socket is full there isn't much point
1176    // of trying to add more to SSL
1177    if (ssl.morePendingOutput()) {
1178        cb::net::set_ewouldblock();
1179        return -1;
1180    }
1181
1182    // We've got an empty buffer for SSL to operate on,
1183    // so lets get to it.
1184    do {
1185        const auto n = ssl.write(src, int(nbytes));
1186        if (n > 0) {
1187            // we've successfully sent some bytes to
1188            // SSL. Lets try to flush it to the network
1189            // socket buffers.
1190            ssl.drainBioSendPipe(socketDescriptor);
1191            if (ssl.hasError()) {
1192                cb::net::set_econnreset();
1193                return -1;
1194            }
1195            // Return the number of bytes submitted to SSL
1196            return n;
1197        } else if (n == 0) {
1198            // Closed connection.
1199            cb::net::set_econnreset();
1200            return -1;
1201        } else {
1202            const int error = ssl.getError(n);
1203            if (error == SSL_ERROR_WANT_WRITE) {
1204                ssl.drainBioSendPipe(socketDescriptor);
1205                if (ssl.morePendingOutput()) {
1206                    cb::net::set_ewouldblock();
1207                    return -1;
1208                }
1209                // We've got space in the network buffer.
1210                // retry the operation (and openssl will
1211                // try to fill the network buffer again)
1212            } else {
1213                // We enountered an error we don't have code
1214                // to handle. Reset the connection
1215                logSslErrorInfo("SSL_write", n);
1216                cb::net::set_econnreset();
1217                return -1;
1218            }
1219        }
1220    } while (true);
1221}
1222
1223void Connection::addMsgHdr(bool reset) {
1224    if (reset) {
1225        msgcurr = 0;
1226        msglist.clear();
1227        iovused = 0;
1228    }
1229
1230    msglist.emplace_back();
1231
1232    struct msghdr& msg = msglist.back();
1233
1234    /* this wipes msg_iovlen, msg_control, msg_controllen, and
1235       msg_flags, the last 3 of which aren't defined on solaris: */
1236    memset(&msg, 0, sizeof(struct msghdr));
1237
1238    msg.msg_iov = &iov.data()[iovused];
1239
1240    msgbytes = 0;
1241    STATS_MAX(this, msgused_high_watermark, gsl::narrow<int>(msglist.size()));
1242}
1243
1244void Connection::addIov(const void* buf, size_t len) {
1245    if (len == 0) {
1246        return;
1247    }
1248
1249    struct msghdr* m = &msglist.back();
1250
1251    /* We may need to start a new msghdr if this one is full. */
1252    if (m->msg_iovlen == IOV_MAX) {
1253        addMsgHdr(false);
1254    }
1255
1256    ensureIovSpace();
1257
1258    // Update 'm' as we may have added an additional msghdr
1259    m = &msglist.back();
1260
1261    m->msg_iov[m->msg_iovlen].iov_base = (void*)buf;
1262    m->msg_iov[m->msg_iovlen].iov_len = len;
1263
1264    msgbytes += len;
1265    ++iovused;
1266    STATS_MAX(this, iovused_high_watermark, gsl::narrow<int>(getIovUsed()));
1267    m->msg_iovlen++;
1268}
1269
1270void Connection::ensureIovSpace() {
1271    if (iovused < iov.size()) {
1272        // There is still size in the list
1273        return;
1274    }
1275
1276    // Try to double the size of the array
1277    iov.resize(iov.size() * 2);
1278
1279    /* Point all the msghdr structures at the new list. */
1280    size_t ii;
1281    int iovnum;
1282    for (ii = 0, iovnum = 0; ii < msglist.size(); ii++) {
1283        msglist[ii].msg_iov = &iov[iovnum];
1284        iovnum += msglist[ii].msg_iovlen;
1285    }
1286}
1287
1288Connection::Connection()
1289    : socketDescriptor(INVALID_SOCKET),
1290      base(nullptr),
1291      sasl_conn(create_new_cbsasl_server_t()),
1292      stateMachine(*this) {
1293    MEMCACHED_CONN_CREATE(this);
1294    updateDescription();
1295    cookies.emplace_back(std::unique_ptr<Cookie>{new Cookie(*this)});
1296    setConnectionId(peername.c_str());
1297}
1298
1299Connection::Connection(SOCKET sfd, event_base* b, const ListeningPort& ifc)
1300    : socketDescriptor(sfd),
1301      base(b),
1302      sasl_conn(create_new_cbsasl_server_t()),
1303      parent_port(ifc.port),
1304      stateMachine(*this) {
1305    MEMCACHED_CONN_CREATE(this);
1306    resolveConnectionName();
1307    setTcpNoDelay(ifc.tcp_nodelay);
1308    updateDescription();
1309    cookies.emplace_back(std::unique_ptr<Cookie>{new Cookie(*this)});
1310    msglist.reserve(MSG_LIST_INITIAL);
1311    iov.resize(IOV_LIST_INITIAL);
1312
1313    if (ifc.ssl.enabled) {
1314        if (!enableSSL(ifc.ssl.cert, ifc.ssl.key)) {
1315            throw std::runtime_error(std::to_string(getId()) +
1316                                     " Failed to enable SSL");
1317        }
1318    }
1319
1320    if (!initializeEvent()) {
1321        throw std::runtime_error("Failed to initialize event structure");
1322    }
1323    setConnectionId(peername.c_str());
1324}
1325
1326Connection::~Connection() {
1327    MEMCACHED_CONN_DESTROY(this);
1328    releaseReservedItems();
1329    for (auto* ptr : temp_alloc) {
1330        cb_free(ptr);
1331    }
1332    if (socketDescriptor != INVALID_SOCKET) {
1333        LOG_DEBUG("{} - Closing socket descriptor", getId());
1334        safe_close(socketDescriptor);
1335    }
1336}
1337
1338void Connection::setState(McbpStateMachine::State next_state) {
1339    stateMachine.setCurrentState(next_state);
1340}
1341
1342void Connection::runStateMachinery() {
1343    if (settings.getVerbose() > 1) {
1344        do {
1345            LOG_INFO("{} - Running task: {}",
1346                     getId(),
1347                     stateMachine.getCurrentStateName());
1348        } while (stateMachine.execute());
1349    } else {
1350        while (stateMachine.execute()) {
1351            // empty
1352        }
1353    }
1354}
1355
1356void Connection::setAgentName(cb::const_char_buffer name) {
1357    auto size = std::min(name.size(), agentName.size() - 1);
1358    std::copy(name.begin(), name.begin() + size, agentName.begin());
1359    agentName[size] = '\0';
1360}
1361
1362void Connection::setConnectionId(cb::const_char_buffer uuid) {
1363    auto size = std::min(uuid.size(), connectionId.size() - 1);
1364    std::copy(uuid.begin(), uuid.begin() + size, connectionId.begin());
1365    // the uuid string shall always be zero terminated
1366    connectionId[size] = '\0';
1367}
1368
1369bool Connection::shouldDelete() {
1370    return getState() == McbpStateMachine::State ::destroyed;
1371}
1372
1373size_t Connection::getNumberOfCookies() const {
1374    size_t ret = 0;
1375    for (const auto& cookie : cookies) {
1376        if (cookie) {
1377            ++ret;
1378        }
1379    }
1380
1381    return ret;
1382}
1383
1384bool Connection::processServerEvents() {
1385    if (server_events.empty()) {
1386        return false;
1387    }
1388
1389    const auto before = getState();
1390
1391    // We're waiting for the next command to arrive from the client
1392    // and we've got a server event to process. Let's start
1393    // processing the server events (which might toggle our state)
1394    if (server_events.front()->execute(*this)) {
1395        server_events.pop();
1396    }
1397
1398    return getState() != before;
1399}
1400
1401void Connection::runEventLoop(short which) {
1402    conn_loan_buffers(this);
1403    currentEvent = which;
1404    numEvents = max_reqs_per_event;
1405
1406    try {
1407        runStateMachinery();
1408    } catch (const std::exception& e) {
1409        bool logged = false;
1410        if (getState() == McbpStateMachine::State::execute) {
1411            try {
1412                // Converting the cookie to json -> string could probably
1413                // cause too much memory allcation. We don't want that to
1414                // cause us to crash..
1415                LOG_WARNING(
1416                        "{}: exception occurred in runloop during packet "
1417                        "execution. Cookie info: {} - closing connection: {}",
1418                        getId(),
1419                        to_string(getCookieObject().toJSON(), false),
1420                        e.what());
1421                logged = true;
1422            } catch (const std::bad_alloc&) {
1423                // none
1424            }
1425        }
1426
1427        if (!logged) {
1428            try {
1429                LOG_WARNING(
1430                        "{}: exception occurred in runloop (state: \"{}\") - "
1431                        "closing connection: {}",
1432                        getId(),
1433                        getStateName(),
1434                        e.what());
1435            } catch (const std::bad_alloc&) {
1436                // Ditch logging.. just shut down the connection
1437            }
1438        }
1439
1440        setState(McbpStateMachine::State::closing);
1441        /*
1442         * In addition to setting the state to conn_closing
1443         * we need to move execution foward by executing
1444         * conn_closing() and the subsequent functions
1445         * i.e. conn_pending_close() or conn_immediate_close()
1446         */
1447        try {
1448            runStateMachinery();
1449        } catch (const std::exception& e) {
1450            try {
1451                LOG_WARNING(
1452                        "{}: exception occurred in runloop whilst"
1453                        " attempting to close connection: {}",
1454                        getId(),
1455                        e.what());
1456            } catch (const std::bad_alloc&) {
1457                // Drop logging
1458            }
1459        }
1460    }
1461
1462    conn_return_buffers(this);
1463}
1464
1465void Connection::close() {
1466    bool ewb = false;
1467    for (auto& cookie : cookies) {
1468        if (cookie) {
1469            if (cookie->isEwouldblock()) {
1470                ewb = true;
1471            }
1472            cookie->reset();
1473        }
1474    }
1475
1476    // We don't want any network notifications anymore..
1477    unregisterEvent();
1478    safe_close(socketDescriptor);
1479    socketDescriptor = INVALID_SOCKET;
1480
1481    // Release all reserved items!
1482    releaseReservedItems();
1483
1484    if (refcount > 1 || ewb) {
1485        setState(McbpStateMachine::State::pending_close);
1486    } else {
1487        setState(McbpStateMachine::State::immediate_close);
1488    }
1489}
1490
1491void Connection::propagateDisconnect() const {
1492    for (auto& cookie : cookies) {
1493        if (cookie) {
1494            perform_callbacks(ON_DISCONNECT, nullptr, cookie.get());
1495        }
1496    }
1497}
1498
1499void Connection::signalIfIdle(bool logbusy, size_t workerthread) {
1500    if (!isEwouldblock() && stateMachine.isIdleState()) {
1501        // Raise a 'fake' write event to ensure the connection has an
1502        // event delivered (for example if its sendQ is full).
1503        if (!registered_in_libevent) {
1504            ev_flags = EV_READ | EV_WRITE | EV_PERSIST;
1505            if (!registerEvent()) {
1506                LOG_WARNING(
1507                        "{}: Connection::signalIfIdle: Unable to "
1508                        "registerEvent.  Setting state to conn_closing",
1509                        getId());
1510                setState(McbpStateMachine::State::closing);
1511            }
1512        } else if (!updateEvent(EV_READ | EV_WRITE | EV_PERSIST)) {
1513            LOG_WARNING(
1514                    "{}: Connection::signalIfIdle: Unable to "
1515                    "updateEvent.  Setting state to conn_closing",
1516                    getId());
1517            setState(McbpStateMachine::State::closing);
1518        }
1519        event_active(&event, EV_WRITE, 0);
1520    } else if (logbusy) {
1521        unique_cJSON_ptr json(toJSON());
1522        auto details = to_string(json, false);
1523        LOG_INFO("Worker thread {}: {}", workerthread, details);
1524    }
1525}
1526
1527void Connection::setPriority(const Connection::Priority& priority) {
1528    Connection::priority = priority;
1529    switch (priority) {
1530    case Priority::High:
1531        max_reqs_per_event =
1532                settings.getRequestsPerEventNotification(EventPriority::High);
1533        return;
1534    case Priority::Medium:
1535        max_reqs_per_event =
1536                settings.getRequestsPerEventNotification(EventPriority::Medium);
1537        return;
1538    case Priority::Low:
1539        max_reqs_per_event =
1540                settings.getRequestsPerEventNotification(EventPriority::Low);
1541        return;
1542    }
1543    throw std::invalid_argument("Unkown priority: " +
1544                                std::to_string(int(priority)));
1545}
1546
1547bool Connection::selectedBucketIsXattrEnabled() const {
1548    if (bucketEngine) {
1549        return settings.isXattrEnabled() &&
1550               bucketEngine->isXattrEnabled(getBucketEngineAsV0());
1551    }
1552    return settings.isXattrEnabled();
1553}
1554