1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *  memcached - memory caching daemon
4  *
5  *       http://www.danga.com/memcached/
6  *
7  *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8  *
9  *  Use and distribution licensed under the BSD license.  See
10  *  the LICENSE file for full text.
11  *
12  *  Authors:
13  *      Anatoly Vorobey <mellon@pobox.com>
14  *      Brad Fitzpatrick <brad@danga.com>
15  */
16 #include "config.h"
17 #include "memcached.h"
18 #include "alloc_hooks.h"
19 #include "buckets.h"
20 #include "cmdline.h"
21 #include "config_parse.h"
22 #include "connections.h"
23 #include "debug_helpers.h"
24 #include "doc_pre_expiry.h"
25 #include "enginemap.h"
26 #include "executorpool.h"
27 #include "ioctl.h"
28 #include "libevent_locking.h"
29 #include "logger/logger.h"
30 #include "mc_time.h"
31 #include "mcaudit.h"
32 #include "mcbp.h"
33 #include "mcbp_executors.h"
34 #include "mcbp_topkeys.h"
35 #include "mcbp_validators.h"
36 #include "mcbpdestroybuckettask.h"
37 #include "memcached/audit_interface.h"
38 #include "memcached_openssl.h"
39 #include "parent_monitor.h"
40 #include "protocol/mcbp/engine_wrapper.h"
41 #include "runtime.h"
42 #include "server_socket.h"
43 #include "session_cas.h"
44 #include "settings.h"
45 #include "stats.h"
46 #include "subdocument.h"
47 #include "timings.h"
48 #include "topkeys.h"
49 #include "tracing.h"
50 #include "utilities/engine_loader.h"
51 #include "utilities/protocol2text.h"
52 #include "utilities/terminate_handler.h"
53 
54 #include <mcbp/mcbp.h>
55 #include <memcached/util.h>
56 #include <phosphor/phosphor.h>
57 #include <platform/cb_malloc.h>
58 #include <platform/dirutils.h>
59 #include <platform/make_unique.h>
60 #include <platform/socket.h>
61 #include <platform/strerror.h>
62 #include <platform/sysinfo.h>
63 #include <utilities/breakpad.h>
64 
65 #include <signal.h>
66 #include <fcntl.h>
67 #include <errno.h>
68 #include <stdlib.h>
69 #include <stdio.h>
70 #include <string.h>
71 #include <time.h>
72 #include <limits.h>
73 #include <ctype.h>
74 #include <memcached/rbac.h>
75 #include <stdarg.h>
76 #include <stddef.h>
77 #include <snappy-c.h>
78 #include <cJSON.h>
79 #include <JSON_checker.h>
80 #include <engines/default_engine.h>
81 #include <vector>
82 #include <algorithm>
83 #include <cJSON_utils.h>
84 
85 // MB-14649: log crashing on windows..
86 #include <math.h>
87 #include <memcached/audit_interface.h>
88 #include <memcached/server_api.h>
89 
90 #include <gsl/gsl>
91 
92 #if HAVE_LIBNUMA
93 #include <numa.h>
94 #endif
95 
96 static EXTENSION_LOG_LEVEL get_log_level(void);
97 
98 /**
99  * All of the buckets in couchbase is stored in this array.
100  */
101 static std::mutex buckets_lock;
102 std::array<Bucket, COUCHBASE_MAX_NUM_BUCKETS + 1> all_buckets;
103 
v1_handle_2_handle(ENGINE_HANDLE_V1* v1)104 static ENGINE_HANDLE* v1_handle_2_handle(ENGINE_HANDLE_V1* v1) {
105     return reinterpret_cast<ENGINE_HANDLE*>(v1);
106 }
107 
getBucketName(const Connection* c)108 const char* getBucketName(const Connection* c) {
109     return all_buckets[c->getBucketIndex()].name;
110 }
111 
bucketsForEach(std::function<bool(Bucket&, void*)> fn, void *arg)112 void bucketsForEach(std::function<bool(Bucket&, void*)> fn, void *arg) {
113     std::lock_guard<std::mutex> all_bucket_lock(buckets_lock);
114     for (Bucket& bucket : all_buckets) {
115         bool do_break = false;
116         std::lock_guard<std::mutex> guard(bucket.mutex);
117         if (bucket.state == BucketState::Ready) {
118             if (!fn(bucket, arg)) {
119                 do_break = true;
120             }
121         }
122         if (do_break) {
123             break;
124         }
125     }
126 }
127 
128 std::atomic<bool> memcached_shutdown;
129 std::atomic<bool> service_online;
130 // Should we enable to common ports (all of the ports which arn't tagged as
131 // management ports)
132 static std::atomic<bool> enable_common_ports;
133 
134 std::unique_ptr<ExecutorPool> executorPool;
135 
136 /* Mutex for global stats */
137 std::mutex stats_mutex;
138 
139 /*
140  * forward declarations
141  */
142 static void register_callback(ENGINE_HANDLE *eh,
143                               ENGINE_EVENT_TYPE type,
144                               EVENT_CALLBACK cb, const void *cb_data);
145 
146 static void create_listen_sockets(bool management);
147 
148 /* stats */
149 static void stats_init(void);
150 
151 /* defaults */
152 static void settings_init(void);
153 
154 /** exported globals **/
155 struct stats stats;
156 
157 /** file scope variables **/
158 std::vector<std::unique_ptr<ServerSocket>> listen_conn;
159 static struct event_base *main_base;
160 
161 static engine_event_handler_array_t engine_event_handlers;
162 
163 /*
164  * MB-12470 requests an easy way to see when (some of) the statistics
165  * counters were reset. This functions grabs the current time and tries
166  * to format it to the current timezone by using ctime_r/s (which adds
167  * a newline at the end for some obscure reason which we'll need to
168  * strip off).
169  *
170  * This function expects that the stats lock is held by the caller to get
171  * a "sane" result (otherwise one thread may see a garbled version), but
172  * no crash will occur since the buffer is big enough and always zero
173  * terminated.
174  */
175 char reset_stats_time[80];
set_stats_reset_time(void)176 static void set_stats_reset_time(void)
177 {
178     time_t now = time(NULL);
179 #ifdef WIN32
180     ctime_s(reset_stats_time, sizeof(reset_stats_time), &now);
181 #else
182     ctime_r(&now, reset_stats_time);
183 #endif
184     char *ptr = strchr(reset_stats_time, '\n');
185     if (ptr) {
186         *ptr = '\0';
187     }
188 }
189 
disassociate_bucket(Connection& connection)190 void disassociate_bucket(Connection& connection) {
191     Bucket& b = all_buckets.at(connection.getBucketIndex());
192     std::lock_guard<std::mutex> guard(b.mutex);
193     b.clients--;
194 
195     connection.setBucketIndex(0);
196     connection.setBucketEngine(nullptr);
197 
198     if (b.clients == 0 && b.state == BucketState::Destroying) {
199         b.cond.notify_one();
200     }
201 }
202 
associate_bucket(Connection& connection, const char* name)203 bool associate_bucket(Connection& connection, const char* name) {
204     bool found = false;
205 
206     /* leave the current bucket */
207     disassociate_bucket(connection);
208 
209     /* Try to associate with the named bucket */
210     /* @todo add auth checks!!! */
211     for (size_t ii = 1; ii < all_buckets.size() && !found; ++ii) {
212         Bucket &b = all_buckets.at(ii);
213         std::lock_guard<std::mutex> guard(b.mutex);
214         if (b.state == BucketState::Ready && strcmp(b.name, name) == 0) {
215             b.clients++;
216             connection.setBucketIndex(gsl::narrow<int>(ii));
217             connection.setBucketEngine(b.engine);
218             audit_bucket_selection(connection);
219             found = true;
220         }
221     }
222 
223     if (!found) {
224         /* Bucket not found, connect to the "no-bucket" */
225         Bucket &b = all_buckets.at(0);
226         {
227             std::lock_guard<std::mutex> guard(b.mutex);
228             b.clients++;
229         }
230         connection.setBucketIndex(0);
231         connection.setBucketEngine(b.engine);
232     }
233 
234     return found;
235 }
236 
associate_initial_bucket(Connection& connection)237 void associate_initial_bucket(Connection& connection) {
238     Bucket &b = all_buckets.at(0);
239     {
240         std::lock_guard<std::mutex> guard(b.mutex);
241         b.clients++;
242     }
243 
244     connection.setBucketIndex(0);
245     connection.setBucketEngine(b.engine);
246 
247     if (is_default_bucket_enabled()) {
248         associate_bucket(connection, "default");
249     }
250 }
251 
populate_log_level(void*)252 static void populate_log_level(void*) {
253     // Lock the entire buckets array so that buckets can't be modified while
254     // we notify them (blocking bucket creation/deletion)
255     auto val = get_log_level();
256 
257     std::lock_guard<std::mutex> all_bucket_lock(buckets_lock);
258     for (auto& bucket : all_buckets) {
259         std::lock_guard<std::mutex> guard(bucket.mutex);
260         if (bucket.state == BucketState::Ready &&
261             bucket.engine->set_log_level != nullptr) {
262             bucket.engine->set_log_level(reinterpret_cast<ENGINE_HANDLE*>(bucket.engine),
263                                          val);
264         }
265     }
266 }
267 
268 /* Perform all callbacks of a given type for the given connection. */
perform_callbacks(ENGINE_EVENT_TYPE type, const void *data, const void *void_cookie)269 void perform_callbacks(ENGINE_EVENT_TYPE type,
270                        const void *data,
271                        const void *void_cookie)
272 {
273     cb_thread_t tid;
274 
275     switch (type) {
276         /*
277          * The following events operates on a connection which is passed in
278          * as the cookie.
279          */
280     case ON_DISCONNECT: {
281         const auto * cookie = reinterpret_cast<const Cookie *>(void_cookie);
282         if (cookie == nullptr) {
283             throw std::invalid_argument("perform_callbacks: cookie is nullptr");
284         }
285         const auto bucket_idx = cookie->getConnection().getBucketIndex();
286         if (bucket_idx == -1) {
287             throw std::logic_error(
288                     "perform_callbacks: connection (which is " +
289                     std::to_string(cookie->getConnection().getId()) +
290                     ") cannot be "
291                     "disconnected as it is not associated with a bucket");
292         }
293 
294         for (auto& handler : all_buckets[bucket_idx].engine_event_handlers[type]) {
295             handler.cb(void_cookie, ON_DISCONNECT, data, handler.cb_data);
296         }
297         break;
298     }
299     case ON_LOG_LEVEL:
300         if (void_cookie != nullptr) {
301             throw std::invalid_argument("perform_callbacks: cookie "
302                 "(which is " +
303                 std::to_string(reinterpret_cast<uintptr_t>(void_cookie)) +
304                 ") should be NULL for ON_LOG_LEVEL");
305         }
306         for (auto& handler : engine_event_handlers[type]) {
307             handler.cb(void_cookie, ON_LOG_LEVEL, data, handler.cb_data);
308         }
309 
310         if (service_online) {
311             if (cb_create_thread(&tid, populate_log_level, nullptr, 1) == -1) {
312                 LOG_WARNING(
313                         "Failed to create thread to notify engines about "
314                         "changing log level");
315             }
316         }
317         break;
318 
319     case ON_DELETE_BUCKET: {
320         /** cookie is the bucket entry */
321         auto* bucket = reinterpret_cast<const Bucket*>(void_cookie);
322         for (auto& handler : bucket->engine_event_handlers[type]) {
323             handler.cb(void_cookie, ON_DELETE_BUCKET, data, handler.cb_data);
324         }
325         break;
326     }
327 
328     case ON_INIT_COMPLETE:
329         if ((data != nullptr) || (void_cookie != nullptr)) {
330             throw std::invalid_argument("perform_callbacks: data and cookie"
331                                             " should be nullptr");
332         }
333         enable_common_ports.store(true, std::memory_order_release);
334         notify_dispatcher();
335         break;
336 
337     default:
338         throw std::invalid_argument("perform_callbacks: type "
339                 "(which is " + std::to_string(type) +
340                 "is not a valid ENGINE_EVENT_TYPE");
341     }
342 }
343 
register_callback(ENGINE_HANDLE *eh, ENGINE_EVENT_TYPE type, EVENT_CALLBACK cb, const void *cb_data)344 static void register_callback(ENGINE_HANDLE *eh,
345                               ENGINE_EVENT_TYPE type,
346                               EVENT_CALLBACK cb,
347                               const void *cb_data)
348 {
349     size_t idx;
350     switch (type) {
351     /*
352      * The following events operates on a connection which is passed in
353      * as the cookie.
354      */
355     case ON_DISCONNECT:
356     case ON_DELETE_BUCKET:
357         if (eh == nullptr) {
358             throw std::invalid_argument("register_callback: 'eh' must be non-NULL");
359         }
360         for (idx = 0; idx < all_buckets.size(); ++idx) {
361             if ((void *)eh == (void *)all_buckets[idx].engine) {
362                 break;
363             }
364         }
365         if (idx == all_buckets.size()) {
366             throw std::invalid_argument("register_callback: eh (which is " +
367                     std::to_string(reinterpret_cast<uintptr_t>(eh)) +
368                     ") is not a engine associated with a bucket");
369         }
370         all_buckets[idx].engine_event_handlers[type].push_back({cb, cb_data});
371         break;
372 
373     case ON_LOG_LEVEL:
374         if (eh != nullptr) {
375             throw std::invalid_argument("register_callback: 'eh' must be NULL");
376         }
377         engine_event_handlers[type].push_back({cb, cb_data});
378         break;
379 
380     default:
381         throw std::invalid_argument("register_callback: type (which is " +
382                                     std::to_string(type) +
383                                     ") is not a valid ENGINE_EVENT_TYPE");
384     }
385 }
386 
free_callbacks()387 static void free_callbacks() {
388     // free per-bucket callbacks.
389     for (size_t idx = 0; idx < all_buckets.size(); ++idx) {
390         for (auto& type_vec : all_buckets[idx].engine_event_handlers) {
391             type_vec.clear();
392         }
393     }
394 
395     // free global callbacks
396     for (auto& type_vec : engine_event_handlers) {
397         type_vec.clear();
398     }
399 }
400 
stats_init(void)401 static void stats_init(void) {
402     set_stats_reset_time();
403     stats.conn_structs.reset();
404     stats.total_conns.reset();
405     stats.daemon_conns.reset();
406     stats.rejected_conns.reset();
407     stats.curr_conns.store(0, std::memory_order_relaxed);
408 }
409 
get_thread_stats(Connection* c)410 struct thread_stats* get_thread_stats(Connection* c) {
411     cb_assert(c->getThread()->index < (settings.getNumWorkerThreads() + 1));
412     auto& independent_stats = all_buckets[c->getBucketIndex()].stats;
413     return &independent_stats.at(c->getThread()->index);
414 }
415 
stats_reset(Cookie& cookie)416 void stats_reset(Cookie& cookie) {
417     {
418         std::lock_guard<std::mutex> guard(stats_mutex);
419         set_stats_reset_time();
420     }
421     stats.total_conns.reset();
422     stats.rejected_conns.reset();
423     threadlocal_stats_reset(cookie.getConnection().getBucket().stats);
424     bucket_reset_stats(cookie);
425 }
426 
get_number_of_worker_threads()427 static size_t get_number_of_worker_threads() {
428     size_t ret;
429     char *override = getenv("MEMCACHED_NUM_CPUS");
430     if (override == NULL) {
431         // No override specified; determine worker thread count based
432         // on the CPU count:
433         //     <5 cores: create 4 workers.
434         //    >5+ cores: create #CPUs * 7/8.
435         ret = Couchbase::get_available_cpu_count();
436 
437         if (ret > 4) {
438             ret = (ret * 7) / 8;
439         }
440         if (ret < 4) {
441             ret = 4;
442         }
443     } else {
444         ret = std::stoull(override);
445         if (ret == 0) {
446             ret = 4;
447         }
448     }
449     return ret;
450 }
451 
breakpad_changed_listener(const std::string&, Settings &s)452 static void breakpad_changed_listener(const std::string&, Settings &s) {
453     cb::breakpad::initialize(s.getBreakpadSettings());
454 }
455 
ssl_minimum_protocol_changed_listener(const std::string&, Settings &s)456 static void ssl_minimum_protocol_changed_listener(const std::string&, Settings &s) {
457     set_ssl_protocol_mask(s.getSslMinimumProtocol());
458 }
459 
ssl_cipher_list_changed_listener(const std::string&, Settings &s)460 static void ssl_cipher_list_changed_listener(const std::string&, Settings &s) {
461     set_ssl_cipher_list(s.getSslCipherList());
462 }
463 
verbosity_changed_listener(const std::string&, Settings &s)464 static void verbosity_changed_listener(const std::string&, Settings &s) {
465     auto logger = cb::logger::get();
466     if (logger) {
467         logger->set_level(cb::logger::convertToSpdSeverity(get_log_level()));
468     }
469 
470     perform_callbacks(ON_LOG_LEVEL, NULL, NULL);
471 }
472 
saslauthd_socketpath_changed_listener(const std::string&, Settings &s)473 static void saslauthd_socketpath_changed_listener(const std::string&, Settings &s) {
474     cb::sasl::saslauthd::set_socketpath(s.getSaslauthdSocketpath());
475 }
476 
scramsha_fallback_salt_changed_listener(const std::string&, Settings& s)477 static void scramsha_fallback_salt_changed_listener(const std::string&,
478                                                     Settings& s) {
479     cb::sasl::set_scramsha_fallback_salt(s.getScramshaFallbackSalt());
480 }
481 
opcode_attributes_override_changed_listener(const std::string&, Settings& s)482 static void opcode_attributes_override_changed_listener(const std::string&,
483                                                         Settings& s) {
484     unique_cJSON_ptr json(cJSON_Parse(s.getOpcodeAttributesOverride().c_str()));
485     if (json) {
486         cb::mcbp::sla::reconfigure(settings.getRoot(), *json);
487     } else {
488         cb::mcbp::sla::reconfigure(settings.getRoot());
489     }
490     LOG_INFO("SLA configuration changed to: {}",
491              to_string(cb::mcbp::sla::to_json(), false));
492 }
493 
interfaces_changed_listener(const std::string&, Settings &s)494 static void interfaces_changed_listener(const std::string&, Settings &s) {
495     for (const auto& ifc : s.getInterfaces()) {
496         auto* port = get_listening_port_instance(ifc.port);
497         if (port != nullptr) {
498             if (port->maxconns != ifc.maxconn) {
499                 port->maxconns = ifc.maxconn;
500             }
501 
502             if (port->backlog != ifc.backlog) {
503                 port->backlog = ifc.backlog;
504             }
505 
506             if (port->tcp_nodelay != ifc.tcp_nodelay) {
507                 port->tcp_nodelay = ifc.tcp_nodelay;
508             }
509         }
510     }
511     s.calculateMaxconns();
512 }
513 
514 #ifdef HAVE_LIBNUMA
515 /** Configure the NUMA policy for memcached. By default will attempt to set to
516  *  interleaved polocy, unless the env var MEMCACHED_NUMA_MEM_POLICY is set to
517  *  'disable'.
518  *  @return A log message describing what action was taken.
519  *
520  */
configure_numa_policy()521 static std::string configure_numa_policy() {
522     if (numa_available() != 0) {
523         return "Not available - not setting mem policy.";
524     }
525 
526     // Attempt to set the default NUMA memory policy to interleaved,
527     // unless overridden by our env var.
528     const char* mem_policy_env = getenv("MEMCACHED_NUMA_MEM_POLICY");
529     if (mem_policy_env != NULL && strcmp("disable", mem_policy_env) == 0) {
530         return std::string("NOT setting memory allocation policy - disabled "
531                 "via MEMCACHED_NUMA_MEM_POLICY='") + mem_policy_env + "'";
532     } else {
533         errno = 0;
534         numa_set_interleave_mask(numa_all_nodes_ptr);
535         if (errno == 0) {
536             return "Set memory allocation policy to 'interleave'";
537         } else {
538             return std::string("NOT setting memory allocation policy to "
539                     "'interleave' - request failed: ") + cb_strerror();
540         }
541     }
542 }
543 #endif  // HAVE_LIBNUMA
544 
settings_init(void)545 static void settings_init(void) {
546     // Set up the listener functions
547     settings.addChangeListener("breakpad",
548                                breakpad_changed_listener);
549     settings.addChangeListener("ssl_minimum_protocol",
550                                ssl_minimum_protocol_changed_listener);
551     settings.addChangeListener("ssl_cipher_list",
552                                ssl_cipher_list_changed_listener);
553     settings.addChangeListener("verbosity", verbosity_changed_listener);
554     settings.addChangeListener("interfaces", interfaces_changed_listener);
555     settings.addChangeListener("saslauthd_socketpath",
556                                saslauthd_socketpath_changed_listener);
557     settings.addChangeListener("scramsha_fallback_salt",
558                                scramsha_fallback_salt_changed_listener);
559     NetworkInterface default_interface;
560     settings.addInterface(default_interface);
561 
562     settings.setBioDrainBufferSize(8192);
563 
564     settings.setVerbose(0);
565     settings.setConnectionIdleTime(0); // Connection idle time disabled
566     settings.setNumWorkerThreads(get_number_of_worker_threads());
567     settings.setDatatypeJsonEnabled(true);
568     settings.setDatatypeSnappyEnabled(true);
569     settings.setRequestsPerEventNotification(50, EventPriority::High);
570     settings.setRequestsPerEventNotification(5, EventPriority::Medium);
571     settings.setRequestsPerEventNotification(1, EventPriority::Low);
572     settings.setRequestsPerEventNotification(20, EventPriority::Default);
573 
574     /*
575      * The max object size is 20MB. Let's allow packets up to 30MB to
576      * be handled "properly" by returing E2BIG, but packets bigger
577      * than that will cause the server to disconnect the client
578      */
579     settings.setMaxPacketSize(30 * 1024 * 1024);
580 
581     settings.setDedupeNmvbMaps(false);
582 
583     char *tmp = getenv("MEMCACHED_TOP_KEYS");
584     settings.setTopkeysSize(20);
585     if (tmp != NULL) {
586         int count;
587         if (safe_strtol(tmp, count)) {
588             settings.setTopkeysSize(count);
589         }
590     }
591 
592     {
593         // MB-13642 Allow the user to specify the SSL cipher list
594         //    If someone wants to use SSL we should try to be "secure
595         //    by default", and only allow for using strong ciphers.
596         //    Users that may want to use a less secure cipher list
597         //    should be allowed to do so by setting an environment
598         //    variable (since there is no place in the UI to do
599         //    so currently). Whenever ns_server allows for specifying
600         //    the SSL cipher list in the UI, it will be stored
601         //    in memcached.json and override these settings.
602         const char *env = getenv("COUCHBASE_SSL_CIPHER_LIST");
603         if (env == nullptr) {
604             settings.setSslCipherList("HIGH");
605         } else {
606             settings.setSslCipherList(env);
607         }
608     }
609 
610     settings.setSslMinimumProtocol("tlsv1");
611     /*
612      * MB-22586
613      * When connecting over SSL there isn't much point of using SCRAM or
614      * any other CPU intensive (and multiple packets exchange between
615      * client and server) to avoid sending the password in clear text as
616      * everything going over SSL should be encrypted anyway.
617      */
618     if (getenv("COUCHBASE_I_DONT_TRUST_SSL") == nullptr) {
619         settings.setSslSaslMechanisms("PLAIN");
620     }
621 
622     if (getenv("COUCHBASE_ENABLE_PRIVILEGE_DEBUG") != nullptr) {
623         settings.setPrivilegeDebug(true);
624     }
625 
626     settings.setTopkeysEnabled(true);
627 }
628 
629 /**
630  * The config file may have altered some of the default values we're
631  * caching in other variables. This is the place where we'd propagate
632  * such changes.
633  *
634  * This is also the place to initialize any additional files needed by
635  * Memcached.
636  */
update_settings_from_config(void)637 static void update_settings_from_config(void)
638 {
639     std::string root(DESTINATION_ROOT);
640 
641     if (!settings.getRoot().empty()) {
642         root = settings.getRoot().c_str();
643     }
644 
645     if (settings.getErrorMapsDir().empty()) {
646         // Set the error map dir.
647         std::string error_maps_dir(root + "/etc/couchbase/kv/error_maps");
648         cb::io::sanitizePath(error_maps_dir);
649         if (cb::io::isDirectory(error_maps_dir)) {
650             settings.setErrorMapsDir(error_maps_dir);
651         }
652     }
653 
654     // If the user didn't set a socket path, use the default
655     if (settings.getSaslauthdSocketpath().empty()) {
656         const char* path = getenv("CBAUTH_SOCKPATH");
657         if (path == nullptr) {
658             path = "/var/run/saslauthd/mux";
659         }
660 
661         settings.setSaslauthdSocketpath(path);
662     }
663 
664     try {
665         cb::mcbp::sla::reconfigure(root);
666     } catch (const std::exception& e) {
667         FATAL_ERROR(EXIT_FAILURE, e.what());
668     }
669 
670     settings.addChangeListener("opcode_attributes_override",
671                                opcode_attributes_override_changed_listener);
672 }
673 
674 struct {
675     std::mutex mutex;
676     bool disabled;
677     ssize_t count;
678     uint64_t num_disable;
679 } listen_state;
680 
is_listen_disabled(void)681 bool is_listen_disabled(void) {
682     std::lock_guard<std::mutex> guard(listen_state.mutex);
683     return listen_state.disabled;
684 }
685 
get_listen_disabled_num(void)686 uint64_t get_listen_disabled_num(void) {
687     std::lock_guard<std::mutex> guard(listen_state.mutex);
688     return listen_state.num_disable;
689 }
690 
disable_listen()691 void disable_listen() {
692     {
693         std::lock_guard<std::mutex> guard(listen_state.mutex);
694         listen_state.disabled = true;
695         listen_state.count = 10;
696         ++listen_state.num_disable;
697     }
698 
699     for (auto& connection : listen_conn) {
700         connection->disable();
701     }
702 }
703 
safe_close(SOCKET sfd)704 void safe_close(SOCKET sfd) {
705     if (sfd != INVALID_SOCKET) {
706         int rval;
707 
708         do {
709             rval = evutil_closesocket(sfd);
710         } while (rval == SOCKET_ERROR &&
711                  cb::net::is_interrupted(cb::net::get_socket_error()));
712 
713         if (rval == SOCKET_ERROR) {
714             std::string error = cb_strerror();
715             LOG_WARNING("Failed to close socket {} ({})!!", (int)sfd, error);
716         } else {
717             stats.curr_conns.fetch_sub(1, std::memory_order_relaxed);
718             if (is_listen_disabled()) {
719                 notify_dispatcher();
720             }
721         }
722     }
723 }
724 
get_bucket_id(gsl::not_null<const void*> void_cookie)725 bucket_id_t get_bucket_id(gsl::not_null<const void*> void_cookie) {
726     /* @todo fix this. Currently we're using the index as the id,
727      * but this should be changed to be a uniqe ID that won't be
728      * reused.
729      */
730     auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
731     return bucket_id_t(cookie->getConnection().getBucketIndex());
732 }
733 
get_connection_id(gsl::not_null<const void*> void_cookie)734 uint64_t get_connection_id(gsl::not_null<const void*> void_cookie) {
735     auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
736     return uint64_t(&cookie->getConnection());
737 }
738 
cookie_get_log_info( gsl::not_null<const void*> void_cookie)739 std::pair<uint32_t, std::string> cookie_get_log_info(
740         gsl::not_null<const void*> void_cookie) {
741     auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
742     return std::make_pair(cookie->getConnection().getId(),
743                           cookie->getConnection().getDescription());
744 }
745 
cookie_set_error_context(gsl::not_null<void*> void_cookie, cb::const_char_buffer message)746 void cookie_set_error_context(gsl::not_null<void*> void_cookie,
747                               cb::const_char_buffer message) {
748     auto* cookie = reinterpret_cast<Cookie*>(void_cookie.get());
749     cookie->setErrorContext(to_string(message));
750 }
751 
752 /**
753  * Check if the cookie holds the privilege
754  *
755  * @param void_cookie this is the cookie passed down to the engine.
756  * @param privilege The privilege to check for
757  * @return if the privilege is held or not (or if the privilege data is stale)
758  */
check_privilege( gsl::not_null<const void*> void_cookie, const cb::rbac::Privilege privilege)759 static cb::rbac::PrivilegeAccess check_privilege(
760         gsl::not_null<const void*> void_cookie,
761         const cb::rbac::Privilege privilege) {
762     auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
763     return cookie->getConnection().checkPrivilege(privilege,
764                                                   const_cast<Cookie&>(*cookie));
765 }
766 
engine_error2mcbp( gsl::not_null<const void*> void_cookie, ENGINE_ERROR_CODE code)767 static protocol_binary_response_status engine_error2mcbp(
768         gsl::not_null<const void*> void_cookie, ENGINE_ERROR_CODE code) {
769     const auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
770     auto& connection = cookie->getConnection();
771 
772     ENGINE_ERROR_CODE status = connection.remapErrorCode(code);
773     if (status == ENGINE_DISCONNECT) {
774         throw cb::engine_error(
775                 cb::engine_errc::disconnect,
776                 "engine_error2mcbp: " + std::to_string(connection.getId()) +
777                         ": Disconnect client");
778     }
779 
780     return engine_error_2_mcbp_protocol_error(status);
781 }
782 
pre_link_document( gsl::not_null<const void*> void_cookie, item_info& info)783 static ENGINE_ERROR_CODE pre_link_document(
784         gsl::not_null<const void*> void_cookie, item_info& info) {
785     // Sanity check that people aren't calling the method with a bogus
786     // cookie
787     auto* cookie =
788             reinterpret_cast<Cookie*>(const_cast<void*>(void_cookie.get()));
789 
790     auto* context = cookie->getCommandContext();
791     if (context != nullptr) {
792         return context->pre_link_document(info);
793     }
794 
795     return ENGINE_SUCCESS;
796 }
797 
get_bucket_details_UNLOCKED(const Bucket& bucket, size_t idx)798 static cJSON* get_bucket_details_UNLOCKED(const Bucket& bucket, size_t idx) {
799     if (bucket.state == BucketState::None) {
800         return nullptr;
801     }
802 
803     cJSON *root = cJSON_CreateObject();
804     cJSON_AddNumberToObject(root, "index", idx);
805     switch (bucket.state.load()) {
806     case BucketState::None:
807         cJSON_AddStringToObject(root, "state", "none");
808         break;
809     case BucketState::Creating:
810         cJSON_AddStringToObject(root, "state", "creating");
811         break;
812     case BucketState::Initializing:
813         cJSON_AddStringToObject(root, "state", "initializing");
814         break;
815     case BucketState::Ready:
816         cJSON_AddStringToObject(root, "state", "ready");
817         break;
818     case BucketState::Stopping:
819         cJSON_AddStringToObject(root, "state", "stopping");
820         break;
821     case BucketState::Destroying:
822         cJSON_AddStringToObject(root, "state", "destroying");
823         break;
824     }
825 
826     cJSON_AddNumberToObject(root, "clients", bucket.clients);
827     cJSON_AddStringToObject(root, "name", bucket.name);
828 
829     switch (bucket.type) {
830     case BucketType::Unknown:
831         cJSON_AddStringToObject(root, "type", "<<unknown>>");
832         break;
833     case BucketType::NoBucket:
834         cJSON_AddStringToObject(root, "type", "no bucket");
835         break;
836     case BucketType::Memcached:
837         cJSON_AddStringToObject(root, "type", "memcached");
838         break;
839     case BucketType::Couchstore:
840         cJSON_AddStringToObject(root, "type", "couchstore");
841         break;
842     case BucketType::EWouldBlock:
843         cJSON_AddStringToObject(root, "type", "ewouldblock");
844         break;
845     }
846 
847     return root;
848 }
849 
get_bucket_details(size_t idx)850 cJSON *get_bucket_details(size_t idx)
851 {
852     cJSON* ret;
853     Bucket &bucket = all_buckets.at(idx);
854     std::lock_guard<std::mutex> guard(bucket.mutex);
855     ret = get_bucket_details_UNLOCKED(bucket, idx);
856 
857     return ret;
858 }
859 
860 /**
861  * Check if the associated bucket is dying or not. There is two reasons
862  * for why a bucket could be dying: It is currently being deleted, or
863  * someone initiated a shutdown process.
864  */
is_bucket_dying(Connection& c)865 bool is_bucket_dying(Connection& c) {
866     bool disconnect = memcached_shutdown;
867     Bucket& b = all_buckets.at(c.getBucketIndex());
868 
869     if (b.state != BucketState::Ready) {
870         disconnect = true;
871     }
872 
873     if (disconnect) {
874         LOG_INFO(
875                 "{}: The connected bucket is being deleted.. closing "
876                 "connection {}",
877                 c.getId(),
878                 c.getDescription());
879         c.setState(McbpStateMachine::State::closing);
880         return true;
881     }
882 
883     return false;
884 }
885 
event_handler(evutil_socket_t fd, short which, void *arg)886 void event_handler(evutil_socket_t fd, short which, void *arg) {
887     auto* c = reinterpret_cast<Connection*>(arg);
888     if (c == nullptr) {
889         LOG_WARNING("event_handler: connection must be non-NULL");
890         return;
891     }
892 
893     auto *thr = c->getThread();
894     if (thr == nullptr) {
895         LOG_WARNING(
896                 "Internal error - connection without a thread found. - "
897                 "ignored");
898         return;
899     }
900 
901     // Remove the list from the list of pending io's (in case the
902     // object was scheduled to run in the dispatcher before the
903     // callback for the worker thread is executed.
904     //
905     {
906         std::lock_guard<std::mutex> lock(thr->pending_io.mutex);
907         thr->pending_io.map.erase(c);
908     }
909 
910     // Remove the connection from the notification list if it's there
911     thr->notification.remove(c);
912 
913     TRACE_LOCKGUARD_TIMED(thr->mutex,
914                           "mutex",
915                           "event_handler::threadLock",
916                           SlowMutexThreshold);
917 
918     if (memcached_shutdown) {
919         // Someone requested memcached to shut down.
920         if (signal_idle_clients(thr, -1, false) == 0) {
921             cb_assert(thr != nullptr);
922             LOG_INFO("Stopping worker thread {}", thr->index);
923             c->eventBaseLoopbreak();
924             return;
925         }
926     }
927 
928     /* sanity */
929     cb_assert(fd == c->getSocketDescriptor());
930 
931     if ((which & EV_TIMEOUT) == EV_TIMEOUT) {
932         if (c->isInternal() || c->isDCP()) {
933             if (c->isInternal()) {
934                 LOG_INFO("{}: Timeout for admin connection. (ignore)",
935                          c->getId());
936             } else if (c->isDCP()) {
937                 LOG_INFO("{}: Timeout for DCP connection. (ignore)",
938                          c->getId());
939             }
940             if (!c->reapplyEventmask()) {
941                 c->setState(McbpStateMachine::State::closing);
942             }
943         } else {
944             LOG_INFO("{}: Shutting down idle client {}",
945                      c->getId(),
946                      c->getDescription());
947             c->setState(McbpStateMachine::State::closing);
948         }
949     }
950 
951     run_event_loop(c, which);
952 
953     if (memcached_shutdown) {
954         // Someone requested memcached to shut down. If we don't have
955         // any connections bound to this thread we can just shut down
956         int connected = signal_idle_clients(thr, -1, true);
957         if (connected == 0) {
958             LOG_INFO("Stopping worker thread {}", thr->index);
959             event_base_loopbreak(thr->base);
960         } else {
961             LOG_INFO("Waiting for {} connected clients on worker thread {}",
962                      connected,
963                      thr->index);
964         }
965     }
966 }
967 
968 /**
969  * The listen_event_handler is the callback from libevent when someone is
970  * connecting to one of the server sockets. It runs in the context of the
971  * listen thread
972  */
listen_event_handler(evutil_socket_t, short which, void *arg)973 void listen_event_handler(evutil_socket_t, short which, void *arg) {
974     auto& c = *reinterpret_cast<ServerSocket*>(arg);
975 
976     if (memcached_shutdown) {
977         // Someone requested memcached to shut down. The listen thread should
978         // be stopped immediately to avoid new connections
979         LOG_INFO("Stopping listen thread");
980         event_base_loopbreak(main_base);
981         return;
982     }
983 
984     try {
985         c.acceptNewClient();
986     } catch (std::invalid_argument& e) {
987         LOG_WARNING("{}: exception occurred while accepting clients: {}",
988                     c.getSocket(),
989                     e.what());
990     }
991 }
992 
dispatch_event_handler(evutil_socket_t fd, short, void *)993 static void dispatch_event_handler(evutil_socket_t fd, short, void *) {
994     char buffer[80];
995     ssize_t nr = cb::net::recv(fd, buffer, sizeof(buffer), 0);
996 
997     if (enable_common_ports.load()) {
998         enable_common_ports.store(false);
999         create_listen_sockets(false);
1000         LOG_INFO("Initialization complete. Accepting clients.");
1001     }
1002 
1003     if (nr != -1 && is_listen_disabled()) {
1004         bool enable = false;
1005         {
1006             std::lock_guard<std::mutex> guard(listen_state.mutex);
1007             listen_state.count -= nr;
1008             if (listen_state.count <= 0) {
1009                 enable = true;
1010                 listen_state.disabled = false;
1011             }
1012         }
1013         if (enable) {
1014             for (auto& connection : listen_conn) {
1015                 connection->enable();
1016             }
1017         }
1018     }
1019 }
1020 
1021 /*
1022  * Sets a socket's send buffer size to the maximum allowed by the system.
1023  */
maximize_sndbuf(const SOCKET sfd)1024 static void maximize_sndbuf(const SOCKET sfd) {
1025     socklen_t intsize = sizeof(int);
1026     int last_good = 0;
1027     int old_size;
1028 
1029     /* Start with the default size. */
1030     if (cb::net::getsockopt(sfd,
1031                             SOL_SOCKET,
1032                             SO_SNDBUF,
1033                             reinterpret_cast<void*>(&old_size),
1034                             &intsize) != 0) {
1035         LOG_WARNING("getsockopt(SO_SNDBUF): {}", strerror(errno));
1036         return;
1037     }
1038 
1039     /* Binary-search for the real maximum. */
1040     int min = old_size;
1041     int max = MAX_SENDBUF_SIZE;
1042 
1043     while (min <= max) {
1044         int avg = ((unsigned int)(min + max)) / 2;
1045         if (cb::net::setsockopt(sfd,
1046                                 SOL_SOCKET,
1047                                 SO_SNDBUF,
1048                                 reinterpret_cast<void*>(&avg),
1049                                 intsize) == 0) {
1050             last_good = avg;
1051             min = avg + 1;
1052         } else {
1053             max = avg - 1;
1054         }
1055     }
1056 
1057     LOG_DEBUG("<{} send buffer was {}, now {}", sfd, old_size, last_good);
1058 }
1059 
new_server_socket(struct addrinfo *ai, bool tcp_nodelay)1060 static SOCKET new_server_socket(struct addrinfo *ai, bool tcp_nodelay) {
1061     SOCKET sfd;
1062 
1063     sfd = cb::net::socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
1064     if (sfd == INVALID_SOCKET) {
1065         return INVALID_SOCKET;
1066     }
1067 
1068     if (evutil_make_socket_nonblocking(sfd) == -1) {
1069         safe_close(sfd);
1070         return INVALID_SOCKET;
1071     }
1072 
1073     maximize_sndbuf(sfd);
1074 
1075     const struct linger ling = {0, 0};
1076     const int flags = 1;
1077     int error;
1078 
1079 #ifdef IPV6_V6ONLY
1080     if (ai->ai_family == AF_INET6) {
1081         error = cb::net::setsockopt(sfd,
1082                                     IPPROTO_IPV6,
1083                                     IPV6_V6ONLY,
1084                                     reinterpret_cast<const void*>(&flags),
1085                                     sizeof(flags));
1086         if (error != 0) {
1087             LOG_WARNING("setsockopt(IPV6_V6ONLY): {}", strerror(errno));
1088             safe_close(sfd);
1089             return INVALID_SOCKET;
1090         }
1091     }
1092 #endif
1093 
1094     if (cb::net::setsockopt(sfd,
1095                             SOL_SOCKET,
1096                             SO_REUSEADDR,
1097                             reinterpret_cast<const void*>(&flags),
1098                             sizeof(flags)) != 0) {
1099         LOG_WARNING("setsockopt(SO_REUSEADDR): {}",
1100                     cb_strerror(cb::net::get_socket_error()));
1101     }
1102 
1103     if (cb::net::setsockopt(sfd,
1104                             SOL_SOCKET,
1105                             SO_KEEPALIVE,
1106                             reinterpret_cast<const void*>(&flags),
1107                             sizeof(flags)) != 0) {
1108         LOG_WARNING("setsockopt(SO_KEEPALIVE): {}",
1109                     cb_strerror(cb::net::get_socket_error()));
1110     }
1111 
1112     if (cb::net::setsockopt(sfd,
1113                             SOL_SOCKET,
1114                             SO_LINGER,
1115                             reinterpret_cast<const char*>(&ling),
1116                             sizeof(ling)) != 0) {
1117         LOG_WARNING("setsockopt(SO_LINGER): {}",
1118                     cb_strerror(cb::net::get_socket_error()));
1119     }
1120 
1121     if (tcp_nodelay) {
1122         if (cb::net::setsockopt(sfd,
1123                                 IPPROTO_TCP,
1124                                 TCP_NODELAY,
1125                                 reinterpret_cast<const void*>(&flags),
1126                                 sizeof(flags)) != 0) {
1127             LOG_WARNING("setsockopt(TCP_NODELAY): {}",
1128                         cb_strerror(cb::net::get_socket_error()));
1129         }
1130     }
1131 
1132     return sfd;
1133 }
1134 
1135 /**
1136  * Add a port to the list of interfaces we're listening to.
1137  *
1138  * We're supporting binding to the port number "0" to have the operating
1139  * system pick an available port we may use (and we'll report it back to
1140  * the user through the portnumber file.). If we have knowledge of the port,
1141  * update the port descriptor (ip4/ip6), if not go ahead and create a new entry
1142  *
1143  * @param interf the interface description used to create the port
1144  * @param port the port number in use
1145  * @param family the address family for the port
1146  */
add_listening_port(const NetworkInterface *interf, in_port_t port, sa_family_t family)1147 static void add_listening_port(const NetworkInterface *interf, in_port_t port, sa_family_t family) {
1148     std::lock_guard<std::mutex> guard(stats_mutex);
1149     auto *descr = get_listening_port_instance(port);
1150 
1151     if (descr == nullptr) {
1152         ListeningPort newport(port,
1153                               interf->host,
1154                               interf->tcp_nodelay,
1155                               interf->backlog,
1156                               interf->management);
1157 
1158         newport.curr_conns = 1;
1159         newport.maxconns = interf->maxconn;
1160 
1161         if (interf->ssl.key.empty() || interf->ssl.cert.empty()) {
1162             newport.ssl.enabled = false;
1163         } else {
1164             newport.ssl.enabled = true;
1165             newport.ssl.key = interf->ssl.key;
1166             newport.ssl.cert = interf->ssl.cert;
1167         }
1168 
1169         if (family == AF_INET) {
1170             newport.ipv4 = true;
1171             newport.ipv6 = false;
1172         } else if (family == AF_INET6) {
1173             newport.ipv4 = false;
1174             newport.ipv6 = true;
1175         }
1176 
1177         stats.listening_ports.push_back(newport);
1178     } else {
1179         if (family == AF_INET) {
1180             descr->ipv4 = true;
1181         } else if (family == AF_INET6) {
1182             descr->ipv6 = true;
1183         }
1184         ++descr->curr_conns;
1185     }
1186 }
1187 
1188 /**
1189  * Create a socket and bind it to a specific port number
1190  * @param interface the interface to bind to
1191  * @param true if we was able to set up at least one address on the interface
1192  *        false if we failed to set any addresses on the interface
1193  */
server_socket(const NetworkInterface& interf)1194 static bool server_socket(const NetworkInterface& interf) {
1195     SOCKET sfd;
1196     addrinfo hints = {};
1197 
1198     // Set to true when we create an IPv4 interface
1199     bool ipv4 = false;
1200     // Set to true when we create an IPv6 interface
1201     bool ipv6 = false;
1202 
1203     hints.ai_flags = AI_PASSIVE;
1204     hints.ai_protocol = IPPROTO_TCP;
1205     hints.ai_socktype = SOCK_STREAM;
1206 
1207     if (interf.ipv4 != NetworkInterface::Protocol::Off &&
1208         interf.ipv6 != NetworkInterface::Protocol::Off) {
1209         hints.ai_family = AF_UNSPEC;
1210     } else if (interf.ipv4 != NetworkInterface::Protocol::Off) {
1211         hints.ai_family = AF_INET;
1212     } else if (interf.ipv6 != NetworkInterface::Protocol::Off) {
1213         hints.ai_family = AF_INET6;
1214     } else {
1215         throw std::invalid_argument(
1216                 "server_socket: can't create a socket without IPv4 or IPv6");
1217     }
1218 
1219     std::string port_buf = std::to_string(interf.port);
1220 
1221     const char* host = nullptr;
1222     if (!interf.host.empty() && interf.host != "*") {
1223         host = interf.host.c_str();
1224     }
1225 
1226     struct addrinfo *ai;
1227     int error = getaddrinfo(host, port_buf.c_str(), &hints, &ai);
1228     if (error != 0) {
1229 #ifdef WIN32
1230         LOG_WARNING("getaddrinfo(): {}", cb_strerror(error));
1231 #else
1232         if (error != EAI_SYSTEM) {
1233             LOG_WARNING("getaddrinfo(): {}", gai_strerror(error));
1234         } else {
1235             LOG_WARNING("getaddrinfo(): {}", cb_strerror(error));
1236         }
1237 #endif
1238         return false;
1239     }
1240 
1241     // getaddrinfo may return multiple entries for a given name/port pair.
1242     // Iterate over all of them and try to set up a listen object.
1243     // We need at least _one_ entry per requested configuration (IPv4/6) in
1244     // order to call it a success.
1245     for (struct addrinfo* next = ai; next; next = next->ai_next) {
1246         if ((sfd = new_server_socket(next, interf.tcp_nodelay)) ==
1247             INVALID_SOCKET) {
1248             // getaddrinfo can return "junk" addresses,
1249             continue;
1250         }
1251 
1252         in_port_t listenport = 0;
1253         if (bind(sfd, next->ai_addr, (socklen_t)next->ai_addrlen) == SOCKET_ERROR) {
1254             const auto bind_error = cb::net::get_socket_error();
1255             auto name = cb::net::to_string(
1256                     reinterpret_cast<sockaddr_storage*>(next->ai_addr),
1257                     static_cast<socklen_t>(next->ai_addrlen));
1258             LOG_WARNING(
1259                     "Failed to bind to {} - {}", name, cb_strerror(bind_error));
1260             safe_close(sfd);
1261             continue;
1262         }
1263 
1264         // We've configured this port.
1265         if (next->ai_addr->sa_family == AF_INET) {
1266             // We have at least one entry
1267             ipv4 = true;
1268         } else if (next->ai_addr->sa_family == AF_INET6) {
1269             // We have at least one entry
1270             ipv6 = true;
1271         }
1272 
1273         if (next->ai_addr->sa_family == AF_INET ||
1274              next->ai_addr->sa_family == AF_INET6) {
1275             union {
1276                 struct sockaddr_in in;
1277                 struct sockaddr_in6 in6;
1278             } my_sockaddr;
1279             socklen_t len = sizeof(my_sockaddr);
1280             if (getsockname(sfd, (struct sockaddr*)&my_sockaddr, &len)==0) {
1281                 if (next->ai_addr->sa_family == AF_INET) {
1282                     listenport = ntohs(my_sockaddr.in.sin_port);
1283                 } else {
1284                     listenport = ntohs(my_sockaddr.in6.sin6_port);
1285                 }
1286             }
1287         }
1288 
1289         listen_conn.emplace_back(std::make_unique<ServerSocket>(
1290                 sfd, main_base, listenport, next->ai_addr->sa_family, interf));
1291         stats.daemon_conns++;
1292         stats.curr_conns.fetch_add(1, std::memory_order_relaxed);
1293         add_listening_port(&interf, listenport, next->ai_addr->sa_family);
1294     }
1295 
1296     freeaddrinfo(ai);
1297 
1298     // Check if we successfully listened on all required protocols.
1299     bool required_proto_missing = false;
1300 
1301     // Check if the specified (missing) protocol was requested; if so log a
1302     // message, and if required return true.
1303     auto checkIfProtocolRequired =
1304             [interf](const NetworkInterface::Protocol& protoMode,
1305                      const char* protoName) -> bool {
1306         if (protoMode != NetworkInterface::Protocol::Off) {
1307             // Failed to create a socket for this protocol; and it's not
1308             // disabled
1309             auto level = spdlog::level::level_enum::warn;
1310             if (protoMode == NetworkInterface::Protocol::Required) {
1311                 level = spdlog::level::level_enum::critical;
1312             }
1313             CB_LOG_ENTRY(level,
1314                          R"(Failed to create {} {} socket for "{}:{}")",
1315                          to_string(protoMode),
1316                          protoName,
1317                          interf.host.empty() ? "*" : interf.host,
1318                          interf.port);
1319         }
1320         return protoMode == NetworkInterface::Protocol::Required;
1321     };
1322     if (!ipv4) {
1323         required_proto_missing |= checkIfProtocolRequired(interf.ipv4, "IPv4");
1324     }
1325     if (!ipv6) {
1326         required_proto_missing |= checkIfProtocolRequired(interf.ipv6, "IPv6");
1327     }
1328 
1329     // Return success as long as we managed to create a listening port
1330     // for all non-optional protocols.
1331     return !required_proto_missing;
1332 }
1333 
server_sockets(bool management)1334 static bool server_sockets(bool management) {
1335     bool success = true;
1336 
1337     if (management) {
1338         LOG_INFO("Enable management port(s)");
1339     } else {
1340         LOG_INFO("Enable user port(s)");
1341     }
1342 
1343     for (auto& interface : settings.getInterfaces()) {
1344         if (management && interface.management) {
1345             if (!server_socket(interface)) {
1346                 success = false;
1347             }
1348         } else if (!management && !interface.management) {
1349             if (!server_socket(interface)) {
1350                 success = false;
1351             }
1352         }
1353     }
1354 
1355     return success;
1356 }
1357 
create_listen_sockets(bool management)1358 static void create_listen_sockets(bool management) {
1359     if (!server_sockets(management)) {
1360         FATAL_ERROR(
1361                 EX_OSERR,
1362                 "Failed to create required listening socket(s). Terminating.");
1363     }
1364 
1365     if (management) {
1366         // the client is not expecting us to update the port set at
1367         // later time, so enable all ports immediately
1368         if (!server_sockets(false)) {
1369             FATAL_ERROR(EX_OSERR,
1370                         "Failed to create required listening socket(s). "
1371                         "Terminating.");
1372         }
1373     }
1374 
1375     const char* portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
1376     if (portnumber_filename != nullptr) {
1377         std::string temp_portnumber_filename;
1378         temp_portnumber_filename.assign(portnumber_filename);
1379         temp_portnumber_filename.append(".lck");
1380 
1381         FILE* portnumber_file = nullptr;
1382         portnumber_file = fopen(temp_portnumber_filename.c_str(), "a");
1383         if (portnumber_file == nullptr) {
1384             FATAL_ERROR(EX_OSERR,
1385                         R"(Failed to open "{}": {})",
1386                         temp_portnumber_filename,
1387                         strerror(errno));
1388         }
1389 
1390         unique_cJSON_ptr array(cJSON_CreateArray());
1391 
1392         for (const auto& connection : listen_conn) {
1393             cJSON_AddItemToArray(array.get(),
1394                                  connection->getDetails().release());
1395         }
1396 
1397         unique_cJSON_ptr root(cJSON_CreateObject());
1398         cJSON_AddItemToObject(root.get(), "ports", array.release());
1399         fprintf(portnumber_file, "%s\n", to_string(root, true).c_str());
1400         fclose(portnumber_file);
1401         LOG_INFO("Port numbers available in {}", portnumber_filename);
1402         if (rename(temp_portnumber_filename.c_str(), portnumber_filename) == -1) {
1403             FATAL_ERROR(EX_OSERR,
1404                         R"(Failed to rename "{}" to "{}": {})",
1405                         temp_portnumber_filename.c_str(),
1406                         portnumber_filename,
1407                         strerror(errno));
1408         }
1409     }
1410 }
1411 
1412 #ifdef WIN32
1413 // Unfortunately we don't have signal handlers on windows
install_signal_handlers()1414 static bool install_signal_handlers() {
1415     return true;
1416 }
1417 
release_signal_handlers()1418 static void release_signal_handlers() {
1419 }
1420 #else
1421 
1422 #ifndef HAVE_SIGIGNORE
sigignore(int sig)1423 static int sigignore(int sig) {
1424     struct sigaction sa;
1425     memset(&sa, 0, sizeof(sa));
1426     sa.sa_handler = SIG_IGN;
1427 
1428     if (sigemptyset(&sa.sa_mask) == -1 || sigaction(sig, &sa, 0) == -1) {
1429         return -1;
1430     }
1431     return 0;
1432 }
1433 #endif /* !HAVE_SIGIGNORE */
1434 
1435 
sigterm_handler(evutil_socket_t, short, void *)1436 static void sigterm_handler(evutil_socket_t, short, void *) {
1437     shutdown_server();
1438 }
1439 
1440 static struct event* sigusr1_event;
1441 static struct event* sigterm_event;
1442 static struct event* sigint_event;
1443 
install_signal_handlers()1444 static bool install_signal_handlers() {
1445     // SIGUSR1 - Used to dump connection stats
1446     sigusr1_event = evsignal_new(main_base, SIGUSR1,
1447                                  dump_connection_stat_signal_handler,
1448                                  nullptr);
1449     if (sigusr1_event == nullptr) {
1450         LOG_WARNING("Failed to allocate SIGUSR1 handler");
1451         return false;
1452     }
1453 
1454     if (event_add(sigusr1_event, nullptr) < 0) {
1455         LOG_WARNING("Failed to install SIGUSR1 handler");
1456         return false;
1457 
1458     }
1459 
1460     // SIGTERM - Used to shut down memcached cleanly
1461     sigterm_event = evsignal_new(main_base, SIGTERM, sigterm_handler, NULL);
1462     if (sigterm_event == NULL) {
1463         LOG_WARNING("Failed to allocate SIGTERM handler");
1464         return false;
1465     }
1466 
1467     if (event_add(sigterm_event, NULL) < 0) {
1468         LOG_WARNING("Failed to install SIGTERM handler");
1469         return false;
1470     }
1471 
1472     // SIGINT - Used to shut down memcached cleanly
1473     sigint_event = evsignal_new(main_base, SIGINT, sigterm_handler, NULL);
1474     if (sigint_event == NULL) {
1475         LOG_WARNING("Failed to allocate SIGINT handler");
1476         return false;
1477     }
1478 
1479     if (event_add(sigint_event, NULL) < 0) {
1480         LOG_WARNING("Failed to install SIGINT handler");
1481         return false;
1482     }
1483 
1484     return true;
1485 }
1486 
release_signal_handlers()1487 static void release_signal_handlers() {
1488     event_free(sigusr1_event);
1489     event_free(sigint_event);
1490     event_free(sigterm_event);
1491 }
1492 #endif
1493 
get_server_version(void)1494 const char* get_server_version(void) {
1495     if (strlen(PRODUCT_VERSION) == 0) {
1496         return "unknown";
1497     } else {
1498         return PRODUCT_VERSION;
1499     }
1500 }
1501 
store_engine_specific(gsl::not_null<const void*> void_cookie, void* engine_data)1502 static void store_engine_specific(gsl::not_null<const void*> void_cookie,
1503                                   void* engine_data) {
1504     auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1505     cookie->getConnection().setEngineStorage(engine_data);
1506 }
1507 
get_engine_specific(gsl::not_null<const void*> void_cookie)1508 static void* get_engine_specific(gsl::not_null<const void*> void_cookie) {
1509     auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1510     return cookie->getConnection().getEngineStorage();
1511 }
1512 
is_datatype_supported(gsl::not_null<const void*> void_cookie, protocol_binary_datatype_t datatype)1513 static bool is_datatype_supported(gsl::not_null<const void*> void_cookie,
1514                                   protocol_binary_datatype_t datatype) {
1515     auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1516     return cookie->getConnection().isDatatypeEnabled(datatype);
1517 }
1518 
is_mutation_extras_supported( gsl::not_null<const void*> void_cookie)1519 static bool is_mutation_extras_supported(
1520         gsl::not_null<const void*> void_cookie) {
1521     auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1522     return cookie->getConnection().isSupportsMutationExtras();
1523 }
1524 
is_collections_supported(gsl::not_null<const void*> void_cookie)1525 static bool is_collections_supported(gsl::not_null<const void*> void_cookie) {
1526     auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1527     return cookie->getConnection().isCollectionsSupported();
1528 }
1529 
get_opcode_if_ewouldblock_set( gsl::not_null<const void*> void_cookie)1530 static uint8_t get_opcode_if_ewouldblock_set(
1531         gsl::not_null<const void*> void_cookie) {
1532     auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1533 
1534     uint8_t opcode = PROTOCOL_BINARY_CMD_INVALID;
1535     if (cookie->isEwouldblock()) {
1536         try {
1537             opcode = cookie->getHeader().getOpcode();
1538         } catch (...) {
1539             // Don't barf out if the header isn't there
1540         }
1541     }
1542     return opcode;
1543 }
1544 
validate_session_cas(const uint64_t cas)1545 static bool validate_session_cas(const uint64_t cas) {
1546     return session_cas.increment_session_counter(cas);
1547 }
1548 
decrement_session_ctr(void)1549 static void decrement_session_ctr(void) {
1550     session_cas.decrement_session_counter();
1551 }
1552 
reserve_cookie( gsl::not_null<const void*> void_cookie)1553 static ENGINE_ERROR_CODE reserve_cookie(
1554         gsl::not_null<const void*> void_cookie) {
1555     auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1556 
1557     cookie->getConnection().incrementRefcount();
1558     return ENGINE_SUCCESS;
1559 }
1560 
release_cookie( gsl::not_null<const void*> void_cookie)1561 static ENGINE_ERROR_CODE release_cookie(
1562         gsl::not_null<const void*> void_cookie) {
1563     auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1564 
1565     auto* c = &cookie->getConnection();
1566     int notify;
1567     LIBEVENT_THREAD *thr;
1568 
1569     thr = c->getThread();
1570     cb_assert(thr);
1571 
1572     {
1573         TRACE_LOCKGUARD_TIMED(thr->mutex,
1574                               "mutex",
1575                               "release_cookie::threadLock",
1576                               SlowMutexThreshold);
1577 
1578         c->decrementRefcount();
1579 
1580         /* Releasing the refererence to the object may cause it to change
1581          * state. (NOTE: the release call shall never be called from the
1582          * worker threads), so should put the connection in the pool of
1583          * pending IO and have the system retry the operation for the
1584          * connection
1585          */
1586         notify = add_conn_to_pending_io_list(c, ENGINE_SUCCESS);
1587     }
1588 
1589     /* kick the thread in the butt */
1590     if (notify) {
1591         notify_thread(*thr);
1592     }
1593 
1594     return ENGINE_SUCCESS;
1595 }
1596 
cookie_set_priority(gsl::not_null<const void*> void_cookie, CONN_PRIORITY priority)1597 static void cookie_set_priority(gsl::not_null<const void*> void_cookie,
1598                                 CONN_PRIORITY priority) {
1599     auto* cookie = reinterpret_cast<const Cookie*>(void_cookie.get());
1600 
1601     auto* c = &cookie->getConnection();
1602     switch (priority) {
1603     case CONN_PRIORITY_HIGH:
1604         c->setPriority(Connection::Priority::High);
1605         return;
1606     case CONN_PRIORITY_MED:
1607         c->setPriority(Connection::Priority::Medium);
1608         return;
1609     case CONN_PRIORITY_LOW:
1610         c->setPriority(Connection::Priority::Low);
1611         return;
1612     }
1613 
1614     LOG_WARNING(
1615             "{}: cookie_set_priority: priority (which is {}) is not a "
1616             "valid CONN_PRIORITY - closing connection {}",
1617             c->getId(),
1618             priority,
1619             c->getDescription());
1620     c->setState(McbpStateMachine::State::closing);
1621 }
1622 
count_eviction(gsl::not_null<const void*>, const void*, int)1623 static void count_eviction(gsl::not_null<const void*>, const void*, int) {
1624 }
1625 
get_max_item_iovec_size()1626 static size_t get_max_item_iovec_size() {
1627     return 1;
1628 }
1629 
1630 static std::condition_variable shutdown_cv;
1631 static std::mutex shutdown_cv_mutex;
1632 static bool memcached_can_shutdown = false;
shutdown_server(void)1633 void shutdown_server(void) {
1634 
1635     std::unique_lock<std::mutex> lk(shutdown_cv_mutex);
1636     if (!memcached_can_shutdown) {
1637         // log and proceed to wait shutdown
1638         LOG_INFO("shutdown_server waiting for can_shutdown signal");
1639         shutdown_cv.wait(lk, []{return memcached_can_shutdown;});
1640     }
1641     memcached_shutdown = true;
1642     LOG_INFO("Received shutdown request");
1643     event_base_loopbreak(main_base);
1644 }
1645 
enable_shutdown(void)1646 void enable_shutdown(void) {
1647     std::unique_lock<std::mutex> lk(shutdown_cv_mutex);
1648     memcached_can_shutdown = true;
1649     shutdown_cv.notify_all();
1650 }
1651 
get_logger(void)1652 static EXTENSION_LOGGER_DESCRIPTOR* get_logger(void)
1653 {
1654     return settings.extensions.logger;
1655 }
1656 
get_log_level(void)1657 static EXTENSION_LOG_LEVEL get_log_level(void)
1658 {
1659     EXTENSION_LOG_LEVEL ret;
1660     switch (settings.getVerbose()) {
1661     case 0: ret = EXTENSION_LOG_NOTICE; break;
1662     case 1: ret = EXTENSION_LOG_INFO; break;
1663     default:
1664         ret = EXTENSION_LOG_DEBUG;
1665     }
1666     return ret;
1667 }
1668 
set_log_level(EXTENSION_LOG_LEVEL severity)1669 static void set_log_level(EXTENSION_LOG_LEVEL severity)
1670 {
1671     switch (severity) {
1672     case EXTENSION_LOG_FATAL:
1673     case EXTENSION_LOG_WARNING:
1674     case EXTENSION_LOG_NOTICE:
1675         settings.setVerbose(0);
1676         break;
1677     case EXTENSION_LOG_INFO:
1678         settings.setVerbose(1);
1679         break;
1680     default:
1681         settings.setVerbose(2);
1682     }
1683 }
1684 
1685 /**
1686  * Callback the engines may call to get the public server interface
1687  * @return pointer to a structure containing the interface. The client should
1688  *         know the layout and perform the proper casts.
1689  */
get_server_api()1690 SERVER_HANDLE_V1* get_server_api() {
1691     static int init;
1692     static SERVER_CORE_API core_api;
1693     static SERVER_COOKIE_API server_cookie_api;
1694     static SERVER_STAT_API server_stat_api;
1695     static SERVER_LOG_API server_log_api;
1696     static SERVER_CALLBACK_API callback_api;
1697     static ALLOCATOR_HOOKS_API hooks_api;
1698     static SERVER_HANDLE_V1 rv;
1699     static SERVER_DOCUMENT_API document_api;
1700 
1701     if (!init) {
1702         init = 1;
1703         core_api.realtime = mc_time_convert_to_real_time;
1704         core_api.abstime = mc_time_convert_to_abs_time;
1705         core_api.get_current_time = mc_time_get_current_time;
1706         core_api.parse_config = parse_config;
1707         core_api.shutdown = shutdown_server;
1708         core_api.get_max_item_iovec_size = get_max_item_iovec_size;
1709         core_api.trigger_tick = mc_time_clock_tick;
1710 
1711         server_cookie_api.store_engine_specific = store_engine_specific;
1712         server_cookie_api.get_engine_specific = get_engine_specific;
1713         server_cookie_api.is_datatype_supported = is_datatype_supported;
1714         server_cookie_api.is_mutation_extras_supported = is_mutation_extras_supported;
1715         server_cookie_api.is_collections_supported = is_collections_supported;
1716         server_cookie_api.get_opcode_if_ewouldblock_set = get_opcode_if_ewouldblock_set;
1717         server_cookie_api.validate_session_cas = validate_session_cas;
1718         server_cookie_api.decrement_session_ctr = decrement_session_ctr;
1719         server_cookie_api.notify_io_complete = notify_io_complete;
1720         server_cookie_api.reserve = reserve_cookie;
1721         server_cookie_api.release = release_cookie;
1722         server_cookie_api.set_priority = cookie_set_priority;
1723         server_cookie_api.get_bucket_id = get_bucket_id;
1724         server_cookie_api.get_connection_id = get_connection_id;
1725         server_cookie_api.check_privilege = check_privilege;
1726         server_cookie_api.engine_error2mcbp = engine_error2mcbp;
1727         server_cookie_api.get_log_info = cookie_get_log_info;
1728         server_cookie_api.set_error_context = cookie_set_error_context;
1729 
1730         server_stat_api.evicting = count_eviction;
1731 
1732         server_log_api.get_logger = get_logger;
1733         server_log_api.get_level = get_log_level;
1734         server_log_api.set_level = set_log_level;
1735 
1736         callback_api.register_callback = register_callback;
1737         callback_api.perform_callbacks = perform_callbacks;
1738 
1739         hooks_api.add_new_hook = AllocHooks::add_new_hook;
1740         hooks_api.remove_new_hook = AllocHooks::remove_new_hook;
1741         hooks_api.add_delete_hook = AllocHooks::add_delete_hook;
1742         hooks_api.remove_delete_hook = AllocHooks::remove_delete_hook;
1743         hooks_api.get_extra_stats_size = AllocHooks::get_extra_stats_size;
1744         hooks_api.get_allocator_stats = AllocHooks::get_allocator_stats;
1745         hooks_api.get_allocation_size = AllocHooks::get_allocation_size;
1746         hooks_api.get_detailed_stats = AllocHooks::get_detailed_stats;
1747         hooks_api.release_free_memory = AllocHooks::release_free_memory;
1748         hooks_api.enable_thread_cache = AllocHooks::enable_thread_cache;
1749         hooks_api.get_allocator_property = AllocHooks::get_allocator_property;
1750 
1751         document_api.pre_link = pre_link_document;
1752         document_api.pre_expiry = document_pre_expiry;
1753 
1754         rv.interface = 1;
1755         rv.core = &core_api;
1756         rv.stat = &server_stat_api;
1757         rv.callback = &callback_api;
1758         rv.log = &server_log_api;
1759         rv.cookie = &server_cookie_api;
1760         rv.alloc_hooks = &hooks_api;
1761         rv.document = &document_api;
1762     }
1763 
1764     return &rv;
1765 }
1766 
1767 /* BUCKET FUNCTIONS */
create()1768 void CreateBucketThread::create() {
1769     LOG_INFO("{} Create bucket [{}]", connection.getId(), name);
1770 
1771     if (!BucketValidator::validateBucketName(name, error)) {
1772         LOG_WARNING("{} Create bucket [{}] failed - Invalid bucket name",
1773                     connection.getId(),
1774                     name);
1775         result = ENGINE_EINVAL;
1776         return;
1777     }
1778 
1779     if (!BucketValidator::validateBucketType(type, error)) {
1780         LOG_WARNING("{} Create bucket [{}] failed - Invalid bucket type",
1781                     connection.getId(),
1782                     name);
1783         result = ENGINE_EINVAL;
1784         return;
1785     }
1786 
1787     size_t ii;
1788     size_t first_free = all_buckets.size();
1789     bool found = false;
1790 
1791     std::unique_lock<std::mutex> all_bucket_lock(buckets_lock);
1792     for (ii = 0; ii < all_buckets.size() && !found; ++ii) {
1793         std::lock_guard<std::mutex> guard(all_buckets[ii].mutex);
1794         if (first_free == all_buckets.size() &&
1795             all_buckets[ii].state == BucketState::None)
1796         {
1797             first_free = ii;
1798         }
1799         if (name == all_buckets[ii].name) {
1800             found = true;
1801         }
1802     }
1803 
1804     if (found) {
1805         result = ENGINE_KEY_EEXISTS;
1806         LOG_WARNING("{} Create bucket [{}] failed - Already exists",
1807                     connection.getId(),
1808                     name);
1809     } else if (first_free == all_buckets.size()) {
1810         result = ENGINE_E2BIG;
1811         LOG_WARNING("{} Create bucket [{}] failed - Too many buckets",
1812                     connection.getId(),
1813                     name);
1814     } else {
1815         result = ENGINE_SUCCESS;
1816         ii = first_free;
1817         /*
1818          * split the creation of the bucket in two... so
1819          * we can release the global lock..
1820          */
1821         std::lock_guard<std::mutex> guard(all_buckets[ii].mutex);
1822         all_buckets[ii].state = BucketState::Creating;
1823         all_buckets[ii].type = type;
1824         strcpy(all_buckets[ii].name, name.c_str());
1825         try {
1826             all_buckets[ii].topkeys = new TopKeys(settings.getTopkeysSize());
1827         } catch (const std::bad_alloc &) {
1828             result = ENGINE_ENOMEM;
1829             LOG_WARNING("{} Create bucket [{}] failed - out of memory",
1830                         connection.getId(),
1831                         name);
1832         }
1833     }
1834     all_bucket_lock.unlock();
1835 
1836     if (result != ENGINE_SUCCESS) {
1837         return;
1838     }
1839 
1840     auto &bucket = all_buckets[ii];
1841 
1842     /* People aren't allowed to use the engine in this state,
1843      * so we can do stuff without locking..
1844      */
1845     if (new_engine_instance(type,
1846                             name,
1847                             get_server_api,
1848                             (ENGINE_HANDLE**)&bucket.engine,
1849                             settings.extensions.logger)) {
1850         auto* engine = bucket.engine;
1851         {
1852             std::lock_guard<std::mutex> guard(bucket.mutex);
1853             bucket.state = BucketState::Initializing;
1854         }
1855 
1856         try {
1857             result = engine->initialize(v1_handle_2_handle(engine),
1858                                         config.c_str());
1859         } catch (const std::runtime_error& e) {
1860             LOG_WARNING("{} - Failed to create bucket [{}]: {}",
1861                         connection.getId(),
1862                         name,
1863                         e.what());
1864             result = ENGINE_FAILED;
1865         } catch (const std::bad_alloc& e) {
1866             LOG_WARNING("{} - Failed to create bucket [{}]: {}",
1867                         connection.getId(),
1868                         name,
1869                         e.what());
1870             result = ENGINE_ENOMEM;
1871         }
1872 
1873         if (result == ENGINE_SUCCESS) {
1874             {
1875                 std::lock_guard<std::mutex> guard(bucket.mutex);
1876                 bucket.state = BucketState::Ready;
1877             }
1878             LOG_INFO("{} - Bucket [{}] created successfully",
1879                      connection.getId(),
1880                      name);
1881             bucket.max_document_size =
1882                  engine->getMaxItemSize(reinterpret_cast<ENGINE_HANDLE*>(engine));
1883         } else {
1884             {
1885                 std::lock_guard<std::mutex> guard(bucket.mutex);
1886                 bucket.state = BucketState::Destroying;
1887             }
1888             engine->destroy(v1_handle_2_handle(engine), false);
1889             std::lock_guard<std::mutex> guard(bucket.mutex);
1890             bucket.state = BucketState::None;
1891             bucket.name[0] = '\0';
1892             bucket.engine = nullptr;
1893             delete bucket.topkeys;
1894             bucket.topkeys = nullptr;
1895             bucket.clusterConfiguration.reset();
1896 
1897             result = ENGINE_NOT_STORED;
1898         }
1899     } else {
1900         {
1901             std::lock_guard<std::mutex> guard(bucket.mutex);
1902             bucket.state = BucketState::None;
1903             bucket.name[0] = '\0';
1904             bucket.engine = nullptr;
1905             delete bucket.topkeys;
1906             bucket.topkeys = nullptr;
1907             bucket.clusterConfiguration.reset();
1908         }
1909 
1910         LOG_WARNING(
1911                 "{} - Failed to create bucket [{}]: failed to "
1912                 "create a new engine instance",
1913                 connection.getId(),
1914                 name);
1915         result = ENGINE_FAILED;
1916     }
1917 }
1918 
run()1919 void CreateBucketThread::run()
1920 {
1921     setRunning();
1922     // Perform the task without having any locks. The task should be
1923     // scheduled in a pending state so the executor won't try to touch
1924     // the object until we're telling it that it is runnable
1925     create();
1926     std::lock_guard<std::mutex> guard(task->getMutex());
1927     task->makeRunnable();
1928 }
1929 
notify_thread_bucket_deletion(LIBEVENT_THREAD& me)1930 void notify_thread_bucket_deletion(LIBEVENT_THREAD& me) {
1931     for (size_t ii = 0; ii < all_buckets.size(); ++ii) {
1932         bool destroy = false;
1933         {
1934             std::lock_guard<std::mutex> guard(all_buckets[ii].mutex);
1935             if (all_buckets[ii].state == BucketState::Destroying) {
1936                 destroy = true;
1937             }
1938         }
1939 
1940         if (destroy) {
1941             signal_idle_clients(&me, gsl::narrow<int>(ii), false);
1942         }
1943     }
1944 }
1945 
destroy()1946 void DestroyBucketThread::destroy() {
1947     ENGINE_ERROR_CODE ret = ENGINE_KEY_ENOENT;
1948     std::unique_lock<std::mutex> all_bucket_lock(buckets_lock);
1949 
1950     Connection* connection = nullptr;
1951     if (cookie != nullptr) {
1952         connection = &cookie->getConnection();
1953     }
1954 
1955     /*
1956      * The destroy function will have access to a connection if the
1957      * McbpDestroyBucketTask originated from delete_bucket_executor().
1958      * However if we are in the process of shuting down and the
1959      * McbpDestroyBucketTask originated from main() then connection
1960      * will be set to nullptr.
1961      */
1962     const std::string connection_id{(connection == nullptr)
1963             ? "<none>"
1964             : std::to_string(connection->getId())};
1965 
1966     size_t idx = 0;
1967     for (size_t ii = 0; ii < all_buckets.size(); ++ii) {
1968         std::lock_guard<std::mutex> guard(all_buckets[ii].mutex);
1969         if (name == all_buckets[ii].name) {
1970             idx = ii;
1971             if (all_buckets[ii].state == BucketState::Ready) {
1972                 ret = ENGINE_SUCCESS;
1973                 all_buckets[ii].state = BucketState::Destroying;
1974             } else {
1975                 ret = ENGINE_KEY_EEXISTS;
1976             }
1977         }
1978         if (ret != ENGINE_KEY_ENOENT) {
1979             break;
1980         }
1981     }
1982     all_bucket_lock.unlock();
1983 
1984     if (ret != ENGINE_SUCCESS) {
1985         auto code = engine_error_2_mcbp_protocol_error(ret);
1986         LOG_INFO("{} Delete bucket [{}]: {}",
1987                  connection_id,
1988                  name,
1989                  memcached_status_2_text(code));
1990         result = ret;
1991         return;
1992     }
1993 
1994     LOG_INFO(
1995             "{} Delete bucket [{}]. Notifying all registered "
1996             "ON_DELETE_BUCKET callbacks",
1997             connection_id,
1998             name);
1999 
2000     perform_callbacks(ON_DELETE_BUCKET, nullptr, &all_buckets[idx]);
2001 
2002     LOG_INFO("{} Delete bucket [{}]. Wait for clients to disconnect",
2003              connection_id,
2004              name);
2005 
2006     /* If this thread is connected to the requested bucket... release it */
2007     if (connection != nullptr && idx == size_t(connection->getBucketIndex())) {
2008         disassociate_bucket(*connection);
2009     }
2010 
2011     /* Let all of the worker threads start invalidating connections */
2012     threads_initiate_bucket_deletion();
2013 
2014     auto& bucket = all_buckets[idx];
2015 
2016     /* Wait until all users disconnected... */
2017     {
2018         std::unique_lock<std::mutex> guard(bucket.mutex);
2019 
2020         while (bucket.clients > 0) {
2021             LOG_INFO(
2022                     "{} Delete bucket [{}]. Still waiting: {} clients "
2023                     "connected",
2024                     connection_id.c_str(),
2025                     name.c_str(),
2026                     bucket.clients);
2027             /* drop the lock and notify the worker threads */
2028             guard.unlock();
2029             threads_notify_bucket_deletion();
2030             guard.lock();
2031 
2032             bucket.cond.wait_for(guard,
2033                                  std::chrono::milliseconds(1000),
2034                                  [&bucket] { return bucket.clients == 0; });
2035         }
2036     }
2037 
2038     /* Tell the worker threads to stop trying to invalidating connections */
2039     threads_complete_bucket_deletion();
2040 
2041     /*
2042      * We cannot call assert_no_assocations(idx) because it iterates
2043      * over all connections and calls c->getBucketIndex().  The problem
2044      * is that a worker thread can call associate_initial_bucket() or
2045      * associate_bucket() at the same time.  This could lead to a call
2046      * to c->setBucketIndex(0) (the "no bucket"), which although safe,
2047      * raises a threadsanitizer warning.
2048 
2049      * Note, if associate_bucket() attempts to associate a connection
2050      * with a bucket that has been destroyed, or is in the process of
2051      * being destroyed, the association will fail because
2052      * BucketState != Ready.  See associate_bucket() for more details.
2053      */
2054 
2055     LOG_INFO(
2056             "{} Delete bucket [{}]. Shut down the bucket", connection_id, name);
2057 
2058     bucket.engine->destroy(v1_handle_2_handle(bucket.engine), force);
2059 
2060     LOG_INFO("{} Delete bucket [{}]. Clean up allocated resources ",
2061              connection_id,
2062              name);
2063 
2064     /* Clean up the stats... */
2065     threadlocal_stats_reset(bucket.stats);
2066 
2067     // Clear any registered event handlers
2068     for (auto& handler : bucket.engine_event_handlers) {
2069         handler.clear();
2070     }
2071 
2072     {
2073         std::lock_guard<std::mutex> guard(bucket.mutex);
2074         bucket.state = BucketState::None;
2075         bucket.engine = NULL;
2076         bucket.name[0] = '\0';
2077         delete bucket.topkeys;
2078         bucket.responseCounters.fill(0);
2079         bucket.topkeys = nullptr;
2080         bucket.clusterConfiguration.reset();
2081     }
2082     // don't need lock because all timing data uses atomics
2083     bucket.timings.reset();
2084 
2085     LOG_INFO("{} Delete bucket [{}] complete", connection_id, name);
2086     result = ENGINE_SUCCESS;
2087 }
2088 
run()2089 void DestroyBucketThread::run() {
2090     setRunning();
2091     destroy();
2092     std::lock_guard<std::mutex> guard(task->getMutex());
2093     task->makeRunnable();
2094 }
2095 
initialize_buckets(void)2096 static void initialize_buckets(void) {
2097     size_t numthread = settings.getNumWorkerThreads() + 1;
2098     for (auto &b : all_buckets) {
2099         b.stats.resize(numthread);
2100     }
2101 
2102     // To make the life easier for us in the code, index 0
2103     // in the array is "no bucket"
2104     ENGINE_HANDLE *handle;
2105     cb_assert(new_engine_instance(BucketType::NoBucket,
2106                                   "<internal>",
2107                                   get_server_api,
2108                                   &handle,
2109                                   settings.extensions.logger));
2110 
2111     cb_assert(handle != nullptr);
2112     auto &nobucket = all_buckets.at(0);
2113     nobucket.type = BucketType::NoBucket;
2114     nobucket.state = BucketState::Ready;
2115     nobucket.engine = (ENGINE_HANDLE_V1*)handle;
2116 }
2117 
cleanup_buckets(void)2118 static void cleanup_buckets(void) {
2119     for (auto &bucket : all_buckets) {
2120         bool waiting;
2121 
2122         do {
2123             waiting = false;
2124             {
2125                 std::lock_guard<std::mutex> guard(bucket.mutex);
2126                 switch (bucket.state.load()) {
2127                 case BucketState::Stopping:
2128                 case BucketState::Destroying:
2129                 case BucketState::Creating:
2130                 case BucketState::Initializing:
2131                     waiting = true;
2132                     break;
2133                 default:
2134                         /* Empty */
2135                         ;
2136                 }
2137             }
2138             if (waiting) {
2139                 usleep(250);
2140             }
2141         } while (waiting);
2142 
2143         if (bucket.state == BucketState::Ready) {
2144             bucket.engine->destroy(v1_handle_2_handle(bucket.engine), false);
2145             delete bucket.topkeys;
2146         }
2147     }
2148 }
2149 
delete_all_buckets()2150 void delete_all_buckets() {
2151     /*
2152      * Delete all of the buckets one by one by using the executor.
2153      * We could in theory schedule all of them in parallel, but they
2154      * probably have some dirty items they need to write to disk so
2155      * instead of having all of the buckets step on the underlying IO
2156      * in parallel we'll do them sequentially.
2157      */
2158 
2159     /**
2160      * Create a specialized task I may use that just holds the
2161      * DeleteBucketThread object.
2162      */
2163     class DestroyBucketTask : public Task {
2164     public:
2165         DestroyBucketTask(const std::string& name_)
2166             : thread(name_, false, nullptr, this)
2167         {
2168             // empty
2169         }
2170 
2171         // start the bucket deletion
2172         // May throw std::bad_alloc if we're failing to start the thread
2173         void start() {
2174             thread.start();
2175         }
2176 
2177         Status execute() override {
2178             return Status::Finished;
2179         }
2180 
2181         DestroyBucketThread thread;
2182     };
2183 
2184     LOG_INFO("Stop all buckets");
2185     bool done;
2186     do {
2187         done = true;
2188         std::shared_ptr<Task> task;
2189         std::string name;
2190 
2191         std::unique_lock<std::mutex> all_bucket_lock(buckets_lock);
2192         /*
2193          * Start at one (not zero) because zero is reserved for "no bucket".
2194          * The "no bucket" has a state of BucketState::Ready but no name.
2195          */
2196         for (size_t ii = 1; ii < all_buckets.size() && done; ++ii) {
2197             std::lock_guard<std::mutex> bucket_guard(all_buckets[ii].mutex);
2198             if (all_buckets[ii].state == BucketState::Ready) {
2199                 name.assign(all_buckets[ii].name);
2200                 LOG_INFO("Scheduling delete for bucket {}", name);
2201                 task = std::make_shared<DestroyBucketTask>(name);
2202                 std::lock_guard<std::mutex> guard(task->getMutex());
2203                 dynamic_cast<DestroyBucketTask&>(*task).start();
2204                 executorPool->schedule(task, false);
2205                 done = false;
2206             }
2207         }
2208         all_bucket_lock.unlock();
2209 
2210         if (task.get() != nullptr) {
2211             auto& dbt = dynamic_cast<DestroyBucketTask&>(*task);
2212             LOG_INFO("Waiting for delete of {} to complete", name);
2213             dbt.thread.waitForState(Couchbase::ThreadState::Zombie);
2214             LOG_INFO("Bucket {} deleted", name);
2215         }
2216     } while (!done);
2217 }
2218 
set_max_filehandles(void)2219 static void set_max_filehandles(void) {
2220     const uint64_t maxfiles = settings.getMaxconns() +
2221                             (3 * (settings.getNumWorkerThreads() + 2)) +
2222                             1024;
2223 
2224     auto limit = cb::io::maximizeFileDescriptors(maxfiles);
2225 
2226     if (limit < maxfiles) {
2227         LOG_WARNING(
2228                 "Failed to set the number of file descriptors "
2229                 "to {} due to system resource restrictions. "
2230                 "This may cause the system to misbehave once you reach a "
2231                 "high connection count as the system won't be able open "
2232                 "new files on the system. The maximum number of file "
2233                 "descriptors is currently set to {}. The system "
2234                 "is configured to allow {} number of client connections, "
2235                 "and in addition to that the overhead of the worker "
2236                 "threads is {}. Finally the backed database needs to "
2237                 "open files to persist data.",
2238                 int(maxfiles),
2239                 int(limit),
2240                 settings.getMaxconns(),
2241                 (3 * (settings.getNumWorkerThreads() + 2)));
2242     }
2243 }
2244 
2245 /**
2246  * The log function used from SASL
2247  *
2248  * Try to remap the log levels to our own levels and put in the log
2249  * depending on the severity.
2250  */
sasl_log_callback(cb::sasl::logging::Level level, const std::string& message)2251 static void sasl_log_callback(cb::sasl::logging::Level level,
2252                               const std::string& message) {
2253     switch (level) {
2254     case cb::sasl::logging::Level::Error:
2255         LOG_ERROR("{}", message);
2256         break;
2257     case cb::sasl::logging::Level::Warning:
2258         LOG_WARNING("{}", message);
2259         break;
2260     case cb::sasl::logging::Level::Notice:
2261         LOG_INFO("{}", message);
2262         break;
2263     case cb::sasl::logging::Level::Fail:
2264     case cb::sasl::logging::Level::Debug:
2265         LOG_DEBUG("{}", message);
2266         break;
2267     case cb::sasl::logging::Level::Trace:
2268         LOG_TRACE("{}", message);
2269         break;
2270     }
2271 }
2272 
sasl_getopt_callback(void*, const char*, const char* option, const char** result, unsigned* len)2273 static int sasl_getopt_callback(void*, const char*,
2274                                 const char* option,
2275                                 const char** result,
2276                                 unsigned* len) {
2277     if (option == nullptr || result == nullptr || len == nullptr) {
2278         return CBSASL_BADPARAM;
2279     }
2280 
2281     std::string key(option);
2282 
2283     if (key == "hmac iteration count") {
2284         // Speed up the test suite by reducing the SHA1 hmac calculations
2285         // from 4k to 10
2286         if (getenv("MEMCACHED_UNIT_TESTS") != nullptr) {
2287             *result = "10";
2288             *len = 2;
2289             return CBSASL_OK;
2290         }
2291     } else if (key == "sasl mechanisms") {
2292         const auto& value = settings.getSaslMechanisms();
2293         if (!value.empty()) {
2294             *result = value.data();
2295             *len = static_cast<unsigned int>(value.size());
2296             return CBSASL_OK;
2297         }
2298     }
2299 
2300     return CBSASL_FAIL;
2301 }
2302 
initialize_sasl()2303 static void initialize_sasl() {
2304     cb::sasl::logging::set_log_callback(sasl_log_callback);
2305 
2306     cbsasl_callback_t sasl_callbacks[2];
2307     int ii = 0;
2308 
2309     sasl_callbacks[ii].id = CBSASL_CB_GETOPT;
2310     sasl_callbacks[ii].proc = (int (*)(void))&sasl_getopt_callback;
2311     sasl_callbacks[ii].context = nullptr;
2312     sasl_callbacks[++ii].id = CBSASL_CB_LIST_END;
2313     sasl_callbacks[ii].proc = nullptr;
2314     sasl_callbacks[ii].context = nullptr;
2315 
2316     if (cbsasl_server_init(sasl_callbacks, "memcached") != CBSASL_OK) {
2317         FATAL_ERROR(EXIT_FAILURE, "Failed to initialize SASL server");
2318     }
2319 
2320     if (cb::sasl::plain::authenticate("default", "") == CBSASL_OK) {
2321         set_default_bucket_enabled(true);
2322     } else {
2323         set_default_bucket_enabled(false);
2324     }
2325 }
2326 
memcached_main(int argc, char **argv)2327 extern "C" int memcached_main(int argc, char **argv) {
2328     // MB-14649 log() crash on windows on some CPU's
2329 #ifdef _WIN64
2330     _set_FMA3_enable (0);
2331 #endif
2332 
2333 #ifdef HAVE_LIBNUMA
2334     // Configure NUMA policy as soon as possible (before any dynamic memory
2335     // allocation).
2336     const std::string numa_status = configure_numa_policy();
2337 #endif
2338     std::unique_ptr<ParentMonitor> parent_monitor;
2339 
2340     try {
2341         cb::logger::createConsoleLogger();
2342         settings.extensions.logger = &cb::logger::getLoggerDescriptor();
2343     } catch (const std::exception& e) {
2344         std::cerr << "Failed to create logger object: " << e.what()
2345                   << std::endl;
2346         exit(EXIT_FAILURE);
2347     }
2348 
2349     // Setup terminate handler as early as possible to catch crashes
2350     // occurring during initialisation.
2351     install_backtrace_terminate_handler();
2352 
2353     setup_libevent_locking();
2354 
2355     initialize_openssl();
2356 
2357     /* Initialize the socket subsystem */
2358     cb_initialize_sockets();
2359 
2360     AllocHooks::initialize();
2361 
2362     /* init settings */
2363     settings_init();
2364 
2365     initialize_mbcp_lookup_map();
2366 
2367     /* Parse command line arguments */
2368     try {
2369         parse_arguments(argc, argv);
2370     } catch (const std::exception& exception) {
2371         FATAL_ERROR(EXIT_FAILURE,
2372                     "Failed to initialize server: {}",
2373                     exception.what());
2374     }
2375 
2376     update_settings_from_config();
2377 
2378     cb::rbac::initialize();
2379 
2380     if (getenv("COUCHBASE_FORCE_ENABLE_XATTR") != nullptr) {
2381         settings.setXattrEnabled(true);
2382     }
2383 
2384     /* Initialize breakpad crash catcher with our just-parsed settings. */
2385     cb::breakpad::initialize(settings.getBreakpadSettings());
2386 
2387     /* Configure file logger, if specified as a settings object */
2388     if (settings.has.logger) {
2389         auto ret = cb::logger::initialize(settings.getLoggerConfig());
2390         if (ret) {
2391             FATAL_ERROR(
2392                     EXIT_FAILURE, "Failed to initialize logger: {}", ret.get());
2393         }
2394     }
2395 
2396     /* File-based logging available from this point onwards... */
2397 
2398     /* Logging available now extensions have been loaded. */
2399     LOG_INFO("Couchbase version {} starting.", get_server_version());
2400 
2401     LOG_INFO("Using SLA configuration: {}",
2402              to_string(cb::mcbp::sla::to_json(), false));
2403 
2404     if (settings.isStdinListenerEnabled()) {
2405         LOG_INFO("Enable standard input listener");
2406         start_stdin_listener(shutdown_server);
2407     }
2408 
2409 #ifdef HAVE_LIBNUMA
2410     // Log the NUMA policy selected (now the logger is available).
2411     LOG_INFO("NUMA: {}", numa_status);
2412 #endif
2413 
2414     if (!settings.has.rbac_file) {
2415         FATAL_ERROR(EXIT_FAILURE, "RBAC file not specified");
2416     }
2417 
2418     if (!cb::io::isFile(settings.getRbacFile())) {
2419         FATAL_ERROR(EXIT_FAILURE,
2420                     "RBAC [{}] does not exist",
2421                     settings.getRbacFile());
2422     }
2423 
2424     LOG_INFO("Loading RBAC configuration from [{}]", settings.getRbacFile());
2425     cb::rbac::loadPrivilegeDatabase(settings.getRbacFile());
2426 
2427     LOG_INFO("Loading error maps from [{}]", settings.getErrorMapsDir());
2428     try {
2429         settings.loadErrorMaps(settings.getErrorMapsDir());
2430     } catch (const std::exception& e) {
2431         FATAL_ERROR(EXIT_FAILURE, "Failed to load error maps: {}", e.what());
2432     }
2433 
2434     initialize_audit();
2435 
2436     /* inform interested parties of initial verbosity level */
2437     perform_callbacks(ON_LOG_LEVEL, NULL, NULL);
2438 
2439     set_max_filehandles();
2440 
2441     /* Aggregate the maximum number of connections */
2442     settings.calculateMaxconns();
2443 
2444     {
2445         char *errmsg;
2446         if (!initialize_engine_map(&errmsg)) {
2447             FATAL_ERROR(EXIT_FAILURE,
2448                         "Unable to initialize engine "
2449                         "map: {}",
2450                         errmsg);
2451         }
2452     }
2453 
2454     /* Initialize bucket engine */
2455     initialize_buckets();
2456 
2457     initialize_sasl();
2458 
2459     /* initialize main thread libevent instance */
2460     main_base = event_base_new();
2461 
2462     /* Initialize signal handlers (requires libevent). */
2463     if (!install_signal_handlers()) {
2464         FATAL_ERROR(EXIT_FAILURE, "Unable to install signal handlers");
2465     }
2466 
2467     /* initialize other stuff */
2468     stats_init();
2469 
2470 #ifndef WIN32
2471     /*
2472      * ignore SIGPIPE signals; we can use errno == EPIPE if we
2473      * need that information
2474      */
2475     if (sigignore(SIGPIPE) == -1) {
2476         FATAL_ERROR(EX_OSERR, "Failed to ignore SIGPIPE; sigaction");
2477     }
2478 #endif
2479 
2480     /* create the listening socket(s), bind, and init. Note this is done
2481      * before starting worker threads; so _if_ any required sockets fail then
2482      * terminating is simpler as we don't need to shutdown the workers.
2483      */
2484     create_listen_sockets(true);
2485 
2486     /* start up worker threads if MT mode */
2487     thread_init(settings.getNumWorkerThreads(), main_base, dispatch_event_handler);
2488 
2489     executorPool.reset(new ExecutorPool(settings.getNumWorkerThreads()));
2490 
2491     initializeTracing();
2492     TRACE_GLOBAL0("memcached", "Started");
2493 
2494     /*
2495      * MB-20034.
2496      * Now that all threads have been created, e.g. the audit thread, threads
2497      * associated with extensions and the workers, we can enable shutdown.
2498      */
2499     enable_shutdown();
2500 
2501     /* Initialise memcached time keeping */
2502     mc_time_init(main_base);
2503 
2504     /* Optional parent monitor */
2505     char *env = getenv("MEMCACHED_PARENT_MONITOR");
2506     if (env != NULL) {
2507         LOG_INFO("Starting parent monitor");
2508         parent_monitor.reset(new ParentMonitor(std::stoi(env)));
2509     }
2510 
2511     if (!memcached_shutdown) {
2512         /* enter the event loop */
2513         LOG_INFO("Initialization complete. Accepting clients.");
2514         service_online = true;
2515         event_base_loop(main_base, 0);
2516         service_online = false;
2517     }
2518 
2519     LOG_INFO("Initiating graceful shutdown.");
2520     delete_all_buckets();
2521 
2522     if (parent_monitor.get() != nullptr) {
2523         LOG_INFO("Shutting down parent monitor");
2524         parent_monitor.reset();
2525     }
2526 
2527     LOG_INFO("Shutting down audit daemon");
2528     /* Close down the audit daemon cleanly */
2529     shutdown_auditdaemon(get_audit_handle());
2530 
2531     LOG_INFO("Shutting down client worker threads");
2532     threads_shutdown();
2533 
2534     LOG_INFO("Releasing server sockets");
2535     listen_conn.clear();
2536 
2537     LOG_INFO("Releasing client resources");
2538     close_all_connections();
2539 
2540     LOG_INFO("Releasing bucket resources");
2541     cleanup_buckets();
2542 
2543     LOG_INFO("Shutting down RBAC subsystem");
2544     cb::rbac::destroy();
2545 
2546     LOG_INFO("Releasing thread resources");
2547     threads_cleanup();
2548 
2549     LOG_INFO("Shutting down executor pool");
2550     executorPool.reset();
2551 
2552     LOG_INFO("Releasing signal handlers");
2553     release_signal_handlers();
2554 
2555     LOG_INFO("Shutting down SASL server");
2556     cbsasl_server_term();
2557 
2558     LOG_INFO("Releasing connection objects");
2559     destroy_connections();
2560 
2561     LOG_INFO("Deinitialising tracing");
2562     deinitializeTracing();
2563 
2564     LOG_INFO("Shutting down engine map");
2565     shutdown_engine_map();
2566 
2567     LOG_INFO("Removing breakpad");
2568     cb::breakpad::destroy();
2569 
2570     LOG_INFO("Releasing callbacks");
2571     free_callbacks();
2572 
2573     LOG_INFO("Shutting down OpenSSL");
2574     shutdown_openssl();
2575 
2576     LOG_INFO("Shutting down libevent");
2577     event_base_free(main_base);
2578 
2579     LOG_INFO("Shutting down logger extension");
2580     // drop my handle to the logger
2581     cb::logger::reset();
2582     cb::logger::shutdown();
2583 
2584     return EXIT_SUCCESS;
2585 }
2586