1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "config.h"
3
4#include <stdlib.h>
5#include <stdio.h>
6#include <string.h>
7#include <errno.h>
8#include <ctype.h>
9#include <unistd.h>
10#include <stddef.h>
11#include <inttypes.h>
12
13#include "default_engine_internal.h"
14#include "memcached/util.h"
15#include "memcached/config_parser.h"
16#include <platform/cb_malloc.h>
17#include "engines/default_engine.h"
18#include "engine_manager.h"
19
20// The default engine don't really use vbucket uuids, but in order
21// to run the unit tests and verify that we correctly convert the
22// vbucket uuid to network byte order it is nice to have a value
23// we may use for testing ;)
24#define DEFAULT_ENGINE_VBUCKET_UUID 0xdeadbeef
25
26static ENGINE_ERROR_CODE default_initialize(
27        gsl::not_null<ENGINE_HANDLE*> handle, const char* config_str);
28static void default_destroy(gsl::not_null<ENGINE_HANDLE*> handle,
29                            const bool force);
30static cb::EngineErrorItemPair default_item_allocate(
31        gsl::not_null<ENGINE_HANDLE*> handle,
32        gsl::not_null<const void*> cookie,
33        const DocKey& key,
34        const size_t nbytes,
35        const int flags,
36        const rel_time_t exptime,
37        uint8_t datatype,
38        uint16_t vbucket);
39static std::pair<cb::unique_item_ptr, item_info> default_item_allocate_ex(
40        gsl::not_null<ENGINE_HANDLE*> handle,
41        gsl::not_null<const void*> cookie,
42        const DocKey& key,
43        size_t nbytes,
44        size_t priv_nbytes,
45        int flags,
46        rel_time_t exptime,
47        uint8_t datatype,
48        uint16_t vbucket);
49
50static ENGINE_ERROR_CODE default_item_delete(
51        gsl::not_null<ENGINE_HANDLE*> handle,
52        gsl::not_null<const void*> cookie,
53        const DocKey& key,
54        uint64_t& cas,
55        uint16_t vbucket,
56        mutation_descr_t& mut_info);
57
58static void default_item_release(gsl::not_null<ENGINE_HANDLE*> handle,
59                                 gsl::not_null<item*> item);
60static cb::EngineErrorItemPair default_get(gsl::not_null<ENGINE_HANDLE*> handle,
61                                           gsl::not_null<const void*> cookie,
62                                           const DocKey& key,
63                                           uint16_t vbucket,
64                                           DocStateFilter);
65
66static cb::EngineErrorItemPair default_get_if(
67        gsl::not_null<ENGINE_HANDLE*>,
68        gsl::not_null<const void*>,
69        const DocKey&,
70        uint16_t,
71        std::function<bool(const item_info&)>);
72
73static cb::EngineErrorItemPair default_get_and_touch(
74        gsl::not_null<ENGINE_HANDLE*> handle,
75        gsl::not_null<const void*> cookie,
76        const DocKey& key,
77        uint16_t vbucket,
78        uint32_t expiry_time);
79
80static cb::EngineErrorItemPair default_get_locked(
81        gsl::not_null<ENGINE_HANDLE*> handle,
82        gsl::not_null<const void*> cookie,
83        const DocKey& key,
84        uint16_t vbucket,
85        uint32_t lock_timeout);
86
87static cb::EngineErrorMetadataPair default_get_meta(
88        gsl::not_null<ENGINE_HANDLE*> handle,
89        gsl::not_null<const void*> cookie,
90        const DocKey& key,
91        uint16_t vbucket);
92
93static ENGINE_ERROR_CODE default_unlock(gsl::not_null<ENGINE_HANDLE*> handle,
94                                        gsl::not_null<const void*> cookie,
95                                        const DocKey& key,
96                                        uint16_t vbucket,
97                                        uint64_t cas);
98static ENGINE_ERROR_CODE default_get_stats(gsl::not_null<ENGINE_HANDLE*> handle,
99                                           gsl::not_null<const void*> cookie,
100                                           cb::const_char_buffer key,
101                                           ADD_STAT add_stat);
102static void default_reset_stats(gsl::not_null<ENGINE_HANDLE*> handle,
103                                gsl::not_null<const void*> cookie);
104static ENGINE_ERROR_CODE default_store(gsl::not_null<ENGINE_HANDLE*> handle,
105                                       gsl::not_null<const void*> cookie,
106                                       gsl::not_null<item*> item,
107                                       uint64_t& cas,
108                                       ENGINE_STORE_OPERATION operation,
109                                       DocumentState);
110
111static cb::EngineErrorCasPair default_store_if(
112        gsl::not_null<ENGINE_HANDLE*> handle,
113        gsl::not_null<const void*> cookie,
114        gsl::not_null<item*> item_,
115        uint64_t cas,
116        ENGINE_STORE_OPERATION operation,
117        cb::StoreIfPredicate predicate,
118        DocumentState document_state);
119
120static void item_set_datatype(gsl::not_null<ENGINE_HANDLE*> handle,
121                              gsl::not_null<item*> item,
122                              protocol_binary_datatype_t val);
123
124static ENGINE_ERROR_CODE default_flush(gsl::not_null<ENGINE_HANDLE*> handle,
125                                       gsl::not_null<const void*> cookie);
126static ENGINE_ERROR_CODE initalize_configuration(struct default_engine *se,
127                                                 const char *cfg_str);
128static ENGINE_ERROR_CODE default_unknown_command(
129        gsl::not_null<ENGINE_HANDLE*> handle,
130        const void* cookie,
131        gsl::not_null<protocol_binary_request_header*> request,
132        ADD_RESPONSE response,
133        DocNamespace doc_namespace);
134
135union vbucket_info_adapter {
136    char c;
137    struct vbucket_info v;
138};
139
140static void set_vbucket_state(struct default_engine *e,
141                              uint16_t vbid, vbucket_state_t to) {
142    union vbucket_info_adapter vi;
143    vi.c = e->vbucket_infos[vbid];
144    vi.v.state = to;
145    e->vbucket_infos[vbid] = vi.c;
146}
147
148static vbucket_state_t get_vbucket_state(struct default_engine *e,
149                                         uint16_t vbid) {
150    union vbucket_info_adapter vi;
151    vi.c = e->vbucket_infos[vbid];
152    return vbucket_state_t(vi.v.state);
153}
154
155static bool handled_vbucket(struct default_engine *e, uint16_t vbid) {
156    return e->config.ignore_vbucket
157        || (get_vbucket_state(e, vbid) == vbucket_state_active);
158}
159
160/* mechanism for handling bad vbucket requests */
161#define VBUCKET_GUARD(e, v) if (!handled_vbucket(e, v)) { return ENGINE_NOT_MY_VBUCKET; }
162
163static bool get_item_info(gsl::not_null<ENGINE_HANDLE*> handle,
164                          gsl::not_null<const item*> item,
165                          gsl::not_null<item_info*> item_info);
166
167static bool set_item_info(gsl::not_null<ENGINE_HANDLE*> handle,
168                          gsl::not_null<item*> item,
169                          gsl::not_null<const item_info*> itm_info);
170
171static bool is_xattr_supported(gsl::not_null<ENGINE_HANDLE*> handle);
172
173static BucketCompressionMode get_compression_mode(
174                               gsl::not_null<ENGINE_HANDLE*> handle);
175
176static float get_min_compression_ratio(gsl::not_null<ENGINE_HANDLE*> handle);
177
178static size_t get_max_item_size(gsl::not_null<ENGINE_HANDLE*> handle);
179
180/**
181 * Given that default_engine is implemented in C and not C++ we don't have
182 * a constructor for the struct to initialize the members to some sane
183 * default values. Currently they're all being allocated through the
184 * engine manager which keeps a local map of all engines being created.
185 *
186 * Once an object is in that map it may in theory be referenced, so we
187 * need to ensure that the members is initialized before hitting that map.
188 *
189 * @todo refactor default_engine to C++ to avoid this extra hack :)
190 */
191void default_engine_constructor(struct default_engine* engine, bucket_id_t id)
192{
193    memset(engine, 0, sizeof(*engine));
194
195    cb_mutex_initialize(&engine->slabs.lock);
196    cb_mutex_initialize(&engine->items.lock);
197    cb_mutex_initialize(&engine->stats.lock);
198    cb_mutex_initialize(&engine->scrubber.lock);
199
200    engine->bucket_id = id;
201    engine->engine.interface.interface = 1;
202    engine->engine.initialize = default_initialize;
203    engine->engine.destroy = default_destroy;
204    engine->engine.allocate = default_item_allocate;
205    engine->engine.allocate_ex = default_item_allocate_ex;
206    engine->engine.remove = default_item_delete;
207    engine->engine.release = default_item_release;
208    engine->engine.get = default_get;
209    engine->engine.get_if = default_get_if;
210    engine->engine.get_locked = default_get_locked;
211    engine->engine.get_meta = default_get_meta;
212    engine->engine.get_and_touch = default_get_and_touch;
213    engine->engine.unlock = default_unlock;
214    engine->engine.get_stats = default_get_stats;
215    engine->engine.reset_stats = default_reset_stats;
216    engine->engine.store = default_store;
217    engine->engine.store_if = default_store_if;
218    engine->engine.flush = default_flush;
219    engine->engine.unknown_command = default_unknown_command;
220    engine->engine.item_set_cas = item_set_cas;
221    engine->engine.item_set_datatype = item_set_datatype;
222    engine->engine.get_item_info = get_item_info;
223    engine->engine.set_item_info = set_item_info;
224    engine->engine.isXattrEnabled = is_xattr_supported;
225    engine->engine.getCompressionMode = get_compression_mode;
226    engine->engine.getMaxItemSize = get_max_item_size;
227    engine->engine.getMinCompressionRatio = get_min_compression_ratio;
228    engine->config.verbose = 0;
229    engine->config.oldest_live = 0;
230    engine->config.evict_to_free = true;
231    engine->config.maxbytes = 64 * 1024 * 1024;
232    engine->config.preallocate = false;
233    engine->config.factor = 1.25;
234    engine->config.chunk_size = 48;
235    engine->config.item_size_max= 1024 * 1024;
236    engine->config.xattr_enabled = true;
237    engine->config.compression_mode = BucketCompressionMode::Off;
238    engine->config.min_compression_ratio = default_min_compression_ratio;
239}
240
241extern "C" ENGINE_ERROR_CODE create_instance(uint64_t interface,
242                                             GET_SERVER_API get_server_api,
243                                             ENGINE_HANDLE **handle) {
244   SERVER_HANDLE_V1 *api = get_server_api();
245   struct default_engine *engine;
246
247   if (interface != 1 || api == NULL) {
248      return ENGINE_ENOTSUP;
249   }
250
251   if ((engine = engine_manager_create_engine()) == NULL) {
252      return ENGINE_ENOMEM;
253   }
254
255   engine->server = *api;
256   engine->get_server_api = get_server_api;
257   engine->initialized = true;
258   *handle = (ENGINE_HANDLE*)&engine->engine;
259   return ENGINE_SUCCESS;
260}
261
262extern "C" void destroy_engine() {
263    engine_manager_shutdown();
264    assoc_destroy();
265}
266
267static struct default_engine* get_handle(ENGINE_HANDLE* handle) {
268   return (struct default_engine*)handle;
269}
270
271static hash_item* get_real_item(item* item) {
272    return (hash_item*)item;
273}
274
275static ENGINE_ERROR_CODE default_initialize(
276        gsl::not_null<ENGINE_HANDLE*> handle, const char* config_str) {
277    struct default_engine* se = get_handle(handle);
278    ENGINE_ERROR_CODE ret = initalize_configuration(se, config_str);
279    if (ret != ENGINE_SUCCESS) {
280        return ret;
281    }
282
283    ret = assoc_init(se);
284    if (ret != ENGINE_SUCCESS) {
285        return ret;
286    }
287
288    ret = slabs_init(
289            se, se->config.maxbytes, se->config.factor, se->config.preallocate);
290    if (ret != ENGINE_SUCCESS) {
291        return ret;
292    }
293
294    return ENGINE_SUCCESS;
295}
296
297static void default_destroy(gsl::not_null<ENGINE_HANDLE*> handle,
298                            const bool force) {
299    (void)force;
300    engine_manager_delete_engine(get_handle(handle));
301}
302
303void destroy_engine_instance(struct default_engine* engine) {
304    if (engine->initialized) {
305        /* Destory the slabs cache */
306        slabs_destroy(engine);
307
308        cb_free(engine->config.uuid);
309
310        /* Clean up the mutexes */
311        cb_mutex_destroy(&engine->items.lock);
312        cb_mutex_destroy(&engine->stats.lock);
313        cb_mutex_destroy(&engine->slabs.lock);
314        cb_mutex_destroy(&engine->scrubber.lock);
315
316        engine->initialized = false;
317    }
318}
319
320static cb::EngineErrorItemPair default_item_allocate(
321        gsl::not_null<ENGINE_HANDLE*> handle,
322        gsl::not_null<const void*> cookie,
323        const DocKey& key,
324        const size_t nbytes,
325        const int flags,
326        const rel_time_t exptime,
327        uint8_t datatype,
328        uint16_t vbucket) {
329    try {
330        auto pair = default_item_allocate_ex(handle, cookie, key, nbytes,
331                                             0, // No privileged bytes
332                                             flags, exptime, datatype, vbucket);
333        return {cb::engine_errc::success, std::move(pair.first)};
334    } catch (const cb::engine_error& error) {
335        return cb::makeEngineErrorItemPair(
336                cb::engine_errc(error.code().value()));
337    }
338}
339
340static std::pair<cb::unique_item_ptr, item_info> default_item_allocate_ex(
341        gsl::not_null<ENGINE_HANDLE*> handle,
342        gsl::not_null<const void*> cookie,
343        const DocKey& key,
344        size_t nbytes,
345        size_t priv_nbytes,
346        int flags,
347        rel_time_t exptime,
348        uint8_t datatype,
349        uint16_t vbucket) {
350    hash_item *it;
351
352    unsigned int id;
353    struct default_engine* engine = get_handle(handle);
354
355    if (!handled_vbucket(engine, vbucket)) {
356        throw cb::engine_error(cb::engine_errc::not_my_vbucket,
357                               "default_item_allocate_ex");
358    }
359
360    size_t ntotal = sizeof(hash_item) + key.size() + nbytes;
361    id = slabs_clsid(engine, ntotal);
362    if (id == 0) {
363        throw cb::engine_error(cb::engine_errc::too_big,
364                               "default_item_allocate_ex: no slab class");
365    }
366
367    if ((nbytes - priv_nbytes) > engine->config.item_size_max) {
368        throw cb::engine_error(cb::engine_errc::too_big,
369                               "default_item_allocate_ex");
370    }
371
372    it = item_alloc(engine,
373                    key.data(),
374                    key.size(),
375                    flags,
376                    engine->server.core->realtime(exptime, cb::NoExpiryLimit),
377                    (uint32_t)nbytes,
378                    cookie,
379                    datatype);
380
381    if (it != NULL) {
382        item_info info;
383        if (!get_item_info(handle, it, &info)) {
384            // This should never happen (unless we provide invalid
385            // arguments)
386            item_release(engine, it);
387            throw cb::engine_error(cb::engine_errc::failed,
388                                   "default_item_allocate_ex");
389        }
390
391        return std::make_pair(cb::unique_item_ptr(it, cb::ItemDeleter{handle}),
392                              info);
393    } else {
394        throw cb::engine_error(cb::engine_errc::no_memory,
395                               "default_item_allocate_ex");
396    }
397}
398
399static ENGINE_ERROR_CODE default_item_delete(
400        gsl::not_null<ENGINE_HANDLE*> handle,
401        gsl::not_null<const void*> cookie,
402        const DocKey& key,
403        uint64_t& cas,
404        uint16_t vbucket,
405        mutation_descr_t& mut_info) {
406    struct default_engine* engine = get_handle(handle);
407    hash_item* it;
408    uint64_t cas_in = cas;
409    VBUCKET_GUARD(engine, vbucket);
410
411    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
412    do {
413        it = item_get(engine,
414                      cookie,
415                      key.data(),
416                      key.size(),
417                      DocStateFilter::Alive);
418        if (it == nullptr) {
419            return ENGINE_KEY_ENOENT;
420        }
421
422        if (it->locktime != 0 &&
423            it->locktime > engine->server.core->get_current_time()) {
424            if (cas_in != it->cas) {
425                item_release(engine, it);
426                return ENGINE_LOCKED;
427            }
428        }
429
430        auto* deleted = item_alloc(engine,
431                                   key.data(),
432                                   key.size(),
433                                   it->flags,
434                                   it->exptime,
435                                   0,
436                                   cookie,
437                                   PROTOCOL_BINARY_RAW_BYTES);
438
439        if (deleted == NULL) {
440            item_release(engine, it);
441            return ENGINE_TMPFAIL;
442        }
443
444        if (cas_in == 0) {
445            // If the caller specified the "cas wildcard" we should set
446            // the cas for the item we just fetched and do a cas
447            // replace with that value
448            item_set_cas(handle, deleted, it->cas);
449        } else {
450            // The caller specified a specific CAS value so we should
451            // use that value in our cas replace
452            item_set_cas(handle, deleted, cas_in);
453        }
454
455        ret = store_item(engine,
456                         deleted,
457                         &cas,
458                         OPERATION_CAS,
459                         cookie,
460                         DocumentState::Deleted);
461
462        item_release(engine, it);
463        item_release(engine, deleted);
464
465        // We should only retry for race conditions if the caller specified
466        // cas wildcard
467    } while (ret == ENGINE_KEY_EEXISTS && cas_in == 0);
468
469    // vbucket UUID / seqno arn't supported by default engine, so just return
470    // a hardcoded vbucket uuid, and zero for the sequence number.
471    mut_info.vbucket_uuid = DEFAULT_ENGINE_VBUCKET_UUID;
472    mut_info.seqno = 0;
473
474    return ret;
475}
476
477static void default_item_release(gsl::not_null<ENGINE_HANDLE*> handle,
478                                 gsl::not_null<item*> item) {
479    item_release(get_handle(handle), get_real_item(item));
480}
481
482static cb::EngineErrorItemPair default_get(gsl::not_null<ENGINE_HANDLE*> handle,
483                                           gsl::not_null<const void*> cookie,
484                                           const DocKey& key,
485                                           uint16_t vbucket,
486                                           DocStateFilter documentStateFilter) {
487    struct default_engine* engine = get_handle(handle);
488
489    if (!handled_vbucket(engine, vbucket)) {
490        return std::make_pair(
491                cb::engine_errc::not_my_vbucket,
492                cb::unique_item_ptr{nullptr, cb::ItemDeleter{handle}});
493    }
494
495    item* it = item_get(
496            engine, cookie, key.data(), key.size(), documentStateFilter);
497    if (it != nullptr) {
498        return cb::makeEngineErrorItemPair(
499                cb::engine_errc::success, it, handle);
500    } else {
501        return cb::makeEngineErrorItemPair(cb::engine_errc::no_such_key);
502    }
503}
504
505static cb::EngineErrorItemPair default_get_if(
506        gsl::not_null<ENGINE_HANDLE*> handle,
507        gsl::not_null<const void*> cookie,
508        const DocKey& key,
509        uint16_t vbucket,
510        std::function<bool(const item_info&)> filter) {
511    struct default_engine* engine = get_handle(handle);
512
513    if (!handled_vbucket(engine, vbucket)) {
514        return cb::makeEngineErrorItemPair(cb::engine_errc::not_my_vbucket);
515    }
516
517    cb::unique_item_ptr ret(item_get(engine,
518                                     cookie,
519                                     key.data(),
520                                     key.size(),
521                                     DocStateFilter::Alive),
522                            cb::ItemDeleter{handle});
523    if (!ret) {
524        return cb::makeEngineErrorItemPair(cb::engine_errc::no_such_key);
525    }
526
527    item_info info;
528    if (!get_item_info(handle, ret.get(), &info)) {
529        throw cb::engine_error(cb::engine_errc::failed,
530                               "default_get_if: get_item_info failed");
531    }
532
533    if (!filter(info)) {
534        ret.reset(nullptr);
535    }
536
537    return cb::makeEngineErrorItemPair(
538            cb::engine_errc::success, ret.release(), handle);
539}
540
541static cb::EngineErrorItemPair default_get_and_touch(
542        gsl::not_null<ENGINE_HANDLE*> handle,
543        gsl::not_null<const void*> cookie,
544        const DocKey& key,
545        uint16_t vbucket,
546        uint32_t expiry_time) {
547    struct default_engine* engine = get_handle(handle);
548
549    if (!handled_vbucket(engine, vbucket)) {
550        return cb::makeEngineErrorItemPair(cb::engine_errc::not_my_vbucket);
551    }
552
553    hash_item* it = nullptr;
554    auto ret = item_get_and_touch(
555            engine,
556            cookie,
557            &it,
558            key.data(),
559            key.size(),
560            engine->server.core->realtime(expiry_time, cb::NoExpiryLimit));
561
562    return cb::makeEngineErrorItemPair(
563            cb::engine_errc(ret), reinterpret_cast<item*>(it), handle);
564}
565
566static cb::EngineErrorItemPair default_get_locked(
567        gsl::not_null<ENGINE_HANDLE*> handle,
568        gsl::not_null<const void*> cookie,
569        const DocKey& key,
570        uint16_t vbucket,
571        uint32_t lock_timeout) {
572    auto* engine = get_handle(handle);
573
574    if (!handled_vbucket(engine, vbucket)) {
575        return cb::makeEngineErrorItemPair(cb::engine_errc::not_my_vbucket);
576    }
577
578    // memcached buckets don't offer any way for the user to configure
579    // the lock settings.
580    static const uint32_t default_lock_timeout = 15;
581    static const uint32_t max_lock_timeout = 30;
582
583    if (lock_timeout == 0 || lock_timeout > max_lock_timeout) {
584        lock_timeout = default_lock_timeout;
585    }
586
587    // Convert the lock timeout to an absolute time
588    lock_timeout += engine->server.core->get_current_time();
589
590    hash_item* it = nullptr;
591    auto ret = item_get_locked(engine, cookie, &it, key.data(), key.size(),
592                               lock_timeout);
593    return cb::makeEngineErrorItemPair(cb::engine_errc(ret), it, handle);
594}
595
596static cb::EngineErrorMetadataPair default_get_meta(
597        gsl::not_null<ENGINE_HANDLE*> handle,
598        gsl::not_null<const void*> cookie,
599        const DocKey& key,
600        uint16_t vbucket) {
601    auto* engine_handle = get_handle(handle);
602
603    if (!handled_vbucket(engine_handle, vbucket)) {
604        return std::make_pair(cb::engine_errc::not_my_vbucket, item_info());
605    }
606
607    cb::unique_item_ptr item{item_get(engine_handle,
608                                      cookie,
609                                      key.data(),
610                                      key.size(),
611                                      DocStateFilter::AliveOrDeleted),
612                             cb::ItemDeleter(handle)};
613
614    if (!item) {
615        return std::make_pair(cb::engine_errc::no_such_key, item_info());
616    }
617
618    item_info info;
619    if (!get_item_info(handle, item.get(), &info)) {
620        throw cb::engine_error(cb::engine_errc::failed,
621                               "default_get_if: get_item_info failed");
622    }
623
624    return std::make_pair(cb::engine_errc::success, info);
625}
626
627static ENGINE_ERROR_CODE default_unlock(gsl::not_null<ENGINE_HANDLE*> handle,
628                                        gsl::not_null<const void*> cookie,
629                                        const DocKey& key,
630                                        uint16_t vbucket,
631                                        uint64_t cas) {
632    auto* engine = get_handle(handle);
633    VBUCKET_GUARD(engine, vbucket);
634    return item_unlock(engine, cookie, key.data(), key.size(), cas);
635}
636
637static ENGINE_ERROR_CODE default_get_stats(gsl::not_null<ENGINE_HANDLE*> handle,
638                                           gsl::not_null<const void*> cookie,
639                                           cb::const_char_buffer key,
640                                           ADD_STAT add_stat) {
641    struct default_engine* engine = get_handle(handle);
642    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
643
644    if (key.empty()) {
645        char val[128];
646        int len;
647
648        cb_mutex_enter(&engine->stats.lock);
649        len = sprintf(val, "%" PRIu64, (uint64_t)engine->stats.evictions);
650        add_stat("evictions", 9, val, len, cookie);
651        len = sprintf(val, "%" PRIu64, (uint64_t)engine->stats.curr_items);
652        add_stat("curr_items", 10, val, len, cookie);
653        len = sprintf(val, "%" PRIu64, (uint64_t)engine->stats.total_items);
654        add_stat("total_items", 11, val, len, cookie);
655        len = sprintf(val, "%" PRIu64, (uint64_t)engine->stats.curr_bytes);
656        add_stat("bytes", 5, val, len, cookie);
657        len = sprintf(val, "%" PRIu64, engine->stats.reclaimed);
658        add_stat("reclaimed", 9, val, len, cookie);
659        len = sprintf(val, "%" PRIu64, (uint64_t)engine->config.maxbytes);
660        add_stat("engine_maxbytes", 15, val, len, cookie);
661        cb_mutex_exit(&engine->stats.lock);
662    } else if (key == "slabs"_ccb) {
663        slabs_stats(engine, add_stat, cookie);
664    } else if (key == "items"_ccb) {
665        item_stats(engine, add_stat, cookie);
666    } else if (key == "sizes"_ccb) {
667        item_stats_sizes(engine, add_stat, cookie);
668    } else if (key == "uuid"_ccb) {
669        if (engine->config.uuid) {
670            add_stat("uuid",
671                     4,
672                     engine->config.uuid,
673                     (uint32_t)strlen(engine->config.uuid),
674                     cookie);
675        } else {
676            add_stat("uuid", 4, "", 0, cookie);
677        }
678    } else if (key == "scrub"_ccb) {
679        char val[128];
680        int len;
681
682        cb_mutex_enter(&engine->scrubber.lock);
683        if (engine->scrubber.running) {
684            add_stat("scrubber:status", 15, "running", 7, cookie);
685        } else {
686            add_stat("scrubber:status", 15, "stopped", 7, cookie);
687        }
688
689        if (engine->scrubber.started != 0) {
690            if (engine->scrubber.stopped != 0) {
691                time_t diff =
692                        engine->scrubber.started - engine->scrubber.stopped;
693                len = sprintf(val, "%" PRIu64, (uint64_t)diff);
694                add_stat("scrubber:last_run", 17, val, len, cookie);
695            }
696
697            len = sprintf(val, "%" PRIu64, engine->scrubber.visited);
698            add_stat("scrubber:visited", 16, val, len, cookie);
699            len = sprintf(val, "%" PRIu64, engine->scrubber.cleaned);
700            add_stat("scrubber:cleaned", 16, val, len, cookie);
701        }
702        cb_mutex_exit(&engine->scrubber.lock);
703    } else {
704        ret = ENGINE_KEY_ENOENT;
705    }
706
707    return ret;
708}
709
710static ENGINE_ERROR_CODE default_store(gsl::not_null<ENGINE_HANDLE*> handle,
711                                       gsl::not_null<const void*> cookie,
712                                       gsl::not_null<item*> item,
713                                       uint64_t& cas,
714                                       ENGINE_STORE_OPERATION operation,
715                                       DocumentState document_state) {
716    auto* engine = get_handle(handle);
717    auto& config = engine->config;
718    auto* it = get_real_item(item);
719
720    if (document_state == DocumentState::Deleted && !config.keep_deleted) {
721        return safe_item_unlink(engine, it);
722    }
723
724    return store_item(engine, it, &cas, operation, cookie, document_state);
725}
726
727static cb::EngineErrorCasPair default_store_if(
728        gsl::not_null<ENGINE_HANDLE*> handle,
729        gsl::not_null<const void*> cookie,
730        gsl::not_null<item*> item,
731        uint64_t cas,
732        ENGINE_STORE_OPERATION operation,
733        cb::StoreIfPredicate predicate,
734        DocumentState document_state) {
735    struct default_engine* engine = get_handle(handle);
736
737    if (predicate) {
738        // Check for an existing item and call the item predicate on it.
739        auto* it = get_real_item(item);
740        auto* key = item_get_key(it);
741        if (!key) {
742            throw cb::engine_error(cb::engine_errc::failed,
743                                   "default_store_if: item_get_key failed");
744        }
745        cb::unique_item_ptr existing(
746                item_get(engine, cookie, *key, DocStateFilter::Alive),
747                cb::ItemDeleter{handle});
748
749        cb::StoreIfStatus status;
750        if (existing.get()) {
751            item_info info;
752            if (!get_item_info(handle, existing.get(), &info)) {
753                throw cb::engine_error(
754                        cb::engine_errc::failed,
755                        "default_store_if: get_item_info failed");
756            }
757            status = predicate(info, {true});
758        } else {
759            status = predicate(boost::none, {true});
760        }
761
762        switch (status) {
763        case cb::StoreIfStatus::Fail: {
764            return {cb::engine_errc::predicate_failed, 0};
765        }
766        case cb::StoreIfStatus::Continue:
767        case cb::StoreIfStatus::GetItemInfo: {
768            break;
769        }
770        }
771    }
772
773    auto* it = get_real_item(item);
774    auto status =
775            store_item(engine, it, &cas, operation, cookie, document_state);
776    return {cb::engine_errc(status), cas};
777}
778
779static ENGINE_ERROR_CODE default_flush(gsl::not_null<ENGINE_HANDLE*> handle,
780                                       gsl::not_null<const void*> cookie) {
781    item_flush_expired(get_handle(handle));
782
783    return ENGINE_SUCCESS;
784}
785
786static void default_reset_stats(gsl::not_null<ENGINE_HANDLE*> handle,
787                                gsl::not_null<const void*> cookie) {
788    struct default_engine* engine = get_handle(handle);
789    item_stats_reset(engine);
790
791    cb_mutex_enter(&engine->stats.lock);
792    engine->stats.evictions = 0;
793    engine->stats.reclaimed = 0;
794    engine->stats.total_items = 0;
795    cb_mutex_exit(&engine->stats.lock);
796}
797
798static ENGINE_ERROR_CODE initalize_configuration(struct default_engine *se,
799                                                 const char *cfg_str) {
800   ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
801
802   se->config.vb0 = true;
803
804   if (cfg_str != NULL) {
805       struct config_item items[13];
806       int ii = 0;
807
808       memset(&items, 0, sizeof(items));
809       items[ii].key = "verbose";
810       items[ii].datatype = DT_SIZE;
811       items[ii].value.dt_size = &se->config.verbose;
812       ++ii;
813
814       items[ii].key = "eviction";
815       items[ii].datatype = DT_BOOL;
816       items[ii].value.dt_bool = &se->config.evict_to_free;
817       ++ii;
818
819       items[ii].key = "cache_size";
820       items[ii].datatype = DT_SIZE;
821       items[ii].value.dt_size = &se->config.maxbytes;
822       ++ii;
823
824       items[ii].key = "preallocate";
825       items[ii].datatype = DT_BOOL;
826       items[ii].value.dt_bool = &se->config.preallocate;
827       ++ii;
828
829       items[ii].key = "factor";
830       items[ii].datatype = DT_FLOAT;
831       items[ii].value.dt_float = &se->config.factor;
832       ++ii;
833
834       items[ii].key = "chunk_size";
835       items[ii].datatype = DT_SIZE;
836       items[ii].value.dt_size = &se->config.chunk_size;
837       ++ii;
838
839       items[ii].key = "item_size_max";
840       items[ii].datatype = DT_SIZE;
841       items[ii].value.dt_size = &se->config.item_size_max;
842       ++ii;
843
844       items[ii].key = "ignore_vbucket";
845       items[ii].datatype = DT_BOOL;
846       items[ii].value.dt_bool = &se->config.ignore_vbucket;
847       ++ii;
848
849       items[ii].key = "vb0";
850       items[ii].datatype = DT_BOOL;
851       items[ii].value.dt_bool = &se->config.vb0;
852       ++ii;
853
854       items[ii].key = "config_file";
855       items[ii].datatype = DT_CONFIGFILE;
856       ++ii;
857
858       items[ii].key = "uuid";
859       items[ii].datatype = DT_STRING;
860       items[ii].value.dt_string = &se->config.uuid;
861       ++ii;
862
863       items[ii].key = "keep_deleted";
864       items[ii].datatype = DT_BOOL;
865       items[ii].value.dt_bool = &se->config.keep_deleted;
866       ++ii;
867
868       items[ii].key = NULL;
869       ++ii;
870       cb_assert(ii == 13);
871       ret = ENGINE_ERROR_CODE(se->server.core->parse_config(cfg_str,
872                                                             items,
873                                                             stderr));
874   }
875
876   if (se->config.vb0) {
877       set_vbucket_state(se, 0, vbucket_state_active);
878   }
879
880   return ret;
881}
882
883static bool set_vbucket(struct default_engine *e,
884                        const void* cookie,
885                        protocol_binary_request_set_vbucket *req,
886                        ADD_RESPONSE response) {
887    vbucket_state_t state;
888    size_t bodylen = ntohl(req->message.header.request.bodylen)
889        - ntohs(req->message.header.request.keylen);
890    if (bodylen != sizeof(vbucket_state_t)) {
891        const char *msg = "Incorrect packet format";
892        return response(NULL, 0, NULL, 0, msg, (uint32_t)strlen(msg),
893                        PROTOCOL_BINARY_RAW_BYTES,
894                        PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
895    }
896    memcpy(&state, &req->message.body.state, sizeof(state));
897    state = vbucket_state_t(ntohl(state));
898
899    if (!is_valid_vbucket_state_t(state)) {
900        const char *msg = "Invalid vbucket state";
901        return response(NULL, 0, NULL, 0, msg, (uint32_t)strlen(msg),
902                        PROTOCOL_BINARY_RAW_BYTES,
903                        PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
904    }
905
906    set_vbucket_state(e, ntohs(req->message.header.request.vbucket), state);
907    return response(NULL, 0, NULL, 0, &state, sizeof(state),
908                    PROTOCOL_BINARY_RAW_BYTES,
909                    PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
910}
911
912static bool get_vbucket(struct default_engine *e,
913                        const void* cookie,
914                        protocol_binary_request_get_vbucket *req,
915                        ADD_RESPONSE response) {
916    vbucket_state_t state;
917    state = get_vbucket_state(e, ntohs(req->message.header.request.vbucket));
918    state = vbucket_state_t(ntohl(state));
919
920    return response(NULL, 0, NULL, 0, &state, sizeof(state),
921                    PROTOCOL_BINARY_RAW_BYTES,
922                    PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
923}
924
925static bool rm_vbucket(struct default_engine *e,
926                       const void *cookie,
927                       protocol_binary_request_header *req,
928                       ADD_RESPONSE response) {
929    set_vbucket_state(e, ntohs(req->request.vbucket), vbucket_state_dead);
930    return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
931                    PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
932}
933
934static bool scrub_cmd(struct default_engine *e,
935                      const void *cookie,
936                      protocol_binary_request_header *request,
937                      ADD_RESPONSE response) {
938
939    protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
940    if (!item_start_scrub(e)) {
941        res = PROTOCOL_BINARY_RESPONSE_EBUSY;
942    }
943
944    return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
945                    res, 0, cookie);
946}
947
948/**
949 * set_param only added to allow per bucket xattr on/off
950 * and toggle between compression modes for testing purposes
951 */
952static bool set_param(struct default_engine* e,
953                      const void* cookie,
954                      protocol_binary_request_set_param* req,
955                      ADD_RESPONSE response) {
956    size_t keylen = ntohs(req->message.header.request.keylen);
957    uint8_t extlen = req->message.header.request.extlen;
958    size_t vallen = ntohl(req->message.header.request.bodylen);
959    protocol_binary_engine_param_t paramtype =
960            static_cast<protocol_binary_engine_param_t>(
961                    ntohl(req->message.body.param_type));
962
963    if (keylen == 0 || (vallen - keylen - extlen) == 0) {
964        return false;
965    }
966
967    // Only support protocol_binary_engine_param_flush with xattr_enabled
968    if (paramtype == protocol_binary_engine_param_flush) {
969        const char* keyp =
970                reinterpret_cast<const char*>(req->bytes) + sizeof(req->bytes);
971        const char* valuep = keyp + keylen;
972        vallen -= (keylen + extlen);
973        cb::const_char_buffer key(keyp, keylen);
974        cb::const_char_buffer value(valuep, vallen);
975
976        if (key == "xattr_enabled") {
977            if (value == "true") {
978                e->config.xattr_enabled = true;
979            } else if (value == "false") {
980                e->config.xattr_enabled = false;
981            } else {
982                return false;
983            }
984        } else if (key == "compression_mode") {
985            try {
986                e->config.compression_mode = parseCompressionMode(
987                        std::string(value.data(), value.size()));
988            } catch (std::invalid_argument&) {
989                return false;
990            }
991        } else if (key == "min_compression_ratio") {
992            std::string value_str{value.data(), value.size()};
993            float min_comp_ratio;
994            if (!safe_strtof(value_str.c_str(), min_comp_ratio)) {
995                return false;
996            }
997
998            e->config.min_compression_ratio = min_comp_ratio;
999        }
1000
1001        return response(NULL,
1002                        0,
1003                        NULL,
1004                        0,
1005                        NULL,
1006                        0,
1007                        PROTOCOL_BINARY_RAW_BYTES,
1008                        PROTOCOL_BINARY_RESPONSE_SUCCESS,
1009                        0,
1010                        cookie);
1011    }
1012    return false;
1013}
1014
1015static ENGINE_ERROR_CODE default_unknown_command(
1016        gsl::not_null<ENGINE_HANDLE*> handle,
1017        const void* cookie,
1018        gsl::not_null<protocol_binary_request_header*> request,
1019        ADD_RESPONSE response,
1020        DocNamespace doc_namespace) {
1021    struct default_engine* e = get_handle(handle);
1022    bool sent;
1023
1024    switch(request->request.opcode) {
1025    case PROTOCOL_BINARY_CMD_SCRUB:
1026        sent = scrub_cmd(e, cookie, request, response);
1027        break;
1028    case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1029        sent = rm_vbucket(e, cookie, request, response);
1030        break;
1031    case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1032        sent = set_vbucket(
1033                e,
1034                cookie,
1035                reinterpret_cast<protocol_binary_request_set_vbucket*>(
1036                        request.get()),
1037                response);
1038        break;
1039    case PROTOCOL_BINARY_CMD_GET_VBUCKET:
1040        sent = get_vbucket(
1041                e,
1042                cookie,
1043                reinterpret_cast<protocol_binary_request_get_vbucket*>(
1044                        request.get()),
1045                response);
1046        break;
1047    case PROTOCOL_BINARY_CMD_SET_PARAM:
1048        sent = set_param(e,
1049                         cookie,
1050                         reinterpret_cast<protocol_binary_request_set_param*>(
1051                                 request.get()),
1052                         response);
1053        break;
1054    default:
1055        sent = response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
1056                        PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, 0, cookie);
1057        break;
1058    }
1059
1060    if (sent) {
1061        return ENGINE_SUCCESS;
1062    } else {
1063        return ENGINE_FAILED;
1064    }
1065}
1066
1067void item_set_cas(gsl::not_null<ENGINE_HANDLE*> handle,
1068                  gsl::not_null<item*> item,
1069                  uint64_t val) {
1070    hash_item* it = get_real_item(item);
1071    it->cas = val;
1072}
1073
1074static void item_set_datatype(gsl::not_null<ENGINE_HANDLE*> handle,
1075                              gsl::not_null<item*> item,
1076                              protocol_binary_datatype_t val) {
1077    auto* it = reinterpret_cast<hash_item*>(item.get());
1078    it->datatype = val;
1079}
1080
1081hash_key* item_get_key(const hash_item* item)
1082{
1083    const char *ret = reinterpret_cast<const char*>(item + 1);
1084    return (hash_key*)ret;
1085}
1086
1087char* item_get_data(const hash_item* item)
1088{
1089    const hash_key* key = item_get_key(item);
1090    return ((char*)key->header.full_key) + hash_key_get_key_len(key);
1091}
1092
1093static bool get_item_info(gsl::not_null<ENGINE_HANDLE*> handle,
1094                          gsl::not_null<const item*> item,
1095                          gsl::not_null<item_info*> item_info) {
1096    auto* it = reinterpret_cast<const hash_item*>(item.get());
1097    const hash_key* key = item_get_key(it);
1098    auto* engine = get_handle(handle);
1099
1100    // This may potentially open up for a race, but:
1101    // 1) If the item isn't linked anymore we don't need to mask
1102    //    the CAS anymore. (if the client tries to use that
1103    //    CAS it'll fail with an invalid cas)
1104    // 2) In production the memcached buckets don't use the
1105    //    ZOMBIE state (and if we start doing that, it is only
1106    //    the owner of the item pointer (the one bumping the
1107    //    refcount initially) which would change this. Anyone else
1108    //    would create a new item object and set the iflag
1109    //    to deleted.
1110    const auto iflag = it->iflag.load(std::memory_order_relaxed);
1111
1112    if ((iflag & ITEM_LINKED) && it->locktime != 0 &&
1113        it->locktime > engine->server.core->get_current_time()) {
1114        // This object is locked. According to docs/Document.md we should
1115        // return -1 in such cases to hide the real CAS for the other clients
1116        // (Note the check on ITEM_LINKED.. for the actual item returned by
1117        // get_locked we return an item which isn't linked (copy of the
1118        // linked item) to allow returning the real CAS.
1119        item_info->cas = uint64_t(-1);
1120    } else {
1121        item_info->cas = it->cas;
1122    }
1123
1124    item_info->vbucket_uuid = DEFAULT_ENGINE_VBUCKET_UUID;
1125    item_info->seqno = 0;
1126    if (it->exptime == 0) {
1127        item_info->exptime = 0;
1128    } else {
1129        item_info->exptime = engine->server.core->abstime(it->exptime);
1130    }
1131    item_info->nbytes = it->nbytes;
1132    item_info->flags = it->flags;
1133    item_info->nkey = hash_key_get_client_key_len(key);
1134    item_info->key = hash_key_get_client_key(key);
1135    item_info->value[0].iov_base = item_get_data(it);
1136    item_info->value[0].iov_len = it->nbytes;
1137    item_info->datatype = it->datatype;
1138    if (iflag & ITEM_ZOMBIE) {
1139        item_info->document_state = DocumentState::Deleted;
1140    } else {
1141        item_info->document_state = DocumentState::Alive;
1142    }
1143    return true;
1144}
1145
1146static bool set_item_info(gsl::not_null<ENGINE_HANDLE*> handle,
1147                          gsl::not_null<item*> item,
1148                          gsl::not_null<const item_info*> itm_info) {
1149    auto* it = reinterpret_cast<hash_item*>(item.get());
1150    it->datatype = itm_info->datatype;
1151    return true;
1152}
1153
1154static bool is_xattr_supported(gsl::not_null<ENGINE_HANDLE*> handle) {
1155    return get_handle(handle)->config.xattr_enabled;
1156}
1157
1158static BucketCompressionMode get_compression_mode(
1159                              gsl::not_null<ENGINE_HANDLE*> handle) {
1160    return get_handle(handle)->config.compression_mode;
1161}
1162
1163static float get_min_compression_ratio(
1164                             gsl::not_null<ENGINE_HANDLE*> handle) {
1165    return get_handle(handle)->config.min_compression_ratio;
1166}
1167
1168static size_t get_max_item_size(
1169                             gsl::not_null<ENGINE_HANDLE*> handle) {
1170    return get_handle(handle)->config.item_size_max;
1171}
1172