1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include "config.h"
3 
4 #include <stdlib.h>
5 #include <stdio.h>
6 #include <string.h>
7 #include <errno.h>
8 #include <ctype.h>
9 #include <unistd.h>
10 #include <stddef.h>
11 #include <inttypes.h>
12 
13 #include "default_engine_internal.h"
14 #include "memcached/util.h"
15 #include "memcached/config_parser.h"
16 #include <platform/cb_malloc.h>
17 #include "engines/default_engine.h"
18 #include "engine_manager.h"
19 
20 // The default engine don't really use vbucket uuids, but in order
21 // to run the unit tests and verify that we correctly convert the
22 // vbucket uuid to network byte order it is nice to have a value
23 // we may use for testing ;)
24 #define DEFAULT_ENGINE_VBUCKET_UUID 0xdeadbeef
25 
26 static ENGINE_ERROR_CODE default_initialize(
27         gsl::not_null<ENGINE_HANDLE*> handle, const char* config_str);
28 static void default_destroy(gsl::not_null<ENGINE_HANDLE*> handle,
29                             const bool force);
30 static cb::EngineErrorItemPair default_item_allocate(
31         gsl::not_null<ENGINE_HANDLE*> handle,
32         gsl::not_null<const void*> cookie,
33         const DocKey& key,
34         const size_t nbytes,
35         const int flags,
36         const rel_time_t exptime,
37         uint8_t datatype,
38         uint16_t vbucket);
39 static std::pair<cb::unique_item_ptr, item_info> default_item_allocate_ex(
40         gsl::not_null<ENGINE_HANDLE*> handle,
41         gsl::not_null<const void*> cookie,
42         const DocKey& key,
43         size_t nbytes,
44         size_t priv_nbytes,
45         int flags,
46         rel_time_t exptime,
47         uint8_t datatype,
48         uint16_t vbucket);
49 
50 static ENGINE_ERROR_CODE default_item_delete(
51         gsl::not_null<ENGINE_HANDLE*> handle,
52         gsl::not_null<const void*> cookie,
53         const DocKey& key,
54         uint64_t& cas,
55         uint16_t vbucket,
56         mutation_descr_t& mut_info);
57 
58 static void default_item_release(gsl::not_null<ENGINE_HANDLE*> handle,
59                                  gsl::not_null<item*> item);
60 static cb::EngineErrorItemPair default_get(gsl::not_null<ENGINE_HANDLE*> handle,
61                                            gsl::not_null<const void*> cookie,
62                                            const DocKey& key,
63                                            uint16_t vbucket,
64                                            DocStateFilter);
65 
66 static cb::EngineErrorItemPair default_get_if(
67         gsl::not_null<ENGINE_HANDLE*>,
68         gsl::not_null<const void*>,
69         const DocKey&,
70         uint16_t,
71         std::function<bool(const item_info&)>);
72 
73 static cb::EngineErrorItemPair default_get_and_touch(
74         gsl::not_null<ENGINE_HANDLE*> handle,
75         gsl::not_null<const void*> cookie,
76         const DocKey& key,
77         uint16_t vbucket,
78         uint32_t expiry_time);
79 
80 static cb::EngineErrorItemPair default_get_locked(
81         gsl::not_null<ENGINE_HANDLE*> handle,
82         gsl::not_null<const void*> cookie,
83         const DocKey& key,
84         uint16_t vbucket,
85         uint32_t lock_timeout);
86 
87 static cb::EngineErrorMetadataPair default_get_meta(
88         gsl::not_null<ENGINE_HANDLE*> handle,
89         gsl::not_null<const void*> cookie,
90         const DocKey& key,
91         uint16_t vbucket);
92 
93 static ENGINE_ERROR_CODE default_unlock(gsl::not_null<ENGINE_HANDLE*> handle,
94                                         gsl::not_null<const void*> cookie,
95                                         const DocKey& key,
96                                         uint16_t vbucket,
97                                         uint64_t cas);
98 static ENGINE_ERROR_CODE default_get_stats(gsl::not_null<ENGINE_HANDLE*> handle,
99                                            gsl::not_null<const void*> cookie,
100                                            cb::const_char_buffer key,
101                                            ADD_STAT add_stat);
102 static void default_reset_stats(gsl::not_null<ENGINE_HANDLE*> handle,
103                                 gsl::not_null<const void*> cookie);
104 static ENGINE_ERROR_CODE default_store(gsl::not_null<ENGINE_HANDLE*> handle,
105                                        gsl::not_null<const void*> cookie,
106                                        gsl::not_null<item*> item,
107                                        uint64_t& cas,
108                                        ENGINE_STORE_OPERATION operation,
109                                        DocumentState);
110 
111 static cb::EngineErrorCasPair default_store_if(
112         gsl::not_null<ENGINE_HANDLE*> handle,
113         gsl::not_null<const void*> cookie,
114         gsl::not_null<item*> item_,
115         uint64_t cas,
116         ENGINE_STORE_OPERATION operation,
117         cb::StoreIfPredicate predicate,
118         DocumentState document_state);
119 
120 static void item_set_datatype(gsl::not_null<ENGINE_HANDLE*> handle,
121                               gsl::not_null<item*> item,
122                               protocol_binary_datatype_t val);
123 
124 static ENGINE_ERROR_CODE default_flush(gsl::not_null<ENGINE_HANDLE*> handle,
125                                        gsl::not_null<const void*> cookie);
126 static ENGINE_ERROR_CODE initalize_configuration(struct default_engine *se,
127                                                  const char *cfg_str);
128 static ENGINE_ERROR_CODE default_unknown_command(
129         gsl::not_null<ENGINE_HANDLE*> handle,
130         const void* cookie,
131         gsl::not_null<protocol_binary_request_header*> request,
132         ADD_RESPONSE response,
133         DocNamespace doc_namespace);
134 
135 union vbucket_info_adapter {
136     char c;
137     struct vbucket_info v;
138 };
139 
set_vbucket_state(struct default_engine * e,uint16_t vbid,vbucket_state_t to)140 static void set_vbucket_state(struct default_engine *e,
141                               uint16_t vbid, vbucket_state_t to) {
142     union vbucket_info_adapter vi;
143     vi.c = e->vbucket_infos[vbid];
144     vi.v.state = to;
145     e->vbucket_infos[vbid] = vi.c;
146 }
147 
get_vbucket_state(struct default_engine * e,uint16_t vbid)148 static vbucket_state_t get_vbucket_state(struct default_engine *e,
149                                          uint16_t vbid) {
150     union vbucket_info_adapter vi;
151     vi.c = e->vbucket_infos[vbid];
152     return vbucket_state_t(vi.v.state);
153 }
154 
handled_vbucket(struct default_engine * e,uint16_t vbid)155 static bool handled_vbucket(struct default_engine *e, uint16_t vbid) {
156     return e->config.ignore_vbucket
157         || (get_vbucket_state(e, vbid) == vbucket_state_active);
158 }
159 
160 /* mechanism for handling bad vbucket requests */
161 #define VBUCKET_GUARD(e, v) if (!handled_vbucket(e, v)) { return ENGINE_NOT_MY_VBUCKET; }
162 
163 static bool get_item_info(gsl::not_null<ENGINE_HANDLE*> handle,
164                           gsl::not_null<const item*> item,
165                           gsl::not_null<item_info*> item_info);
166 
167 static bool set_item_info(gsl::not_null<ENGINE_HANDLE*> handle,
168                           gsl::not_null<item*> item,
169                           gsl::not_null<const item_info*> itm_info);
170 
171 static bool is_xattr_supported(gsl::not_null<ENGINE_HANDLE*> handle);
172 
173 static BucketCompressionMode get_compression_mode(
174                                gsl::not_null<ENGINE_HANDLE*> handle);
175 
176 static float get_min_compression_ratio(gsl::not_null<ENGINE_HANDLE*> handle);
177 
178 static size_t get_max_item_size(gsl::not_null<ENGINE_HANDLE*> handle);
179 
180 /**
181  * Given that default_engine is implemented in C and not C++ we don't have
182  * a constructor for the struct to initialize the members to some sane
183  * default values. Currently they're all being allocated through the
184  * engine manager which keeps a local map of all engines being created.
185  *
186  * Once an object is in that map it may in theory be referenced, so we
187  * need to ensure that the members is initialized before hitting that map.
188  *
189  * @todo refactor default_engine to C++ to avoid this extra hack :)
190  */
default_engine_constructor(struct default_engine * engine,bucket_id_t id)191 void default_engine_constructor(struct default_engine* engine, bucket_id_t id)
192 {
193     memset(engine, 0, sizeof(*engine));
194 
195     cb_mutex_initialize(&engine->slabs.lock);
196     cb_mutex_initialize(&engine->items.lock);
197     cb_mutex_initialize(&engine->stats.lock);
198     cb_mutex_initialize(&engine->scrubber.lock);
199 
200     engine->bucket_id = id;
201     engine->engine.interface.interface = 1;
202     engine->engine.initialize = default_initialize;
203     engine->engine.destroy = default_destroy;
204     engine->engine.allocate = default_item_allocate;
205     engine->engine.allocate_ex = default_item_allocate_ex;
206     engine->engine.remove = default_item_delete;
207     engine->engine.release = default_item_release;
208     engine->engine.get = default_get;
209     engine->engine.get_if = default_get_if;
210     engine->engine.get_locked = default_get_locked;
211     engine->engine.get_meta = default_get_meta;
212     engine->engine.get_and_touch = default_get_and_touch;
213     engine->engine.unlock = default_unlock;
214     engine->engine.get_stats = default_get_stats;
215     engine->engine.reset_stats = default_reset_stats;
216     engine->engine.store = default_store;
217     engine->engine.store_if = default_store_if;
218     engine->engine.flush = default_flush;
219     engine->engine.unknown_command = default_unknown_command;
220     engine->engine.item_set_cas = item_set_cas;
221     engine->engine.item_set_datatype = item_set_datatype;
222     engine->engine.get_item_info = get_item_info;
223     engine->engine.set_item_info = set_item_info;
224     engine->engine.isXattrEnabled = is_xattr_supported;
225     engine->engine.getCompressionMode = get_compression_mode;
226     engine->engine.getMaxItemSize = get_max_item_size;
227     engine->engine.getMinCompressionRatio = get_min_compression_ratio;
228     engine->config.verbose = 0;
229     engine->config.oldest_live = 0;
230     engine->config.evict_to_free = true;
231     engine->config.maxbytes = 64 * 1024 * 1024;
232     engine->config.preallocate = false;
233     engine->config.factor = 1.25;
234     engine->config.chunk_size = 48;
235     engine->config.item_size_max= 1024 * 1024;
236     engine->config.xattr_enabled = true;
237     engine->config.compression_mode = BucketCompressionMode::Off;
238     engine->config.min_compression_ratio = default_min_compression_ratio;
239 }
240 
create_instance(uint64_t interface,GET_SERVER_API get_server_api,ENGINE_HANDLE ** handle)241 extern "C" ENGINE_ERROR_CODE create_instance(uint64_t interface,
242                                              GET_SERVER_API get_server_api,
243                                              ENGINE_HANDLE **handle) {
244    SERVER_HANDLE_V1 *api = get_server_api();
245    struct default_engine *engine;
246 
247    if (interface != 1 || api == NULL) {
248       return ENGINE_ENOTSUP;
249    }
250 
251    if ((engine = engine_manager_create_engine()) == NULL) {
252       return ENGINE_ENOMEM;
253    }
254 
255    engine->server = *api;
256    engine->get_server_api = get_server_api;
257    engine->initialized = true;
258    *handle = (ENGINE_HANDLE*)&engine->engine;
259    return ENGINE_SUCCESS;
260 }
261 
destroy_engine()262 extern "C" void destroy_engine() {
263     engine_manager_shutdown();
264     assoc_destroy();
265 }
266 
get_handle(ENGINE_HANDLE * handle)267 static struct default_engine* get_handle(ENGINE_HANDLE* handle) {
268    return (struct default_engine*)handle;
269 }
270 
get_real_item(item * item)271 static hash_item* get_real_item(item* item) {
272     return (hash_item*)item;
273 }
274 
default_initialize(gsl::not_null<ENGINE_HANDLE * > handle,const char * config_str)275 static ENGINE_ERROR_CODE default_initialize(
276         gsl::not_null<ENGINE_HANDLE*> handle, const char* config_str) {
277     struct default_engine* se = get_handle(handle);
278     ENGINE_ERROR_CODE ret = initalize_configuration(se, config_str);
279     if (ret != ENGINE_SUCCESS) {
280         return ret;
281     }
282 
283     ret = assoc_init(se);
284     if (ret != ENGINE_SUCCESS) {
285         return ret;
286     }
287 
288     ret = slabs_init(
289             se, se->config.maxbytes, se->config.factor, se->config.preallocate);
290     if (ret != ENGINE_SUCCESS) {
291         return ret;
292     }
293 
294     return ENGINE_SUCCESS;
295 }
296 
default_destroy(gsl::not_null<ENGINE_HANDLE * > handle,const bool force)297 static void default_destroy(gsl::not_null<ENGINE_HANDLE*> handle,
298                             const bool force) {
299     (void)force;
300     engine_manager_delete_engine(get_handle(handle));
301 }
302 
destroy_engine_instance(struct default_engine * engine)303 void destroy_engine_instance(struct default_engine* engine) {
304     if (engine->initialized) {
305         /* Destory the slabs cache */
306         slabs_destroy(engine);
307 
308         cb_free(engine->config.uuid);
309 
310         /* Clean up the mutexes */
311         cb_mutex_destroy(&engine->items.lock);
312         cb_mutex_destroy(&engine->stats.lock);
313         cb_mutex_destroy(&engine->slabs.lock);
314         cb_mutex_destroy(&engine->scrubber.lock);
315 
316         engine->initialized = false;
317     }
318 }
319 
default_item_allocate(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,const size_t nbytes,const int flags,const rel_time_t exptime,uint8_t datatype,uint16_t vbucket)320 static cb::EngineErrorItemPair default_item_allocate(
321         gsl::not_null<ENGINE_HANDLE*> handle,
322         gsl::not_null<const void*> cookie,
323         const DocKey& key,
324         const size_t nbytes,
325         const int flags,
326         const rel_time_t exptime,
327         uint8_t datatype,
328         uint16_t vbucket) {
329     try {
330         auto pair = default_item_allocate_ex(handle, cookie, key, nbytes,
331                                              0, // No privileged bytes
332                                              flags, exptime, datatype, vbucket);
333         return {cb::engine_errc::success, std::move(pair.first)};
334     } catch (const cb::engine_error& error) {
335         return cb::makeEngineErrorItemPair(
336                 cb::engine_errc(error.code().value()));
337     }
338 }
339 
default_item_allocate_ex(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,size_t nbytes,size_t priv_nbytes,int flags,rel_time_t exptime,uint8_t datatype,uint16_t vbucket)340 static std::pair<cb::unique_item_ptr, item_info> default_item_allocate_ex(
341         gsl::not_null<ENGINE_HANDLE*> handle,
342         gsl::not_null<const void*> cookie,
343         const DocKey& key,
344         size_t nbytes,
345         size_t priv_nbytes,
346         int flags,
347         rel_time_t exptime,
348         uint8_t datatype,
349         uint16_t vbucket) {
350     hash_item *it;
351 
352     unsigned int id;
353     struct default_engine* engine = get_handle(handle);
354 
355     if (!handled_vbucket(engine, vbucket)) {
356         throw cb::engine_error(cb::engine_errc::not_my_vbucket,
357                                "default_item_allocate_ex");
358     }
359 
360     size_t ntotal = sizeof(hash_item) + key.size() + nbytes;
361     id = slabs_clsid(engine, ntotal);
362     if (id == 0) {
363         throw cb::engine_error(cb::engine_errc::too_big,
364                                "default_item_allocate_ex: no slab class");
365     }
366 
367     if ((nbytes - priv_nbytes) > engine->config.item_size_max) {
368         throw cb::engine_error(cb::engine_errc::too_big,
369                                "default_item_allocate_ex");
370     }
371 
372     it = item_alloc(engine,
373                     key.data(),
374                     key.size(),
375                     flags,
376                     engine->server.core->realtime(exptime, cb::NoExpiryLimit),
377                     (uint32_t)nbytes,
378                     cookie,
379                     datatype);
380 
381     if (it != NULL) {
382         item_info info;
383         if (!get_item_info(handle, it, &info)) {
384             // This should never happen (unless we provide invalid
385             // arguments)
386             item_release(engine, it);
387             throw cb::engine_error(cb::engine_errc::failed,
388                                    "default_item_allocate_ex");
389         }
390 
391         return std::make_pair(cb::unique_item_ptr(it, cb::ItemDeleter{handle}),
392                               info);
393     } else {
394         throw cb::engine_error(cb::engine_errc::no_memory,
395                                "default_item_allocate_ex");
396     }
397 }
398 
default_item_delete(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,uint64_t & cas,uint16_t vbucket,mutation_descr_t & mut_info)399 static ENGINE_ERROR_CODE default_item_delete(
400         gsl::not_null<ENGINE_HANDLE*> handle,
401         gsl::not_null<const void*> cookie,
402         const DocKey& key,
403         uint64_t& cas,
404         uint16_t vbucket,
405         mutation_descr_t& mut_info) {
406     struct default_engine* engine = get_handle(handle);
407     hash_item* it;
408     uint64_t cas_in = cas;
409     VBUCKET_GUARD(engine, vbucket);
410 
411     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
412     do {
413         it = item_get(engine,
414                       cookie,
415                       key.data(),
416                       key.size(),
417                       DocStateFilter::Alive);
418         if (it == nullptr) {
419             return ENGINE_KEY_ENOENT;
420         }
421 
422         if (it->locktime != 0 &&
423             it->locktime > engine->server.core->get_current_time()) {
424             if (cas_in != it->cas) {
425                 item_release(engine, it);
426                 return ENGINE_LOCKED;
427             }
428         }
429 
430         auto* deleted = item_alloc(engine,
431                                    key.data(),
432                                    key.size(),
433                                    it->flags,
434                                    it->exptime,
435                                    0,
436                                    cookie,
437                                    PROTOCOL_BINARY_RAW_BYTES);
438 
439         if (deleted == NULL) {
440             item_release(engine, it);
441             return ENGINE_TMPFAIL;
442         }
443 
444         if (cas_in == 0) {
445             // If the caller specified the "cas wildcard" we should set
446             // the cas for the item we just fetched and do a cas
447             // replace with that value
448             item_set_cas(handle, deleted, it->cas);
449         } else {
450             // The caller specified a specific CAS value so we should
451             // use that value in our cas replace
452             item_set_cas(handle, deleted, cas_in);
453         }
454 
455         ret = store_item(engine,
456                          deleted,
457                          &cas,
458                          OPERATION_CAS,
459                          cookie,
460                          DocumentState::Deleted);
461 
462         item_release(engine, it);
463         item_release(engine, deleted);
464 
465         // We should only retry for race conditions if the caller specified
466         // cas wildcard
467     } while (ret == ENGINE_KEY_EEXISTS && cas_in == 0);
468 
469     // vbucket UUID / seqno arn't supported by default engine, so just return
470     // a hardcoded vbucket uuid, and zero for the sequence number.
471     mut_info.vbucket_uuid = DEFAULT_ENGINE_VBUCKET_UUID;
472     mut_info.seqno = 0;
473 
474     return ret;
475 }
476 
default_item_release(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<item * > item)477 static void default_item_release(gsl::not_null<ENGINE_HANDLE*> handle,
478                                  gsl::not_null<item*> item) {
479     item_release(get_handle(handle), get_real_item(item));
480 }
481 
default_get(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,uint16_t vbucket,DocStateFilter documentStateFilter)482 static cb::EngineErrorItemPair default_get(gsl::not_null<ENGINE_HANDLE*> handle,
483                                            gsl::not_null<const void*> cookie,
484                                            const DocKey& key,
485                                            uint16_t vbucket,
486                                            DocStateFilter documentStateFilter) {
487     struct default_engine* engine = get_handle(handle);
488 
489     if (!handled_vbucket(engine, vbucket)) {
490         return std::make_pair(
491                 cb::engine_errc::not_my_vbucket,
492                 cb::unique_item_ptr{nullptr, cb::ItemDeleter{handle}});
493     }
494 
495     item* it = item_get(
496             engine, cookie, key.data(), key.size(), documentStateFilter);
497     if (it != nullptr) {
498         return cb::makeEngineErrorItemPair(
499                 cb::engine_errc::success, it, handle);
500     } else {
501         return cb::makeEngineErrorItemPair(cb::engine_errc::no_such_key);
502     }
503 }
504 
default_get_if(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,uint16_t vbucket,std::function<bool (const item_info &)> filter)505 static cb::EngineErrorItemPair default_get_if(
506         gsl::not_null<ENGINE_HANDLE*> handle,
507         gsl::not_null<const void*> cookie,
508         const DocKey& key,
509         uint16_t vbucket,
510         std::function<bool(const item_info&)> filter) {
511     struct default_engine* engine = get_handle(handle);
512 
513     if (!handled_vbucket(engine, vbucket)) {
514         return cb::makeEngineErrorItemPair(cb::engine_errc::not_my_vbucket);
515     }
516 
517     cb::unique_item_ptr ret(item_get(engine,
518                                      cookie,
519                                      key.data(),
520                                      key.size(),
521                                      DocStateFilter::Alive),
522                             cb::ItemDeleter{handle});
523     if (!ret) {
524         return cb::makeEngineErrorItemPair(cb::engine_errc::no_such_key);
525     }
526 
527     item_info info;
528     if (!get_item_info(handle, ret.get(), &info)) {
529         throw cb::engine_error(cb::engine_errc::failed,
530                                "default_get_if: get_item_info failed");
531     }
532 
533     if (!filter(info)) {
534         ret.reset(nullptr);
535     }
536 
537     return cb::makeEngineErrorItemPair(
538             cb::engine_errc::success, ret.release(), handle);
539 }
540 
default_get_and_touch(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,uint16_t vbucket,uint32_t expiry_time)541 static cb::EngineErrorItemPair default_get_and_touch(
542         gsl::not_null<ENGINE_HANDLE*> handle,
543         gsl::not_null<const void*> cookie,
544         const DocKey& key,
545         uint16_t vbucket,
546         uint32_t expiry_time) {
547     struct default_engine* engine = get_handle(handle);
548 
549     if (!handled_vbucket(engine, vbucket)) {
550         return cb::makeEngineErrorItemPair(cb::engine_errc::not_my_vbucket);
551     }
552 
553     hash_item* it = nullptr;
554     auto ret = item_get_and_touch(
555             engine,
556             cookie,
557             &it,
558             key.data(),
559             key.size(),
560             engine->server.core->realtime(expiry_time, cb::NoExpiryLimit));
561 
562     return cb::makeEngineErrorItemPair(
563             cb::engine_errc(ret), reinterpret_cast<item*>(it), handle);
564 }
565 
default_get_locked(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,uint16_t vbucket,uint32_t lock_timeout)566 static cb::EngineErrorItemPair default_get_locked(
567         gsl::not_null<ENGINE_HANDLE*> handle,
568         gsl::not_null<const void*> cookie,
569         const DocKey& key,
570         uint16_t vbucket,
571         uint32_t lock_timeout) {
572     auto* engine = get_handle(handle);
573 
574     if (!handled_vbucket(engine, vbucket)) {
575         return cb::makeEngineErrorItemPair(cb::engine_errc::not_my_vbucket);
576     }
577 
578     // memcached buckets don't offer any way for the user to configure
579     // the lock settings.
580     static const uint32_t default_lock_timeout = 15;
581     static const uint32_t max_lock_timeout = 30;
582 
583     if (lock_timeout == 0 || lock_timeout > max_lock_timeout) {
584         lock_timeout = default_lock_timeout;
585     }
586 
587     // Convert the lock timeout to an absolute time
588     lock_timeout += engine->server.core->get_current_time();
589 
590     hash_item* it = nullptr;
591     auto ret = item_get_locked(engine, cookie, &it, key.data(), key.size(),
592                                lock_timeout);
593     return cb::makeEngineErrorItemPair(cb::engine_errc(ret), it, handle);
594 }
595 
default_get_meta(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,uint16_t vbucket)596 static cb::EngineErrorMetadataPair default_get_meta(
597         gsl::not_null<ENGINE_HANDLE*> handle,
598         gsl::not_null<const void*> cookie,
599         const DocKey& key,
600         uint16_t vbucket) {
601     auto* engine_handle = get_handle(handle);
602 
603     if (!handled_vbucket(engine_handle, vbucket)) {
604         return std::make_pair(cb::engine_errc::not_my_vbucket, item_info());
605     }
606 
607     cb::unique_item_ptr item{item_get(engine_handle,
608                                       cookie,
609                                       key.data(),
610                                       key.size(),
611                                       DocStateFilter::AliveOrDeleted),
612                              cb::ItemDeleter(handle)};
613 
614     if (!item) {
615         return std::make_pair(cb::engine_errc::no_such_key, item_info());
616     }
617 
618     item_info info;
619     if (!get_item_info(handle, item.get(), &info)) {
620         throw cb::engine_error(cb::engine_errc::failed,
621                                "default_get_if: get_item_info failed");
622     }
623 
624     return std::make_pair(cb::engine_errc::success, info);
625 }
626 
default_unlock(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,uint16_t vbucket,uint64_t cas)627 static ENGINE_ERROR_CODE default_unlock(gsl::not_null<ENGINE_HANDLE*> handle,
628                                         gsl::not_null<const void*> cookie,
629                                         const DocKey& key,
630                                         uint16_t vbucket,
631                                         uint64_t cas) {
632     auto* engine = get_handle(handle);
633     VBUCKET_GUARD(engine, vbucket);
634     return item_unlock(engine, cookie, key.data(), key.size(), cas);
635 }
636 
default_get_stats(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,cb::const_char_buffer key,ADD_STAT add_stat)637 static ENGINE_ERROR_CODE default_get_stats(gsl::not_null<ENGINE_HANDLE*> handle,
638                                            gsl::not_null<const void*> cookie,
639                                            cb::const_char_buffer key,
640                                            ADD_STAT add_stat) {
641     struct default_engine* engine = get_handle(handle);
642     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
643 
644     if (key.empty()) {
645         char val[128];
646         int len;
647 
648         cb_mutex_enter(&engine->stats.lock);
649         len = sprintf(val, "%" PRIu64, (uint64_t)engine->stats.evictions);
650         add_stat("evictions", 9, val, len, cookie);
651         len = sprintf(val, "%" PRIu64, (uint64_t)engine->stats.curr_items);
652         add_stat("curr_items", 10, val, len, cookie);
653         len = sprintf(val, "%" PRIu64, (uint64_t)engine->stats.total_items);
654         add_stat("total_items", 11, val, len, cookie);
655         len = sprintf(val, "%" PRIu64, (uint64_t)engine->stats.curr_bytes);
656         add_stat("bytes", 5, val, len, cookie);
657         len = sprintf(val, "%" PRIu64, engine->stats.reclaimed);
658         add_stat("reclaimed", 9, val, len, cookie);
659         len = sprintf(val, "%" PRIu64, (uint64_t)engine->config.maxbytes);
660         add_stat("engine_maxbytes", 15, val, len, cookie);
661         cb_mutex_exit(&engine->stats.lock);
662     } else if (key == "slabs"_ccb) {
663         slabs_stats(engine, add_stat, cookie);
664     } else if (key == "items"_ccb) {
665         item_stats(engine, add_stat, cookie);
666     } else if (key == "sizes"_ccb) {
667         item_stats_sizes(engine, add_stat, cookie);
668     } else if (key == "uuid"_ccb) {
669         if (engine->config.uuid) {
670             add_stat("uuid",
671                      4,
672                      engine->config.uuid,
673                      (uint32_t)strlen(engine->config.uuid),
674                      cookie);
675         } else {
676             add_stat("uuid", 4, "", 0, cookie);
677         }
678     } else if (key == "scrub"_ccb) {
679         char val[128];
680         int len;
681 
682         cb_mutex_enter(&engine->scrubber.lock);
683         if (engine->scrubber.running) {
684             add_stat("scrubber:status", 15, "running", 7, cookie);
685         } else {
686             add_stat("scrubber:status", 15, "stopped", 7, cookie);
687         }
688 
689         if (engine->scrubber.started != 0) {
690             if (engine->scrubber.stopped != 0) {
691                 time_t diff =
692                         engine->scrubber.started - engine->scrubber.stopped;
693                 len = sprintf(val, "%" PRIu64, (uint64_t)diff);
694                 add_stat("scrubber:last_run", 17, val, len, cookie);
695             }
696 
697             len = sprintf(val, "%" PRIu64, engine->scrubber.visited);
698             add_stat("scrubber:visited", 16, val, len, cookie);
699             len = sprintf(val, "%" PRIu64, engine->scrubber.cleaned);
700             add_stat("scrubber:cleaned", 16, val, len, cookie);
701         }
702         cb_mutex_exit(&engine->scrubber.lock);
703     } else {
704         ret = ENGINE_KEY_ENOENT;
705     }
706 
707     return ret;
708 }
709 
default_store(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,gsl::not_null<item * > item,uint64_t & cas,ENGINE_STORE_OPERATION operation,DocumentState document_state)710 static ENGINE_ERROR_CODE default_store(gsl::not_null<ENGINE_HANDLE*> handle,
711                                        gsl::not_null<const void*> cookie,
712                                        gsl::not_null<item*> item,
713                                        uint64_t& cas,
714                                        ENGINE_STORE_OPERATION operation,
715                                        DocumentState document_state) {
716     auto* engine = get_handle(handle);
717     auto& config = engine->config;
718     auto* it = get_real_item(item);
719 
720     if (document_state == DocumentState::Deleted && !config.keep_deleted) {
721         return safe_item_unlink(engine, it);
722     }
723 
724     return store_item(engine, it, &cas, operation, cookie, document_state);
725 }
726 
default_store_if(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,gsl::not_null<item * > item,uint64_t cas,ENGINE_STORE_OPERATION operation,cb::StoreIfPredicate predicate,DocumentState document_state)727 static cb::EngineErrorCasPair default_store_if(
728         gsl::not_null<ENGINE_HANDLE*> handle,
729         gsl::not_null<const void*> cookie,
730         gsl::not_null<item*> item,
731         uint64_t cas,
732         ENGINE_STORE_OPERATION operation,
733         cb::StoreIfPredicate predicate,
734         DocumentState document_state) {
735     struct default_engine* engine = get_handle(handle);
736 
737     if (predicate) {
738         // Check for an existing item and call the item predicate on it.
739         auto* it = get_real_item(item);
740         auto* key = item_get_key(it);
741         if (!key) {
742             throw cb::engine_error(cb::engine_errc::failed,
743                                    "default_store_if: item_get_key failed");
744         }
745         cb::unique_item_ptr existing(
746                 item_get(engine, cookie, *key, DocStateFilter::Alive),
747                 cb::ItemDeleter{handle});
748 
749         cb::StoreIfStatus status;
750         if (existing.get()) {
751             item_info info;
752             if (!get_item_info(handle, existing.get(), &info)) {
753                 throw cb::engine_error(
754                         cb::engine_errc::failed,
755                         "default_store_if: get_item_info failed");
756             }
757             status = predicate(info, {true});
758         } else {
759             status = predicate(boost::none, {true});
760         }
761 
762         switch (status) {
763         case cb::StoreIfStatus::Fail: {
764             return {cb::engine_errc::predicate_failed, 0};
765         }
766         case cb::StoreIfStatus::Continue:
767         case cb::StoreIfStatus::GetItemInfo: {
768             break;
769         }
770         }
771     }
772 
773     auto* it = get_real_item(item);
774     auto status =
775             store_item(engine, it, &cas, operation, cookie, document_state);
776     return {cb::engine_errc(status), cas};
777 }
778 
default_flush(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie)779 static ENGINE_ERROR_CODE default_flush(gsl::not_null<ENGINE_HANDLE*> handle,
780                                        gsl::not_null<const void*> cookie) {
781     item_flush_expired(get_handle(handle));
782 
783     return ENGINE_SUCCESS;
784 }
785 
default_reset_stats(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie)786 static void default_reset_stats(gsl::not_null<ENGINE_HANDLE*> handle,
787                                 gsl::not_null<const void*> cookie) {
788     struct default_engine* engine = get_handle(handle);
789     item_stats_reset(engine);
790 
791     cb_mutex_enter(&engine->stats.lock);
792     engine->stats.evictions = 0;
793     engine->stats.reclaimed = 0;
794     engine->stats.total_items = 0;
795     cb_mutex_exit(&engine->stats.lock);
796 }
797 
initalize_configuration(struct default_engine * se,const char * cfg_str)798 static ENGINE_ERROR_CODE initalize_configuration(struct default_engine *se,
799                                                  const char *cfg_str) {
800    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
801 
802    se->config.vb0 = true;
803 
804    if (cfg_str != NULL) {
805        struct config_item items[13];
806        int ii = 0;
807 
808        memset(&items, 0, sizeof(items));
809        items[ii].key = "verbose";
810        items[ii].datatype = DT_SIZE;
811        items[ii].value.dt_size = &se->config.verbose;
812        ++ii;
813 
814        items[ii].key = "eviction";
815        items[ii].datatype = DT_BOOL;
816        items[ii].value.dt_bool = &se->config.evict_to_free;
817        ++ii;
818 
819        items[ii].key = "cache_size";
820        items[ii].datatype = DT_SIZE;
821        items[ii].value.dt_size = &se->config.maxbytes;
822        ++ii;
823 
824        items[ii].key = "preallocate";
825        items[ii].datatype = DT_BOOL;
826        items[ii].value.dt_bool = &se->config.preallocate;
827        ++ii;
828 
829        items[ii].key = "factor";
830        items[ii].datatype = DT_FLOAT;
831        items[ii].value.dt_float = &se->config.factor;
832        ++ii;
833 
834        items[ii].key = "chunk_size";
835        items[ii].datatype = DT_SIZE;
836        items[ii].value.dt_size = &se->config.chunk_size;
837        ++ii;
838 
839        items[ii].key = "item_size_max";
840        items[ii].datatype = DT_SIZE;
841        items[ii].value.dt_size = &se->config.item_size_max;
842        ++ii;
843 
844        items[ii].key = "ignore_vbucket";
845        items[ii].datatype = DT_BOOL;
846        items[ii].value.dt_bool = &se->config.ignore_vbucket;
847        ++ii;
848 
849        items[ii].key = "vb0";
850        items[ii].datatype = DT_BOOL;
851        items[ii].value.dt_bool = &se->config.vb0;
852        ++ii;
853 
854        items[ii].key = "config_file";
855        items[ii].datatype = DT_CONFIGFILE;
856        ++ii;
857 
858        items[ii].key = "uuid";
859        items[ii].datatype = DT_STRING;
860        items[ii].value.dt_string = &se->config.uuid;
861        ++ii;
862 
863        items[ii].key = "keep_deleted";
864        items[ii].datatype = DT_BOOL;
865        items[ii].value.dt_bool = &se->config.keep_deleted;
866        ++ii;
867 
868        items[ii].key = NULL;
869        ++ii;
870        cb_assert(ii == 13);
871        ret = ENGINE_ERROR_CODE(se->server.core->parse_config(cfg_str,
872                                                              items,
873                                                              stderr));
874    }
875 
876    if (se->config.vb0) {
877        set_vbucket_state(se, 0, vbucket_state_active);
878    }
879 
880    return ret;
881 }
882 
set_vbucket(struct default_engine * e,const void * cookie,protocol_binary_request_set_vbucket * req,ADD_RESPONSE response)883 static bool set_vbucket(struct default_engine *e,
884                         const void* cookie,
885                         protocol_binary_request_set_vbucket *req,
886                         ADD_RESPONSE response) {
887     vbucket_state_t state;
888     size_t bodylen = ntohl(req->message.header.request.bodylen)
889         - ntohs(req->message.header.request.keylen);
890     if (bodylen != sizeof(vbucket_state_t)) {
891         const char *msg = "Incorrect packet format";
892         return response(NULL, 0, NULL, 0, msg, (uint32_t)strlen(msg),
893                         PROTOCOL_BINARY_RAW_BYTES,
894                         PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
895     }
896     memcpy(&state, &req->message.body.state, sizeof(state));
897     state = vbucket_state_t(ntohl(state));
898 
899     if (!is_valid_vbucket_state_t(state)) {
900         const char *msg = "Invalid vbucket state";
901         return response(NULL, 0, NULL, 0, msg, (uint32_t)strlen(msg),
902                         PROTOCOL_BINARY_RAW_BYTES,
903                         PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
904     }
905 
906     set_vbucket_state(e, ntohs(req->message.header.request.vbucket), state);
907     return response(NULL, 0, NULL, 0, &state, sizeof(state),
908                     PROTOCOL_BINARY_RAW_BYTES,
909                     PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
910 }
911 
get_vbucket(struct default_engine * e,const void * cookie,protocol_binary_request_get_vbucket * req,ADD_RESPONSE response)912 static bool get_vbucket(struct default_engine *e,
913                         const void* cookie,
914                         protocol_binary_request_get_vbucket *req,
915                         ADD_RESPONSE response) {
916     vbucket_state_t state;
917     state = get_vbucket_state(e, ntohs(req->message.header.request.vbucket));
918     state = vbucket_state_t(ntohl(state));
919 
920     return response(NULL, 0, NULL, 0, &state, sizeof(state),
921                     PROTOCOL_BINARY_RAW_BYTES,
922                     PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
923 }
924 
rm_vbucket(struct default_engine * e,const void * cookie,protocol_binary_request_header * req,ADD_RESPONSE response)925 static bool rm_vbucket(struct default_engine *e,
926                        const void *cookie,
927                        protocol_binary_request_header *req,
928                        ADD_RESPONSE response) {
929     set_vbucket_state(e, ntohs(req->request.vbucket), vbucket_state_dead);
930     return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
931                     PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
932 }
933 
scrub_cmd(struct default_engine * e,const void * cookie,protocol_binary_request_header * request,ADD_RESPONSE response)934 static bool scrub_cmd(struct default_engine *e,
935                       const void *cookie,
936                       protocol_binary_request_header *request,
937                       ADD_RESPONSE response) {
938 
939     protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
940     if (!item_start_scrub(e)) {
941         res = PROTOCOL_BINARY_RESPONSE_EBUSY;
942     }
943 
944     return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
945                     res, 0, cookie);
946 }
947 
948 /**
949  * set_param only added to allow per bucket xattr on/off
950  * and toggle between compression modes for testing purposes
951  */
set_param(struct default_engine * e,const void * cookie,protocol_binary_request_set_param * req,ADD_RESPONSE response)952 static bool set_param(struct default_engine* e,
953                       const void* cookie,
954                       protocol_binary_request_set_param* req,
955                       ADD_RESPONSE response) {
956     size_t keylen = ntohs(req->message.header.request.keylen);
957     uint8_t extlen = req->message.header.request.extlen;
958     size_t vallen = ntohl(req->message.header.request.bodylen);
959     protocol_binary_engine_param_t paramtype =
960             static_cast<protocol_binary_engine_param_t>(
961                     ntohl(req->message.body.param_type));
962 
963     if (keylen == 0 || (vallen - keylen - extlen) == 0) {
964         return false;
965     }
966 
967     // Only support protocol_binary_engine_param_flush with xattr_enabled
968     if (paramtype == protocol_binary_engine_param_flush) {
969         const char* keyp =
970                 reinterpret_cast<const char*>(req->bytes) + sizeof(req->bytes);
971         const char* valuep = keyp + keylen;
972         vallen -= (keylen + extlen);
973         cb::const_char_buffer key(keyp, keylen);
974         cb::const_char_buffer value(valuep, vallen);
975 
976         if (key == "xattr_enabled") {
977             if (value == "true") {
978                 e->config.xattr_enabled = true;
979             } else if (value == "false") {
980                 e->config.xattr_enabled = false;
981             } else {
982                 return false;
983             }
984         } else if (key == "compression_mode") {
985             try {
986                 e->config.compression_mode = parseCompressionMode(
987                         std::string(value.data(), value.size()));
988             } catch (std::invalid_argument&) {
989                 return false;
990             }
991         } else if (key == "min_compression_ratio") {
992             std::string value_str{value.data(), value.size()};
993             float min_comp_ratio;
994             if (!safe_strtof(value_str.c_str(), min_comp_ratio)) {
995                 return false;
996             }
997 
998             e->config.min_compression_ratio = min_comp_ratio;
999         }
1000 
1001         return response(NULL,
1002                         0,
1003                         NULL,
1004                         0,
1005                         NULL,
1006                         0,
1007                         PROTOCOL_BINARY_RAW_BYTES,
1008                         PROTOCOL_BINARY_RESPONSE_SUCCESS,
1009                         0,
1010                         cookie);
1011     }
1012     return false;
1013 }
1014 
default_unknown_command(gsl::not_null<ENGINE_HANDLE * > handle,const void * cookie,gsl::not_null<protocol_binary_request_header * > request,ADD_RESPONSE response,DocNamespace doc_namespace)1015 static ENGINE_ERROR_CODE default_unknown_command(
1016         gsl::not_null<ENGINE_HANDLE*> handle,
1017         const void* cookie,
1018         gsl::not_null<protocol_binary_request_header*> request,
1019         ADD_RESPONSE response,
1020         DocNamespace doc_namespace) {
1021     struct default_engine* e = get_handle(handle);
1022     bool sent;
1023 
1024     switch(request->request.opcode) {
1025     case PROTOCOL_BINARY_CMD_SCRUB:
1026         sent = scrub_cmd(e, cookie, request, response);
1027         break;
1028     case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1029         sent = rm_vbucket(e, cookie, request, response);
1030         break;
1031     case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1032         sent = set_vbucket(
1033                 e,
1034                 cookie,
1035                 reinterpret_cast<protocol_binary_request_set_vbucket*>(
1036                         request.get()),
1037                 response);
1038         break;
1039     case PROTOCOL_BINARY_CMD_GET_VBUCKET:
1040         sent = get_vbucket(
1041                 e,
1042                 cookie,
1043                 reinterpret_cast<protocol_binary_request_get_vbucket*>(
1044                         request.get()),
1045                 response);
1046         break;
1047     case PROTOCOL_BINARY_CMD_SET_PARAM:
1048         sent = set_param(e,
1049                          cookie,
1050                          reinterpret_cast<protocol_binary_request_set_param*>(
1051                                  request.get()),
1052                          response);
1053         break;
1054     default:
1055         sent = response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
1056                         PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, 0, cookie);
1057         break;
1058     }
1059 
1060     if (sent) {
1061         return ENGINE_SUCCESS;
1062     } else {
1063         return ENGINE_FAILED;
1064     }
1065 }
1066 
item_set_cas(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<item * > item,uint64_t val)1067 void item_set_cas(gsl::not_null<ENGINE_HANDLE*> handle,
1068                   gsl::not_null<item*> item,
1069                   uint64_t val) {
1070     hash_item* it = get_real_item(item);
1071     it->cas = val;
1072 }
1073 
item_set_datatype(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<item * > item,protocol_binary_datatype_t val)1074 static void item_set_datatype(gsl::not_null<ENGINE_HANDLE*> handle,
1075                               gsl::not_null<item*> item,
1076                               protocol_binary_datatype_t val) {
1077     auto* it = reinterpret_cast<hash_item*>(item.get());
1078     it->datatype = val;
1079 }
1080 
item_get_key(const hash_item * item)1081 hash_key* item_get_key(const hash_item* item)
1082 {
1083     const char *ret = reinterpret_cast<const char*>(item + 1);
1084     return (hash_key*)ret;
1085 }
1086 
item_get_data(const hash_item * item)1087 char* item_get_data(const hash_item* item)
1088 {
1089     const hash_key* key = item_get_key(item);
1090     return ((char*)key->header.full_key) + hash_key_get_key_len(key);
1091 }
1092 
get_item_info(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const item * > item,gsl::not_null<item_info * > item_info)1093 static bool get_item_info(gsl::not_null<ENGINE_HANDLE*> handle,
1094                           gsl::not_null<const item*> item,
1095                           gsl::not_null<item_info*> item_info) {
1096     auto* it = reinterpret_cast<const hash_item*>(item.get());
1097     const hash_key* key = item_get_key(it);
1098     auto* engine = get_handle(handle);
1099 
1100     // This may potentially open up for a race, but:
1101     // 1) If the item isn't linked anymore we don't need to mask
1102     //    the CAS anymore. (if the client tries to use that
1103     //    CAS it'll fail with an invalid cas)
1104     // 2) In production the memcached buckets don't use the
1105     //    ZOMBIE state (and if we start doing that, it is only
1106     //    the owner of the item pointer (the one bumping the
1107     //    refcount initially) which would change this. Anyone else
1108     //    would create a new item object and set the iflag
1109     //    to deleted.
1110     const auto iflag = it->iflag.load(std::memory_order_relaxed);
1111 
1112     if ((iflag & ITEM_LINKED) && it->locktime != 0 &&
1113         it->locktime > engine->server.core->get_current_time()) {
1114         // This object is locked. According to docs/Document.md we should
1115         // return -1 in such cases to hide the real CAS for the other clients
1116         // (Note the check on ITEM_LINKED.. for the actual item returned by
1117         // get_locked we return an item which isn't linked (copy of the
1118         // linked item) to allow returning the real CAS.
1119         item_info->cas = uint64_t(-1);
1120     } else {
1121         item_info->cas = it->cas;
1122     }
1123 
1124     item_info->vbucket_uuid = DEFAULT_ENGINE_VBUCKET_UUID;
1125     item_info->seqno = 0;
1126     if (it->exptime == 0) {
1127         item_info->exptime = 0;
1128     } else {
1129         item_info->exptime = engine->server.core->abstime(it->exptime);
1130     }
1131     item_info->nbytes = it->nbytes;
1132     item_info->flags = it->flags;
1133     item_info->nkey = hash_key_get_client_key_len(key);
1134     item_info->key = hash_key_get_client_key(key);
1135     item_info->value[0].iov_base = item_get_data(it);
1136     item_info->value[0].iov_len = it->nbytes;
1137     item_info->datatype = it->datatype;
1138     if (iflag & ITEM_ZOMBIE) {
1139         item_info->document_state = DocumentState::Deleted;
1140     } else {
1141         item_info->document_state = DocumentState::Alive;
1142     }
1143     return true;
1144 }
1145 
set_item_info(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<item * > item,gsl::not_null<const item_info * > itm_info)1146 static bool set_item_info(gsl::not_null<ENGINE_HANDLE*> handle,
1147                           gsl::not_null<item*> item,
1148                           gsl::not_null<const item_info*> itm_info) {
1149     auto* it = reinterpret_cast<hash_item*>(item.get());
1150     it->datatype = itm_info->datatype;
1151     return true;
1152 }
1153 
is_xattr_supported(gsl::not_null<ENGINE_HANDLE * > handle)1154 static bool is_xattr_supported(gsl::not_null<ENGINE_HANDLE*> handle) {
1155     return get_handle(handle)->config.xattr_enabled;
1156 }
1157 
get_compression_mode(gsl::not_null<ENGINE_HANDLE * > handle)1158 static BucketCompressionMode get_compression_mode(
1159                               gsl::not_null<ENGINE_HANDLE*> handle) {
1160     return get_handle(handle)->config.compression_mode;
1161 }
1162 
get_min_compression_ratio(gsl::not_null<ENGINE_HANDLE * > handle)1163 static float get_min_compression_ratio(
1164                              gsl::not_null<ENGINE_HANDLE*> handle) {
1165     return get_handle(handle)->config.min_compression_ratio;
1166 }
1167 
get_max_item_size(gsl::not_null<ENGINE_HANDLE * > handle)1168 static size_t get_max_item_size(
1169                              gsl::not_null<ENGINE_HANDLE*> handle) {
1170     return get_handle(handle)->config.item_size_max;
1171 }
1172