xref: /5.5.2/kv_engine/daemon/memcached.cc (revision 610e6d38)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the BSD license.  See
10 *  the LICENSE file for full text.
11 *
12 *  Authors:
13 *      Anatoly Vorobey <mellon@pobox.com>
14 *      Brad Fitzpatrick <brad@danga.com>
15 */
16#include "config.h"
17#include "memcached.h"
18#include "alloc_hooks.h"
19#include "buckets.h"
20#include "cmdline.h"
21#include "config_parse.h"
22#include "connections.h"
23#include "debug_helpers.h"
24#include "doc_pre_expiry.h"
25#include "enginemap.h"
26#include "executorpool.h"
27#include "ioctl.h"
28#include "libevent_locking.h"
29#include "logger/logger.h"
30#include "mc_time.h"
31#include "mcaudit.h"
32#include "mcbp.h"
33#include "mcbp_executors.h"
34#include "mcbp_topkeys.h"
35#include "mcbp_validators.h"
36#include "mcbpdestroybuckettask.h"
37#include "memcached/audit_interface.h"
38#include "memcached_openssl.h"
39#include "parent_monitor.h"
40#include "protocol/mcbp/engine_wrapper.h"
41#include "runtime.h"
42#include "server_socket.h"
43#include "session_cas.h"
44#include "settings.h"
45#include "stats.h"
46#include "subdocument.h"
47#include "timings.h"
48#include "topkeys.h"
49#include "tracing.h"
50#include "utilities/engine_loader.h"
51#include "utilities/protocol2text.h"
52#include "utilities/terminate_handler.h"
53
54#include <mcbp/mcbp.h>
55#include <memcached/util.h>
56#include <phosphor/phosphor.h>
57#include <platform/cb_malloc.h>
58#include <platform/dirutils.h>
59#include <platform/make_unique.h>
60#include <platform/socket.h>
61#include <platform/strerror.h>
62#include <platform/sysinfo.h>
63#include <utilities/breakpad.h>
64
65#include <signal.h>
66#include <fcntl.h>
67#include <errno.h>
68#include <stdlib.h>
69#include <stdio.h>
70#include <string.h>
71#include <time.h>
72#include <limits.h>
73#include <ctype.h>
74#include <memcached/rbac.h>
75#include <stdarg.h>
76#include <stddef.h>
77#include <snappy-c.h>
78#include <cJSON.h>
79#include <JSON_checker.h>
80#include <engines/default_engine.h>
81#include <vector>
82#include <algorithm>
83#include <cJSON_utils.h>
84
85// MB-14649: log crashing on windows..
86#include <math.h>
87#include <memcached/audit_interface.h>
88#include <memcached/server_api.h>
89
90#include <gsl/gsl>
91
92#if HAVE_LIBNUMA
93#include <numa.h>
94#endif
95
96static EXTENSION_LOG_LEVEL get_log_level(void);
97
98/**
99 * All of the buckets in couchbase is stored in this array.
100 */
101static std::mutex buckets_lock;
102std::array<Bucket, COUCHBASE_MAX_NUM_BUCKETS + 1> all_buckets;
103
104static ENGINE_HANDLE* v1_handle_2_handle(ENGINE_HANDLE_V1* v1) {
105    return reinterpret_cast<ENGINE_HANDLE*>(v1);
106}
107
108const char* getBucketName(const Connection* c) {
109    return all_buckets[c->getBucketIndex()].name;
110}
111
112void bucketsForEach(std::function<bool(Bucket&, void*)> fn, void *arg) {
113    std::lock_guard<std::mutex> all_bucket_lock(buckets_lock);
114    for (Bucket& bucket : all_buckets) {
115        bool do_break = false;
116        std::lock_guard<std::mutex> guard(bucket.mutex);
117        if (bucket.state == BucketState::Ready) {
118            if (!fn(bucket, arg)) {
119                do_break = true;
120            }
121        }
122        if (do_break) {
123            break;
124        }
125    }
126}
127
128std::atomic<bool> memcached_shutdown;
129std::atomic<bool> service_online;
130// Should we enable to common ports (all of the ports which arn't tagged as
131// management ports)
132static std::atomic<bool> enable_common_ports;
133
134std::unique_ptr<ExecutorPool> executorPool;
135
136/* Mutex for global stats */
137std::mutex stats_mutex;
138
139/*
140 * forward declarations
141 */
142static void register_callback(ENGINE_HANDLE *eh,
143                              ENGINE_EVENT_TYPE type,
144                              EVENT_CALLBACK cb, const void *cb_data);
145
146static void create_listen_sockets(bool management);
147
148/* stats */
149static void stats_init(void);
150
151/* defaults */
152static void settings_init(void);
153
154/** exported globals **/
155struct stats stats;
156
157/** file scope variables **/
158std::vector<std::unique_ptr<ServerSocket>> listen_conn;
159static struct event_base *main_base;
160
161static engine_event_handler_array_t engine_event_handlers;
162
163/*
164 * MB-12470 requests an easy way to see when (some of) the statistics
165 * counters were reset. This functions grabs the current time and tries
166 * to format it to the current timezone by using ctime_r/s (which adds
167 * a newline at the end for some obscure reason which we'll need to
168 * strip off).
169 *
170 * This function expects that the stats lock is held by the caller to get
171 * a "sane" result (otherwise one thread may see a garbled version), but
172 * no crash will occur since the buffer is big enough and always zero
173 * terminated.
174 */
175char reset_stats_time[80];
176static void set_stats_reset_time(void)
177{
178    time_t now = time(NULL);
179#ifdef WIN32
180    ctime_s(reset_stats_time, sizeof(reset_stats_time), &now);
181#else
182    ctime_r(&now, reset_stats_time);
183#endif
184    char *ptr = strchr(reset_stats_time, '\n');
185    if (ptr) {
186        *ptr = '\0';
187    }
188}
189
190void disassociate_bucket(Connection& connection) {
191    Bucket& b = all_buckets.at(connection.getBucketIndex());
192    std::lock_guard<std::mutex> guard(b.mutex);
193    b.clients--;
194
195    connection.setBucketIndex(0);
196    connection.setBucketEngine(nullptr);
197
198    if (b.clients == 0 && b.state == BucketState::Destroying) {
199        b.cond.notify_one();
200    }
201}
202
203bool associate_bucket(Connection& connection, const char* name) {
204    bool found = false;
205
206    /* leave the current bucket */
207    disassociate_bucket(connection);
208
209    /* Try to associate with the named bucket */
210    /* @todo add auth checks!!! */
211    for (size_t ii = 1; ii < all_buckets.size() && !found; ++ii) {
212        Bucket &b = all_buckets.at(ii);
213        std::lock_guard<std::mutex> guard(b.mutex);
214        if (b.state == BucketState::Ready && strcmp(b.name, name) == 0) {
215            b.clients++;
216            connection.setBucketIndex(gsl::narrow<int>(ii));
217            connection.setBucketEngine(b.engine);
218            found = true;
219        }
220    }
221
222    if (!found) {
223        /* Bucket not found, connect to the "no-bucket" */
224        Bucket &b = all_buckets.at(0);
225        {
226            std::lock_guard<std::mutex> guard(b.mutex);
227            b.clients++;
228        }
229        connection.setBucketIndex(0);
230        connection.setBucketEngine(b.engine);
231    }
232
233    return found;
234}
235
236void associate_initial_bucket(Connection& connection) {
237    Bucket &b = all_buckets.at(0);
238    {
239        std::lock_guard<std::mutex> guard(b.mutex);
240        b.clients++;
241    }
242
243    connection.setBucketIndex(0);
244    connection.setBucketEngine(b.engine);
245
246    if (is_default_bucket_enabled()) {
247        associate_bucket(connection, "default");
248    }
249}
250
251static void populate_log_level(void*) {
252    // Lock the entire buckets array so that buckets can't be modified while
253    // we notify them (blocking bucket creation/deletion)
254    auto val = get_log_level();
255
256    std::lock_guard<std::mutex> all_bucket_lock(buckets_lock);
257    for (auto& bucket : all_buckets) {
258        std::lock_guard<std::mutex> guard(bucket.mutex);
259        if (bucket.state == BucketState::Ready &&
260            bucket.engine->set_log_level != nullptr) {
261            bucket.engine->set_log_level(reinterpret_cast<ENGINE_HANDLE*>(bucket.engine),
262                                         val);
263        }
264    }
265}
266
267/* Perform all callbacks of a given type for the given connection. */
268void perform_callbacks(ENGINE_EVENT_TYPE type,
269                       const void *data,
270                       const void *void_cookie)
271{
272    cb_thread_t tid;
273
274    switch (type) {
275        /*
276         * The following events operates on a connection which is passed in
277         * as the cookie.
278         */
279    case ON_DISCONNECT: {
280        const auto * cookie = reinterpret_cast<const Cookie *>(void_cookie);
281        if (cookie == nullptr) {
282            throw std::invalid_argument("perform_callbacks: cookie is nullptr");
283        }
284        const auto bucket_idx = cookie->getConnection().getBucketIndex();
285        if (bucket_idx == -1) {
286            throw std::logic_error(
287                    "perform_callbacks: connection (which is " +
288                    std::to_string(cookie->getConnection().getId()) +
289                    ") cannot be "
290                    "disconnected as it is not associated with a bucket");
291        }
292
293        for (auto& handler : all_buckets[bucket_idx].engine_event_handlers[type]) {
294            handler.cb(void_cookie, ON_DISCONNECT, data, handler.cb_data);
295        }
296        break;
297    }
298    case ON_LOG_LEVEL:
299        if (void_cookie != nullptr) {
300            throw std::invalid_argument("perform_callbacks: cookie "
301                "(which is " +
302                std::to_string(reinterpret_cast<uintptr_t>(void_cookie)) +
303                ") should be NULL for ON_LOG_LEVEL");
304        }
305        for (auto& handler : engine_event_handlers[type]) {
306            handler.cb(void_cookie, ON_LOG_LEVEL, data, handler.cb_data);
307        }
308
309        if (service_online) {
310            if (cb_create_thread(&tid, populate_log_level, nullptr, 1) == -1) {
311                LOG_WARNING(
312                        "Failed to create thread to notify engines about "
313                        "changing log level");
314            }
315        }
316        break;
317
318    case ON_DELETE_BUCKET: {
319        /** cookie is the bucket entry */
320        auto* bucket = reinterpret_cast<const Bucket*>(void_cookie);
321        for (auto& handler : bucket->engine_event_handlers[type]) {
322            handler.cb(void_cookie, ON_DELETE_BUCKET, data, handler.cb_data);
323        }
324        break;
325    }
326
327    case ON_INIT_COMPLETE:
328        if ((data != nullptr) || (void_cookie != nullptr)) {
329            throw std::invalid_argument("perform_callbacks: data and cookie"
330                                            " should be nullptr");
331        }
332        enable_common_ports.store(true, std::memory_order_release);
333        notify_dispatcher();
334        break;
335
336    default:
337        throw std::invalid_argument("perform_callbacks: type "
338                "(which is " + std::to_string(type) +
339                "is not a valid ENGINE_EVENT_TYPE");
340    }
341}
342
343static void register_callback(ENGINE_HANDLE *eh,
344                              ENGINE_EVENT_TYPE type,
345                              EVENT_CALLBACK cb,
346                              const void *cb_data)
347{
348    size_t idx;
349    switch (type) {
350    /*
351     * The following events operates on a connection which is passed in
352     * as the cookie.
353     */
354    case ON_DISCONNECT:
355    case ON_DELETE_BUCKET:
356        if (eh == nullptr) {
357            throw std::invalid_argument("register_callback: 'eh' must be non-NULL");
358        }
359        for (idx = 0; idx < all_buckets.size(); ++idx) {
360            if ((void *)eh == (void *)all_buckets[idx].engine) {
361                break;
362            }
363        }
364        if (idx == all_buckets.size()) {
365            throw std::invalid_argument("register_callback: eh (which is " +
366                    std::to_string(reinterpret_cast<uintptr_t>(eh)) +
367                    ") is not a engine associated with a bucket");
368        }
369        all_buckets[idx].engine_event_handlers[type].push_back({cb, cb_data});
370        break;
371
372    case ON_LOG_LEVEL:
373        if (eh != nullptr) {
374            throw std::invalid_argument("register_callback: 'eh' must be NULL");
375        }
376        engine_event_handlers[type].push_back({cb, cb_data});
377        break;
378
379    default:
380        throw std::invalid_argument("register_callback: type (which is " +
381                                    std::to_string(type) +
382                                    ") is not a valid ENGINE_EVENT_TYPE");
383    }
384}
385
386static void free_callbacks() {
387    // free per-bucket callbacks.
388    for (size_t idx = 0; idx < all_buckets.size(); ++idx) {
389        for (auto& type_vec : all_buckets[idx].engine_event_handlers) {
390            type_vec.clear();
391        }
392    }
393
394    // free global callbacks
395    for (auto& type_vec : engine_event_handlers) {
396        type_vec.clear();
397    }
398}
399
400static void stats_init(void) {
401    set_stats_reset_time();
402    stats.conn_structs.reset();
403    stats.total_conns.reset();
404    stats.daemon_conns.reset();
405    stats.rejected_conns.reset();
406    stats.curr_conns.store(0, std::memory_order_relaxed);
407}
408
409struct thread_stats* get_thread_stats(Connection* c) {
410    cb_assert(c->getThread()->index < (settings.getNumWorkerThreads() + 1));
411    auto& independent_stats = all_buckets[c->getBucketIndex()].stats;
412    return &independent_stats.at(c->getThread()->index);
413}
414
415void stats_reset(Cookie& cookie) {
416    {
417        std::lock_guard<std::mutex> guard(stats_mutex);
418        set_stats_reset_time();
419    }
420    stats.total_conns.reset();
421    stats.rejected_conns.reset();
422    threadlocal_stats_reset(cookie.getConnection().getBucket().stats);
423    bucket_reset_stats(cookie);
424}
425
426static size_t get_number_of_worker_threads() {
427    size_t ret;
428    char *override = getenv("MEMCACHED_NUM_CPUS");
429    if (override == NULL) {
430        // No override specified; determine worker thread count based
431        // on the CPU count:
432        //     <5 cores: create 4 workers.
433        //    >5+ cores: create #CPUs * 7/8.
434        ret = Couchbase::get_available_cpu_count();
435
436        if (ret > 4) {
437            ret = (ret * 7) / 8;
438        }
439        if (ret < 4) {
440            ret = 4;
441        }
442    } else {
443        ret = std::stoull(override);
444        if (ret == 0) {
445            ret = 4;
446        }
447    }
448    return ret;
449}
450
451static void breakpad_changed_listener(const std::string&, Settings &s) {
452    cb::breakpad::initialize(s.getBreakpadSettings());
453}
454
455static void ssl_minimum_protocol_changed_listener(const std::string&, Settings &s) {
456    set_ssl_protocol_mask(s.getSslMinimumProtocol());
457}
458
459static void ssl_cipher_list_changed_listener(const std::string&, Settings &s) {
460    set_ssl_cipher_list(s.getSslCipherList());
461}
462
463static void verbosity_changed_listener(const std::string&, Settings &s) {
464    auto logger = cb::logger::get();
465    if (logger) {
466        logger->set_level(cb::logger::convertToSpdSeverity(get_log_level()));
467    }
468
469    perform_callbacks(ON_LOG_LEVEL, NULL, NULL);
470}
471
472static void saslauthd_socketpath_changed_listener(const std::string&, Settings &s) {
473    cb::sasl::saslauthd::set_socketpath(s.getSaslauthdSocketpath());
474}
475
476static void scramsha_fallback_salt_changed_listener(const std::string&,
477                                                    Settings& s) {
478    cb::sasl::set_scramsha_fallback_salt(s.getScramshaFallbackSalt());
479}
480
481static void opcode_attributes_override_changed_listener(const std::string&,
482                                                        Settings& s) {
483    unique_cJSON_ptr json(cJSON_Parse(s.getOpcodeAttributesOverride().c_str()));
484    if (json) {
485        cb::mcbp::sla::reconfigure(settings.getRoot(), *json);
486    } else {
487        cb::mcbp::sla::reconfigure(settings.getRoot());
488    }
489    LOG_INFO("SLA configuration changed to: {}",
490             to_string(cb::mcbp::sla::to_json(), false));
491}
492
493static void interfaces_changed_listener(const std::string&, Settings &s) {
494    for (const auto& ifc : s.getInterfaces()) {
495        auto* port = get_listening_port_instance(ifc.port);
496        if (port != nullptr) {
497            if (port->maxconns != ifc.maxconn) {
498                port->maxconns = ifc.maxconn;
499            }
500
501            if (port->backlog != ifc.backlog) {
502                port->backlog = ifc.backlog;
503            }
504
505            if (port->tcp_nodelay != ifc.tcp_nodelay) {
506                port->tcp_nodelay = ifc.tcp_nodelay;
507            }
508        }
509    }
510    s.calculateMaxconns();
511}
512
513#ifdef HAVE_LIBNUMA
514/** Configure the NUMA policy for memcached. By default will attempt to set to
515 *  interleaved polocy, unless the env var MEMCACHED_NUMA_MEM_POLICY is set to
516 *  'disable'.
517 *  @return A log message describing what action was taken.
518 *
519 */
520static std::string configure_numa_policy() {
521    if (numa_available() != 0) {
522        return "Not available - not setting mem policy.";
523    }
524
525    // Attempt to set the default NUMA memory policy to interleaved,
526    // unless overridden by our env var.
527    const char* mem_policy_env = getenv("MEMCACHED_NUMA_MEM_POLICY");
528    if (mem_policy_env != NULL && strcmp("disable", mem_policy_env) == 0) {
529        return std::string("NOT setting memory allocation policy - disabled "
530                "via MEMCACHED_NUMA_MEM_POLICY='") + mem_policy_env + "'";
531    } else {
532        errno = 0;
533        numa_set_interleave_mask(numa_all_nodes_ptr);
534        if (errno == 0) {
535            return "Set memory allocation policy to 'interleave'";
536        } else {
537            return std::string("NOT setting memory allocation policy to "
538                    "'interleave' - request failed: ") + cb_strerror();
539        }
540    }
541}
542#endif  // HAVE_LIBNUMA
543
544static void settings_init(void) {
545    // Set up the listener functions
546    settings.addChangeListener("breakpad",
547                               breakpad_changed_listener);
548    settings.addChangeListener("ssl_minimum_protocol",
549                               ssl_minimum_protocol_changed_listener);
550    settings.addChangeListener("ssl_cipher_list",
551                               ssl_cipher_list_changed_listener);
552    settings.addChangeListener("verbosity", verbosity_changed_listener);
553    settings.addChangeListener("interfaces", interfaces_changed_listener);
554    settings.addChangeListener("saslauthd_socketpath",
555                               saslauthd_socketpath_changed_listener);
556    settings.addChangeListener("scramsha_fallback_salt",
557                               scramsha_fallback_salt_changed_listener);
558    NetworkInterface default_interface;
559    settings.addInterface(default_interface);
560
561    settings.setBioDrainBufferSize(8192);
562
563    settings.setVerbose(0);
564    settings.setConnectionIdleTime(0); // Connection idle time disabled
565    settings.setNumWorkerThreads(get_number_of_worker_threads());
566    settings.setDatatypeJsonEnabled(true);
567    settings.setDatatypeSnappyEnabled(true);
568    settings.setRequestsPerEventNotification(50, EventPriority::High);
569    settings.setRequestsPerEventNotification(5, EventPriority::Medium);
570    settings.setRequestsPerEventNotification(1, EventPriority::Low);
571    settings.setRequestsPerEventNotification(20, EventPriority::Default);
572
573    /*
574     * The max object size is 20MB. Let's allow packets up to 30MB to
575     * be handled "properly" by returing E2BIG, but packets bigger
576     * than that will cause the server to disconnect the client
577     */
578    settings.setMaxPacketSize(30 * 1024 * 1024);
579
580    settings.setDedupeNmvbMaps(false);
581
582    char *tmp = getenv("MEMCACHED_TOP_KEYS");
583    settings.setTopkeysSize(20);
584    if (tmp != NULL) {
585        int count;
586        if (safe_strtol(tmp, count)) {
587            settings.setTopkeysSize(count);
588        }
589    }
590
591    {
592        // MB-13642 Allow the user to specify the SSL cipher list
593        //    If someone wants to use SSL we should try to be "secure
594        //    by default", and only allow for using strong ciphers.
595        //    Users that may want to use a less secure cipher list
596        //    should be allowed to do so by setting an environment
597        //    variable (since there is no place in the UI to do
598        //    so currently). Whenever ns_server allows for specifying
599        //    the SSL cipher list in the UI, it will be stored
600        //    in memcached.json and override these settings.
601        const char *env = getenv("COUCHBASE_SSL_CIPHER_LIST");
602        if (env == nullptr) {
603            settings.setSslCipherList("HIGH");
604        } else {
605            settings.setSslCipherList(env);
606        }
607    }
608
609    settings.setSslMinimumProtocol("tlsv1");
610    /*
611     * MB-22586
612     * When connecting over SSL there isn't much point of using SCRAM or
613     * any other CPU intensive (and multiple packets exchange between
614     * client and server) to avoid sending the password in clear text as
615     * everything going over SSL should be encrypted anyway.
616     */
617    if (getenv("COUCHBASE_I_DONT_TRUST_SSL") == nullptr) {
618        settings.setSslSaslMechanisms("PLAIN");
619    }
620
621    if (getenv("COUCHBASE_ENABLE_PRIVILEGE_DEBUG") != nullptr) {
622        settings.setPrivilegeDebug(true);
623    }
624
625    settings.setTopkeysEnabled(true);
626}
627
628/**
629 * The config file may have altered some of the default values we're
630 * caching in other variables. This is the place where we'd propagate
631 * such changes.
632 *
633 * This is also the place to initialize any additional files needed by
634 * Memcached.
635 */
636static void update_settings_from_config(void)
637{
638    std::string root(DESTINATION_ROOT);
639
640    if (!settings.getRoot().empty()) {
641        root = settings.getRoot().c_str();
642    }
643
644    if (settings.getErrorMapsDir().empty()) {
645        // Set the error map dir.
646        std::string error_maps_dir(root + "/etc/couchbase/kv/error_maps");
647        cb::io::sanitizePath(error_maps_dir);
648        if (cb::io::isDirectory(error_maps_dir)) {
649            settings.setErrorMapsDir(error_maps_dir);
650        }
651    }
652
653    // If the user didn't set a socket path, use the default
654    if (settings.getSaslauthdSocketpath().empty()) {
655        const char* path = getenv("CBAUTH_SOCKPATH");
656        if (path == nullptr) {
657            path = "/var/run/saslauthd/mux";
658        }
659
660        settings.setSaslauthdSocketpath(path);
661    }
662
663    try {
664        cb::mcbp::sla::reconfigure(root);
665    } catch (const std::exception& e) {
666        FATAL_ERROR(EXIT_FAILURE, e.what());
667    }
668
669    settings.addChangeListener("opcode_attributes_override",
670                               opcode_attributes_override_changed_listener);
671}
672
673struct {
674    std::mutex mutex;
675    bool disabled;
676    ssize_t count;
677    uint64_t num_disable;
678} listen_state;
679
680bool is_listen_disabled(void) {
681    std::lock_guard<std::mutex> guard(listen_state.mutex);
682    return listen_state.disabled;
683}
684
685uint64_t get_listen_disabled_num(void) {
686    std::lock_guard<std::mutex> guard(listen_state.mutex);
687    return listen_state.num_disable;
688}
689
690void disable_listen() {
691    {
692        std::lock_guard<std::mutex> guard(listen_state.mutex);
693        listen_state.disabled = true;
694        listen_state.count = 10;
695        ++listen_state.num_disable;
696    }
697
698    for (auto& connection : listen_conn) {
699        connection->disable();
700    }
701}
702
703void safe_close(SOCKET sfd) {
704    if (sfd != INVALID_SOCKET) {
705        int rval;
706
707        do {
708            rval = evutil_closesocket(sfd);
709        } while (rval == SOCKET_ERROR &&
710                 cb::net::is_interrupted(cb::net::get_socket_error()));
711
712        if (rval == SOCKET_ERROR) {
713            std::string error = cb_strerror();
714            LOG_WARNING("Failed to close socket {} ({})!!", (int)sfd, error);
715        } else {
716            stats.curr_conns.fetch_sub(1, std::memory_order_relaxed);
717            if (is_listen_disabled()) {
718                notify_dispatcher();
719            }
720        }
721    }
722}
723
724bucket_id_t get_bucket_id(gsl::not_null<const void*> void_cookie) {
725    /* @todo fix this. Currently we're using the index as the id,
726     * but this should be changed to be a uniqe ID that won't be
727     * reused.
728     */
729    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
730    return bucket_id_t(cookie->getConnection().getBucketIndex());
731}
732
733uint64_t get_connection_id(gsl::not_null<const void*> void_cookie) {
734    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
735    return uint64_t(&cookie->getConnection());
736}
737
738std::pair<uint32_t, std::string> cookie_get_log_info(
739        gsl::not_null<const void*> void_cookie) {
740    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
741    return std::make_pair(cookie->getConnection().getId(),
742                          cookie->getConnection().getDescription());
743}
744
745void cookie_set_error_context(gsl::not_null<void*> void_cookie,
746                              cb::const_char_buffer message) {
747    auto* cookie = reinterpret_cast<Cookie*>(void_cookie.get());
748    cookie->setErrorContext(to_string(message));
749}
750
751/**
752 * Check if the cookie holds the privilege
753 *
754 * @param void_cookie this is the cookie passed down to the engine.
755 * @param privilege The privilege to check for
756 * @return if the privilege is held or not (or if the privilege data is stale)
757 */
758static cb::rbac::PrivilegeAccess check_privilege(
759        gsl::not_null<const void*> void_cookie,
760        const cb::rbac::Privilege privilege) {
761    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
762    return cookie->getConnection().checkPrivilege(privilege,
763                                                  const_cast<Cookie&>(*cookie));
764}
765
766static protocol_binary_response_status engine_error2mcbp(
767        gsl::not_null<const void*> void_cookie, ENGINE_ERROR_CODE code) {
768    const auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
769    auto& connection = cookie->getConnection();
770
771    ENGINE_ERROR_CODE status = connection.remapErrorCode(code);
772    if (status == ENGINE_DISCONNECT) {
773        throw cb::engine_error(
774                cb::engine_errc::disconnect,
775                "engine_error2mcbp: " + std::to_string(connection.getId()) +
776                        ": Disconnect client");
777    }
778
779    return engine_error_2_mcbp_protocol_error(status);
780}
781
782static ENGINE_ERROR_CODE pre_link_document(
783        gsl::not_null<const void*> void_cookie, item_info& info) {
784    // Sanity check that people aren't calling the method with a bogus
785    // cookie
786    auto* cookie =
787            reinterpret_cast<Cookie*>(const_cast<void*>(void_cookie.get()));
788
789    auto* context = cookie->getCommandContext();
790    if (context != nullptr) {
791        return context->pre_link_document(info);
792    }
793
794    return ENGINE_SUCCESS;
795}
796
797static cJSON* get_bucket_details_UNLOCKED(const Bucket& bucket, size_t idx) {
798    if (bucket.state == BucketState::None) {
799        return nullptr;
800    }
801
802    cJSON *root = cJSON_CreateObject();
803    cJSON_AddNumberToObject(root, "index", idx);
804    switch (bucket.state.load()) {
805    case BucketState::None:
806        cJSON_AddStringToObject(root, "state", "none");
807        break;
808    case BucketState::Creating:
809        cJSON_AddStringToObject(root, "state", "creating");
810        break;
811    case BucketState::Initializing:
812        cJSON_AddStringToObject(root, "state", "initializing");
813        break;
814    case BucketState::Ready:
815        cJSON_AddStringToObject(root, "state", "ready");
816        break;
817    case BucketState::Stopping:
818        cJSON_AddStringToObject(root, "state", "stopping");
819        break;
820    case BucketState::Destroying:
821        cJSON_AddStringToObject(root, "state", "destroying");
822        break;
823    }
824
825    cJSON_AddNumberToObject(root, "clients", bucket.clients);
826    cJSON_AddStringToObject(root, "name", bucket.name);
827
828    switch (bucket.type) {
829    case BucketType::Unknown:
830        cJSON_AddStringToObject(root, "type", "<<unknown>>");
831        break;
832    case BucketType::NoBucket:
833        cJSON_AddStringToObject(root, "type", "no bucket");
834        break;
835    case BucketType::Memcached:
836        cJSON_AddStringToObject(root, "type", "memcached");
837        break;
838    case BucketType::Couchstore:
839        cJSON_AddStringToObject(root, "type", "couchstore");
840        break;
841    case BucketType::EWouldBlock:
842        cJSON_AddStringToObject(root, "type", "ewouldblock");
843        break;
844    }
845
846    return root;
847}
848
849cJSON *get_bucket_details(size_t idx)
850{
851    cJSON* ret;
852    Bucket &bucket = all_buckets.at(idx);
853    std::lock_guard<std::mutex> guard(bucket.mutex);
854    ret = get_bucket_details_UNLOCKED(bucket, idx);
855
856    return ret;
857}
858
859/**
860 * Check if the associated bucket is dying or not. There is two reasons
861 * for why a bucket could be dying: It is currently being deleted, or
862 * someone initiated a shutdown process.
863 */
864bool is_bucket_dying(Connection& c) {
865    bool disconnect = memcached_shutdown;
866    Bucket& b = all_buckets.at(c.getBucketIndex());
867
868    if (b.state != BucketState::Ready) {
869        disconnect = true;
870    }
871
872    if (disconnect) {
873        LOG_INFO(
874                "{}: The connected bucket is being deleted.. closing "
875                "connection {}",
876                c.getId(),
877                c.getDescription());
878        c.setState(McbpStateMachine::State::closing);
879        return true;
880    }
881
882    return false;
883}
884
885void event_handler(evutil_socket_t fd, short which, void *arg) {
886    auto* c = reinterpret_cast<Connection*>(arg);
887    if (c == nullptr) {
888        LOG_WARNING("event_handler: connection must be non-NULL");
889        return;
890    }
891
892    auto *thr = c->getThread();
893    if (thr == nullptr) {
894        LOG_WARNING(
895                "Internal error - connection without a thread found. - "
896                "ignored");
897        return;
898    }
899
900    // Remove the list from the list of pending io's (in case the
901    // object was scheduled to run in the dispatcher before the
902    // callback for the worker thread is executed.
903    //
904    {
905        std::lock_guard<std::mutex> lock(thr->pending_io.mutex);
906        thr->pending_io.map.erase(c);
907    }
908
909    TRACE_LOCKGUARD_TIMED(thr->mutex,
910                          "mutex",
911                          "event_handler::threadLock",
912                          SlowMutexThreshold);
913
914    if (memcached_shutdown) {
915        // Someone requested memcached to shut down.
916        if (signal_idle_clients(thr, -1, false) == 0) {
917            cb_assert(thr != nullptr);
918            LOG_INFO("Stopping worker thread {}", thr->index);
919            c->eventBaseLoopbreak();
920            return;
921        }
922    }
923
924    /* sanity */
925    cb_assert(fd == c->getSocketDescriptor());
926
927    if ((which & EV_TIMEOUT) == EV_TIMEOUT) {
928        if (c->isInternal() || c->isDCP()) {
929            if (c->isInternal()) {
930                LOG_INFO("{}: Timeout for admin connection. (ignore)",
931                         c->getId());
932            } else if (c->isDCP()) {
933                LOG_INFO("{}: Timeout for DCP connection. (ignore)",
934                         c->getId());
935            }
936            if (!c->reapplyEventmask()) {
937                c->setState(McbpStateMachine::State::closing);
938            }
939        } else {
940            LOG_INFO("{}: Shutting down idle client {}",
941                     c->getId(),
942                     c->getDescription());
943            c->setState(McbpStateMachine::State::closing);
944        }
945    }
946
947    run_event_loop(c, which);
948
949    if (memcached_shutdown) {
950        // Someone requested memcached to shut down. If we don't have
951        // any connections bound to this thread we can just shut down
952        int connected = signal_idle_clients(thr, -1, true);
953        if (connected == 0) {
954            LOG_INFO("Stopping worker thread {}", thr->index);
955            event_base_loopbreak(thr->base);
956        } else {
957            LOG_INFO("Waiting for {} connected clients on worker thread {}",
958                     connected,
959                     thr->index);
960        }
961    }
962}
963
964/**
965 * The listen_event_handler is the callback from libevent when someone is
966 * connecting to one of the server sockets. It runs in the context of the
967 * listen thread
968 */
969void listen_event_handler(evutil_socket_t, short which, void *arg) {
970    auto& c = *reinterpret_cast<ServerSocket*>(arg);
971
972    if (memcached_shutdown) {
973        // Someone requested memcached to shut down. The listen thread should
974        // be stopped immediately to avoid new connections
975        LOG_INFO("Stopping listen thread");
976        event_base_loopbreak(main_base);
977        return;
978    }
979
980    try {
981        c.acceptNewClient();
982    } catch (std::invalid_argument& e) {
983        LOG_WARNING("{}: exception occurred while accepting clients: {}",
984                    c.getSocket(),
985                    e.what());
986    }
987}
988
989static void dispatch_event_handler(evutil_socket_t fd, short, void *) {
990    char buffer[80];
991    ssize_t nr = cb::net::recv(fd, buffer, sizeof(buffer), 0);
992
993    if (enable_common_ports.load()) {
994        enable_common_ports.store(false);
995        create_listen_sockets(false);
996        LOG_INFO("Initialization complete. Accepting clients.");
997    }
998
999    if (nr != -1 && is_listen_disabled()) {
1000        bool enable = false;
1001        {
1002            std::lock_guard<std::mutex> guard(listen_state.mutex);
1003            listen_state.count -= nr;
1004            if (listen_state.count <= 0) {
1005                enable = true;
1006                listen_state.disabled = false;
1007            }
1008        }
1009        if (enable) {
1010            for (auto& connection : listen_conn) {
1011                connection->enable();
1012            }
1013        }
1014    }
1015}
1016
1017/*
1018 * Sets a socket's send buffer size to the maximum allowed by the system.
1019 */
1020static void maximize_sndbuf(const SOCKET sfd) {
1021    socklen_t intsize = sizeof(int);
1022    int last_good = 0;
1023    int old_size;
1024
1025    /* Start with the default size. */
1026    if (cb::net::getsockopt(sfd,
1027                            SOL_SOCKET,
1028                            SO_SNDBUF,
1029                            reinterpret_cast<void*>(&old_size),
1030                            &intsize) != 0) {
1031        LOG_WARNING("getsockopt(SO_SNDBUF): {}", strerror(errno));
1032        return;
1033    }
1034
1035    /* Binary-search for the real maximum. */
1036    int min = old_size;
1037    int max = MAX_SENDBUF_SIZE;
1038
1039    while (min <= max) {
1040        int avg = ((unsigned int)(min + max)) / 2;
1041        if (cb::net::setsockopt(sfd,
1042                                SOL_SOCKET,
1043                                SO_SNDBUF,
1044                                reinterpret_cast<void*>(&avg),
1045                                intsize) == 0) {
1046            last_good = avg;
1047            min = avg + 1;
1048        } else {
1049            max = avg - 1;
1050        }
1051    }
1052
1053    LOG_DEBUG("<{} send buffer was {}, now {}", sfd, old_size, last_good);
1054}
1055
1056static SOCKET new_server_socket(struct addrinfo *ai, bool tcp_nodelay) {
1057    SOCKET sfd;
1058
1059    sfd = cb::net::socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
1060    if (sfd == INVALID_SOCKET) {
1061        return INVALID_SOCKET;
1062    }
1063
1064    if (evutil_make_socket_nonblocking(sfd) == -1) {
1065        safe_close(sfd);
1066        return INVALID_SOCKET;
1067    }
1068
1069    maximize_sndbuf(sfd);
1070
1071    const struct linger ling = {0, 0};
1072    const int flags = 1;
1073    int error;
1074
1075#ifdef IPV6_V6ONLY
1076    if (ai->ai_family == AF_INET6) {
1077        error = cb::net::setsockopt(sfd,
1078                                    IPPROTO_IPV6,
1079                                    IPV6_V6ONLY,
1080                                    reinterpret_cast<const void*>(&flags),
1081                                    sizeof(flags));
1082        if (error != 0) {
1083            LOG_WARNING("setsockopt(IPV6_V6ONLY): {}", strerror(errno));
1084            safe_close(sfd);
1085            return INVALID_SOCKET;
1086        }
1087    }
1088#endif
1089
1090    if (cb::net::setsockopt(sfd,
1091                            SOL_SOCKET,
1092                            SO_REUSEADDR,
1093                            reinterpret_cast<const void*>(&flags),
1094                            sizeof(flags)) != 0) {
1095        LOG_WARNING("setsockopt(SO_REUSEADDR): {}",
1096                    cb_strerror(cb::net::get_socket_error()));
1097    }
1098
1099    if (cb::net::setsockopt(sfd,
1100                            SOL_SOCKET,
1101                            SO_KEEPALIVE,
1102                            reinterpret_cast<const void*>(&flags),
1103                            sizeof(flags)) != 0) {
1104        LOG_WARNING("setsockopt(SO_KEEPALIVE): {}",
1105                    cb_strerror(cb::net::get_socket_error()));
1106    }
1107
1108    if (cb::net::setsockopt(sfd,
1109                            SOL_SOCKET,
1110                            SO_LINGER,
1111                            reinterpret_cast<const char*>(&ling),
1112                            sizeof(ling)) != 0) {
1113        LOG_WARNING("setsockopt(SO_LINGER): {}",
1114                    cb_strerror(cb::net::get_socket_error()));
1115    }
1116
1117    if (tcp_nodelay) {
1118        if (cb::net::setsockopt(sfd,
1119                                IPPROTO_TCP,
1120                                TCP_NODELAY,
1121                                reinterpret_cast<const void*>(&flags),
1122                                sizeof(flags)) != 0) {
1123            LOG_WARNING("setsockopt(TCP_NODELAY): {}",
1124                        cb_strerror(cb::net::get_socket_error()));
1125        }
1126    }
1127
1128    return sfd;
1129}
1130
1131/**
1132 * Add a port to the list of interfaces we're listening to.
1133 *
1134 * We're supporting binding to the port number "0" to have the operating
1135 * system pick an available port we may use (and we'll report it back to
1136 * the user through the portnumber file.). If we have knowledge of the port,
1137 * update the port descriptor (ip4/ip6), if not go ahead and create a new entry
1138 *
1139 * @param interf the interface description used to create the port
1140 * @param port the port number in use
1141 * @param family the address family for the port
1142 */
1143static void add_listening_port(const NetworkInterface *interf, in_port_t port, sa_family_t family) {
1144    std::lock_guard<std::mutex> guard(stats_mutex);
1145    auto *descr = get_listening_port_instance(port);
1146
1147    if (descr == nullptr) {
1148        ListeningPort newport(port,
1149                              interf->host,
1150                              interf->tcp_nodelay,
1151                              interf->backlog,
1152                              interf->management);
1153
1154        newport.curr_conns = 1;
1155        newport.maxconns = interf->maxconn;
1156
1157        if (interf->ssl.key.empty() || interf->ssl.cert.empty()) {
1158            newport.ssl.enabled = false;
1159        } else {
1160            newport.ssl.enabled = true;
1161            newport.ssl.key = interf->ssl.key;
1162            newport.ssl.cert = interf->ssl.cert;
1163        }
1164
1165        if (family == AF_INET) {
1166            newport.ipv4 = true;
1167            newport.ipv6 = false;
1168        } else if (family == AF_INET6) {
1169            newport.ipv4 = false;
1170            newport.ipv6 = true;
1171        }
1172
1173        stats.listening_ports.push_back(newport);
1174    } else {
1175        if (family == AF_INET) {
1176            descr->ipv4 = true;
1177        } else if (family == AF_INET6) {
1178            descr->ipv6 = true;
1179        }
1180        ++descr->curr_conns;
1181    }
1182}
1183
1184/**
1185 * Create a socket and bind it to a specific port number
1186 * @param interface the interface to bind to
1187 * @param true if we was able to set up at least one address on the interface
1188 *        false if we failed to set any addresses on the interface
1189 */
1190static bool server_socket(const NetworkInterface& interf) {
1191    SOCKET sfd;
1192    addrinfo hints = {};
1193
1194    // Set to true when we create an IPv4 interface
1195    bool ipv4 = false;
1196    // Set to true when we create an IPv6 interface
1197    bool ipv6 = false;
1198
1199    hints.ai_flags = AI_PASSIVE;
1200    hints.ai_protocol = IPPROTO_TCP;
1201    hints.ai_socktype = SOCK_STREAM;
1202
1203    if (interf.ipv4 && interf.ipv6) {
1204        hints.ai_family = AF_UNSPEC;
1205    } else if (interf.ipv4) {
1206        hints.ai_family = AF_INET;
1207    } else if (interf.ipv6) {
1208        hints.ai_family = AF_INET6;
1209    } else {
1210        throw std::invalid_argument(
1211                "server_socket: can't create a socket without IPv4 or IPv6");
1212    }
1213
1214    std::string port_buf = std::to_string(interf.port);
1215
1216    const char* host = nullptr;
1217    if (!interf.host.empty() && interf.host != "*") {
1218        host = interf.host.c_str();
1219    }
1220
1221    struct addrinfo *ai;
1222    int error = getaddrinfo(host, port_buf.c_str(), &hints, &ai);
1223    if (error != 0) {
1224#ifdef WIN32
1225        LOG_WARNING("getaddrinfo(): {}", cb_strerror(error));
1226#else
1227        if (error != EAI_SYSTEM) {
1228            LOG_WARNING("getaddrinfo(): {}", gai_strerror(error));
1229        } else {
1230            LOG_WARNING("getaddrinfo(): {}", cb_strerror(error));
1231        }
1232#endif
1233        return false;
1234    }
1235
1236    // getaddrinfo may return multiple entries for a given name/port pair.
1237    // Iterate over all of them and try to set up a listen object.
1238    // We need at least _one_ entry per requested configuration (IPv4/6) in
1239    // order to call it a success.
1240    for (struct addrinfo* next = ai; next; next = next->ai_next) {
1241        if ((sfd = new_server_socket(next, interf.tcp_nodelay)) ==
1242            INVALID_SOCKET) {
1243            // getaddrinfo can return "junk" addresses,
1244            continue;
1245        }
1246
1247        in_port_t listenport = 0;
1248        if (bind(sfd, next->ai_addr, (socklen_t)next->ai_addrlen) == SOCKET_ERROR) {
1249            LOG_WARNING("Failed to bind to address: {}",
1250                        cb_strerror(cb::net::get_socket_error()));
1251            safe_close(sfd);
1252            continue;
1253        }
1254
1255        // We've configured this port.
1256        if (next->ai_addr->sa_family == AF_INET) {
1257            // We have at least one entry
1258            ipv4 = true;
1259        } else if (next->ai_addr->sa_family == AF_INET6) {
1260            // We have at least one entry
1261            ipv6 = true;
1262        }
1263
1264        if (next->ai_addr->sa_family == AF_INET ||
1265             next->ai_addr->sa_family == AF_INET6) {
1266            union {
1267                struct sockaddr_in in;
1268                struct sockaddr_in6 in6;
1269            } my_sockaddr;
1270            socklen_t len = sizeof(my_sockaddr);
1271            if (getsockname(sfd, (struct sockaddr*)&my_sockaddr, &len)==0) {
1272                if (next->ai_addr->sa_family == AF_INET) {
1273                    listenport = ntohs(my_sockaddr.in.sin_port);
1274                } else {
1275                    listenport = ntohs(my_sockaddr.in6.sin6_port);
1276                }
1277            }
1278        }
1279
1280        listen_conn.emplace_back(std::make_unique<ServerSocket>(
1281                sfd, main_base, listenport, next->ai_addr->sa_family, interf));
1282        stats.daemon_conns++;
1283        stats.curr_conns.fetch_add(1, std::memory_order_relaxed);
1284        add_listening_port(&interf, listenport, next->ai_addr->sa_family);
1285    }
1286
1287    freeaddrinfo(ai);
1288
1289    if (interf.ipv4 && !ipv4) {
1290        // Failed to create an IPv4 port
1291        LOG_CRITICAL(R"(Failed to create IPv4 port for "{}:{}")",
1292                     interf.host.empty() ? "*" : interf.host,
1293                     interf.port);
1294    }
1295
1296    if (interf.ipv6 && !ipv6) {
1297        // Failed to create an IPv6 oprt
1298        LOG_CRITICAL(R"(Failed to create IPv6 port for "{}:{}")",
1299                     interf.host.empty() ? "*" : interf.host,
1300                     interf.port);
1301    }
1302
1303    // Return success as long as we managed to create a listening port
1304    // for at least one protocol.
1305    return ipv4 || ipv6;
1306}
1307
1308static bool server_sockets(bool management) {
1309    bool success = true;
1310
1311    if (management) {
1312        LOG_INFO("Enable management port(s)");
1313    } else {
1314        LOG_INFO("Enable user port(s)");
1315    }
1316
1317    for (auto& interface : settings.getInterfaces()) {
1318        if (management && interface.management) {
1319            if (!server_socket(interface)) {
1320                success = false;
1321            }
1322        } else if (!management && !interface.management) {
1323            if (!server_socket(interface)) {
1324                success = false;
1325            }
1326        }
1327    }
1328
1329    return success;
1330}
1331
1332static void create_listen_sockets(bool management) {
1333    if (!server_sockets(management)) {
1334        FATAL_ERROR(EX_OSERR, "Failed to create listening socket(s)");
1335    }
1336
1337    if (management) {
1338        // the client is not expecting us to update the port set at
1339        // later time, so enable all ports immediately
1340        if (!server_sockets(false)) {
1341            FATAL_ERROR(EX_OSERR, "Failed to create listening socket(s)");
1342        }
1343    }
1344
1345    const char* portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
1346    if (portnumber_filename != nullptr) {
1347        std::string temp_portnumber_filename;
1348        temp_portnumber_filename.assign(portnumber_filename);
1349        temp_portnumber_filename.append(".lck");
1350
1351        FILE* portnumber_file = nullptr;
1352        portnumber_file = fopen(temp_portnumber_filename.c_str(), "a");
1353        if (portnumber_file == nullptr) {
1354            FATAL_ERROR(EX_OSERR,
1355                        R"(Failed to open "{}": {})",
1356                        temp_portnumber_filename,
1357                        strerror(errno));
1358        }
1359
1360        unique_cJSON_ptr array(cJSON_CreateArray());
1361
1362        for (const auto& connection : listen_conn) {
1363            cJSON_AddItemToArray(array.get(),
1364                                 connection->getDetails().release());
1365        }
1366
1367        unique_cJSON_ptr root(cJSON_CreateObject());
1368        cJSON_AddItemToObject(root.get(), "ports", array.release());
1369        fprintf(portnumber_file, "%s\n", to_string(root, true).c_str());
1370        fclose(portnumber_file);
1371        LOG_INFO("Port numbers available in {}", portnumber_filename);
1372        if (rename(temp_portnumber_filename.c_str(), portnumber_filename) == -1) {
1373            FATAL_ERROR(EX_OSERR,
1374                        R"(Failed to rename "{}" to "{}": {})",
1375                        temp_portnumber_filename.c_str(),
1376                        portnumber_filename,
1377                        strerror(errno));
1378        }
1379    }
1380}
1381
1382#ifdef WIN32
1383// Unfortunately we don't have signal handlers on windows
1384static bool install_signal_handlers() {
1385    return true;
1386}
1387
1388static void release_signal_handlers() {
1389}
1390#else
1391
1392#ifndef HAVE_SIGIGNORE
1393static int sigignore(int sig) {
1394    struct sigaction sa;
1395    memset(&sa, 0, sizeof(sa));
1396    sa.sa_handler = SIG_IGN;
1397
1398    if (sigemptyset(&sa.sa_mask) == -1 || sigaction(sig, &sa, 0) == -1) {
1399        return -1;
1400    }
1401    return 0;
1402}
1403#endif /* !HAVE_SIGIGNORE */
1404
1405
1406static void sigterm_handler(evutil_socket_t, short, void *) {
1407    shutdown_server();
1408}
1409
1410static struct event* sigusr1_event;
1411static struct event* sigterm_event;
1412static struct event* sigint_event;
1413
1414static bool install_signal_handlers() {
1415    // SIGUSR1 - Used to dump connection stats
1416    sigusr1_event = evsignal_new(main_base, SIGUSR1,
1417                                 dump_connection_stat_signal_handler,
1418                                 nullptr);
1419    if (sigusr1_event == nullptr) {
1420        LOG_WARNING("Failed to allocate SIGUSR1 handler");
1421        return false;
1422    }
1423
1424    if (event_add(sigusr1_event, nullptr) < 0) {
1425        LOG_WARNING("Failed to install SIGUSR1 handler");
1426        return false;
1427
1428    }
1429
1430    // SIGTERM - Used to shut down memcached cleanly
1431    sigterm_event = evsignal_new(main_base, SIGTERM, sigterm_handler, NULL);
1432    if (sigterm_event == NULL) {
1433        LOG_WARNING("Failed to allocate SIGTERM handler");
1434        return false;
1435    }
1436
1437    if (event_add(sigterm_event, NULL) < 0) {
1438        LOG_WARNING("Failed to install SIGTERM handler");
1439        return false;
1440    }
1441
1442    // SIGINT - Used to shut down memcached cleanly
1443    sigint_event = evsignal_new(main_base, SIGINT, sigterm_handler, NULL);
1444    if (sigint_event == NULL) {
1445        LOG_WARNING("Failed to allocate SIGINT handler");
1446        return false;
1447    }
1448
1449    if (event_add(sigint_event, NULL) < 0) {
1450        LOG_WARNING("Failed to install SIGINT handler");
1451        return false;
1452    }
1453
1454    return true;
1455}
1456
1457static void release_signal_handlers() {
1458    event_free(sigusr1_event);
1459    event_free(sigint_event);
1460    event_free(sigterm_event);
1461}
1462#endif
1463
1464const char* get_server_version(void) {
1465    if (strlen(PRODUCT_VERSION) == 0) {
1466        return "unknown";
1467    } else {
1468        return PRODUCT_VERSION;
1469    }
1470}
1471
1472static void store_engine_specific(gsl::not_null<const void*> void_cookie,
1473                                  void* engine_data) {
1474    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1475    cookie->getConnection().setEngineStorage(engine_data);
1476}
1477
1478static void* get_engine_specific(gsl::not_null<const void*> void_cookie) {
1479    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1480    return cookie->getConnection().getEngineStorage();
1481}
1482
1483static bool is_datatype_supported(gsl::not_null<const void*> void_cookie,
1484                                  protocol_binary_datatype_t datatype) {
1485    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1486    return cookie->getConnection().isDatatypeEnabled(datatype);
1487}
1488
1489static bool is_mutation_extras_supported(
1490        gsl::not_null<const void*> void_cookie) {
1491    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1492    return cookie->getConnection().isSupportsMutationExtras();
1493}
1494
1495static bool is_collections_supported(gsl::not_null<const void*> void_cookie) {
1496    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1497    return cookie->getConnection().isCollectionsSupported();
1498}
1499
1500static uint8_t get_opcode_if_ewouldblock_set(
1501        gsl::not_null<const void*> void_cookie) {
1502    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1503
1504    uint8_t opcode = PROTOCOL_BINARY_CMD_INVALID;
1505    if (cookie->isEwouldblock()) {
1506        try {
1507            opcode = cookie->getHeader().getOpcode();
1508        } catch (...) {
1509            // Don't barf out if the header isn't there
1510        }
1511    }
1512    return opcode;
1513}
1514
1515static bool validate_session_cas(const uint64_t cas) {
1516    return session_cas.increment_session_counter(cas);
1517}
1518
1519static void decrement_session_ctr(void) {
1520    session_cas.decrement_session_counter();
1521}
1522
1523static ENGINE_ERROR_CODE reserve_cookie(
1524        gsl::not_null<const void*> void_cookie) {
1525    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1526
1527    cookie->getConnection().incrementRefcount();
1528    return ENGINE_SUCCESS;
1529}
1530
1531static ENGINE_ERROR_CODE release_cookie(
1532        gsl::not_null<const void*> void_cookie) {
1533    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1534
1535    auto* c = &cookie->getConnection();
1536    int notify;
1537    LIBEVENT_THREAD *thr;
1538
1539    thr = c->getThread();
1540    cb_assert(thr);
1541
1542    {
1543        TRACE_LOCKGUARD_TIMED(thr->mutex,
1544                              "mutex",
1545                              "release_cookie::threadLock",
1546                              SlowMutexThreshold);
1547
1548        c->decrementRefcount();
1549
1550        /* Releasing the refererence to the object may cause it to change
1551         * state. (NOTE: the release call shall never be called from the
1552         * worker threads), so should put the connection in the pool of
1553         * pending IO and have the system retry the operation for the
1554         * connection
1555         */
1556        notify = add_conn_to_pending_io_list(c, ENGINE_SUCCESS);
1557    }
1558
1559    /* kick the thread in the butt */
1560    if (notify) {
1561        notify_thread(*thr);
1562    }
1563
1564    return ENGINE_SUCCESS;
1565}
1566
1567static void cookie_set_priority(gsl::not_null<const void*> void_cookie,
1568                                CONN_PRIORITY priority) {
1569    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1570
1571    auto* c = &cookie->getConnection();
1572    switch (priority) {
1573    case CONN_PRIORITY_HIGH:
1574        c->setPriority(Connection::Priority::High);
1575        return;
1576    case CONN_PRIORITY_MED:
1577        c->setPriority(Connection::Priority::Medium);
1578        return;
1579    case CONN_PRIORITY_LOW:
1580        c->setPriority(Connection::Priority::Low);
1581        return;
1582    }
1583
1584    LOG_WARNING(
1585            "{}: cookie_set_priority: priority (which is {}) is not a "
1586            "valid CONN_PRIORITY - closing connection {}",
1587            c->getId(),
1588            priority,
1589            c->getDescription());
1590    c->setState(McbpStateMachine::State::closing);
1591}
1592
1593static void count_eviction(gsl::not_null<const void*>, const void*, int) {
1594}
1595
1596static size_t get_max_item_iovec_size() {
1597    return 1;
1598}
1599
1600static std::condition_variable shutdown_cv;
1601static std::mutex shutdown_cv_mutex;
1602static bool memcached_can_shutdown = false;
1603void shutdown_server(void) {
1604
1605    std::unique_lock<std::mutex> lk(shutdown_cv_mutex);
1606    if (!memcached_can_shutdown) {
1607        // log and proceed to wait shutdown
1608        LOG_INFO("shutdown_server waiting for can_shutdown signal");
1609        shutdown_cv.wait(lk, []{return memcached_can_shutdown;});
1610    }
1611    memcached_shutdown = true;
1612    LOG_INFO("Received shutdown request");
1613    event_base_loopbreak(main_base);
1614}
1615
1616void enable_shutdown(void) {
1617    std::unique_lock<std::mutex> lk(shutdown_cv_mutex);
1618    memcached_can_shutdown = true;
1619    shutdown_cv.notify_all();
1620}
1621
1622static EXTENSION_LOGGER_DESCRIPTOR* get_logger(void)
1623{
1624    return settings.extensions.logger;
1625}
1626
1627static EXTENSION_LOG_LEVEL get_log_level(void)
1628{
1629    EXTENSION_LOG_LEVEL ret;
1630    switch (settings.getVerbose()) {
1631    case 0: ret = EXTENSION_LOG_NOTICE; break;
1632    case 1: ret = EXTENSION_LOG_INFO; break;
1633    default:
1634        ret = EXTENSION_LOG_DEBUG;
1635    }
1636    return ret;
1637}
1638
1639static void set_log_level(EXTENSION_LOG_LEVEL severity)
1640{
1641    switch (severity) {
1642    case EXTENSION_LOG_FATAL:
1643    case EXTENSION_LOG_WARNING:
1644    case EXTENSION_LOG_NOTICE:
1645        settings.setVerbose(0);
1646        break;
1647    case EXTENSION_LOG_INFO:
1648        settings.setVerbose(1);
1649        break;
1650    default:
1651        settings.setVerbose(2);
1652    }
1653}
1654
1655/**
1656 * Callback the engines may call to get the public server interface
1657 * @return pointer to a structure containing the interface. The client should
1658 *         know the layout and perform the proper casts.
1659 */
1660SERVER_HANDLE_V1* get_server_api() {
1661    static int init;
1662    static SERVER_CORE_API core_api;
1663    static SERVER_COOKIE_API server_cookie_api;
1664    static SERVER_STAT_API server_stat_api;
1665    static SERVER_LOG_API server_log_api;
1666    static SERVER_CALLBACK_API callback_api;
1667    static ALLOCATOR_HOOKS_API hooks_api;
1668    static SERVER_HANDLE_V1 rv;
1669    static SERVER_DOCUMENT_API document_api;
1670
1671    if (!init) {
1672        init = 1;
1673        core_api.realtime = mc_time_convert_to_real_time;
1674        core_api.abstime = mc_time_convert_to_abs_time;
1675        core_api.get_current_time = mc_time_get_current_time;
1676        core_api.parse_config = parse_config;
1677        core_api.shutdown = shutdown_server;
1678        core_api.get_max_item_iovec_size = get_max_item_iovec_size;
1679        core_api.trigger_tick = mc_time_clock_tick;
1680
1681        server_cookie_api.store_engine_specific = store_engine_specific;
1682        server_cookie_api.get_engine_specific = get_engine_specific;
1683        server_cookie_api.is_datatype_supported = is_datatype_supported;
1684        server_cookie_api.is_mutation_extras_supported = is_mutation_extras_supported;
1685        server_cookie_api.is_collections_supported = is_collections_supported;
1686        server_cookie_api.get_opcode_if_ewouldblock_set = get_opcode_if_ewouldblock_set;
1687        server_cookie_api.validate_session_cas = validate_session_cas;
1688        server_cookie_api.decrement_session_ctr = decrement_session_ctr;
1689        server_cookie_api.notify_io_complete = notify_io_complete;
1690        server_cookie_api.reserve = reserve_cookie;
1691        server_cookie_api.release = release_cookie;
1692        server_cookie_api.set_priority = cookie_set_priority;
1693        server_cookie_api.get_bucket_id = get_bucket_id;
1694        server_cookie_api.get_connection_id = get_connection_id;
1695        server_cookie_api.check_privilege = check_privilege;
1696        server_cookie_api.engine_error2mcbp = engine_error2mcbp;
1697        server_cookie_api.get_log_info = cookie_get_log_info;
1698        server_cookie_api.set_error_context = cookie_set_error_context;
1699
1700        server_stat_api.evicting = count_eviction;
1701
1702        server_log_api.get_logger = get_logger;
1703        server_log_api.get_level = get_log_level;
1704        server_log_api.set_level = set_log_level;
1705
1706        callback_api.register_callback = register_callback;
1707        callback_api.perform_callbacks = perform_callbacks;
1708
1709        hooks_api.add_new_hook = AllocHooks::add_new_hook;
1710        hooks_api.remove_new_hook = AllocHooks::remove_new_hook;
1711        hooks_api.add_delete_hook = AllocHooks::add_delete_hook;
1712        hooks_api.remove_delete_hook = AllocHooks::remove_delete_hook;
1713        hooks_api.get_extra_stats_size = AllocHooks::get_extra_stats_size;
1714        hooks_api.get_allocator_stats = AllocHooks::get_allocator_stats;
1715        hooks_api.get_allocation_size = AllocHooks::get_allocation_size;
1716        hooks_api.get_detailed_stats = AllocHooks::get_detailed_stats;
1717        hooks_api.release_free_memory = AllocHooks::release_free_memory;
1718        hooks_api.enable_thread_cache = AllocHooks::enable_thread_cache;
1719        hooks_api.get_allocator_property = AllocHooks::get_allocator_property;
1720
1721        document_api.pre_link = pre_link_document;
1722        document_api.pre_expiry = document_pre_expiry;
1723
1724        rv.interface = 1;
1725        rv.core = &core_api;
1726        rv.stat = &server_stat_api;
1727        rv.callback = &callback_api;
1728        rv.log = &server_log_api;
1729        rv.cookie = &server_cookie_api;
1730        rv.alloc_hooks = &hooks_api;
1731        rv.document = &document_api;
1732    }
1733
1734    return &rv;
1735}
1736
1737/* BUCKET FUNCTIONS */
1738void CreateBucketThread::create() {
1739    LOG_INFO("{} Create bucket [{}]", connection.getId(), name);
1740
1741    if (!BucketValidator::validateBucketName(name, error)) {
1742        LOG_WARNING("{} Create bucket [{}] failed - Invalid bucket name",
1743                    connection.getId(),
1744                    name);
1745        result = ENGINE_EINVAL;
1746        return;
1747    }
1748
1749    if (!BucketValidator::validateBucketType(type, error)) {
1750        LOG_WARNING("{} Create bucket [{}] failed - Invalid bucket type",
1751                    connection.getId(),
1752                    name);
1753        result = ENGINE_EINVAL;
1754        return;
1755    }
1756
1757    size_t ii;
1758    size_t first_free = all_buckets.size();
1759    bool found = false;
1760
1761    std::unique_lock<std::mutex> all_bucket_lock(buckets_lock);
1762    for (ii = 0; ii < all_buckets.size() && !found; ++ii) {
1763        std::lock_guard<std::mutex> guard(all_buckets[ii].mutex);
1764        if (first_free == all_buckets.size() &&
1765            all_buckets[ii].state == BucketState::None)
1766        {
1767            first_free = ii;
1768        }
1769        if (name == all_buckets[ii].name) {
1770            found = true;
1771        }
1772    }
1773
1774    if (found) {
1775        result = ENGINE_KEY_EEXISTS;
1776        LOG_WARNING("{} Create bucket [{}] failed - Already exists",
1777                    connection.getId(),
1778                    name);
1779    } else if (first_free == all_buckets.size()) {
1780        result = ENGINE_E2BIG;
1781        LOG_WARNING("{} Create bucket [{}] failed - Too many buckets",
1782                    connection.getId(),
1783                    name);
1784    } else {
1785        result = ENGINE_SUCCESS;
1786        ii = first_free;
1787        /*
1788         * split the creation of the bucket in two... so
1789         * we can release the global lock..
1790         */
1791        std::lock_guard<std::mutex> guard(all_buckets[ii].mutex);
1792        all_buckets[ii].state = BucketState::Creating;
1793        all_buckets[ii].type = type;
1794        strcpy(all_buckets[ii].name, name.c_str());
1795        try {
1796            all_buckets[ii].topkeys = new TopKeys(settings.getTopkeysSize());
1797        } catch (const std::bad_alloc &) {
1798            result = ENGINE_ENOMEM;
1799            LOG_WARNING("{} Create bucket [{}] failed - out of memory",
1800                        connection.getId(),
1801                        name);
1802        }
1803    }
1804    all_bucket_lock.unlock();
1805
1806    if (result != ENGINE_SUCCESS) {
1807        return;
1808    }
1809
1810    auto &bucket = all_buckets[ii];
1811
1812    /* People aren't allowed to use the engine in this state,
1813     * so we can do stuff without locking..
1814     */
1815    if (new_engine_instance(type,
1816                            name,
1817                            get_server_api,
1818                            (ENGINE_HANDLE**)&bucket.engine,
1819                            settings.extensions.logger)) {
1820        auto* engine = bucket.engine;
1821        {
1822            std::lock_guard<std::mutex> guard(bucket.mutex);
1823            bucket.state = BucketState::Initializing;
1824        }
1825
1826        try {
1827            result = engine->initialize(v1_handle_2_handle(engine),
1828                                        config.c_str());
1829        } catch (const std::runtime_error& e) {
1830            LOG_WARNING("{} - Failed to create bucket [{}]: {}",
1831                        connection.getId(),
1832                        name,
1833                        e.what());
1834            result = ENGINE_FAILED;
1835        } catch (const std::bad_alloc& e) {
1836            LOG_WARNING("{} - Failed to create bucket [{}]: {}",
1837                        connection.getId(),
1838                        name,
1839                        e.what());
1840            result = ENGINE_ENOMEM;
1841        }
1842
1843        if (result == ENGINE_SUCCESS) {
1844            {
1845                std::lock_guard<std::mutex> guard(bucket.mutex);
1846                bucket.state = BucketState::Ready;
1847            }
1848            LOG_INFO("{} - Bucket [{}] created successfully",
1849                     connection.getId(),
1850                     name);
1851            bucket.max_document_size =
1852                 engine->getMaxItemSize(reinterpret_cast<ENGINE_HANDLE*>(engine));
1853        } else {
1854            {
1855                std::lock_guard<std::mutex> guard(bucket.mutex);
1856                bucket.state = BucketState::Destroying;
1857            }
1858            engine->destroy(v1_handle_2_handle(engine), false);
1859            std::lock_guard<std::mutex> guard(bucket.mutex);
1860            bucket.state = BucketState::None;
1861            bucket.name[0] = '\0';
1862            bucket.engine = nullptr;
1863            delete bucket.topkeys;
1864            bucket.topkeys = nullptr;
1865
1866            result = ENGINE_NOT_STORED;
1867        }
1868    } else {
1869        {
1870            std::lock_guard<std::mutex> guard(bucket.mutex);
1871            bucket.state = BucketState::None;
1872            bucket.name[0] = '\0';
1873            bucket.engine = nullptr;
1874            delete bucket.topkeys;
1875            bucket.topkeys = nullptr;
1876        }
1877
1878        LOG_WARNING(
1879                "{} - Failed to create bucket [{}]: failed to "
1880                "create a new engine instance",
1881                connection.getId(),
1882                name);
1883        result = ENGINE_FAILED;
1884    }
1885}
1886
1887void CreateBucketThread::run()
1888{
1889    setRunning();
1890    // Perform the task without having any locks. The task should be
1891    // scheduled in a pending state so the executor won't try to touch
1892    // the object until we're telling it that it is runnable
1893    create();
1894    std::lock_guard<std::mutex> guard(task->getMutex());
1895    task->makeRunnable();
1896}
1897
1898void notify_thread_bucket_deletion(LIBEVENT_THREAD& me) {
1899    for (size_t ii = 0; ii < all_buckets.size(); ++ii) {
1900        bool destroy = false;
1901        {
1902            std::lock_guard<std::mutex> guard(all_buckets[ii].mutex);
1903            if (all_buckets[ii].state == BucketState::Destroying) {
1904                destroy = true;
1905            }
1906        }
1907
1908        if (destroy) {
1909            signal_idle_clients(&me, gsl::narrow<int>(ii), false);
1910        }
1911    }
1912}
1913
1914void DestroyBucketThread::destroy() {
1915    ENGINE_ERROR_CODE ret = ENGINE_KEY_ENOENT;
1916    std::unique_lock<std::mutex> all_bucket_lock(buckets_lock);
1917
1918    Connection* connection = nullptr;
1919    if (cookie != nullptr) {
1920        connection = &cookie->getConnection();
1921    }
1922
1923    /*
1924     * The destroy function will have access to a connection if the
1925     * McbpDestroyBucketTask originated from delete_bucket_executor().
1926     * However if we are in the process of shuting down and the
1927     * McbpDestroyBucketTask originated from main() then connection
1928     * will be set to nullptr.
1929     */
1930    const std::string connection_id{(connection == nullptr)
1931            ? "<none>"
1932            : std::to_string(connection->getId())};
1933
1934    size_t idx = 0;
1935    for (size_t ii = 0; ii < all_buckets.size(); ++ii) {
1936        std::lock_guard<std::mutex> guard(all_buckets[ii].mutex);
1937        if (name == all_buckets[ii].name) {
1938            idx = ii;
1939            if (all_buckets[ii].state == BucketState::Ready) {
1940                ret = ENGINE_SUCCESS;
1941                all_buckets[ii].state = BucketState::Destroying;
1942            } else {
1943                ret = ENGINE_KEY_EEXISTS;
1944            }
1945        }
1946        if (ret != ENGINE_KEY_ENOENT) {
1947            break;
1948        }
1949    }
1950    all_bucket_lock.unlock();
1951
1952    if (ret != ENGINE_SUCCESS) {
1953        auto code = engine_error_2_mcbp_protocol_error(ret);
1954        LOG_INFO("{} Delete bucket [{}]: {}",
1955                 connection_id,
1956                 name,
1957                 memcached_status_2_text(code));
1958        result = ret;
1959        return;
1960    }
1961
1962    LOG_INFO(
1963            "{} Delete bucket [{}]. Notifying all registered "
1964            "ON_DELETE_BUCKET callbacks",
1965            connection_id,
1966            name);
1967
1968    perform_callbacks(ON_DELETE_BUCKET, nullptr, &all_buckets[idx]);
1969
1970    LOG_INFO("{} Delete bucket [{}]. Wait for clients to disconnect",
1971             connection_id,
1972             name);
1973
1974    /* If this thread is connected to the requested bucket... release it */
1975    if (connection != nullptr && idx == size_t(connection->getBucketIndex())) {
1976        disassociate_bucket(*connection);
1977    }
1978
1979    /* Let all of the worker threads start invalidating connections */
1980    threads_initiate_bucket_deletion();
1981
1982    auto& bucket = all_buckets[idx];
1983
1984    /* Wait until all users disconnected... */
1985    {
1986        std::unique_lock<std::mutex> guard(bucket.mutex);
1987
1988        while (bucket.clients > 0) {
1989            LOG_INFO(
1990                    "{} Delete bucket [{}]. Still waiting: {} clients "
1991                    "connected",
1992                    connection_id.c_str(),
1993                    name.c_str(),
1994                    bucket.clients);
1995            /* drop the lock and notify the worker threads */
1996            guard.unlock();
1997            threads_notify_bucket_deletion();
1998            guard.lock();
1999
2000            bucket.cond.wait_for(guard,
2001                                 std::chrono::milliseconds(1000),
2002                                 [&bucket] { return bucket.clients == 0; });
2003        }
2004    }
2005
2006    /* Tell the worker threads to stop trying to invalidating connections */
2007    threads_complete_bucket_deletion();
2008
2009    /*
2010     * We cannot call assert_no_assocations(idx) because it iterates
2011     * over all connections and calls c->getBucketIndex().  The problem
2012     * is that a worker thread can call associate_initial_bucket() or
2013     * associate_bucket() at the same time.  This could lead to a call
2014     * to c->setBucketIndex(0) (the "no bucket"), which although safe,
2015     * raises a threadsanitizer warning.
2016
2017     * Note, if associate_bucket() attempts to associate a connection
2018     * with a bucket that has been destroyed, or is in the process of
2019     * being destroyed, the association will fail because
2020     * BucketState != Ready.  See associate_bucket() for more details.
2021     */
2022
2023    LOG_INFO(
2024            "{} Delete bucket [{}]. Shut down the bucket", connection_id, name);
2025
2026    bucket.engine->destroy(v1_handle_2_handle(bucket.engine), force);
2027
2028    LOG_INFO("{} Delete bucket [{}]. Clean up allocated resources ",
2029             connection_id,
2030             name);
2031
2032    /* Clean up the stats... */
2033    threadlocal_stats_reset(bucket.stats);
2034
2035    // Clear any registered event handlers
2036    for (auto& handler : bucket.engine_event_handlers) {
2037        handler.clear();
2038    }
2039
2040    {
2041        std::lock_guard<std::mutex> guard(bucket.mutex);
2042        bucket.state = BucketState::None;
2043        bucket.engine = NULL;
2044        bucket.name[0] = '\0';
2045        delete bucket.topkeys;
2046        bucket.responseCounters.fill(0);
2047        bucket.topkeys = nullptr;
2048    }
2049    // don't need lock because all timing data uses atomics
2050    bucket.timings.reset();
2051
2052    LOG_INFO("{} Delete bucket [{}] complete", connection_id, name);
2053    result = ENGINE_SUCCESS;
2054}
2055
2056void DestroyBucketThread::run() {
2057    setRunning();
2058    destroy();
2059    std::lock_guard<std::mutex> guard(task->getMutex());
2060    task->makeRunnable();
2061}
2062
2063static void initialize_buckets(void) {
2064    size_t numthread = settings.getNumWorkerThreads() + 1;
2065    for (auto &b : all_buckets) {
2066        b.stats.resize(numthread);
2067    }
2068
2069    // To make the life easier for us in the code, index 0
2070    // in the array is "no bucket"
2071    ENGINE_HANDLE *handle;
2072    cb_assert(new_engine_instance(BucketType::NoBucket,
2073                                  "<internal>",
2074                                  get_server_api,
2075                                  &handle,
2076                                  settings.extensions.logger));
2077
2078    cb_assert(handle != nullptr);
2079    auto &nobucket = all_buckets.at(0);
2080    nobucket.type = BucketType::NoBucket;
2081    nobucket.state = BucketState::Ready;
2082    nobucket.engine = (ENGINE_HANDLE_V1*)handle;
2083}
2084
2085static void cleanup_buckets(void) {
2086    for (auto &bucket : all_buckets) {
2087        bool waiting;
2088
2089        do {
2090            waiting = false;
2091            {
2092                std::lock_guard<std::mutex> guard(bucket.mutex);
2093                switch (bucket.state.load()) {
2094                case BucketState::Stopping:
2095                case BucketState::Destroying:
2096                case BucketState::Creating:
2097                case BucketState::Initializing:
2098                    waiting = true;
2099                    break;
2100                default:
2101                        /* Empty */
2102                        ;
2103                }
2104            }
2105            if (waiting) {
2106                usleep(250);
2107            }
2108        } while (waiting);
2109
2110        if (bucket.state == BucketState::Ready) {
2111            bucket.engine->destroy(v1_handle_2_handle(bucket.engine), false);
2112            delete bucket.topkeys;
2113        }
2114    }
2115}
2116
2117void delete_all_buckets() {
2118    /*
2119     * Delete all of the buckets one by one by using the executor.
2120     * We could in theory schedule all of them in parallel, but they
2121     * probably have some dirty items they need to write to disk so
2122     * instead of having all of the buckets step on the underlying IO
2123     * in parallel we'll do them sequentially.
2124     */
2125
2126    /**
2127     * Create a specialized task I may use that just holds the
2128     * DeleteBucketThread object.
2129     */
2130    class DestroyBucketTask : public Task {
2131    public:
2132        DestroyBucketTask(const std::string& name_)
2133            : thread(name_, false, nullptr, this)
2134        {
2135            // empty
2136        }
2137
2138        // start the bucket deletion
2139        // May throw std::bad_alloc if we're failing to start the thread
2140        void start() {
2141            thread.start();
2142        }
2143
2144        Status execute() override {
2145            return Status::Finished;
2146        }
2147
2148        DestroyBucketThread thread;
2149    };
2150
2151    LOG_INFO("Stop all buckets");
2152    bool done;
2153    do {
2154        done = true;
2155        std::shared_ptr<Task> task;
2156        std::string name;
2157
2158        std::unique_lock<std::mutex> all_bucket_lock(buckets_lock);
2159        /*
2160         * Start at one (not zero) because zero is reserved for "no bucket".
2161         * The "no bucket" has a state of BucketState::Ready but no name.
2162         */
2163        for (size_t ii = 1; ii < all_buckets.size() && done; ++ii) {
2164            std::lock_guard<std::mutex> bucket_guard(all_buckets[ii].mutex);
2165            if (all_buckets[ii].state == BucketState::Ready) {
2166                name.assign(all_buckets[ii].name);
2167                LOG_INFO("Scheduling delete for bucket {}", name);
2168                task = std::make_shared<DestroyBucketTask>(name);
2169                std::lock_guard<std::mutex> guard(task->getMutex());
2170                dynamic_cast<DestroyBucketTask&>(*task).start();
2171                executorPool->schedule(task, false);
2172                done = false;
2173            }
2174        }
2175        all_bucket_lock.unlock();
2176
2177        if (task.get() != nullptr) {
2178            auto& dbt = dynamic_cast<DestroyBucketTask&>(*task);
2179            LOG_INFO("Waiting for delete of {} to complete", name);
2180            dbt.thread.waitForState(Couchbase::ThreadState::Zombie);
2181            LOG_INFO("Bucket {} deleted", name);
2182        }
2183    } while (!done);
2184}
2185
2186static void set_max_filehandles(void) {
2187    const uint64_t maxfiles = settings.getMaxconns() +
2188                            (3 * (settings.getNumWorkerThreads() + 2)) +
2189                            1024;
2190
2191    auto limit = cb::io::maximizeFileDescriptors(maxfiles);
2192
2193    if (limit < maxfiles) {
2194        LOG_WARNING(
2195                "Failed to set the number of file descriptors "
2196                "to {} due to system resource restrictions. "
2197                "This may cause the system to misbehave once you reach a "
2198                "high connection count as the system won't be able open "
2199                "new files on the system. The maximum number of file "
2200                "descriptors is currently set to {}. The system "
2201                "is configured to allow {} number of client connections, "
2202                "and in addition to that the overhead of the worker "
2203                "threads is {}. Finally the backed database needs to "
2204                "open files to persist data.",
2205                int(maxfiles),
2206                int(limit),
2207                settings.getMaxconns(),
2208                (3 * (settings.getNumWorkerThreads() + 2)));
2209    }
2210}
2211
2212/**
2213 * The log function used from SASL
2214 *
2215 * Try to remap the log levels to our own levels and put in the log
2216 * depending on the severity.
2217 */
2218static void sasl_log_callback(cb::sasl::logging::Level level,
2219                              const std::string& message) {
2220    switch (level) {
2221    case cb::sasl::logging::Level::Error:
2222        LOG_ERROR("{}", message);
2223        break;
2224    case cb::sasl::logging::Level::Warning:
2225        LOG_WARNING("{}", message);
2226        break;
2227    case cb::sasl::logging::Level::Notice:
2228        LOG_INFO("{}", message);
2229        break;
2230    case cb::sasl::logging::Level::Fail:
2231    case cb::sasl::logging::Level::Debug:
2232        LOG_DEBUG("{}", message);
2233        break;
2234    case cb::sasl::logging::Level::Trace:
2235        LOG_TRACE("{}", message);
2236        break;
2237    }
2238}
2239
2240static int sasl_getopt_callback(void*, const char*,
2241                                const char* option,
2242                                const char** result,
2243                                unsigned* len) {
2244    if (option == nullptr || result == nullptr || len == nullptr) {
2245        return CBSASL_BADPARAM;
2246    }
2247
2248    std::string key(option);
2249
2250    if (key == "hmac iteration count") {
2251        // Speed up the test suite by reducing the SHA1 hmac calculations
2252        // from 4k to 10
2253        if (getenv("MEMCACHED_UNIT_TESTS") != nullptr) {
2254            *result = "10";
2255            *len = 2;
2256            return CBSASL_OK;
2257        }
2258    } else if (key == "sasl mechanisms") {
2259        const auto& value = settings.getSaslMechanisms();
2260        if (!value.empty()) {
2261            *result = value.data();
2262            *len = static_cast<unsigned int>(value.size());
2263            return CBSASL_OK;
2264        }
2265    }
2266
2267    return CBSASL_FAIL;
2268}
2269
2270static void initialize_sasl() {
2271    cb::sasl::logging::set_log_callback(sasl_log_callback);
2272
2273    cbsasl_callback_t sasl_callbacks[2];
2274    int ii = 0;
2275
2276    sasl_callbacks[ii].id = CBSASL_CB_GETOPT;
2277    sasl_callbacks[ii].proc = (int (*)(void))&sasl_getopt_callback;
2278    sasl_callbacks[ii].context = nullptr;
2279    sasl_callbacks[++ii].id = CBSASL_CB_LIST_END;
2280    sasl_callbacks[ii].proc = nullptr;
2281    sasl_callbacks[ii].context = nullptr;
2282
2283    if (cbsasl_server_init(sasl_callbacks, "memcached") != CBSASL_OK) {
2284        FATAL_ERROR(EXIT_FAILURE, "Failed to initialize SASL server");
2285    }
2286
2287    if (cb::sasl::plain::authenticate("default", "") == CBSASL_OK) {
2288        set_default_bucket_enabled(true);
2289    } else {
2290        set_default_bucket_enabled(false);
2291    }
2292}
2293
2294extern "C" int memcached_main(int argc, char **argv) {
2295    // MB-14649 log() crash on windows on some CPU's
2296#ifdef _WIN64
2297    _set_FMA3_enable (0);
2298#endif
2299
2300#ifdef HAVE_LIBNUMA
2301    // Configure NUMA policy as soon as possible (before any dynamic memory
2302    // allocation).
2303    const std::string numa_status = configure_numa_policy();
2304#endif
2305    std::unique_ptr<ParentMonitor> parent_monitor;
2306
2307    try {
2308        cb::logger::createConsoleLogger();
2309        settings.extensions.logger = &cb::logger::getLoggerDescriptor();
2310    } catch (const std::exception& e) {
2311        std::cerr << "Failed to create logger object: " << e.what()
2312                  << std::endl;
2313        exit(EXIT_FAILURE);
2314    }
2315
2316    // Setup terminate handler as early as possible to catch crashes
2317    // occurring during initialisation.
2318    install_backtrace_terminate_handler();
2319
2320    setup_libevent_locking();
2321
2322    initialize_openssl();
2323
2324    /* Initialize the socket subsystem */
2325    cb_initialize_sockets();
2326
2327    AllocHooks::initialize();
2328
2329    /* init settings */
2330    settings_init();
2331
2332    initialize_mbcp_lookup_map();
2333
2334    /* Parse command line arguments */
2335    try {
2336        parse_arguments(argc, argv);
2337    } catch (const std::exception& exception) {
2338        FATAL_ERROR(EXIT_FAILURE,
2339                    "Failed to initialize server: {}",
2340                    exception.what());
2341    }
2342
2343    update_settings_from_config();
2344
2345    cb::rbac::initialize();
2346
2347    if (getenv("COUCHBASE_FORCE_ENABLE_XATTR") != nullptr) {
2348        settings.setXattrEnabled(true);
2349    }
2350
2351    /* Initialize breakpad crash catcher with our just-parsed settings. */
2352    cb::breakpad::initialize(settings.getBreakpadSettings());
2353
2354    /* Configure file logger, if specified as a settings object */
2355    if (settings.has.logger) {
2356        auto ret = cb::logger::initialize(settings.getLoggerConfig());
2357        if (ret) {
2358            FATAL_ERROR(
2359                    EXIT_FAILURE, "Failed to initialize logger: {}", ret.get());
2360        }
2361    }
2362
2363    /* File-based logging available from this point onwards... */
2364
2365    /* Logging available now extensions have been loaded. */
2366    LOG_INFO("Couchbase version {} starting.", get_server_version());
2367
2368    LOG_INFO("Using SLA configuration: {}",
2369             to_string(cb::mcbp::sla::to_json(), false));
2370
2371    if (settings.isStdinListenerEnabled()) {
2372        LOG_INFO("Enable standard input listener");
2373        start_stdin_listener(shutdown_server);
2374    }
2375
2376#ifdef HAVE_LIBNUMA
2377    // Log the NUMA policy selected (now the logger is available).
2378    LOG_INFO("NUMA: {}", numa_status);
2379#endif
2380
2381    if (!settings.has.rbac_file) {
2382        FATAL_ERROR(EXIT_FAILURE, "RBAC file not specified");
2383    }
2384
2385    if (!cb::io::isFile(settings.getRbacFile())) {
2386        FATAL_ERROR(EXIT_FAILURE,
2387                    "RBAC [{}] does not exist",
2388                    settings.getRbacFile());
2389    }
2390
2391    LOG_INFO("Loading RBAC configuration from [{}]", settings.getRbacFile());
2392    cb::rbac::loadPrivilegeDatabase(settings.getRbacFile());
2393
2394    LOG_INFO("Loading error maps from [{}]", settings.getErrorMapsDir());
2395    try {
2396        settings.loadErrorMaps(settings.getErrorMapsDir());
2397    } catch (const std::exception& e) {
2398        FATAL_ERROR(EXIT_FAILURE, "Failed to load error maps: {}", e.what());
2399    }
2400
2401    initialize_audit();
2402
2403    /* inform interested parties of initial verbosity level */
2404    perform_callbacks(ON_LOG_LEVEL, NULL, NULL);
2405
2406    set_max_filehandles();
2407
2408    /* Aggregate the maximum number of connections */
2409    settings.calculateMaxconns();
2410
2411    {
2412        char *errmsg;
2413        if (!initialize_engine_map(&errmsg)) {
2414            FATAL_ERROR(EXIT_FAILURE,
2415                        "Unable to initialize engine "
2416                        "map: {}",
2417                        errmsg);
2418        }
2419    }
2420
2421    /* Initialize bucket engine */
2422    initialize_buckets();
2423
2424    initialize_sasl();
2425
2426    /* initialize main thread libevent instance */
2427    main_base = event_base_new();
2428
2429    /* Initialize signal handlers (requires libevent). */
2430    if (!install_signal_handlers()) {
2431        FATAL_ERROR(EXIT_FAILURE, "Unable to install signal handlers");
2432    }
2433
2434    /* initialize other stuff */
2435    stats_init();
2436
2437#ifndef WIN32
2438    /*
2439     * ignore SIGPIPE signals; we can use errno == EPIPE if we
2440     * need that information
2441     */
2442    if (sigignore(SIGPIPE) == -1) {
2443        FATAL_ERROR(EX_OSERR, "Failed to ignore SIGPIPE; sigaction");
2444    }
2445#endif
2446
2447    /* start up worker threads if MT mode */
2448    thread_init(settings.getNumWorkerThreads(), main_base, dispatch_event_handler);
2449
2450    executorPool.reset(new ExecutorPool(settings.getNumWorkerThreads()));
2451
2452    initializeTracing();
2453    TRACE_GLOBAL0("memcached", "Started");
2454
2455    /*
2456     * MB-20034.
2457     * Now that all threads have been created, e.g. the audit thread, threads
2458     * associated with extensions and the workers, we can enable shutdown.
2459     */
2460    enable_shutdown();
2461
2462    /* Initialise memcached time keeping */
2463    mc_time_init(main_base);
2464
2465    /* create the listening socket, bind it, and init */
2466    create_listen_sockets(true);
2467
2468    /* Optional parent monitor */
2469    char *env = getenv("MEMCACHED_PARENT_MONITOR");
2470    if (env != NULL) {
2471        LOG_INFO("Starting parent monitor");
2472        parent_monitor.reset(new ParentMonitor(std::stoi(env)));
2473    }
2474
2475    if (!memcached_shutdown) {
2476        /* enter the event loop */
2477        LOG_INFO("Initialization complete. Accepting clients.");
2478        service_online = true;
2479        event_base_loop(main_base, 0);
2480        service_online = false;
2481    }
2482
2483    LOG_INFO("Initiating graceful shutdown.");
2484    delete_all_buckets();
2485
2486    if (parent_monitor.get() != nullptr) {
2487        LOG_INFO("Shutting down parent monitor");
2488        parent_monitor.reset();
2489    }
2490
2491    LOG_INFO("Shutting down audit daemon");
2492    /* Close down the audit daemon cleanly */
2493    shutdown_auditdaemon(get_audit_handle());
2494
2495    LOG_INFO("Shutting down client worker threads");
2496    threads_shutdown();
2497
2498    LOG_INFO("Releasing server sockets");
2499    listen_conn.clear();
2500
2501    LOG_INFO("Releasing client resources");
2502    close_all_connections();
2503
2504    LOG_INFO("Releasing bucket resources");
2505    cleanup_buckets();
2506
2507    LOG_INFO("Shutting down RBAC subsystem");
2508    cb::rbac::destroy();
2509
2510    LOG_INFO("Releasing thread resources");
2511    threads_cleanup();
2512
2513    LOG_INFO("Shutting down executor pool");
2514    executorPool.reset();
2515
2516    LOG_INFO("Releasing signal handlers");
2517    release_signal_handlers();
2518
2519    LOG_INFO("Shutting down SASL server");
2520    cbsasl_server_term();
2521
2522    LOG_INFO("Releasing connection objects");
2523    destroy_connections();
2524
2525    LOG_INFO("Deinitialising tracing");
2526    deinitializeTracing();
2527
2528    LOG_INFO("Shutting down engine map");
2529    shutdown_engine_map();
2530
2531    LOG_INFO("Removing breakpad");
2532    cb::breakpad::destroy();
2533
2534    LOG_INFO("Releasing callbacks");
2535    free_callbacks();
2536
2537    LOG_INFO("Shutting down OpenSSL");
2538    shutdown_openssl();
2539
2540    LOG_INFO("Shutting down libevent");
2541    event_base_free(main_base);
2542
2543    LOG_INFO("Shutting down logger extension");
2544    // drop my handle to the logger
2545    cb::logger::reset();
2546    cb::logger::shutdown();
2547
2548    return EXIT_SUCCESS;
2549}
2550