xref: /6.0.3/kv_engine/daemon/memcached.cc (revision bc9dc2fa)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the BSD license.  See
10 *  the LICENSE file for full text.
11 *
12 *  Authors:
13 *      Anatoly Vorobey <mellon@pobox.com>
14 *      Brad Fitzpatrick <brad@danga.com>
15 */
16#include "config.h"
17#include "memcached.h"
18#include "alloc_hooks.h"
19#include "buckets.h"
20#include "cmdline.h"
21#include "config_parse.h"
22#include "connections.h"
23#include "debug_helpers.h"
24#include "doc_pre_expiry.h"
25#include "enginemap.h"
26#include "executorpool.h"
27#include "ioctl.h"
28#include "libevent_locking.h"
29#include "logger/logger.h"
30#include "mc_time.h"
31#include "mcaudit.h"
32#include "mcbp.h"
33#include "mcbp_executors.h"
34#include "mcbp_topkeys.h"
35#include "mcbp_validators.h"
36#include "mcbpdestroybuckettask.h"
37#include "memcached/audit_interface.h"
38#include "memcached_openssl.h"
39#include "parent_monitor.h"
40#include "protocol/mcbp/engine_wrapper.h"
41#include "runtime.h"
42#include "server_socket.h"
43#include "session_cas.h"
44#include "settings.h"
45#include "stats.h"
46#include "subdocument.h"
47#include "timings.h"
48#include "topkeys.h"
49#include "tracing.h"
50#include "utilities/engine_loader.h"
51#include "utilities/protocol2text.h"
52#include "utilities/terminate_handler.h"
53
54#include <mcbp/mcbp.h>
55#include <memcached/util.h>
56#include <phosphor/phosphor.h>
57#include <platform/cb_malloc.h>
58#include <platform/dirutils.h>
59#include <platform/make_unique.h>
60#include <platform/socket.h>
61#include <platform/strerror.h>
62#include <platform/sysinfo.h>
63#include <utilities/breakpad.h>
64
65#include <signal.h>
66#include <fcntl.h>
67#include <errno.h>
68#include <stdlib.h>
69#include <stdio.h>
70#include <string.h>
71#include <time.h>
72#include <limits.h>
73#include <ctype.h>
74#include <memcached/rbac.h>
75#include <stdarg.h>
76#include <stddef.h>
77#include <snappy-c.h>
78#include <cJSON.h>
79#include <JSON_checker.h>
80#include <engines/default_engine.h>
81#include <vector>
82#include <algorithm>
83#include <cJSON_utils.h>
84
85// MB-14649: log crashing on windows..
86#include <math.h>
87#include <memcached/audit_interface.h>
88#include <memcached/server_api.h>
89
90#include <gsl/gsl>
91
92#if HAVE_LIBNUMA
93#include <numa.h>
94#endif
95
96static EXTENSION_LOG_LEVEL get_log_level(void);
97
98/**
99 * All of the buckets in couchbase is stored in this array.
100 */
101static std::mutex buckets_lock;
102std::array<Bucket, COUCHBASE_MAX_NUM_BUCKETS + 1> all_buckets;
103
104static ENGINE_HANDLE* v1_handle_2_handle(ENGINE_HANDLE_V1* v1) {
105    return reinterpret_cast<ENGINE_HANDLE*>(v1);
106}
107
108const char* getBucketName(const Connection* c) {
109    return all_buckets[c->getBucketIndex()].name;
110}
111
112void bucketsForEach(std::function<bool(Bucket&, void*)> fn, void *arg) {
113    std::lock_guard<std::mutex> all_bucket_lock(buckets_lock);
114    for (Bucket& bucket : all_buckets) {
115        bool do_break = false;
116        std::lock_guard<std::mutex> guard(bucket.mutex);
117        if (bucket.state == BucketState::Ready) {
118            if (!fn(bucket, arg)) {
119                do_break = true;
120            }
121        }
122        if (do_break) {
123            break;
124        }
125    }
126}
127
128std::atomic<bool> memcached_shutdown;
129std::atomic<bool> service_online;
130// Should we enable to common ports (all of the ports which arn't tagged as
131// management ports)
132static std::atomic<bool> enable_common_ports;
133
134std::unique_ptr<ExecutorPool> executorPool;
135
136/* Mutex for global stats */
137std::mutex stats_mutex;
138
139/*
140 * forward declarations
141 */
142static void register_callback(ENGINE_HANDLE *eh,
143                              ENGINE_EVENT_TYPE type,
144                              EVENT_CALLBACK cb, const void *cb_data);
145
146static void create_listen_sockets(bool management);
147
148/* stats */
149static void stats_init(void);
150
151/* defaults */
152static void settings_init(void);
153
154/** exported globals **/
155struct stats stats;
156
157/** file scope variables **/
158std::vector<std::unique_ptr<ServerSocket>> listen_conn;
159static struct event_base *main_base;
160
161static engine_event_handler_array_t engine_event_handlers;
162
163/*
164 * MB-12470 requests an easy way to see when (some of) the statistics
165 * counters were reset. This functions grabs the current time and tries
166 * to format it to the current timezone by using ctime_r/s (which adds
167 * a newline at the end for some obscure reason which we'll need to
168 * strip off).
169 *
170 * This function expects that the stats lock is held by the caller to get
171 * a "sane" result (otherwise one thread may see a garbled version), but
172 * no crash will occur since the buffer is big enough and always zero
173 * terminated.
174 */
175char reset_stats_time[80];
176static void set_stats_reset_time(void)
177{
178    time_t now = time(NULL);
179#ifdef WIN32
180    ctime_s(reset_stats_time, sizeof(reset_stats_time), &now);
181#else
182    ctime_r(&now, reset_stats_time);
183#endif
184    char *ptr = strchr(reset_stats_time, '\n');
185    if (ptr) {
186        *ptr = '\0';
187    }
188}
189
190void disassociate_bucket(Connection& connection) {
191    Bucket& b = all_buckets.at(connection.getBucketIndex());
192    std::lock_guard<std::mutex> guard(b.mutex);
193    b.clients--;
194
195    connection.setBucketIndex(0);
196    connection.setBucketEngine(nullptr);
197
198    if (b.clients == 0 && b.state == BucketState::Destroying) {
199        b.cond.notify_one();
200    }
201}
202
203bool associate_bucket(Connection& connection, const char* name) {
204    bool found = false;
205
206    /* leave the current bucket */
207    disassociate_bucket(connection);
208
209    /* Try to associate with the named bucket */
210    /* @todo add auth checks!!! */
211    for (size_t ii = 1; ii < all_buckets.size() && !found; ++ii) {
212        Bucket &b = all_buckets.at(ii);
213        std::lock_guard<std::mutex> guard(b.mutex);
214        if (b.state == BucketState::Ready && strcmp(b.name, name) == 0) {
215            b.clients++;
216            connection.setBucketIndex(gsl::narrow<int>(ii));
217            connection.setBucketEngine(b.engine);
218            audit_bucket_selection(connection);
219            found = true;
220        }
221    }
222
223    if (!found) {
224        /* Bucket not found, connect to the "no-bucket" */
225        Bucket &b = all_buckets.at(0);
226        {
227            std::lock_guard<std::mutex> guard(b.mutex);
228            b.clients++;
229        }
230        connection.setBucketIndex(0);
231        connection.setBucketEngine(b.engine);
232    }
233
234    return found;
235}
236
237void associate_initial_bucket(Connection& connection) {
238    Bucket &b = all_buckets.at(0);
239    {
240        std::lock_guard<std::mutex> guard(b.mutex);
241        b.clients++;
242    }
243
244    connection.setBucketIndex(0);
245    connection.setBucketEngine(b.engine);
246
247    if (is_default_bucket_enabled()) {
248        associate_bucket(connection, "default");
249    }
250}
251
252static void populate_log_level(void*) {
253    // Lock the entire buckets array so that buckets can't be modified while
254    // we notify them (blocking bucket creation/deletion)
255    auto val = get_log_level();
256
257    std::lock_guard<std::mutex> all_bucket_lock(buckets_lock);
258    for (auto& bucket : all_buckets) {
259        std::lock_guard<std::mutex> guard(bucket.mutex);
260        if (bucket.state == BucketState::Ready &&
261            bucket.engine->set_log_level != nullptr) {
262            bucket.engine->set_log_level(reinterpret_cast<ENGINE_HANDLE*>(bucket.engine),
263                                         val);
264        }
265    }
266}
267
268/* Perform all callbacks of a given type for the given connection. */
269void perform_callbacks(ENGINE_EVENT_TYPE type,
270                       const void *data,
271                       const void *void_cookie)
272{
273    cb_thread_t tid;
274
275    switch (type) {
276        /*
277         * The following events operates on a connection which is passed in
278         * as the cookie.
279         */
280    case ON_DISCONNECT: {
281        const auto * cookie = reinterpret_cast<const Cookie *>(void_cookie);
282        if (cookie == nullptr) {
283            throw std::invalid_argument("perform_callbacks: cookie is nullptr");
284        }
285        const auto bucket_idx = cookie->getConnection().getBucketIndex();
286        if (bucket_idx == -1) {
287            throw std::logic_error(
288                    "perform_callbacks: connection (which is " +
289                    std::to_string(cookie->getConnection().getId()) +
290                    ") cannot be "
291                    "disconnected as it is not associated with a bucket");
292        }
293
294        for (auto& handler : all_buckets[bucket_idx].engine_event_handlers[type]) {
295            handler.cb(void_cookie, ON_DISCONNECT, data, handler.cb_data);
296        }
297        break;
298    }
299    case ON_LOG_LEVEL:
300        if (void_cookie != nullptr) {
301            throw std::invalid_argument("perform_callbacks: cookie "
302                "(which is " +
303                std::to_string(reinterpret_cast<uintptr_t>(void_cookie)) +
304                ") should be NULL for ON_LOG_LEVEL");
305        }
306        for (auto& handler : engine_event_handlers[type]) {
307            handler.cb(void_cookie, ON_LOG_LEVEL, data, handler.cb_data);
308        }
309
310        if (service_online) {
311            if (cb_create_thread(&tid, populate_log_level, nullptr, 1) == -1) {
312                LOG_WARNING(
313                        "Failed to create thread to notify engines about "
314                        "changing log level");
315            }
316        }
317        break;
318
319    case ON_DELETE_BUCKET: {
320        /** cookie is the bucket entry */
321        auto* bucket = reinterpret_cast<const Bucket*>(void_cookie);
322        for (auto& handler : bucket->engine_event_handlers[type]) {
323            handler.cb(void_cookie, ON_DELETE_BUCKET, data, handler.cb_data);
324        }
325        break;
326    }
327
328    case ON_INIT_COMPLETE:
329        if ((data != nullptr) || (void_cookie != nullptr)) {
330            throw std::invalid_argument("perform_callbacks: data and cookie"
331                                            " should be nullptr");
332        }
333        enable_common_ports.store(true, std::memory_order_release);
334        notify_dispatcher();
335        break;
336
337    default:
338        throw std::invalid_argument("perform_callbacks: type "
339                "(which is " + std::to_string(type) +
340                "is not a valid ENGINE_EVENT_TYPE");
341    }
342}
343
344static void register_callback(ENGINE_HANDLE *eh,
345                              ENGINE_EVENT_TYPE type,
346                              EVENT_CALLBACK cb,
347                              const void *cb_data)
348{
349    size_t idx;
350    switch (type) {
351    /*
352     * The following events operates on a connection which is passed in
353     * as the cookie.
354     */
355    case ON_DISCONNECT:
356    case ON_DELETE_BUCKET:
357        if (eh == nullptr) {
358            throw std::invalid_argument("register_callback: 'eh' must be non-NULL");
359        }
360        for (idx = 0; idx < all_buckets.size(); ++idx) {
361            if ((void *)eh == (void *)all_buckets[idx].engine) {
362                break;
363            }
364        }
365        if (idx == all_buckets.size()) {
366            throw std::invalid_argument("register_callback: eh (which is " +
367                    std::to_string(reinterpret_cast<uintptr_t>(eh)) +
368                    ") is not a engine associated with a bucket");
369        }
370        all_buckets[idx].engine_event_handlers[type].push_back({cb, cb_data});
371        break;
372
373    case ON_LOG_LEVEL:
374        if (eh != nullptr) {
375            throw std::invalid_argument("register_callback: 'eh' must be NULL");
376        }
377        engine_event_handlers[type].push_back({cb, cb_data});
378        break;
379
380    default:
381        throw std::invalid_argument("register_callback: type (which is " +
382                                    std::to_string(type) +
383                                    ") is not a valid ENGINE_EVENT_TYPE");
384    }
385}
386
387static void free_callbacks() {
388    // free per-bucket callbacks.
389    for (size_t idx = 0; idx < all_buckets.size(); ++idx) {
390        for (auto& type_vec : all_buckets[idx].engine_event_handlers) {
391            type_vec.clear();
392        }
393    }
394
395    // free global callbacks
396    for (auto& type_vec : engine_event_handlers) {
397        type_vec.clear();
398    }
399}
400
401static void stats_init(void) {
402    set_stats_reset_time();
403    stats.conn_structs.reset();
404    stats.total_conns.reset();
405    stats.daemon_conns.reset();
406    stats.rejected_conns.reset();
407    stats.curr_conns.store(0, std::memory_order_relaxed);
408}
409
410struct thread_stats* get_thread_stats(Connection* c) {
411    cb_assert(c->getThread()->index < (settings.getNumWorkerThreads() + 1));
412    auto& independent_stats = all_buckets[c->getBucketIndex()].stats;
413    return &independent_stats.at(c->getThread()->index);
414}
415
416void stats_reset(Cookie& cookie) {
417    {
418        std::lock_guard<std::mutex> guard(stats_mutex);
419        set_stats_reset_time();
420    }
421    stats.total_conns.reset();
422    stats.rejected_conns.reset();
423    threadlocal_stats_reset(cookie.getConnection().getBucket().stats);
424    bucket_reset_stats(cookie);
425}
426
427static size_t get_number_of_worker_threads() {
428    size_t ret;
429    char *override = getenv("MEMCACHED_NUM_CPUS");
430    if (override == NULL) {
431        // No override specified; determine worker thread count based
432        // on the CPU count:
433        //     <5 cores: create 4 workers.
434        //    >5+ cores: create #CPUs * 7/8.
435        ret = Couchbase::get_available_cpu_count();
436
437        if (ret > 4) {
438            ret = (ret * 7) / 8;
439        }
440        if (ret < 4) {
441            ret = 4;
442        }
443    } else {
444        ret = std::stoull(override);
445        if (ret == 0) {
446            ret = 4;
447        }
448    }
449    return ret;
450}
451
452static void breakpad_changed_listener(const std::string&, Settings &s) {
453    cb::breakpad::initialize(s.getBreakpadSettings());
454}
455
456static void ssl_minimum_protocol_changed_listener(const std::string&, Settings &s) {
457    set_ssl_protocol_mask(s.getSslMinimumProtocol());
458}
459
460static void ssl_cipher_list_changed_listener(const std::string&, Settings &s) {
461    set_ssl_cipher_list(s.getSslCipherList());
462}
463
464static void verbosity_changed_listener(const std::string&, Settings &s) {
465    auto logger = cb::logger::get();
466    if (logger) {
467        logger->set_level(cb::logger::convertToSpdSeverity(get_log_level()));
468    }
469
470    perform_callbacks(ON_LOG_LEVEL, NULL, NULL);
471}
472
473static void saslauthd_socketpath_changed_listener(const std::string&, Settings &s) {
474    cb::sasl::saslauthd::set_socketpath(s.getSaslauthdSocketpath());
475}
476
477static void scramsha_fallback_salt_changed_listener(const std::string&,
478                                                    Settings& s) {
479    cb::sasl::set_scramsha_fallback_salt(s.getScramshaFallbackSalt());
480}
481
482static void opcode_attributes_override_changed_listener(const std::string&,
483                                                        Settings& s) {
484    unique_cJSON_ptr json(cJSON_Parse(s.getOpcodeAttributesOverride().c_str()));
485    if (json) {
486        cb::mcbp::sla::reconfigure(settings.getRoot(), *json);
487    } else {
488        cb::mcbp::sla::reconfigure(settings.getRoot());
489    }
490    LOG_INFO("SLA configuration changed to: {}",
491             to_string(cb::mcbp::sla::to_json(), false));
492}
493
494static void interfaces_changed_listener(const std::string&, Settings &s) {
495    for (const auto& ifc : s.getInterfaces()) {
496        auto* port = get_listening_port_instance(ifc.port);
497        if (port != nullptr) {
498            if (port->maxconns != ifc.maxconn) {
499                port->maxconns = ifc.maxconn;
500            }
501
502            if (port->backlog != ifc.backlog) {
503                port->backlog = ifc.backlog;
504            }
505
506            if (port->tcp_nodelay != ifc.tcp_nodelay) {
507                port->tcp_nodelay = ifc.tcp_nodelay;
508            }
509        }
510    }
511    s.calculateMaxconns();
512}
513
514#ifdef HAVE_LIBNUMA
515/** Configure the NUMA policy for memcached. By default will attempt to set to
516 *  interleaved polocy, unless the env var MEMCACHED_NUMA_MEM_POLICY is set to
517 *  'disable'.
518 *  @return A log message describing what action was taken.
519 *
520 */
521static std::string configure_numa_policy() {
522    if (numa_available() != 0) {
523        return "Not available - not setting mem policy.";
524    }
525
526    // Attempt to set the default NUMA memory policy to interleaved,
527    // unless overridden by our env var.
528    const char* mem_policy_env = getenv("MEMCACHED_NUMA_MEM_POLICY");
529    if (mem_policy_env != NULL && strcmp("disable", mem_policy_env) == 0) {
530        return std::string("NOT setting memory allocation policy - disabled "
531                "via MEMCACHED_NUMA_MEM_POLICY='") + mem_policy_env + "'";
532    } else {
533        errno = 0;
534        numa_set_interleave_mask(numa_all_nodes_ptr);
535        if (errno == 0) {
536            return "Set memory allocation policy to 'interleave'";
537        } else {
538            return std::string("NOT setting memory allocation policy to "
539                    "'interleave' - request failed: ") + cb_strerror();
540        }
541    }
542}
543#endif  // HAVE_LIBNUMA
544
545static void settings_init(void) {
546    // Set up the listener functions
547    settings.addChangeListener("breakpad",
548                               breakpad_changed_listener);
549    settings.addChangeListener("ssl_minimum_protocol",
550                               ssl_minimum_protocol_changed_listener);
551    settings.addChangeListener("ssl_cipher_list",
552                               ssl_cipher_list_changed_listener);
553    settings.addChangeListener("verbosity", verbosity_changed_listener);
554    settings.addChangeListener("interfaces", interfaces_changed_listener);
555    settings.addChangeListener("saslauthd_socketpath",
556                               saslauthd_socketpath_changed_listener);
557    settings.addChangeListener("scramsha_fallback_salt",
558                               scramsha_fallback_salt_changed_listener);
559    NetworkInterface default_interface;
560    settings.addInterface(default_interface);
561
562    settings.setBioDrainBufferSize(8192);
563
564    settings.setVerbose(0);
565    settings.setConnectionIdleTime(0); // Connection idle time disabled
566    settings.setNumWorkerThreads(get_number_of_worker_threads());
567    settings.setDatatypeJsonEnabled(true);
568    settings.setDatatypeSnappyEnabled(true);
569    settings.setRequestsPerEventNotification(50, EventPriority::High);
570    settings.setRequestsPerEventNotification(5, EventPriority::Medium);
571    settings.setRequestsPerEventNotification(1, EventPriority::Low);
572    settings.setRequestsPerEventNotification(20, EventPriority::Default);
573
574    /*
575     * The max object size is 20MB. Let's allow packets up to 30MB to
576     * be handled "properly" by returing E2BIG, but packets bigger
577     * than that will cause the server to disconnect the client
578     */
579    settings.setMaxPacketSize(30 * 1024 * 1024);
580
581    settings.setDedupeNmvbMaps(false);
582
583    char *tmp = getenv("MEMCACHED_TOP_KEYS");
584    settings.setTopkeysSize(20);
585    if (tmp != NULL) {
586        int count;
587        if (safe_strtol(tmp, count)) {
588            settings.setTopkeysSize(count);
589        }
590    }
591
592    {
593        // MB-13642 Allow the user to specify the SSL cipher list
594        //    If someone wants to use SSL we should try to be "secure
595        //    by default", and only allow for using strong ciphers.
596        //    Users that may want to use a less secure cipher list
597        //    should be allowed to do so by setting an environment
598        //    variable (since there is no place in the UI to do
599        //    so currently). Whenever ns_server allows for specifying
600        //    the SSL cipher list in the UI, it will be stored
601        //    in memcached.json and override these settings.
602        const char *env = getenv("COUCHBASE_SSL_CIPHER_LIST");
603        if (env == nullptr) {
604            settings.setSslCipherList("HIGH");
605        } else {
606            settings.setSslCipherList(env);
607        }
608    }
609
610    settings.setSslMinimumProtocol("tlsv1");
611    /*
612     * MB-22586
613     * When connecting over SSL there isn't much point of using SCRAM or
614     * any other CPU intensive (and multiple packets exchange between
615     * client and server) to avoid sending the password in clear text as
616     * everything going over SSL should be encrypted anyway.
617     */
618    if (getenv("COUCHBASE_I_DONT_TRUST_SSL") == nullptr) {
619        settings.setSslSaslMechanisms("PLAIN");
620    }
621
622    if (getenv("COUCHBASE_ENABLE_PRIVILEGE_DEBUG") != nullptr) {
623        settings.setPrivilegeDebug(true);
624    }
625
626    settings.setTopkeysEnabled(true);
627}
628
629/**
630 * The config file may have altered some of the default values we're
631 * caching in other variables. This is the place where we'd propagate
632 * such changes.
633 *
634 * This is also the place to initialize any additional files needed by
635 * Memcached.
636 */
637static void update_settings_from_config(void)
638{
639    std::string root(DESTINATION_ROOT);
640
641    if (!settings.getRoot().empty()) {
642        root = settings.getRoot().c_str();
643    }
644
645    if (settings.getErrorMapsDir().empty()) {
646        // Set the error map dir.
647        std::string error_maps_dir(root + "/etc/couchbase/kv/error_maps");
648        cb::io::sanitizePath(error_maps_dir);
649        if (cb::io::isDirectory(error_maps_dir)) {
650            settings.setErrorMapsDir(error_maps_dir);
651        }
652    }
653
654    // If the user didn't set a socket path, use the default
655    if (settings.getSaslauthdSocketpath().empty()) {
656        const char* path = getenv("CBAUTH_SOCKPATH");
657        if (path == nullptr) {
658            path = "/var/run/saslauthd/mux";
659        }
660
661        settings.setSaslauthdSocketpath(path);
662    }
663
664    try {
665        cb::mcbp::sla::reconfigure(root);
666    } catch (const std::exception& e) {
667        FATAL_ERROR(EXIT_FAILURE, e.what());
668    }
669
670    settings.addChangeListener("opcode_attributes_override",
671                               opcode_attributes_override_changed_listener);
672}
673
674struct {
675    std::mutex mutex;
676    bool disabled;
677    ssize_t count;
678    uint64_t num_disable;
679} listen_state;
680
681bool is_listen_disabled(void) {
682    std::lock_guard<std::mutex> guard(listen_state.mutex);
683    return listen_state.disabled;
684}
685
686uint64_t get_listen_disabled_num(void) {
687    std::lock_guard<std::mutex> guard(listen_state.mutex);
688    return listen_state.num_disable;
689}
690
691void disable_listen() {
692    {
693        std::lock_guard<std::mutex> guard(listen_state.mutex);
694        listen_state.disabled = true;
695        listen_state.count = 10;
696        ++listen_state.num_disable;
697    }
698
699    for (auto& connection : listen_conn) {
700        connection->disable();
701    }
702}
703
704void safe_close(SOCKET sfd) {
705    if (sfd != INVALID_SOCKET) {
706        int rval;
707
708        do {
709            rval = evutil_closesocket(sfd);
710        } while (rval == SOCKET_ERROR &&
711                 cb::net::is_interrupted(cb::net::get_socket_error()));
712
713        if (rval == SOCKET_ERROR) {
714            std::string error = cb_strerror();
715            LOG_WARNING("Failed to close socket {} ({})!!", (int)sfd, error);
716        } else {
717            stats.curr_conns.fetch_sub(1, std::memory_order_relaxed);
718            if (is_listen_disabled()) {
719                notify_dispatcher();
720            }
721        }
722    }
723}
724
725bucket_id_t get_bucket_id(gsl::not_null<const void*> void_cookie) {
726    /* @todo fix this. Currently we're using the index as the id,
727     * but this should be changed to be a uniqe ID that won't be
728     * reused.
729     */
730    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
731    return bucket_id_t(cookie->getConnection().getBucketIndex());
732}
733
734uint64_t get_connection_id(gsl::not_null<const void*> void_cookie) {
735    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
736    return uint64_t(&cookie->getConnection());
737}
738
739std::pair<uint32_t, std::string> cookie_get_log_info(
740        gsl::not_null<const void*> void_cookie) {
741    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
742    return std::make_pair(cookie->getConnection().getId(),
743                          cookie->getConnection().getDescription());
744}
745
746void cookie_set_error_context(gsl::not_null<void*> void_cookie,
747                              cb::const_char_buffer message) {
748    auto* cookie = reinterpret_cast<Cookie*>(void_cookie.get());
749    cookie->setErrorContext(to_string(message));
750}
751
752/**
753 * Check if the cookie holds the privilege
754 *
755 * @param void_cookie this is the cookie passed down to the engine.
756 * @param privilege The privilege to check for
757 * @return if the privilege is held or not (or if the privilege data is stale)
758 */
759static cb::rbac::PrivilegeAccess check_privilege(
760        gsl::not_null<const void*> void_cookie,
761        const cb::rbac::Privilege privilege) {
762    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
763    return cookie->getConnection().checkPrivilege(privilege,
764                                                  const_cast<Cookie&>(*cookie));
765}
766
767static protocol_binary_response_status engine_error2mcbp(
768        gsl::not_null<const void*> void_cookie, ENGINE_ERROR_CODE code) {
769    const auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
770    auto& connection = cookie->getConnection();
771
772    ENGINE_ERROR_CODE status = connection.remapErrorCode(code);
773    if (status == ENGINE_DISCONNECT) {
774        throw cb::engine_error(
775                cb::engine_errc::disconnect,
776                "engine_error2mcbp: " + std::to_string(connection.getId()) +
777                        ": Disconnect client");
778    }
779
780    return engine_error_2_mcbp_protocol_error(status);
781}
782
783static ENGINE_ERROR_CODE pre_link_document(
784        gsl::not_null<const void*> void_cookie, item_info& info) {
785    // Sanity check that people aren't calling the method with a bogus
786    // cookie
787    auto* cookie =
788            reinterpret_cast<Cookie*>(const_cast<void*>(void_cookie.get()));
789
790    auto* context = cookie->getCommandContext();
791    if (context != nullptr) {
792        return context->pre_link_document(info);
793    }
794
795    return ENGINE_SUCCESS;
796}
797
798static cJSON* get_bucket_details_UNLOCKED(const Bucket& bucket, size_t idx) {
799    if (bucket.state == BucketState::None) {
800        return nullptr;
801    }
802
803    cJSON *root = cJSON_CreateObject();
804    cJSON_AddNumberToObject(root, "index", idx);
805    switch (bucket.state.load()) {
806    case BucketState::None:
807        cJSON_AddStringToObject(root, "state", "none");
808        break;
809    case BucketState::Creating:
810        cJSON_AddStringToObject(root, "state", "creating");
811        break;
812    case BucketState::Initializing:
813        cJSON_AddStringToObject(root, "state", "initializing");
814        break;
815    case BucketState::Ready:
816        cJSON_AddStringToObject(root, "state", "ready");
817        break;
818    case BucketState::Stopping:
819        cJSON_AddStringToObject(root, "state", "stopping");
820        break;
821    case BucketState::Destroying:
822        cJSON_AddStringToObject(root, "state", "destroying");
823        break;
824    }
825
826    cJSON_AddNumberToObject(root, "clients", bucket.clients);
827    cJSON_AddStringToObject(root, "name", bucket.name);
828
829    switch (bucket.type) {
830    case BucketType::Unknown:
831        cJSON_AddStringToObject(root, "type", "<<unknown>>");
832        break;
833    case BucketType::NoBucket:
834        cJSON_AddStringToObject(root, "type", "no bucket");
835        break;
836    case BucketType::Memcached:
837        cJSON_AddStringToObject(root, "type", "memcached");
838        break;
839    case BucketType::Couchstore:
840        cJSON_AddStringToObject(root, "type", "couchstore");
841        break;
842    case BucketType::EWouldBlock:
843        cJSON_AddStringToObject(root, "type", "ewouldblock");
844        break;
845    }
846
847    return root;
848}
849
850cJSON *get_bucket_details(size_t idx)
851{
852    cJSON* ret;
853    Bucket &bucket = all_buckets.at(idx);
854    std::lock_guard<std::mutex> guard(bucket.mutex);
855    ret = get_bucket_details_UNLOCKED(bucket, idx);
856
857    return ret;
858}
859
860/**
861 * Check if the associated bucket is dying or not. There is two reasons
862 * for why a bucket could be dying: It is currently being deleted, or
863 * someone initiated a shutdown process.
864 */
865bool is_bucket_dying(Connection& c) {
866    bool disconnect = memcached_shutdown;
867    Bucket& b = all_buckets.at(c.getBucketIndex());
868
869    if (b.state != BucketState::Ready) {
870        disconnect = true;
871    }
872
873    if (disconnect) {
874        LOG_INFO(
875                "{}: The connected bucket is being deleted.. closing "
876                "connection {}",
877                c.getId(),
878                c.getDescription());
879        c.setState(McbpStateMachine::State::closing);
880        return true;
881    }
882
883    return false;
884}
885
886void event_handler(evutil_socket_t fd, short which, void *arg) {
887    auto* c = reinterpret_cast<Connection*>(arg);
888    if (c == nullptr) {
889        LOG_WARNING("event_handler: connection must be non-NULL");
890        return;
891    }
892
893    auto *thr = c->getThread();
894    if (thr == nullptr) {
895        LOG_WARNING(
896                "Internal error - connection without a thread found. - "
897                "ignored");
898        return;
899    }
900
901    // Remove the list from the list of pending io's (in case the
902    // object was scheduled to run in the dispatcher before the
903    // callback for the worker thread is executed.
904    //
905    {
906        std::lock_guard<std::mutex> lock(thr->pending_io.mutex);
907        thr->pending_io.map.erase(c);
908    }
909
910    // Remove the connection from the notification list if it's there
911    thr->notification.remove(c);
912
913    TRACE_LOCKGUARD_TIMED(thr->mutex,
914                          "mutex",
915                          "event_handler::threadLock",
916                          SlowMutexThreshold);
917
918    if (memcached_shutdown) {
919        // Someone requested memcached to shut down.
920        if (signal_idle_clients(thr, -1, false) == 0) {
921            cb_assert(thr != nullptr);
922            LOG_INFO("Stopping worker thread {}", thr->index);
923            c->eventBaseLoopbreak();
924            return;
925        }
926    }
927
928    /* sanity */
929    cb_assert(fd == c->getSocketDescriptor());
930
931    if ((which & EV_TIMEOUT) == EV_TIMEOUT) {
932        if (c->isInternal() || c->isDCP()) {
933            if (c->isInternal()) {
934                LOG_INFO("{}: Timeout for admin connection. (ignore)",
935                         c->getId());
936            } else if (c->isDCP()) {
937                LOG_INFO("{}: Timeout for DCP connection. (ignore)",
938                         c->getId());
939            }
940            if (!c->reapplyEventmask()) {
941                c->setState(McbpStateMachine::State::closing);
942            }
943        } else {
944            LOG_INFO("{}: Shutting down idle client {}",
945                     c->getId(),
946                     c->getDescription());
947            c->setState(McbpStateMachine::State::closing);
948        }
949    }
950
951    run_event_loop(c, which);
952
953    if (memcached_shutdown) {
954        // Someone requested memcached to shut down. If we don't have
955        // any connections bound to this thread we can just shut down
956        int connected = signal_idle_clients(thr, -1, true);
957        if (connected == 0) {
958            LOG_INFO("Stopping worker thread {}", thr->index);
959            event_base_loopbreak(thr->base);
960        } else {
961            LOG_INFO("Waiting for {} connected clients on worker thread {}",
962                     connected,
963                     thr->index);
964        }
965    }
966}
967
968/**
969 * The listen_event_handler is the callback from libevent when someone is
970 * connecting to one of the server sockets. It runs in the context of the
971 * listen thread
972 */
973void listen_event_handler(evutil_socket_t, short which, void *arg) {
974    auto& c = *reinterpret_cast<ServerSocket*>(arg);
975
976    if (memcached_shutdown) {
977        // Someone requested memcached to shut down. The listen thread should
978        // be stopped immediately to avoid new connections
979        LOG_INFO("Stopping listen thread");
980        event_base_loopbreak(main_base);
981        return;
982    }
983
984    try {
985        c.acceptNewClient();
986    } catch (std::invalid_argument& e) {
987        LOG_WARNING("{}: exception occurred while accepting clients: {}",
988                    c.getSocket(),
989                    e.what());
990    }
991}
992
993static void dispatch_event_handler(evutil_socket_t fd, short, void *) {
994    char buffer[80];
995    ssize_t nr = cb::net::recv(fd, buffer, sizeof(buffer), 0);
996
997    if (enable_common_ports.load()) {
998        enable_common_ports.store(false);
999        create_listen_sockets(false);
1000        LOG_INFO("Initialization complete. Accepting clients.");
1001    }
1002
1003    if (nr != -1 && is_listen_disabled()) {
1004        bool enable = false;
1005        {
1006            std::lock_guard<std::mutex> guard(listen_state.mutex);
1007            listen_state.count -= nr;
1008            if (listen_state.count <= 0) {
1009                enable = true;
1010                listen_state.disabled = false;
1011            }
1012        }
1013        if (enable) {
1014            for (auto& connection : listen_conn) {
1015                connection->enable();
1016            }
1017        }
1018    }
1019}
1020
1021/*
1022 * Sets a socket's send buffer size to the maximum allowed by the system.
1023 */
1024static void maximize_sndbuf(const SOCKET sfd) {
1025    socklen_t intsize = sizeof(int);
1026    int last_good = 0;
1027    int old_size;
1028
1029    /* Start with the default size. */
1030    if (cb::net::getsockopt(sfd,
1031                            SOL_SOCKET,
1032                            SO_SNDBUF,
1033                            reinterpret_cast<void*>(&old_size),
1034                            &intsize) != 0) {
1035        LOG_WARNING("getsockopt(SO_SNDBUF): {}", strerror(errno));
1036        return;
1037    }
1038
1039    /* Binary-search for the real maximum. */
1040    int min = old_size;
1041    int max = MAX_SENDBUF_SIZE;
1042
1043    while (min <= max) {
1044        int avg = ((unsigned int)(min + max)) / 2;
1045        if (cb::net::setsockopt(sfd,
1046                                SOL_SOCKET,
1047                                SO_SNDBUF,
1048                                reinterpret_cast<void*>(&avg),
1049                                intsize) == 0) {
1050            last_good = avg;
1051            min = avg + 1;
1052        } else {
1053            max = avg - 1;
1054        }
1055    }
1056
1057    LOG_DEBUG("<{} send buffer was {}, now {}", sfd, old_size, last_good);
1058}
1059
1060static SOCKET new_server_socket(struct addrinfo *ai, bool tcp_nodelay) {
1061    SOCKET sfd;
1062
1063    sfd = cb::net::socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
1064    if (sfd == INVALID_SOCKET) {
1065        return INVALID_SOCKET;
1066    }
1067
1068    if (evutil_make_socket_nonblocking(sfd) == -1) {
1069        safe_close(sfd);
1070        return INVALID_SOCKET;
1071    }
1072
1073    maximize_sndbuf(sfd);
1074
1075    const struct linger ling = {0, 0};
1076    const int flags = 1;
1077    int error;
1078
1079#ifdef IPV6_V6ONLY
1080    if (ai->ai_family == AF_INET6) {
1081        error = cb::net::setsockopt(sfd,
1082                                    IPPROTO_IPV6,
1083                                    IPV6_V6ONLY,
1084                                    reinterpret_cast<const void*>(&flags),
1085                                    sizeof(flags));
1086        if (error != 0) {
1087            LOG_WARNING("setsockopt(IPV6_V6ONLY): {}", strerror(errno));
1088            safe_close(sfd);
1089            return INVALID_SOCKET;
1090        }
1091    }
1092#endif
1093
1094    if (cb::net::setsockopt(sfd,
1095                            SOL_SOCKET,
1096                            SO_REUSEADDR,
1097                            reinterpret_cast<const void*>(&flags),
1098                            sizeof(flags)) != 0) {
1099        LOG_WARNING("setsockopt(SO_REUSEADDR): {}",
1100                    cb_strerror(cb::net::get_socket_error()));
1101    }
1102
1103    if (cb::net::setsockopt(sfd,
1104                            SOL_SOCKET,
1105                            SO_KEEPALIVE,
1106                            reinterpret_cast<const void*>(&flags),
1107                            sizeof(flags)) != 0) {
1108        LOG_WARNING("setsockopt(SO_KEEPALIVE): {}",
1109                    cb_strerror(cb::net::get_socket_error()));
1110    }
1111
1112    if (cb::net::setsockopt(sfd,
1113                            SOL_SOCKET,
1114                            SO_LINGER,
1115                            reinterpret_cast<const char*>(&ling),
1116                            sizeof(ling)) != 0) {
1117        LOG_WARNING("setsockopt(SO_LINGER): {}",
1118                    cb_strerror(cb::net::get_socket_error()));
1119    }
1120
1121    if (tcp_nodelay) {
1122        if (cb::net::setsockopt(sfd,
1123                                IPPROTO_TCP,
1124                                TCP_NODELAY,
1125                                reinterpret_cast<const void*>(&flags),
1126                                sizeof(flags)) != 0) {
1127            LOG_WARNING("setsockopt(TCP_NODELAY): {}",
1128                        cb_strerror(cb::net::get_socket_error()));
1129        }
1130    }
1131
1132    return sfd;
1133}
1134
1135/**
1136 * Add a port to the list of interfaces we're listening to.
1137 *
1138 * We're supporting binding to the port number "0" to have the operating
1139 * system pick an available port we may use (and we'll report it back to
1140 * the user through the portnumber file.). If we have knowledge of the port,
1141 * update the port descriptor (ip4/ip6), if not go ahead and create a new entry
1142 *
1143 * @param interf the interface description used to create the port
1144 * @param port the port number in use
1145 * @param family the address family for the port
1146 */
1147static void add_listening_port(const NetworkInterface *interf, in_port_t port, sa_family_t family) {
1148    std::lock_guard<std::mutex> guard(stats_mutex);
1149    auto *descr = get_listening_port_instance(port);
1150
1151    if (descr == nullptr) {
1152        ListeningPort newport(port,
1153                              interf->host,
1154                              interf->tcp_nodelay,
1155                              interf->backlog,
1156                              interf->management);
1157
1158        newport.curr_conns = 1;
1159        newport.maxconns = interf->maxconn;
1160
1161        if (interf->ssl.key.empty() || interf->ssl.cert.empty()) {
1162            newport.ssl.enabled = false;
1163        } else {
1164            newport.ssl.enabled = true;
1165            newport.ssl.key = interf->ssl.key;
1166            newport.ssl.cert = interf->ssl.cert;
1167        }
1168
1169        if (family == AF_INET) {
1170            newport.ipv4 = true;
1171            newport.ipv6 = false;
1172        } else if (family == AF_INET6) {
1173            newport.ipv4 = false;
1174            newport.ipv6 = true;
1175        }
1176
1177        stats.listening_ports.push_back(newport);
1178    } else {
1179        if (family == AF_INET) {
1180            descr->ipv4 = true;
1181        } else if (family == AF_INET6) {
1182            descr->ipv6 = true;
1183        }
1184        ++descr->curr_conns;
1185    }
1186}
1187
1188/**
1189 * Create a socket and bind it to a specific port number
1190 * @param interface the interface to bind to
1191 * @param true if we was able to set up at least one address on the interface
1192 *        false if we failed to set any addresses on the interface
1193 */
1194static bool server_socket(const NetworkInterface& interf) {
1195    SOCKET sfd;
1196    addrinfo hints = {};
1197
1198    // Set to true when we create an IPv4 interface
1199    bool ipv4 = false;
1200    // Set to true when we create an IPv6 interface
1201    bool ipv6 = false;
1202
1203    hints.ai_flags = AI_PASSIVE;
1204    hints.ai_protocol = IPPROTO_TCP;
1205    hints.ai_socktype = SOCK_STREAM;
1206
1207    if (interf.ipv4 != NetworkInterface::Protocol::Off &&
1208        interf.ipv6 != NetworkInterface::Protocol::Off) {
1209        hints.ai_family = AF_UNSPEC;
1210    } else if (interf.ipv4 != NetworkInterface::Protocol::Off) {
1211        hints.ai_family = AF_INET;
1212    } else if (interf.ipv6 != NetworkInterface::Protocol::Off) {
1213        hints.ai_family = AF_INET6;
1214    } else {
1215        throw std::invalid_argument(
1216                "server_socket: can't create a socket without IPv4 or IPv6");
1217    }
1218
1219    std::string port_buf = std::to_string(interf.port);
1220
1221    const char* host = nullptr;
1222    if (!interf.host.empty() && interf.host != "*") {
1223        host = interf.host.c_str();
1224    }
1225
1226    struct addrinfo *ai;
1227    int error = getaddrinfo(host, port_buf.c_str(), &hints, &ai);
1228    if (error != 0) {
1229#ifdef WIN32
1230        LOG_WARNING("getaddrinfo(): {}", cb_strerror(error));
1231#else
1232        if (error != EAI_SYSTEM) {
1233            LOG_WARNING("getaddrinfo(): {}", gai_strerror(error));
1234        } else {
1235            LOG_WARNING("getaddrinfo(): {}", cb_strerror(error));
1236        }
1237#endif
1238        return false;
1239    }
1240
1241    // getaddrinfo may return multiple entries for a given name/port pair.
1242    // Iterate over all of them and try to set up a listen object.
1243    // We need at least _one_ entry per requested configuration (IPv4/6) in
1244    // order to call it a success.
1245    for (struct addrinfo* next = ai; next; next = next->ai_next) {
1246        if ((sfd = new_server_socket(next, interf.tcp_nodelay)) ==
1247            INVALID_SOCKET) {
1248            // getaddrinfo can return "junk" addresses,
1249            continue;
1250        }
1251
1252        in_port_t listenport = 0;
1253        if (bind(sfd, next->ai_addr, (socklen_t)next->ai_addrlen) == SOCKET_ERROR) {
1254            const auto bind_error = cb::net::get_socket_error();
1255            auto name = cb::net::to_string(
1256                    reinterpret_cast<sockaddr_storage*>(next->ai_addr),
1257                    static_cast<socklen_t>(next->ai_addrlen));
1258            LOG_WARNING(
1259                    "Failed to bind to {} - {}", name, cb_strerror(bind_error));
1260            safe_close(sfd);
1261            continue;
1262        }
1263
1264        // We've configured this port.
1265        if (next->ai_addr->sa_family == AF_INET) {
1266            // We have at least one entry
1267            ipv4 = true;
1268        } else if (next->ai_addr->sa_family == AF_INET6) {
1269            // We have at least one entry
1270            ipv6 = true;
1271        }
1272
1273        if (next->ai_addr->sa_family == AF_INET ||
1274             next->ai_addr->sa_family == AF_INET6) {
1275            union {
1276                struct sockaddr_in in;
1277                struct sockaddr_in6 in6;
1278            } my_sockaddr;
1279            socklen_t len = sizeof(my_sockaddr);
1280            if (getsockname(sfd, (struct sockaddr*)&my_sockaddr, &len)==0) {
1281                if (next->ai_addr->sa_family == AF_INET) {
1282                    listenport = ntohs(my_sockaddr.in.sin_port);
1283                } else {
1284                    listenport = ntohs(my_sockaddr.in6.sin6_port);
1285                }
1286            }
1287        }
1288
1289        listen_conn.emplace_back(std::make_unique<ServerSocket>(
1290                sfd, main_base, listenport, next->ai_addr->sa_family, interf));
1291        stats.daemon_conns++;
1292        stats.curr_conns.fetch_add(1, std::memory_order_relaxed);
1293        add_listening_port(&interf, listenport, next->ai_addr->sa_family);
1294    }
1295
1296    freeaddrinfo(ai);
1297
1298    // Check if we successfully listened on all required protocols.
1299    bool required_proto_missing = false;
1300
1301    // Check if the specified (missing) protocol was requested; if so log a
1302    // message, and if required return true.
1303    auto checkIfProtocolRequired =
1304            [interf](const NetworkInterface::Protocol& protoMode,
1305                     const char* protoName) -> bool {
1306        if (protoMode != NetworkInterface::Protocol::Off) {
1307            // Failed to create a socket for this protocol; and it's not
1308            // disabled
1309            auto level = spdlog::level::level_enum::warn;
1310            if (protoMode == NetworkInterface::Protocol::Required) {
1311                level = spdlog::level::level_enum::critical;
1312            }
1313            CB_LOG_ENTRY(level,
1314                         R"(Failed to create {} {} socket for "{}:{}")",
1315                         to_string(protoMode),
1316                         protoName,
1317                         interf.host.empty() ? "*" : interf.host,
1318                         interf.port);
1319        }
1320        return protoMode == NetworkInterface::Protocol::Required;
1321    };
1322    if (!ipv4) {
1323        required_proto_missing |= checkIfProtocolRequired(interf.ipv4, "IPv4");
1324    }
1325    if (!ipv6) {
1326        required_proto_missing |= checkIfProtocolRequired(interf.ipv6, "IPv6");
1327    }
1328
1329    // Return success as long as we managed to create a listening port
1330    // for all non-optional protocols.
1331    return !required_proto_missing;
1332}
1333
1334static bool server_sockets(bool management) {
1335    bool success = true;
1336
1337    if (management) {
1338        LOG_INFO("Enable management port(s)");
1339    } else {
1340        LOG_INFO("Enable user port(s)");
1341    }
1342
1343    for (auto& interface : settings.getInterfaces()) {
1344        if (management && interface.management) {
1345            if (!server_socket(interface)) {
1346                success = false;
1347            }
1348        } else if (!management && !interface.management) {
1349            if (!server_socket(interface)) {
1350                success = false;
1351            }
1352        }
1353    }
1354
1355    return success;
1356}
1357
1358static void create_listen_sockets(bool management) {
1359    if (!server_sockets(management)) {
1360        FATAL_ERROR(
1361                EX_OSERR,
1362                "Failed to create required listening socket(s). Terminating.");
1363    }
1364
1365    if (management) {
1366        // the client is not expecting us to update the port set at
1367        // later time, so enable all ports immediately
1368        if (!server_sockets(false)) {
1369            FATAL_ERROR(EX_OSERR,
1370                        "Failed to create required listening socket(s). "
1371                        "Terminating.");
1372        }
1373    }
1374
1375    const char* portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
1376    if (portnumber_filename != nullptr) {
1377        std::string temp_portnumber_filename;
1378        temp_portnumber_filename.assign(portnumber_filename);
1379        temp_portnumber_filename.append(".lck");
1380
1381        FILE* portnumber_file = nullptr;
1382        portnumber_file = fopen(temp_portnumber_filename.c_str(), "a");
1383        if (portnumber_file == nullptr) {
1384            FATAL_ERROR(EX_OSERR,
1385                        R"(Failed to open "{}": {})",
1386                        temp_portnumber_filename,
1387                        strerror(errno));
1388        }
1389
1390        unique_cJSON_ptr array(cJSON_CreateArray());
1391
1392        for (const auto& connection : listen_conn) {
1393            cJSON_AddItemToArray(array.get(),
1394                                 connection->getDetails().release());
1395        }
1396
1397        unique_cJSON_ptr root(cJSON_CreateObject());
1398        cJSON_AddItemToObject(root.get(), "ports", array.release());
1399        fprintf(portnumber_file, "%s\n", to_string(root, true).c_str());
1400        fclose(portnumber_file);
1401        LOG_INFO("Port numbers available in {}", portnumber_filename);
1402        if (rename(temp_portnumber_filename.c_str(), portnumber_filename) == -1) {
1403            FATAL_ERROR(EX_OSERR,
1404                        R"(Failed to rename "{}" to "{}": {})",
1405                        temp_portnumber_filename.c_str(),
1406                        portnumber_filename,
1407                        strerror(errno));
1408        }
1409    }
1410}
1411
1412#ifdef WIN32
1413// Unfortunately we don't have signal handlers on windows
1414static bool install_signal_handlers() {
1415    return true;
1416}
1417
1418static void release_signal_handlers() {
1419}
1420#else
1421
1422#ifndef HAVE_SIGIGNORE
1423static int sigignore(int sig) {
1424    struct sigaction sa;
1425    memset(&sa, 0, sizeof(sa));
1426    sa.sa_handler = SIG_IGN;
1427
1428    if (sigemptyset(&sa.sa_mask) == -1 || sigaction(sig, &sa, 0) == -1) {
1429        return -1;
1430    }
1431    return 0;
1432}
1433#endif /* !HAVE_SIGIGNORE */
1434
1435
1436static void sigterm_handler(evutil_socket_t, short, void *) {
1437    shutdown_server();
1438}
1439
1440static struct event* sigusr1_event;
1441static struct event* sigterm_event;
1442static struct event* sigint_event;
1443
1444static bool install_signal_handlers() {
1445    // SIGUSR1 - Used to dump connection stats
1446    sigusr1_event = evsignal_new(main_base, SIGUSR1,
1447                                 dump_connection_stat_signal_handler,
1448                                 nullptr);
1449    if (sigusr1_event == nullptr) {
1450        LOG_WARNING("Failed to allocate SIGUSR1 handler");
1451        return false;
1452    }
1453
1454    if (event_add(sigusr1_event, nullptr) < 0) {
1455        LOG_WARNING("Failed to install SIGUSR1 handler");
1456        return false;
1457
1458    }
1459
1460    // SIGTERM - Used to shut down memcached cleanly
1461    sigterm_event = evsignal_new(main_base, SIGTERM, sigterm_handler, NULL);
1462    if (sigterm_event == NULL) {
1463        LOG_WARNING("Failed to allocate SIGTERM handler");
1464        return false;
1465    }
1466
1467    if (event_add(sigterm_event, NULL) < 0) {
1468        LOG_WARNING("Failed to install SIGTERM handler");
1469        return false;
1470    }
1471
1472    // SIGINT - Used to shut down memcached cleanly
1473    sigint_event = evsignal_new(main_base, SIGINT, sigterm_handler, NULL);
1474    if (sigint_event == NULL) {
1475        LOG_WARNING("Failed to allocate SIGINT handler");
1476        return false;
1477    }
1478
1479    if (event_add(sigint_event, NULL) < 0) {
1480        LOG_WARNING("Failed to install SIGINT handler");
1481        return false;
1482    }
1483
1484    return true;
1485}
1486
1487static void release_signal_handlers() {
1488    event_free(sigusr1_event);
1489    event_free(sigint_event);
1490    event_free(sigterm_event);
1491}
1492#endif
1493
1494const char* get_server_version(void) {
1495    if (strlen(PRODUCT_VERSION) == 0) {
1496        return "unknown";
1497    } else {
1498        return PRODUCT_VERSION;
1499    }
1500}
1501
1502static void store_engine_specific(gsl::not_null<const void*> void_cookie,
1503                                  void* engine_data) {
1504    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1505    cookie->getConnection().setEngineStorage(engine_data);
1506}
1507
1508static void* get_engine_specific(gsl::not_null<const void*> void_cookie) {
1509    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1510    return cookie->getConnection().getEngineStorage();
1511}
1512
1513static bool is_datatype_supported(gsl::not_null<const void*> void_cookie,
1514                                  protocol_binary_datatype_t datatype) {
1515    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1516    return cookie->getConnection().isDatatypeEnabled(datatype);
1517}
1518
1519static bool is_mutation_extras_supported(
1520        gsl::not_null<const void*> void_cookie) {
1521    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1522    return cookie->getConnection().isSupportsMutationExtras();
1523}
1524
1525static bool is_collections_supported(gsl::not_null<const void*> void_cookie) {
1526    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1527    return cookie->getConnection().isCollectionsSupported();
1528}
1529
1530static uint8_t get_opcode_if_ewouldblock_set(
1531        gsl::not_null<const void*> void_cookie) {
1532    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1533
1534    uint8_t opcode = PROTOCOL_BINARY_CMD_INVALID;
1535    if (cookie->isEwouldblock()) {
1536        try {
1537            opcode = cookie->getHeader().getOpcode();
1538        } catch (...) {
1539            // Don't barf out if the header isn't there
1540        }
1541    }
1542    return opcode;
1543}
1544
1545static bool validate_session_cas(const uint64_t cas) {
1546    return session_cas.increment_session_counter(cas);
1547}
1548
1549static void decrement_session_ctr(void) {
1550    session_cas.decrement_session_counter();
1551}
1552
1553static ENGINE_ERROR_CODE reserve_cookie(
1554        gsl::not_null<const void*> void_cookie) {
1555    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1556
1557    cookie->getConnection().incrementRefcount();
1558    return ENGINE_SUCCESS;
1559}
1560
1561static ENGINE_ERROR_CODE release_cookie(
1562        gsl::not_null<const void*> void_cookie) {
1563    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1564
1565    auto* c = &cookie->getConnection();
1566    int notify;
1567    LIBEVENT_THREAD *thr;
1568
1569    thr = c->getThread();
1570    cb_assert(thr);
1571
1572    {
1573        TRACE_LOCKGUARD_TIMED(thr->mutex,
1574                              "mutex",
1575                              "release_cookie::threadLock",
1576                              SlowMutexThreshold);
1577
1578        c->decrementRefcount();
1579
1580        /* Releasing the refererence to the object may cause it to change
1581         * state. (NOTE: the release call shall never be called from the
1582         * worker threads), so should put the connection in the pool of
1583         * pending IO and have the system retry the operation for the
1584         * connection
1585         */
1586        notify = add_conn_to_pending_io_list(c, ENGINE_SUCCESS);
1587    }
1588
1589    /* kick the thread in the butt */
1590    if (notify) {
1591        notify_thread(*thr);
1592    }
1593
1594    return ENGINE_SUCCESS;
1595}
1596
1597static void cookie_set_priority(gsl::not_null<const void*> void_cookie,
1598                                CONN_PRIORITY priority) {
1599    auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1600
1601    auto* c = &cookie->getConnection();
1602    switch (priority) {
1603    case CONN_PRIORITY_HIGH:
1604        c->setPriority(Connection::Priority::High);
1605        return;
1606    case CONN_PRIORITY_MED:
1607        c->setPriority(Connection::Priority::Medium);
1608        return;
1609    case CONN_PRIORITY_LOW:
1610        c->setPriority(Connection::Priority::Low);
1611        return;
1612    }
1613
1614    LOG_WARNING(
1615            "{}: cookie_set_priority: priority (which is {}) is not a "
1616            "valid CONN_PRIORITY - closing connection {}",
1617            c->getId(),
1618            priority,
1619            c->getDescription());
1620    c->setState(McbpStateMachine::State::closing);
1621}
1622
1623static void count_eviction(gsl::not_null<const void*>, const void*, int) {
1624}
1625
1626static size_t get_max_item_iovec_size() {
1627    return 1;
1628}
1629
1630static std::condition_variable shutdown_cv;
1631static std::mutex shutdown_cv_mutex;
1632static bool memcached_can_shutdown = false;
1633void shutdown_server(void) {
1634
1635    std::unique_lock<std::mutex> lk(shutdown_cv_mutex);
1636    if (!memcached_can_shutdown) {
1637        // log and proceed to wait shutdown
1638        LOG_INFO("shutdown_server waiting for can_shutdown signal");
1639        shutdown_cv.wait(lk, []{return memcached_can_shutdown;});
1640    }
1641    memcached_shutdown = true;
1642    LOG_INFO("Received shutdown request");
1643    event_base_loopbreak(main_base);
1644}
1645
1646void enable_shutdown(void) {
1647    std::unique_lock<std::mutex> lk(shutdown_cv_mutex);
1648    memcached_can_shutdown = true;
1649    shutdown_cv.notify_all();
1650}
1651
1652static EXTENSION_LOGGER_DESCRIPTOR* get_logger(void)
1653{
1654    return settings.extensions.logger;
1655}
1656
1657static EXTENSION_LOG_LEVEL get_log_level(void)
1658{
1659    EXTENSION_LOG_LEVEL ret;
1660    switch (settings.getVerbose()) {
1661    case 0: ret = EXTENSION_LOG_NOTICE; break;
1662    case 1: ret = EXTENSION_LOG_INFO; break;
1663    default:
1664        ret = EXTENSION_LOG_DEBUG;
1665    }
1666    return ret;
1667}
1668
1669static void set_log_level(EXTENSION_LOG_LEVEL severity)
1670{
1671    switch (severity) {
1672    case EXTENSION_LOG_FATAL:
1673    case EXTENSION_LOG_WARNING:
1674    case EXTENSION_LOG_NOTICE:
1675        settings.setVerbose(0);
1676        break;
1677    case EXTENSION_LOG_INFO:
1678        settings.setVerbose(1);
1679        break;
1680    default:
1681        settings.setVerbose(2);
1682    }
1683}
1684
1685/**
1686 * Callback the engines may call to get the public server interface
1687 * @return pointer to a structure containing the interface. The client should
1688 *         know the layout and perform the proper casts.
1689 */
1690SERVER_HANDLE_V1* get_server_api() {
1691    static int init;
1692    static SERVER_CORE_API core_api;
1693    static SERVER_COOKIE_API server_cookie_api;
1694    static SERVER_STAT_API server_stat_api;
1695    static SERVER_LOG_API server_log_api;
1696    static SERVER_CALLBACK_API callback_api;
1697    static ALLOCATOR_HOOKS_API hooks_api;
1698    static SERVER_HANDLE_V1 rv;
1699    static SERVER_DOCUMENT_API document_api;
1700
1701    if (!init) {
1702        init = 1;
1703        core_api.realtime = mc_time_convert_to_real_time;
1704        core_api.abstime = mc_time_convert_to_abs_time;
1705        core_api.get_current_time = mc_time_get_current_time;
1706        core_api.parse_config = parse_config;
1707        core_api.shutdown = shutdown_server;
1708        core_api.get_max_item_iovec_size = get_max_item_iovec_size;
1709        core_api.trigger_tick = mc_time_clock_tick;
1710
1711        server_cookie_api.store_engine_specific = store_engine_specific;
1712        server_cookie_api.get_engine_specific = get_engine_specific;
1713        server_cookie_api.is_datatype_supported = is_datatype_supported;
1714        server_cookie_api.is_mutation_extras_supported = is_mutation_extras_supported;
1715        server_cookie_api.is_collections_supported = is_collections_supported;
1716        server_cookie_api.get_opcode_if_ewouldblock_set = get_opcode_if_ewouldblock_set;
1717        server_cookie_api.validate_session_cas = validate_session_cas;
1718        server_cookie_api.decrement_session_ctr = decrement_session_ctr;
1719        server_cookie_api.notify_io_complete = notify_io_complete;
1720        server_cookie_api.reserve = reserve_cookie;
1721        server_cookie_api.release = release_cookie;
1722        server_cookie_api.set_priority = cookie_set_priority;
1723        server_cookie_api.get_bucket_id = get_bucket_id;
1724        server_cookie_api.get_connection_id = get_connection_id;
1725        server_cookie_api.check_privilege = check_privilege;
1726        server_cookie_api.engine_error2mcbp = engine_error2mcbp;
1727        server_cookie_api.get_log_info = cookie_get_log_info;
1728        server_cookie_api.set_error_context = cookie_set_error_context;
1729
1730        server_stat_api.evicting = count_eviction;
1731
1732        server_log_api.get_logger = get_logger;
1733        server_log_api.get_level = get_log_level;
1734        server_log_api.set_level = set_log_level;
1735
1736        callback_api.register_callback = register_callback;
1737        callback_api.perform_callbacks = perform_callbacks;
1738
1739        hooks_api.add_new_hook = AllocHooks::add_new_hook;
1740        hooks_api.remove_new_hook = AllocHooks::remove_new_hook;
1741        hooks_api.add_delete_hook = AllocHooks::add_delete_hook;
1742        hooks_api.remove_delete_hook = AllocHooks::remove_delete_hook;
1743        hooks_api.get_extra_stats_size = AllocHooks::get_extra_stats_size;
1744        hooks_api.get_allocator_stats = AllocHooks::get_allocator_stats;
1745        hooks_api.get_allocation_size = AllocHooks::get_allocation_size;
1746        hooks_api.get_detailed_stats = AllocHooks::get_detailed_stats;
1747        hooks_api.release_free_memory = AllocHooks::release_free_memory;
1748        hooks_api.enable_thread_cache = AllocHooks::enable_thread_cache;
1749        hooks_api.get_allocator_property = AllocHooks::get_allocator_property;
1750
1751        document_api.pre_link = pre_link_document;
1752        document_api.pre_expiry = document_pre_expiry;
1753
1754        rv.interface = 1;
1755        rv.core = &core_api;
1756        rv.stat = &server_stat_api;
1757        rv.callback = &callback_api;
1758        rv.log = &server_log_api;
1759        rv.cookie = &server_cookie_api;
1760        rv.alloc_hooks = &hooks_api;
1761        rv.document = &document_api;
1762    }
1763
1764    return &rv;
1765}
1766
1767/* BUCKET FUNCTIONS */
1768void CreateBucketThread::create() {
1769    LOG_INFO("{} Create bucket [{}]", connection.getId(), name);
1770
1771    if (!BucketValidator::validateBucketName(name, error)) {
1772        LOG_WARNING("{} Create bucket [{}] failed - Invalid bucket name",
1773                    connection.getId(),
1774                    name);
1775        result = ENGINE_EINVAL;
1776        return;
1777    }
1778
1779    if (!BucketValidator::validateBucketType(type, error)) {
1780        LOG_WARNING("{} Create bucket [{}] failed - Invalid bucket type",
1781                    connection.getId(),
1782                    name);
1783        result = ENGINE_EINVAL;
1784        return;
1785    }
1786
1787    size_t ii;
1788    size_t first_free = all_buckets.size();
1789    bool found = false;
1790
1791    std::unique_lock<std::mutex> all_bucket_lock(buckets_lock);
1792    for (ii = 0; ii < all_buckets.size() && !found; ++ii) {
1793        std::lock_guard<std::mutex> guard(all_buckets[ii].mutex);
1794        if (first_free == all_buckets.size() &&
1795            all_buckets[ii].state == BucketState::None)
1796        {
1797            first_free = ii;
1798        }
1799        if (name == all_buckets[ii].name) {
1800            found = true;
1801        }
1802    }
1803
1804    if (found) {
1805        result = ENGINE_KEY_EEXISTS;
1806        LOG_WARNING("{} Create bucket [{}] failed - Already exists",
1807                    connection.getId(),
1808                    name);
1809    } else if (first_free == all_buckets.size()) {
1810        result = ENGINE_E2BIG;
1811        LOG_WARNING("{} Create bucket [{}] failed - Too many buckets",
1812                    connection.getId(),
1813                    name);
1814    } else {
1815        result = ENGINE_SUCCESS;
1816        ii = first_free;
1817        /*
1818         * split the creation of the bucket in two... so
1819         * we can release the global lock..
1820         */
1821        std::lock_guard<std::mutex> guard(all_buckets[ii].mutex);
1822        all_buckets[ii].state = BucketState::Creating;
1823        all_buckets[ii].type = type;
1824        strcpy(all_buckets[ii].name, name.c_str());
1825        try {
1826            all_buckets[ii].topkeys = new TopKeys(settings.getTopkeysSize());
1827        } catch (const std::bad_alloc &) {
1828            result = ENGINE_ENOMEM;
1829            LOG_WARNING("{} Create bucket [{}] failed - out of memory",
1830                        connection.getId(),
1831                        name);
1832        }
1833    }
1834    all_bucket_lock.unlock();
1835
1836    if (result != ENGINE_SUCCESS) {
1837        return;
1838    }
1839
1840    auto &bucket = all_buckets[ii];
1841
1842    /* People aren't allowed to use the engine in this state,
1843     * so we can do stuff without locking..
1844     */
1845    if (new_engine_instance(type,
1846                            name,
1847                            get_server_api,
1848                            (ENGINE_HANDLE**)&bucket.engine,
1849                            settings.extensions.logger)) {
1850        auto* engine = bucket.engine;
1851        {
1852            std::lock_guard<std::mutex> guard(bucket.mutex);
1853            bucket.state = BucketState::Initializing;
1854        }
1855
1856        try {
1857            result = engine->initialize(v1_handle_2_handle(engine),
1858                                        config.c_str());
1859        } catch (const std::runtime_error& e) {
1860            LOG_WARNING("{} - Failed to create bucket [{}]: {}",
1861                        connection.getId(),
1862                        name,
1863                        e.what());
1864            result = ENGINE_FAILED;
1865        } catch (const std::bad_alloc& e) {
1866            LOG_WARNING("{} - Failed to create bucket [{}]: {}",
1867                        connection.getId(),
1868                        name,
1869                        e.what());
1870            result = ENGINE_ENOMEM;
1871        }
1872
1873        if (result == ENGINE_SUCCESS) {
1874            {
1875                std::lock_guard<std::mutex> guard(bucket.mutex);
1876                bucket.state = BucketState::Ready;
1877            }
1878            LOG_INFO("{} - Bucket [{}] created successfully",
1879                     connection.getId(),
1880                     name);
1881            bucket.max_document_size =
1882                 engine->getMaxItemSize(reinterpret_cast<ENGINE_HANDLE*>(engine));
1883        } else {
1884            {
1885                std::lock_guard<std::mutex> guard(bucket.mutex);
1886                bucket.state = BucketState::Destroying;
1887            }
1888            engine->destroy(v1_handle_2_handle(engine), false);
1889            std::lock_guard<std::mutex> guard(bucket.mutex);
1890            bucket.state = BucketState::None;
1891            bucket.name[0] = '\0';
1892            bucket.engine = nullptr;
1893            delete bucket.topkeys;
1894            bucket.topkeys = nullptr;
1895            bucket.clusterConfiguration.reset();
1896
1897            result = ENGINE_NOT_STORED;
1898        }
1899    } else {
1900        {
1901            std::lock_guard<std::mutex> guard(bucket.mutex);
1902            bucket.state = BucketState::None;
1903            bucket.name[0] = '\0';
1904            bucket.engine = nullptr;
1905            delete bucket.topkeys;
1906            bucket.topkeys = nullptr;
1907            bucket.clusterConfiguration.reset();
1908        }
1909
1910        LOG_WARNING(
1911                "{} - Failed to create bucket [{}]: failed to "
1912                "create a new engine instance",
1913                connection.getId(),
1914                name);
1915        result = ENGINE_FAILED;
1916    }
1917}
1918
1919void CreateBucketThread::run()
1920{
1921    setRunning();
1922    // Perform the task without having any locks. The task should be
1923    // scheduled in a pending state so the executor won't try to touch
1924    // the object until we're telling it that it is runnable
1925    create();
1926    std::lock_guard<std::mutex> guard(task->getMutex());
1927    task->makeRunnable();
1928}
1929
1930void notify_thread_bucket_deletion(LIBEVENT_THREAD& me) {
1931    for (size_t ii = 0; ii < all_buckets.size(); ++ii) {
1932        bool destroy = false;
1933        {
1934            std::lock_guard<std::mutex> guard(all_buckets[ii].mutex);
1935            if (all_buckets[ii].state == BucketState::Destroying) {
1936                destroy = true;
1937            }
1938        }
1939
1940        if (destroy) {
1941            signal_idle_clients(&me, gsl::narrow<int>(ii), false);
1942        }
1943    }
1944}
1945
1946void DestroyBucketThread::destroy() {
1947    ENGINE_ERROR_CODE ret = ENGINE_KEY_ENOENT;
1948    std::unique_lock<std::mutex> all_bucket_lock(buckets_lock);
1949
1950    Connection* connection = nullptr;
1951    if (cookie != nullptr) {
1952        connection = &cookie->getConnection();
1953    }
1954
1955    /*
1956     * The destroy function will have access to a connection if the
1957     * McbpDestroyBucketTask originated from delete_bucket_executor().
1958     * However if we are in the process of shuting down and the
1959     * McbpDestroyBucketTask originated from main() then connection
1960     * will be set to nullptr.
1961     */
1962    const std::string connection_id{(connection == nullptr)
1963            ? "<none>"
1964            : std::to_string(connection->getId())};
1965
1966    size_t idx = 0;
1967    for (size_t ii = 0; ii < all_buckets.size(); ++ii) {
1968        std::lock_guard<std::mutex> guard(all_buckets[ii].mutex);
1969        if (name == all_buckets[ii].name) {
1970            idx = ii;
1971            if (all_buckets[ii].state == BucketState::Ready) {
1972                ret = ENGINE_SUCCESS;
1973                all_buckets[ii].state = BucketState::Destroying;
1974            } else {
1975                ret = ENGINE_KEY_EEXISTS;
1976            }
1977        }
1978        if (ret != ENGINE_KEY_ENOENT) {
1979            break;
1980        }
1981    }
1982    all_bucket_lock.unlock();
1983
1984    if (ret != ENGINE_SUCCESS) {
1985        auto code = engine_error_2_mcbp_protocol_error(ret);
1986        LOG_INFO("{} Delete bucket [{}]: {}",
1987                 connection_id,
1988                 name,
1989                 memcached_status_2_text(code));
1990        result = ret;
1991        return;
1992    }
1993
1994    LOG_INFO(
1995            "{} Delete bucket [{}]. Notifying all registered "
1996            "ON_DELETE_BUCKET callbacks",
1997            connection_id,
1998            name);
1999
2000    perform_callbacks(ON_DELETE_BUCKET, nullptr, &all_buckets[idx]);
2001
2002    LOG_INFO("{} Delete bucket [{}]. Wait for clients to disconnect",
2003             connection_id,
2004             name);
2005
2006    /* If this thread is connected to the requested bucket... release it */
2007    if (connection != nullptr && idx == size_t(connection->getBucketIndex())) {
2008        disassociate_bucket(*connection);
2009    }
2010
2011    /* Let all of the worker threads start invalidating connections */
2012    threads_initiate_bucket_deletion();
2013
2014    auto& bucket = all_buckets[idx];
2015
2016    /* Wait until all users disconnected... */
2017    {
2018        std::unique_lock<std::mutex> guard(bucket.mutex);
2019
2020        while (bucket.clients > 0) {
2021            LOG_INFO(
2022                    "{} Delete bucket [{}]. Still waiting: {} clients "
2023                    "connected",
2024                    connection_id.c_str(),
2025                    name.c_str(),
2026                    bucket.clients);
2027            /* drop the lock and notify the worker threads */
2028            guard.unlock();
2029            threads_notify_bucket_deletion();
2030            guard.lock();
2031
2032            bucket.cond.wait_for(guard,
2033                                 std::chrono::milliseconds(1000),
2034                                 [&bucket] { return bucket.clients == 0; });
2035        }
2036    }
2037
2038    /* Tell the worker threads to stop trying to invalidating connections */
2039    threads_complete_bucket_deletion();
2040
2041    /*
2042     * We cannot call assert_no_assocations(idx) because it iterates
2043     * over all connections and calls c->getBucketIndex().  The problem
2044     * is that a worker thread can call associate_initial_bucket() or
2045     * associate_bucket() at the same time.  This could lead to a call
2046     * to c->setBucketIndex(0) (the "no bucket"), which although safe,
2047     * raises a threadsanitizer warning.
2048
2049     * Note, if associate_bucket() attempts to associate a connection
2050     * with a bucket that has been destroyed, or is in the process of
2051     * being destroyed, the association will fail because
2052     * BucketState != Ready.  See associate_bucket() for more details.
2053     */
2054
2055    LOG_INFO(
2056            "{} Delete bucket [{}]. Shut down the bucket", connection_id, name);
2057
2058    bucket.engine->destroy(v1_handle_2_handle(bucket.engine), force);
2059
2060    LOG_INFO("{} Delete bucket [{}]. Clean up allocated resources ",
2061             connection_id,
2062             name);
2063
2064    /* Clean up the stats... */
2065    threadlocal_stats_reset(bucket.stats);
2066
2067    // Clear any registered event handlers
2068    for (auto& handler : bucket.engine_event_handlers) {
2069        handler.clear();
2070    }
2071
2072    {
2073        std::lock_guard<std::mutex> guard(bucket.mutex);
2074        bucket.state = BucketState::None;
2075        bucket.engine = NULL;
2076        bucket.name[0] = '\0';
2077        delete bucket.topkeys;
2078        bucket.responseCounters.fill(0);
2079        bucket.topkeys = nullptr;
2080        bucket.clusterConfiguration.reset();
2081    }
2082    // don't need lock because all timing data uses atomics
2083    bucket.timings.reset();
2084
2085    LOG_INFO("{} Delete bucket [{}] complete", connection_id, name);
2086    result = ENGINE_SUCCESS;
2087}
2088
2089void DestroyBucketThread::run() {
2090    setRunning();
2091    destroy();
2092    std::lock_guard<std::mutex> guard(task->getMutex());
2093    task->makeRunnable();
2094}
2095
2096static void initialize_buckets(void) {
2097    size_t numthread = settings.getNumWorkerThreads() + 1;
2098    for (auto &b : all_buckets) {
2099        b.stats.resize(numthread);
2100    }
2101
2102    // To make the life easier for us in the code, index 0
2103    // in the array is "no bucket"
2104    ENGINE_HANDLE *handle;
2105    cb_assert(new_engine_instance(BucketType::NoBucket,
2106                                  "<internal>",
2107                                  get_server_api,
2108                                  &handle,
2109                                  settings.extensions.logger));
2110
2111    cb_assert(handle != nullptr);
2112    auto &nobucket = all_buckets.at(0);
2113    nobucket.type = BucketType::NoBucket;
2114    nobucket.state = BucketState::Ready;
2115    nobucket.engine = (ENGINE_HANDLE_V1*)handle;
2116}
2117
2118static void cleanup_buckets(void) {
2119    for (auto &bucket : all_buckets) {
2120        bool waiting;
2121
2122        do {
2123            waiting = false;
2124            {
2125                std::lock_guard<std::mutex> guard(bucket.mutex);
2126                switch (bucket.state.load()) {
2127                case BucketState::Stopping:
2128                case BucketState::Destroying:
2129                case BucketState::Creating:
2130                case BucketState::Initializing:
2131                    waiting = true;
2132                    break;
2133                default:
2134                        /* Empty */
2135                        ;
2136                }
2137            }
2138            if (waiting) {
2139                usleep(250);
2140            }
2141        } while (waiting);
2142
2143        if (bucket.state == BucketState::Ready) {
2144            bucket.engine->destroy(v1_handle_2_handle(bucket.engine), false);
2145            delete bucket.topkeys;
2146        }
2147    }
2148}
2149
2150void delete_all_buckets() {
2151    /*
2152     * Delete all of the buckets one by one by using the executor.
2153     * We could in theory schedule all of them in parallel, but they
2154     * probably have some dirty items they need to write to disk so
2155     * instead of having all of the buckets step on the underlying IO
2156     * in parallel we'll do them sequentially.
2157     */
2158
2159    /**
2160     * Create a specialized task I may use that just holds the
2161     * DeleteBucketThread object.
2162     */
2163    class DestroyBucketTask : public Task {
2164    public:
2165        DestroyBucketTask(const std::string& name_)
2166            : thread(name_, false, nullptr, this)
2167        {
2168            // empty
2169        }
2170
2171        // start the bucket deletion
2172        // May throw std::bad_alloc if we're failing to start the thread
2173        void start() {
2174            thread.start();
2175        }
2176
2177        Status execute() override {
2178            return Status::Finished;
2179        }
2180
2181        DestroyBucketThread thread;
2182    };
2183
2184    LOG_INFO("Stop all buckets");
2185    bool done;
2186    do {
2187        done = true;
2188        std::shared_ptr<Task> task;
2189        std::string name;
2190
2191        std::unique_lock<std::mutex> all_bucket_lock(buckets_lock);
2192        /*
2193         * Start at one (not zero) because zero is reserved for "no bucket".
2194         * The "no bucket" has a state of BucketState::Ready but no name.
2195         */
2196        for (size_t ii = 1; ii < all_buckets.size() && done; ++ii) {
2197            std::lock_guard<std::mutex> bucket_guard(all_buckets[ii].mutex);
2198            if (all_buckets[ii].state == BucketState::Ready) {
2199                name.assign(all_buckets[ii].name);
2200                LOG_INFO("Scheduling delete for bucket {}", name);
2201                task = std::make_shared<DestroyBucketTask>(name);
2202                std::lock_guard<std::mutex> guard(task->getMutex());
2203                dynamic_cast<DestroyBucketTask&>(*task).start();
2204                executorPool->schedule(task, false);
2205                done = false;
2206            }
2207        }
2208        all_bucket_lock.unlock();
2209
2210        if (task.get() != nullptr) {
2211            auto& dbt = dynamic_cast<DestroyBucketTask&>(*task);
2212            LOG_INFO("Waiting for delete of {} to complete", name);
2213            dbt.thread.waitForState(Couchbase::ThreadState::Zombie);
2214            LOG_INFO("Bucket {} deleted", name);
2215        }
2216    } while (!done);
2217}
2218
2219static void set_max_filehandles(void) {
2220    const uint64_t maxfiles = settings.getMaxconns() +
2221                            (3 * (settings.getNumWorkerThreads() + 2)) +
2222                            1024;
2223
2224    auto limit = cb::io::maximizeFileDescriptors(maxfiles);
2225
2226    if (limit < maxfiles) {
2227        LOG_WARNING(
2228                "Failed to set the number of file descriptors "
2229                "to {} due to system resource restrictions. "
2230                "This may cause the system to misbehave once you reach a "
2231                "high connection count as the system won't be able open "
2232                "new files on the system. The maximum number of file "
2233                "descriptors is currently set to {}. The system "
2234                "is configured to allow {} number of client connections, "
2235                "and in addition to that the overhead of the worker "
2236                "threads is {}. Finally the backed database needs to "
2237                "open files to persist data.",
2238                int(maxfiles),
2239                int(limit),
2240                settings.getMaxconns(),
2241                (3 * (settings.getNumWorkerThreads() + 2)));
2242    }
2243}
2244
2245/**
2246 * The log function used from SASL
2247 *
2248 * Try to remap the log levels to our own levels and put in the log
2249 * depending on the severity.
2250 */
2251static void sasl_log_callback(cb::sasl::logging::Level level,
2252                              const std::string& message) {
2253    switch (level) {
2254    case cb::sasl::logging::Level::Error:
2255        LOG_ERROR("{}", message);
2256        break;
2257    case cb::sasl::logging::Level::Warning:
2258        LOG_WARNING("{}", message);
2259        break;
2260    case cb::sasl::logging::Level::Notice:
2261        LOG_INFO("{}", message);
2262        break;
2263    case cb::sasl::logging::Level::Fail:
2264    case cb::sasl::logging::Level::Debug:
2265        LOG_DEBUG("{}", message);
2266        break;
2267    case cb::sasl::logging::Level::Trace:
2268        LOG_TRACE("{}", message);
2269        break;
2270    }
2271}
2272
2273static int sasl_getopt_callback(void*, const char*,
2274                                const char* option,
2275                                const char** result,
2276                                unsigned* len) {
2277    if (option == nullptr || result == nullptr || len == nullptr) {
2278        return CBSASL_BADPARAM;
2279    }
2280
2281    std::string key(option);
2282
2283    if (key == "hmac iteration count") {
2284        // Speed up the test suite by reducing the SHA1 hmac calculations
2285        // from 4k to 10
2286        if (getenv("MEMCACHED_UNIT_TESTS") != nullptr) {
2287            *result = "10";
2288            *len = 2;
2289            return CBSASL_OK;
2290        }
2291    } else if (key == "sasl mechanisms") {
2292        const auto& value = settings.getSaslMechanisms();
2293        if (!value.empty()) {
2294            *result = value.data();
2295            *len = static_cast<unsigned int>(value.size());
2296            return CBSASL_OK;
2297        }
2298    }
2299
2300    return CBSASL_FAIL;
2301}
2302
2303static void initialize_sasl() {
2304    cb::sasl::logging::set_log_callback(sasl_log_callback);
2305
2306    cbsasl_callback_t sasl_callbacks[2];
2307    int ii = 0;
2308
2309    sasl_callbacks[ii].id = CBSASL_CB_GETOPT;
2310    sasl_callbacks[ii].proc = (int (*)(void))&sasl_getopt_callback;
2311    sasl_callbacks[ii].context = nullptr;
2312    sasl_callbacks[++ii].id = CBSASL_CB_LIST_END;
2313    sasl_callbacks[ii].proc = nullptr;
2314    sasl_callbacks[ii].context = nullptr;
2315
2316    if (cbsasl_server_init(sasl_callbacks, "memcached") != CBSASL_OK) {
2317        FATAL_ERROR(EXIT_FAILURE, "Failed to initialize SASL server");
2318    }
2319
2320    if (cb::sasl::plain::authenticate("default", "") == CBSASL_OK) {
2321        set_default_bucket_enabled(true);
2322    } else {
2323        set_default_bucket_enabled(false);
2324    }
2325}
2326
2327extern "C" int memcached_main(int argc, char **argv) {
2328    // MB-14649 log() crash on windows on some CPU's
2329#ifdef _WIN64
2330    _set_FMA3_enable (0);
2331#endif
2332
2333#ifdef HAVE_LIBNUMA
2334    // Configure NUMA policy as soon as possible (before any dynamic memory
2335    // allocation).
2336    const std::string numa_status = configure_numa_policy();
2337#endif
2338    std::unique_ptr<ParentMonitor> parent_monitor;
2339
2340    try {
2341        cb::logger::createConsoleLogger();
2342        settings.extensions.logger = &cb::logger::getLoggerDescriptor();
2343    } catch (const std::exception& e) {
2344        std::cerr << "Failed to create logger object: " << e.what()
2345                  << std::endl;
2346        exit(EXIT_FAILURE);
2347    }
2348
2349    // Setup terminate handler as early as possible to catch crashes
2350    // occurring during initialisation.
2351    install_backtrace_terminate_handler();
2352
2353    setup_libevent_locking();
2354
2355    initialize_openssl();
2356
2357    /* Initialize the socket subsystem */
2358    cb_initialize_sockets();
2359
2360    AllocHooks::initialize();
2361
2362    /* init settings */
2363    settings_init();
2364
2365    initialize_mbcp_lookup_map();
2366
2367    /* Parse command line arguments */
2368    try {
2369        parse_arguments(argc, argv);
2370    } catch (const std::exception& exception) {
2371        FATAL_ERROR(EXIT_FAILURE,
2372                    "Failed to initialize server: {}",
2373                    exception.what());
2374    }
2375
2376    update_settings_from_config();
2377
2378    cb::rbac::initialize();
2379
2380    if (getenv("COUCHBASE_FORCE_ENABLE_XATTR") != nullptr) {
2381        settings.setXattrEnabled(true);
2382    }
2383
2384    /* Initialize breakpad crash catcher with our just-parsed settings. */
2385    cb::breakpad::initialize(settings.getBreakpadSettings());
2386
2387    /* Configure file logger, if specified as a settings object */
2388    if (settings.has.logger) {
2389        auto ret = cb::logger::initialize(settings.getLoggerConfig());
2390        if (ret) {
2391            FATAL_ERROR(
2392                    EXIT_FAILURE, "Failed to initialize logger: {}", ret.get());
2393        }
2394    }
2395
2396    /* File-based logging available from this point onwards... */
2397
2398    /* Logging available now extensions have been loaded. */
2399    LOG_INFO("Couchbase version {} starting.", get_server_version());
2400
2401    LOG_INFO("Using SLA configuration: {}",
2402             to_string(cb::mcbp::sla::to_json(), false));
2403
2404    if (settings.isStdinListenerEnabled()) {
2405        LOG_INFO("Enable standard input listener");
2406        start_stdin_listener(shutdown_server);
2407    }
2408
2409#ifdef HAVE_LIBNUMA
2410    // Log the NUMA policy selected (now the logger is available).
2411    LOG_INFO("NUMA: {}", numa_status);
2412#endif
2413
2414    if (!settings.has.rbac_file) {
2415        FATAL_ERROR(EXIT_FAILURE, "RBAC file not specified");
2416    }
2417
2418    if (!cb::io::isFile(settings.getRbacFile())) {
2419        FATAL_ERROR(EXIT_FAILURE,
2420                    "RBAC [{}] does not exist",
2421                    settings.getRbacFile());
2422    }
2423
2424    LOG_INFO("Loading RBAC configuration from [{}]", settings.getRbacFile());
2425    cb::rbac::loadPrivilegeDatabase(settings.getRbacFile());
2426
2427    LOG_INFO("Loading error maps from [{}]", settings.getErrorMapsDir());
2428    try {
2429        settings.loadErrorMaps(settings.getErrorMapsDir());
2430    } catch (const std::exception& e) {
2431        FATAL_ERROR(EXIT_FAILURE, "Failed to load error maps: {}", e.what());
2432    }
2433
2434    initialize_audit();
2435
2436    /* inform interested parties of initial verbosity level */
2437    perform_callbacks(ON_LOG_LEVEL, NULL, NULL);
2438
2439    set_max_filehandles();
2440
2441    /* Aggregate the maximum number of connections */
2442    settings.calculateMaxconns();
2443
2444    {
2445        char *errmsg;
2446        if (!initialize_engine_map(&errmsg)) {
2447            FATAL_ERROR(EXIT_FAILURE,
2448                        "Unable to initialize engine "
2449                        "map: {}",
2450                        errmsg);
2451        }
2452    }
2453
2454    /* Initialize bucket engine */
2455    initialize_buckets();
2456
2457    initialize_sasl();
2458
2459    /* initialize main thread libevent instance */
2460    main_base = event_base_new();
2461
2462    /* Initialize signal handlers (requires libevent). */
2463    if (!install_signal_handlers()) {
2464        FATAL_ERROR(EXIT_FAILURE, "Unable to install signal handlers");
2465    }
2466
2467    /* initialize other stuff */
2468    stats_init();
2469
2470#ifndef WIN32
2471    /*
2472     * ignore SIGPIPE signals; we can use errno == EPIPE if we
2473     * need that information
2474     */
2475    if (sigignore(SIGPIPE) == -1) {
2476        FATAL_ERROR(EX_OSERR, "Failed to ignore SIGPIPE; sigaction");
2477    }
2478#endif
2479
2480    /* create the listening socket(s), bind, and init. Note this is done
2481     * before starting worker threads; so _if_ any required sockets fail then
2482     * terminating is simpler as we don't need to shutdown the workers.
2483     */
2484    create_listen_sockets(true);
2485
2486    /* start up worker threads if MT mode */
2487    thread_init(settings.getNumWorkerThreads(), main_base, dispatch_event_handler);
2488
2489    executorPool.reset(new ExecutorPool(settings.getNumWorkerThreads()));
2490
2491    initializeTracing();
2492    TRACE_GLOBAL0("memcached", "Started");
2493
2494    /*
2495     * MB-20034.
2496     * Now that all threads have been created, e.g. the audit thread, threads
2497     * associated with extensions and the workers, we can enable shutdown.
2498     */
2499    enable_shutdown();
2500
2501    /* Initialise memcached time keeping */
2502    mc_time_init(main_base);
2503
2504    /* Optional parent monitor */
2505    char *env = getenv("MEMCACHED_PARENT_MONITOR");
2506    if (env != NULL) {
2507        LOG_INFO("Starting parent monitor");
2508        parent_monitor.reset(new ParentMonitor(std::stoi(env)));
2509    }
2510
2511    if (!memcached_shutdown) {
2512        /* enter the event loop */
2513        LOG_INFO("Initialization complete. Accepting clients.");
2514        service_online = true;
2515        event_base_loop(main_base, 0);
2516        service_online = false;
2517    }
2518
2519    LOG_INFO("Initiating graceful shutdown.");
2520    delete_all_buckets();
2521
2522    if (parent_monitor.get() != nullptr) {
2523        LOG_INFO("Shutting down parent monitor");
2524        parent_monitor.reset();
2525    }
2526
2527    LOG_INFO("Shutting down audit daemon");
2528    /* Close down the audit daemon cleanly */
2529    shutdown_auditdaemon(get_audit_handle());
2530
2531    LOG_INFO("Shutting down client worker threads");
2532    threads_shutdown();
2533
2534    LOG_INFO("Releasing server sockets");
2535    listen_conn.clear();
2536
2537    LOG_INFO("Releasing client resources");
2538    close_all_connections();
2539
2540    LOG_INFO("Releasing bucket resources");
2541    cleanup_buckets();
2542
2543    LOG_INFO("Shutting down RBAC subsystem");
2544    cb::rbac::destroy();
2545
2546    LOG_INFO("Releasing thread resources");
2547    threads_cleanup();
2548
2549    LOG_INFO("Shutting down executor pool");
2550    executorPool.reset();
2551
2552    LOG_INFO("Releasing signal handlers");
2553    release_signal_handlers();
2554
2555    LOG_INFO("Shutting down SASL server");
2556    cbsasl_server_term();
2557
2558    LOG_INFO("Releasing connection objects");
2559    destroy_connections();
2560
2561    LOG_INFO("Deinitialising tracing");
2562    deinitializeTracing();
2563
2564    LOG_INFO("Shutting down engine map");
2565    shutdown_engine_map();
2566
2567    LOG_INFO("Removing breakpad");
2568    cb::breakpad::destroy();
2569
2570    LOG_INFO("Releasing callbacks");
2571    free_callbacks();
2572
2573    LOG_INFO("Shutting down OpenSSL");
2574    shutdown_openssl();
2575
2576    LOG_INFO("Shutting down libevent");
2577    event_base_free(main_base);
2578
2579    LOG_INFO("Shutting down logger extension");
2580    // drop my handle to the logger
2581    cb::logger::reset();
2582    cb::logger::shutdown();
2583
2584    return EXIT_SUCCESS;
2585}
2586