1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include "config.h"
3
4 #include <stdlib.h>
5 #include <stdio.h>
6 #include <string.h>
7 #include <errno.h>
8 #include <ctype.h>
9 #include <unistd.h>
10 #include <stddef.h>
11 #include <inttypes.h>
12
13 #include "default_engine_internal.h"
14 #include "memcached/util.h"
15 #include "memcached/config_parser.h"
16 #include <platform/cb_malloc.h>
17 #include "engines/default_engine.h"
18 #include "engine_manager.h"
19
20 // The default engine don't really use vbucket uuids, but in order
21 // to run the unit tests and verify that we correctly convert the
22 // vbucket uuid to network byte order it is nice to have a value
23 // we may use for testing ;)
24 #define DEFAULT_ENGINE_VBUCKET_UUID 0xdeadbeef
25
26 static ENGINE_ERROR_CODE default_initialize(
27 gsl::not_null<ENGINE_HANDLE*> handle, const char* config_str);
28 static void default_destroy(gsl::not_null<ENGINE_HANDLE*> handle,
29 const bool force);
30 static cb::EngineErrorItemPair default_item_allocate(
31 gsl::not_null<ENGINE_HANDLE*> handle,
32 gsl::not_null<const void*> cookie,
33 const DocKey& key,
34 const size_t nbytes,
35 const int flags,
36 const rel_time_t exptime,
37 uint8_t datatype,
38 uint16_t vbucket);
39 static std::pair<cb::unique_item_ptr, item_info> default_item_allocate_ex(
40 gsl::not_null<ENGINE_HANDLE*> handle,
41 gsl::not_null<const void*> cookie,
42 const DocKey& key,
43 size_t nbytes,
44 size_t priv_nbytes,
45 int flags,
46 rel_time_t exptime,
47 uint8_t datatype,
48 uint16_t vbucket);
49
50 static ENGINE_ERROR_CODE default_item_delete(
51 gsl::not_null<ENGINE_HANDLE*> handle,
52 gsl::not_null<const void*> cookie,
53 const DocKey& key,
54 uint64_t& cas,
55 uint16_t vbucket,
56 mutation_descr_t& mut_info);
57
58 static void default_item_release(gsl::not_null<ENGINE_HANDLE*> handle,
59 gsl::not_null<item*> item);
60 static cb::EngineErrorItemPair default_get(gsl::not_null<ENGINE_HANDLE*> handle,
61 gsl::not_null<const void*> cookie,
62 const DocKey& key,
63 uint16_t vbucket,
64 DocStateFilter);
65
66 static cb::EngineErrorItemPair default_get_if(
67 gsl::not_null<ENGINE_HANDLE*>,
68 gsl::not_null<const void*>,
69 const DocKey&,
70 uint16_t,
71 std::function<bool(const item_info&)>);
72
73 static cb::EngineErrorItemPair default_get_and_touch(
74 gsl::not_null<ENGINE_HANDLE*> handle,
75 gsl::not_null<const void*> cookie,
76 const DocKey& key,
77 uint16_t vbucket,
78 uint32_t expiry_time);
79
80 static cb::EngineErrorItemPair default_get_locked(
81 gsl::not_null<ENGINE_HANDLE*> handle,
82 gsl::not_null<const void*> cookie,
83 const DocKey& key,
84 uint16_t vbucket,
85 uint32_t lock_timeout);
86
87 static cb::EngineErrorMetadataPair default_get_meta(
88 gsl::not_null<ENGINE_HANDLE*> handle,
89 gsl::not_null<const void*> cookie,
90 const DocKey& key,
91 uint16_t vbucket);
92
93 static ENGINE_ERROR_CODE default_unlock(gsl::not_null<ENGINE_HANDLE*> handle,
94 gsl::not_null<const void*> cookie,
95 const DocKey& key,
96 uint16_t vbucket,
97 uint64_t cas);
98 static ENGINE_ERROR_CODE default_get_stats(gsl::not_null<ENGINE_HANDLE*> handle,
99 gsl::not_null<const void*> cookie,
100 cb::const_char_buffer key,
101 ADD_STAT add_stat);
102 static void default_reset_stats(gsl::not_null<ENGINE_HANDLE*> handle,
103 gsl::not_null<const void*> cookie);
104 static ENGINE_ERROR_CODE default_store(gsl::not_null<ENGINE_HANDLE*> handle,
105 gsl::not_null<const void*> cookie,
106 gsl::not_null<item*> item,
107 uint64_t& cas,
108 ENGINE_STORE_OPERATION operation,
109 DocumentState);
110
111 static cb::EngineErrorCasPair default_store_if(
112 gsl::not_null<ENGINE_HANDLE*> handle,
113 gsl::not_null<const void*> cookie,
114 gsl::not_null<item*> item_,
115 uint64_t cas,
116 ENGINE_STORE_OPERATION operation,
117 cb::StoreIfPredicate predicate,
118 DocumentState document_state);
119
120 static void item_set_datatype(gsl::not_null<ENGINE_HANDLE*> handle,
121 gsl::not_null<item*> item,
122 protocol_binary_datatype_t val);
123
124 static ENGINE_ERROR_CODE default_flush(gsl::not_null<ENGINE_HANDLE*> handle,
125 gsl::not_null<const void*> cookie);
126 static ENGINE_ERROR_CODE initalize_configuration(struct default_engine *se,
127 const char *cfg_str);
128 static ENGINE_ERROR_CODE default_unknown_command(
129 gsl::not_null<ENGINE_HANDLE*> handle,
130 const void* cookie,
131 gsl::not_null<protocol_binary_request_header*> request,
132 ADD_RESPONSE response,
133 DocNamespace doc_namespace);
134
135 union vbucket_info_adapter {
136 char c;
137 struct vbucket_info v;
138 };
139
set_vbucket_state(struct default_engine * e,uint16_t vbid,vbucket_state_t to)140 static void set_vbucket_state(struct default_engine *e,
141 uint16_t vbid, vbucket_state_t to) {
142 union vbucket_info_adapter vi;
143 vi.c = e->vbucket_infos[vbid];
144 vi.v.state = to;
145 e->vbucket_infos[vbid] = vi.c;
146 }
147
get_vbucket_state(struct default_engine * e,uint16_t vbid)148 static vbucket_state_t get_vbucket_state(struct default_engine *e,
149 uint16_t vbid) {
150 union vbucket_info_adapter vi;
151 vi.c = e->vbucket_infos[vbid];
152 return vbucket_state_t(vi.v.state);
153 }
154
handled_vbucket(struct default_engine * e,uint16_t vbid)155 static bool handled_vbucket(struct default_engine *e, uint16_t vbid) {
156 return e->config.ignore_vbucket
157 || (get_vbucket_state(e, vbid) == vbucket_state_active);
158 }
159
160 /* mechanism for handling bad vbucket requests */
161 #define VBUCKET_GUARD(e, v) if (!handled_vbucket(e, v)) { return ENGINE_NOT_MY_VBUCKET; }
162
163 static bool get_item_info(gsl::not_null<ENGINE_HANDLE*> handle,
164 gsl::not_null<const item*> item,
165 gsl::not_null<item_info*> item_info);
166
167 static bool set_item_info(gsl::not_null<ENGINE_HANDLE*> handle,
168 gsl::not_null<item*> item,
169 gsl::not_null<const item_info*> itm_info);
170
171 static bool is_xattr_supported(gsl::not_null<ENGINE_HANDLE*> handle);
172
173 static BucketCompressionMode get_compression_mode(
174 gsl::not_null<ENGINE_HANDLE*> handle);
175
176 static float get_min_compression_ratio(gsl::not_null<ENGINE_HANDLE*> handle);
177
178 static size_t get_max_item_size(gsl::not_null<ENGINE_HANDLE*> handle);
179
180 /**
181 * Given that default_engine is implemented in C and not C++ we don't have
182 * a constructor for the struct to initialize the members to some sane
183 * default values. Currently they're all being allocated through the
184 * engine manager which keeps a local map of all engines being created.
185 *
186 * Once an object is in that map it may in theory be referenced, so we
187 * need to ensure that the members is initialized before hitting that map.
188 *
189 * @todo refactor default_engine to C++ to avoid this extra hack :)
190 */
default_engine_constructor(struct default_engine * engine,bucket_id_t id)191 void default_engine_constructor(struct default_engine* engine, bucket_id_t id)
192 {
193 memset(engine, 0, sizeof(*engine));
194
195 cb_mutex_initialize(&engine->slabs.lock);
196 cb_mutex_initialize(&engine->items.lock);
197 cb_mutex_initialize(&engine->stats.lock);
198 cb_mutex_initialize(&engine->scrubber.lock);
199
200 engine->bucket_id = id;
201 engine->engine.interface.interface = 1;
202 engine->engine.initialize = default_initialize;
203 engine->engine.destroy = default_destroy;
204 engine->engine.allocate = default_item_allocate;
205 engine->engine.allocate_ex = default_item_allocate_ex;
206 engine->engine.remove = default_item_delete;
207 engine->engine.release = default_item_release;
208 engine->engine.get = default_get;
209 engine->engine.get_if = default_get_if;
210 engine->engine.get_locked = default_get_locked;
211 engine->engine.get_meta = default_get_meta;
212 engine->engine.get_and_touch = default_get_and_touch;
213 engine->engine.unlock = default_unlock;
214 engine->engine.get_stats = default_get_stats;
215 engine->engine.reset_stats = default_reset_stats;
216 engine->engine.store = default_store;
217 engine->engine.store_if = default_store_if;
218 engine->engine.flush = default_flush;
219 engine->engine.unknown_command = default_unknown_command;
220 engine->engine.item_set_cas = item_set_cas;
221 engine->engine.item_set_datatype = item_set_datatype;
222 engine->engine.get_item_info = get_item_info;
223 engine->engine.set_item_info = set_item_info;
224 engine->engine.isXattrEnabled = is_xattr_supported;
225 engine->engine.getCompressionMode = get_compression_mode;
226 engine->engine.getMaxItemSize = get_max_item_size;
227 engine->engine.getMinCompressionRatio = get_min_compression_ratio;
228 engine->config.verbose = 0;
229 engine->config.oldest_live = 0;
230 engine->config.evict_to_free = true;
231 engine->config.maxbytes = 64 * 1024 * 1024;
232 engine->config.preallocate = false;
233 engine->config.factor = 1.25;
234 engine->config.chunk_size = 48;
235 engine->config.item_size_max= 1024 * 1024;
236 engine->config.xattr_enabled = true;
237 engine->config.compression_mode = BucketCompressionMode::Off;
238 engine->config.min_compression_ratio = default_min_compression_ratio;
239 }
240
create_instance(uint64_t interface,GET_SERVER_API get_server_api,ENGINE_HANDLE ** handle)241 extern "C" ENGINE_ERROR_CODE create_instance(uint64_t interface,
242 GET_SERVER_API get_server_api,
243 ENGINE_HANDLE **handle) {
244 SERVER_HANDLE_V1 *api = get_server_api();
245 struct default_engine *engine;
246
247 if (interface != 1 || api == NULL) {
248 return ENGINE_ENOTSUP;
249 }
250
251 if ((engine = engine_manager_create_engine()) == NULL) {
252 return ENGINE_ENOMEM;
253 }
254
255 engine->server = *api;
256 engine->get_server_api = get_server_api;
257 engine->initialized = true;
258 *handle = (ENGINE_HANDLE*)&engine->engine;
259 return ENGINE_SUCCESS;
260 }
261
destroy_engine()262 extern "C" void destroy_engine() {
263 engine_manager_shutdown();
264 assoc_destroy();
265 }
266
get_handle(ENGINE_HANDLE * handle)267 static struct default_engine* get_handle(ENGINE_HANDLE* handle) {
268 return (struct default_engine*)handle;
269 }
270
get_real_item(item * item)271 static hash_item* get_real_item(item* item) {
272 return (hash_item*)item;
273 }
274
default_initialize(gsl::not_null<ENGINE_HANDLE * > handle,const char * config_str)275 static ENGINE_ERROR_CODE default_initialize(
276 gsl::not_null<ENGINE_HANDLE*> handle, const char* config_str) {
277 struct default_engine* se = get_handle(handle);
278 ENGINE_ERROR_CODE ret = initalize_configuration(se, config_str);
279 if (ret != ENGINE_SUCCESS) {
280 return ret;
281 }
282
283 ret = assoc_init(se);
284 if (ret != ENGINE_SUCCESS) {
285 return ret;
286 }
287
288 ret = slabs_init(
289 se, se->config.maxbytes, se->config.factor, se->config.preallocate);
290 if (ret != ENGINE_SUCCESS) {
291 return ret;
292 }
293
294 return ENGINE_SUCCESS;
295 }
296
default_destroy(gsl::not_null<ENGINE_HANDLE * > handle,const bool force)297 static void default_destroy(gsl::not_null<ENGINE_HANDLE*> handle,
298 const bool force) {
299 (void)force;
300 engine_manager_delete_engine(get_handle(handle));
301 }
302
destroy_engine_instance(struct default_engine * engine)303 void destroy_engine_instance(struct default_engine* engine) {
304 if (engine->initialized) {
305 /* Destory the slabs cache */
306 slabs_destroy(engine);
307
308 cb_free(engine->config.uuid);
309
310 /* Clean up the mutexes */
311 cb_mutex_destroy(&engine->items.lock);
312 cb_mutex_destroy(&engine->stats.lock);
313 cb_mutex_destroy(&engine->slabs.lock);
314 cb_mutex_destroy(&engine->scrubber.lock);
315
316 engine->initialized = false;
317 }
318 }
319
default_item_allocate(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,const size_t nbytes,const int flags,const rel_time_t exptime,uint8_t datatype,uint16_t vbucket)320 static cb::EngineErrorItemPair default_item_allocate(
321 gsl::not_null<ENGINE_HANDLE*> handle,
322 gsl::not_null<const void*> cookie,
323 const DocKey& key,
324 const size_t nbytes,
325 const int flags,
326 const rel_time_t exptime,
327 uint8_t datatype,
328 uint16_t vbucket) {
329 try {
330 auto pair = default_item_allocate_ex(handle, cookie, key, nbytes,
331 0, // No privileged bytes
332 flags, exptime, datatype, vbucket);
333 return {cb::engine_errc::success, std::move(pair.first)};
334 } catch (const cb::engine_error& error) {
335 return cb::makeEngineErrorItemPair(
336 cb::engine_errc(error.code().value()));
337 }
338 }
339
default_item_allocate_ex(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,size_t nbytes,size_t priv_nbytes,int flags,rel_time_t exptime,uint8_t datatype,uint16_t vbucket)340 static std::pair<cb::unique_item_ptr, item_info> default_item_allocate_ex(
341 gsl::not_null<ENGINE_HANDLE*> handle,
342 gsl::not_null<const void*> cookie,
343 const DocKey& key,
344 size_t nbytes,
345 size_t priv_nbytes,
346 int flags,
347 rel_time_t exptime,
348 uint8_t datatype,
349 uint16_t vbucket) {
350 hash_item *it;
351
352 unsigned int id;
353 struct default_engine* engine = get_handle(handle);
354
355 if (!handled_vbucket(engine, vbucket)) {
356 throw cb::engine_error(cb::engine_errc::not_my_vbucket,
357 "default_item_allocate_ex");
358 }
359
360 size_t ntotal = sizeof(hash_item) + key.size() + nbytes;
361 id = slabs_clsid(engine, ntotal);
362 if (id == 0) {
363 throw cb::engine_error(cb::engine_errc::too_big,
364 "default_item_allocate_ex: no slab class");
365 }
366
367 if ((nbytes - priv_nbytes) > engine->config.item_size_max) {
368 throw cb::engine_error(cb::engine_errc::too_big,
369 "default_item_allocate_ex");
370 }
371
372 it = item_alloc(engine,
373 key.data(),
374 key.size(),
375 flags,
376 engine->server.core->realtime(exptime, cb::NoExpiryLimit),
377 (uint32_t)nbytes,
378 cookie,
379 datatype);
380
381 if (it != NULL) {
382 item_info info;
383 if (!get_item_info(handle, it, &info)) {
384 // This should never happen (unless we provide invalid
385 // arguments)
386 item_release(engine, it);
387 throw cb::engine_error(cb::engine_errc::failed,
388 "default_item_allocate_ex");
389 }
390
391 return std::make_pair(cb::unique_item_ptr(it, cb::ItemDeleter{handle}),
392 info);
393 } else {
394 throw cb::engine_error(cb::engine_errc::no_memory,
395 "default_item_allocate_ex");
396 }
397 }
398
default_item_delete(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,uint64_t & cas,uint16_t vbucket,mutation_descr_t & mut_info)399 static ENGINE_ERROR_CODE default_item_delete(
400 gsl::not_null<ENGINE_HANDLE*> handle,
401 gsl::not_null<const void*> cookie,
402 const DocKey& key,
403 uint64_t& cas,
404 uint16_t vbucket,
405 mutation_descr_t& mut_info) {
406 struct default_engine* engine = get_handle(handle);
407 hash_item* it;
408 uint64_t cas_in = cas;
409 VBUCKET_GUARD(engine, vbucket);
410
411 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
412 do {
413 it = item_get(engine,
414 cookie,
415 key.data(),
416 key.size(),
417 DocStateFilter::Alive);
418 if (it == nullptr) {
419 return ENGINE_KEY_ENOENT;
420 }
421
422 if (it->locktime != 0 &&
423 it->locktime > engine->server.core->get_current_time()) {
424 if (cas_in != it->cas) {
425 item_release(engine, it);
426 return ENGINE_LOCKED;
427 }
428 }
429
430 auto* deleted = item_alloc(engine,
431 key.data(),
432 key.size(),
433 it->flags,
434 it->exptime,
435 0,
436 cookie,
437 PROTOCOL_BINARY_RAW_BYTES);
438
439 if (deleted == NULL) {
440 item_release(engine, it);
441 return ENGINE_TMPFAIL;
442 }
443
444 if (cas_in == 0) {
445 // If the caller specified the "cas wildcard" we should set
446 // the cas for the item we just fetched and do a cas
447 // replace with that value
448 item_set_cas(handle, deleted, it->cas);
449 } else {
450 // The caller specified a specific CAS value so we should
451 // use that value in our cas replace
452 item_set_cas(handle, deleted, cas_in);
453 }
454
455 ret = store_item(engine,
456 deleted,
457 &cas,
458 OPERATION_CAS,
459 cookie,
460 DocumentState::Deleted);
461
462 item_release(engine, it);
463 item_release(engine, deleted);
464
465 // We should only retry for race conditions if the caller specified
466 // cas wildcard
467 } while (ret == ENGINE_KEY_EEXISTS && cas_in == 0);
468
469 // vbucket UUID / seqno arn't supported by default engine, so just return
470 // a hardcoded vbucket uuid, and zero for the sequence number.
471 mut_info.vbucket_uuid = DEFAULT_ENGINE_VBUCKET_UUID;
472 mut_info.seqno = 0;
473
474 return ret;
475 }
476
default_item_release(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<item * > item)477 static void default_item_release(gsl::not_null<ENGINE_HANDLE*> handle,
478 gsl::not_null<item*> item) {
479 item_release(get_handle(handle), get_real_item(item));
480 }
481
default_get(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,uint16_t vbucket,DocStateFilter documentStateFilter)482 static cb::EngineErrorItemPair default_get(gsl::not_null<ENGINE_HANDLE*> handle,
483 gsl::not_null<const void*> cookie,
484 const DocKey& key,
485 uint16_t vbucket,
486 DocStateFilter documentStateFilter) {
487 struct default_engine* engine = get_handle(handle);
488
489 if (!handled_vbucket(engine, vbucket)) {
490 return std::make_pair(
491 cb::engine_errc::not_my_vbucket,
492 cb::unique_item_ptr{nullptr, cb::ItemDeleter{handle}});
493 }
494
495 item* it = item_get(
496 engine, cookie, key.data(), key.size(), documentStateFilter);
497 if (it != nullptr) {
498 return cb::makeEngineErrorItemPair(
499 cb::engine_errc::success, it, handle);
500 } else {
501 return cb::makeEngineErrorItemPair(cb::engine_errc::no_such_key);
502 }
503 }
504
default_get_if(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,uint16_t vbucket,std::function<bool (const item_info &)> filter)505 static cb::EngineErrorItemPair default_get_if(
506 gsl::not_null<ENGINE_HANDLE*> handle,
507 gsl::not_null<const void*> cookie,
508 const DocKey& key,
509 uint16_t vbucket,
510 std::function<bool(const item_info&)> filter) {
511 struct default_engine* engine = get_handle(handle);
512
513 if (!handled_vbucket(engine, vbucket)) {
514 return cb::makeEngineErrorItemPair(cb::engine_errc::not_my_vbucket);
515 }
516
517 cb::unique_item_ptr ret(item_get(engine,
518 cookie,
519 key.data(),
520 key.size(),
521 DocStateFilter::Alive),
522 cb::ItemDeleter{handle});
523 if (!ret) {
524 return cb::makeEngineErrorItemPair(cb::engine_errc::no_such_key);
525 }
526
527 item_info info;
528 if (!get_item_info(handle, ret.get(), &info)) {
529 throw cb::engine_error(cb::engine_errc::failed,
530 "default_get_if: get_item_info failed");
531 }
532
533 if (!filter(info)) {
534 ret.reset(nullptr);
535 }
536
537 return cb::makeEngineErrorItemPair(
538 cb::engine_errc::success, ret.release(), handle);
539 }
540
default_get_and_touch(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,uint16_t vbucket,uint32_t expiry_time)541 static cb::EngineErrorItemPair default_get_and_touch(
542 gsl::not_null<ENGINE_HANDLE*> handle,
543 gsl::not_null<const void*> cookie,
544 const DocKey& key,
545 uint16_t vbucket,
546 uint32_t expiry_time) {
547 struct default_engine* engine = get_handle(handle);
548
549 if (!handled_vbucket(engine, vbucket)) {
550 return cb::makeEngineErrorItemPair(cb::engine_errc::not_my_vbucket);
551 }
552
553 hash_item* it = nullptr;
554 auto ret = item_get_and_touch(
555 engine,
556 cookie,
557 &it,
558 key.data(),
559 key.size(),
560 engine->server.core->realtime(expiry_time, cb::NoExpiryLimit));
561
562 return cb::makeEngineErrorItemPair(
563 cb::engine_errc(ret), reinterpret_cast<item*>(it), handle);
564 }
565
default_get_locked(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,uint16_t vbucket,uint32_t lock_timeout)566 static cb::EngineErrorItemPair default_get_locked(
567 gsl::not_null<ENGINE_HANDLE*> handle,
568 gsl::not_null<const void*> cookie,
569 const DocKey& key,
570 uint16_t vbucket,
571 uint32_t lock_timeout) {
572 auto* engine = get_handle(handle);
573
574 if (!handled_vbucket(engine, vbucket)) {
575 return cb::makeEngineErrorItemPair(cb::engine_errc::not_my_vbucket);
576 }
577
578 // memcached buckets don't offer any way for the user to configure
579 // the lock settings.
580 static const uint32_t default_lock_timeout = 15;
581 static const uint32_t max_lock_timeout = 30;
582
583 if (lock_timeout == 0 || lock_timeout > max_lock_timeout) {
584 lock_timeout = default_lock_timeout;
585 }
586
587 // Convert the lock timeout to an absolute time
588 lock_timeout += engine->server.core->get_current_time();
589
590 hash_item* it = nullptr;
591 auto ret = item_get_locked(engine, cookie, &it, key.data(), key.size(),
592 lock_timeout);
593 return cb::makeEngineErrorItemPair(cb::engine_errc(ret), it, handle);
594 }
595
default_get_meta(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,uint16_t vbucket)596 static cb::EngineErrorMetadataPair default_get_meta(
597 gsl::not_null<ENGINE_HANDLE*> handle,
598 gsl::not_null<const void*> cookie,
599 const DocKey& key,
600 uint16_t vbucket) {
601 auto* engine_handle = get_handle(handle);
602
603 if (!handled_vbucket(engine_handle, vbucket)) {
604 return std::make_pair(cb::engine_errc::not_my_vbucket, item_info());
605 }
606
607 cb::unique_item_ptr item{item_get(engine_handle,
608 cookie,
609 key.data(),
610 key.size(),
611 DocStateFilter::AliveOrDeleted),
612 cb::ItemDeleter(handle)};
613
614 if (!item) {
615 return std::make_pair(cb::engine_errc::no_such_key, item_info());
616 }
617
618 item_info info;
619 if (!get_item_info(handle, item.get(), &info)) {
620 throw cb::engine_error(cb::engine_errc::failed,
621 "default_get_if: get_item_info failed");
622 }
623
624 return std::make_pair(cb::engine_errc::success, info);
625 }
626
default_unlock(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,const DocKey & key,uint16_t vbucket,uint64_t cas)627 static ENGINE_ERROR_CODE default_unlock(gsl::not_null<ENGINE_HANDLE*> handle,
628 gsl::not_null<const void*> cookie,
629 const DocKey& key,
630 uint16_t vbucket,
631 uint64_t cas) {
632 auto* engine = get_handle(handle);
633 VBUCKET_GUARD(engine, vbucket);
634 return item_unlock(engine, cookie, key.data(), key.size(), cas);
635 }
636
default_get_stats(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,cb::const_char_buffer key,ADD_STAT add_stat)637 static ENGINE_ERROR_CODE default_get_stats(gsl::not_null<ENGINE_HANDLE*> handle,
638 gsl::not_null<const void*> cookie,
639 cb::const_char_buffer key,
640 ADD_STAT add_stat) {
641 struct default_engine* engine = get_handle(handle);
642 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
643
644 if (key.empty()) {
645 char val[128];
646 int len;
647
648 cb_mutex_enter(&engine->stats.lock);
649 len = sprintf(val, "%" PRIu64, (uint64_t)engine->stats.evictions);
650 add_stat("evictions", 9, val, len, cookie);
651 len = sprintf(val, "%" PRIu64, (uint64_t)engine->stats.curr_items);
652 add_stat("curr_items", 10, val, len, cookie);
653 len = sprintf(val, "%" PRIu64, (uint64_t)engine->stats.total_items);
654 add_stat("total_items", 11, val, len, cookie);
655 len = sprintf(val, "%" PRIu64, (uint64_t)engine->stats.curr_bytes);
656 add_stat("bytes", 5, val, len, cookie);
657 len = sprintf(val, "%" PRIu64, engine->stats.reclaimed);
658 add_stat("reclaimed", 9, val, len, cookie);
659 len = sprintf(val, "%" PRIu64, (uint64_t)engine->config.maxbytes);
660 add_stat("engine_maxbytes", 15, val, len, cookie);
661 cb_mutex_exit(&engine->stats.lock);
662 } else if (key == "slabs"_ccb) {
663 slabs_stats(engine, add_stat, cookie);
664 } else if (key == "items"_ccb) {
665 item_stats(engine, add_stat, cookie);
666 } else if (key == "sizes"_ccb) {
667 item_stats_sizes(engine, add_stat, cookie);
668 } else if (key == "uuid"_ccb) {
669 if (engine->config.uuid) {
670 add_stat("uuid",
671 4,
672 engine->config.uuid,
673 (uint32_t)strlen(engine->config.uuid),
674 cookie);
675 } else {
676 add_stat("uuid", 4, "", 0, cookie);
677 }
678 } else if (key == "scrub"_ccb) {
679 char val[128];
680 int len;
681
682 cb_mutex_enter(&engine->scrubber.lock);
683 if (engine->scrubber.running) {
684 add_stat("scrubber:status", 15, "running", 7, cookie);
685 } else {
686 add_stat("scrubber:status", 15, "stopped", 7, cookie);
687 }
688
689 if (engine->scrubber.started != 0) {
690 if (engine->scrubber.stopped != 0) {
691 time_t diff =
692 engine->scrubber.started - engine->scrubber.stopped;
693 len = sprintf(val, "%" PRIu64, (uint64_t)diff);
694 add_stat("scrubber:last_run", 17, val, len, cookie);
695 }
696
697 len = sprintf(val, "%" PRIu64, engine->scrubber.visited);
698 add_stat("scrubber:visited", 16, val, len, cookie);
699 len = sprintf(val, "%" PRIu64, engine->scrubber.cleaned);
700 add_stat("scrubber:cleaned", 16, val, len, cookie);
701 }
702 cb_mutex_exit(&engine->scrubber.lock);
703 } else {
704 ret = ENGINE_KEY_ENOENT;
705 }
706
707 return ret;
708 }
709
default_store(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,gsl::not_null<item * > item,uint64_t & cas,ENGINE_STORE_OPERATION operation,DocumentState document_state)710 static ENGINE_ERROR_CODE default_store(gsl::not_null<ENGINE_HANDLE*> handle,
711 gsl::not_null<const void*> cookie,
712 gsl::not_null<item*> item,
713 uint64_t& cas,
714 ENGINE_STORE_OPERATION operation,
715 DocumentState document_state) {
716 auto* engine = get_handle(handle);
717 auto& config = engine->config;
718 auto* it = get_real_item(item);
719
720 if (document_state == DocumentState::Deleted && !config.keep_deleted) {
721 return safe_item_unlink(engine, it);
722 }
723
724 return store_item(engine, it, &cas, operation, cookie, document_state);
725 }
726
default_store_if(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie,gsl::not_null<item * > item,uint64_t cas,ENGINE_STORE_OPERATION operation,cb::StoreIfPredicate predicate,DocumentState document_state)727 static cb::EngineErrorCasPair default_store_if(
728 gsl::not_null<ENGINE_HANDLE*> handle,
729 gsl::not_null<const void*> cookie,
730 gsl::not_null<item*> item,
731 uint64_t cas,
732 ENGINE_STORE_OPERATION operation,
733 cb::StoreIfPredicate predicate,
734 DocumentState document_state) {
735 struct default_engine* engine = get_handle(handle);
736
737 if (predicate) {
738 // Check for an existing item and call the item predicate on it.
739 auto* it = get_real_item(item);
740 auto* key = item_get_key(it);
741 if (!key) {
742 throw cb::engine_error(cb::engine_errc::failed,
743 "default_store_if: item_get_key failed");
744 }
745 cb::unique_item_ptr existing(
746 item_get(engine, cookie, *key, DocStateFilter::Alive),
747 cb::ItemDeleter{handle});
748
749 cb::StoreIfStatus status;
750 if (existing.get()) {
751 item_info info;
752 if (!get_item_info(handle, existing.get(), &info)) {
753 throw cb::engine_error(
754 cb::engine_errc::failed,
755 "default_store_if: get_item_info failed");
756 }
757 status = predicate(info, {true});
758 } else {
759 status = predicate(boost::none, {true});
760 }
761
762 switch (status) {
763 case cb::StoreIfStatus::Fail: {
764 return {cb::engine_errc::predicate_failed, 0};
765 }
766 case cb::StoreIfStatus::Continue:
767 case cb::StoreIfStatus::GetItemInfo: {
768 break;
769 }
770 }
771 }
772
773 auto* it = get_real_item(item);
774 auto status =
775 store_item(engine, it, &cas, operation, cookie, document_state);
776 return {cb::engine_errc(status), cas};
777 }
778
default_flush(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie)779 static ENGINE_ERROR_CODE default_flush(gsl::not_null<ENGINE_HANDLE*> handle,
780 gsl::not_null<const void*> cookie) {
781 item_flush_expired(get_handle(handle));
782
783 return ENGINE_SUCCESS;
784 }
785
default_reset_stats(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const void * > cookie)786 static void default_reset_stats(gsl::not_null<ENGINE_HANDLE*> handle,
787 gsl::not_null<const void*> cookie) {
788 struct default_engine* engine = get_handle(handle);
789 item_stats_reset(engine);
790
791 cb_mutex_enter(&engine->stats.lock);
792 engine->stats.evictions = 0;
793 engine->stats.reclaimed = 0;
794 engine->stats.total_items = 0;
795 cb_mutex_exit(&engine->stats.lock);
796 }
797
initalize_configuration(struct default_engine * se,const char * cfg_str)798 static ENGINE_ERROR_CODE initalize_configuration(struct default_engine *se,
799 const char *cfg_str) {
800 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
801
802 se->config.vb0 = true;
803
804 if (cfg_str != NULL) {
805 struct config_item items[13];
806 int ii = 0;
807
808 memset(&items, 0, sizeof(items));
809 items[ii].key = "verbose";
810 items[ii].datatype = DT_SIZE;
811 items[ii].value.dt_size = &se->config.verbose;
812 ++ii;
813
814 items[ii].key = "eviction";
815 items[ii].datatype = DT_BOOL;
816 items[ii].value.dt_bool = &se->config.evict_to_free;
817 ++ii;
818
819 items[ii].key = "cache_size";
820 items[ii].datatype = DT_SIZE;
821 items[ii].value.dt_size = &se->config.maxbytes;
822 ++ii;
823
824 items[ii].key = "preallocate";
825 items[ii].datatype = DT_BOOL;
826 items[ii].value.dt_bool = &se->config.preallocate;
827 ++ii;
828
829 items[ii].key = "factor";
830 items[ii].datatype = DT_FLOAT;
831 items[ii].value.dt_float = &se->config.factor;
832 ++ii;
833
834 items[ii].key = "chunk_size";
835 items[ii].datatype = DT_SIZE;
836 items[ii].value.dt_size = &se->config.chunk_size;
837 ++ii;
838
839 items[ii].key = "item_size_max";
840 items[ii].datatype = DT_SIZE;
841 items[ii].value.dt_size = &se->config.item_size_max;
842 ++ii;
843
844 items[ii].key = "ignore_vbucket";
845 items[ii].datatype = DT_BOOL;
846 items[ii].value.dt_bool = &se->config.ignore_vbucket;
847 ++ii;
848
849 items[ii].key = "vb0";
850 items[ii].datatype = DT_BOOL;
851 items[ii].value.dt_bool = &se->config.vb0;
852 ++ii;
853
854 items[ii].key = "config_file";
855 items[ii].datatype = DT_CONFIGFILE;
856 ++ii;
857
858 items[ii].key = "uuid";
859 items[ii].datatype = DT_STRING;
860 items[ii].value.dt_string = &se->config.uuid;
861 ++ii;
862
863 items[ii].key = "keep_deleted";
864 items[ii].datatype = DT_BOOL;
865 items[ii].value.dt_bool = &se->config.keep_deleted;
866 ++ii;
867
868 items[ii].key = NULL;
869 ++ii;
870 cb_assert(ii == 13);
871 ret = ENGINE_ERROR_CODE(se->server.core->parse_config(cfg_str,
872 items,
873 stderr));
874 }
875
876 if (se->config.vb0) {
877 set_vbucket_state(se, 0, vbucket_state_active);
878 }
879
880 return ret;
881 }
882
set_vbucket(struct default_engine * e,const void * cookie,protocol_binary_request_set_vbucket * req,ADD_RESPONSE response)883 static bool set_vbucket(struct default_engine *e,
884 const void* cookie,
885 protocol_binary_request_set_vbucket *req,
886 ADD_RESPONSE response) {
887 vbucket_state_t state;
888 size_t bodylen = ntohl(req->message.header.request.bodylen)
889 - ntohs(req->message.header.request.keylen);
890 if (bodylen != sizeof(vbucket_state_t)) {
891 const char *msg = "Incorrect packet format";
892 return response(NULL, 0, NULL, 0, msg, (uint32_t)strlen(msg),
893 PROTOCOL_BINARY_RAW_BYTES,
894 PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
895 }
896 memcpy(&state, &req->message.body.state, sizeof(state));
897 state = vbucket_state_t(ntohl(state));
898
899 if (!is_valid_vbucket_state_t(state)) {
900 const char *msg = "Invalid vbucket state";
901 return response(NULL, 0, NULL, 0, msg, (uint32_t)strlen(msg),
902 PROTOCOL_BINARY_RAW_BYTES,
903 PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
904 }
905
906 set_vbucket_state(e, ntohs(req->message.header.request.vbucket), state);
907 return response(NULL, 0, NULL, 0, &state, sizeof(state),
908 PROTOCOL_BINARY_RAW_BYTES,
909 PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
910 }
911
get_vbucket(struct default_engine * e,const void * cookie,protocol_binary_request_get_vbucket * req,ADD_RESPONSE response)912 static bool get_vbucket(struct default_engine *e,
913 const void* cookie,
914 protocol_binary_request_get_vbucket *req,
915 ADD_RESPONSE response) {
916 vbucket_state_t state;
917 state = get_vbucket_state(e, ntohs(req->message.header.request.vbucket));
918 state = vbucket_state_t(ntohl(state));
919
920 return response(NULL, 0, NULL, 0, &state, sizeof(state),
921 PROTOCOL_BINARY_RAW_BYTES,
922 PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
923 }
924
rm_vbucket(struct default_engine * e,const void * cookie,protocol_binary_request_header * req,ADD_RESPONSE response)925 static bool rm_vbucket(struct default_engine *e,
926 const void *cookie,
927 protocol_binary_request_header *req,
928 ADD_RESPONSE response) {
929 set_vbucket_state(e, ntohs(req->request.vbucket), vbucket_state_dead);
930 return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
931 PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
932 }
933
scrub_cmd(struct default_engine * e,const void * cookie,protocol_binary_request_header * request,ADD_RESPONSE response)934 static bool scrub_cmd(struct default_engine *e,
935 const void *cookie,
936 protocol_binary_request_header *request,
937 ADD_RESPONSE response) {
938
939 protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
940 if (!item_start_scrub(e)) {
941 res = PROTOCOL_BINARY_RESPONSE_EBUSY;
942 }
943
944 return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
945 res, 0, cookie);
946 }
947
948 /**
949 * set_param only added to allow per bucket xattr on/off
950 * and toggle between compression modes for testing purposes
951 */
set_param(struct default_engine * e,const void * cookie,protocol_binary_request_set_param * req,ADD_RESPONSE response)952 static bool set_param(struct default_engine* e,
953 const void* cookie,
954 protocol_binary_request_set_param* req,
955 ADD_RESPONSE response) {
956 size_t keylen = ntohs(req->message.header.request.keylen);
957 uint8_t extlen = req->message.header.request.extlen;
958 size_t vallen = ntohl(req->message.header.request.bodylen);
959 protocol_binary_engine_param_t paramtype =
960 static_cast<protocol_binary_engine_param_t>(
961 ntohl(req->message.body.param_type));
962
963 if (keylen == 0 || (vallen - keylen - extlen) == 0) {
964 return false;
965 }
966
967 // Only support protocol_binary_engine_param_flush with xattr_enabled
968 if (paramtype == protocol_binary_engine_param_flush) {
969 const char* keyp =
970 reinterpret_cast<const char*>(req->bytes) + sizeof(req->bytes);
971 const char* valuep = keyp + keylen;
972 vallen -= (keylen + extlen);
973 cb::const_char_buffer key(keyp, keylen);
974 cb::const_char_buffer value(valuep, vallen);
975
976 if (key == "xattr_enabled") {
977 if (value == "true") {
978 e->config.xattr_enabled = true;
979 } else if (value == "false") {
980 e->config.xattr_enabled = false;
981 } else {
982 return false;
983 }
984 } else if (key == "compression_mode") {
985 try {
986 e->config.compression_mode = parseCompressionMode(
987 std::string(value.data(), value.size()));
988 } catch (std::invalid_argument&) {
989 return false;
990 }
991 } else if (key == "min_compression_ratio") {
992 std::string value_str{value.data(), value.size()};
993 float min_comp_ratio;
994 if (!safe_strtof(value_str.c_str(), min_comp_ratio)) {
995 return false;
996 }
997
998 e->config.min_compression_ratio = min_comp_ratio;
999 }
1000
1001 return response(NULL,
1002 0,
1003 NULL,
1004 0,
1005 NULL,
1006 0,
1007 PROTOCOL_BINARY_RAW_BYTES,
1008 PROTOCOL_BINARY_RESPONSE_SUCCESS,
1009 0,
1010 cookie);
1011 }
1012 return false;
1013 }
1014
default_unknown_command(gsl::not_null<ENGINE_HANDLE * > handle,const void * cookie,gsl::not_null<protocol_binary_request_header * > request,ADD_RESPONSE response,DocNamespace doc_namespace)1015 static ENGINE_ERROR_CODE default_unknown_command(
1016 gsl::not_null<ENGINE_HANDLE*> handle,
1017 const void* cookie,
1018 gsl::not_null<protocol_binary_request_header*> request,
1019 ADD_RESPONSE response,
1020 DocNamespace doc_namespace) {
1021 struct default_engine* e = get_handle(handle);
1022 bool sent;
1023
1024 switch(request->request.opcode) {
1025 case PROTOCOL_BINARY_CMD_SCRUB:
1026 sent = scrub_cmd(e, cookie, request, response);
1027 break;
1028 case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1029 sent = rm_vbucket(e, cookie, request, response);
1030 break;
1031 case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1032 sent = set_vbucket(
1033 e,
1034 cookie,
1035 reinterpret_cast<protocol_binary_request_set_vbucket*>(
1036 request.get()),
1037 response);
1038 break;
1039 case PROTOCOL_BINARY_CMD_GET_VBUCKET:
1040 sent = get_vbucket(
1041 e,
1042 cookie,
1043 reinterpret_cast<protocol_binary_request_get_vbucket*>(
1044 request.get()),
1045 response);
1046 break;
1047 case PROTOCOL_BINARY_CMD_SET_PARAM:
1048 sent = set_param(e,
1049 cookie,
1050 reinterpret_cast<protocol_binary_request_set_param*>(
1051 request.get()),
1052 response);
1053 break;
1054 default:
1055 sent = response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
1056 PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, 0, cookie);
1057 break;
1058 }
1059
1060 if (sent) {
1061 return ENGINE_SUCCESS;
1062 } else {
1063 return ENGINE_FAILED;
1064 }
1065 }
1066
item_set_cas(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<item * > item,uint64_t val)1067 void item_set_cas(gsl::not_null<ENGINE_HANDLE*> handle,
1068 gsl::not_null<item*> item,
1069 uint64_t val) {
1070 hash_item* it = get_real_item(item);
1071 it->cas = val;
1072 }
1073
item_set_datatype(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<item * > item,protocol_binary_datatype_t val)1074 static void item_set_datatype(gsl::not_null<ENGINE_HANDLE*> handle,
1075 gsl::not_null<item*> item,
1076 protocol_binary_datatype_t val) {
1077 auto* it = reinterpret_cast<hash_item*>(item.get());
1078 it->datatype = val;
1079 }
1080
item_get_key(const hash_item * item)1081 hash_key* item_get_key(const hash_item* item)
1082 {
1083 const char *ret = reinterpret_cast<const char*>(item + 1);
1084 return (hash_key*)ret;
1085 }
1086
item_get_data(const hash_item * item)1087 char* item_get_data(const hash_item* item)
1088 {
1089 const hash_key* key = item_get_key(item);
1090 return ((char*)key->header.full_key) + hash_key_get_key_len(key);
1091 }
1092
get_item_info(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<const item * > item,gsl::not_null<item_info * > item_info)1093 static bool get_item_info(gsl::not_null<ENGINE_HANDLE*> handle,
1094 gsl::not_null<const item*> item,
1095 gsl::not_null<item_info*> item_info) {
1096 auto* it = reinterpret_cast<const hash_item*>(item.get());
1097 const hash_key* key = item_get_key(it);
1098 auto* engine = get_handle(handle);
1099
1100 // This may potentially open up for a race, but:
1101 // 1) If the item isn't linked anymore we don't need to mask
1102 // the CAS anymore. (if the client tries to use that
1103 // CAS it'll fail with an invalid cas)
1104 // 2) In production the memcached buckets don't use the
1105 // ZOMBIE state (and if we start doing that, it is only
1106 // the owner of the item pointer (the one bumping the
1107 // refcount initially) which would change this. Anyone else
1108 // would create a new item object and set the iflag
1109 // to deleted.
1110 const auto iflag = it->iflag.load(std::memory_order_relaxed);
1111
1112 if ((iflag & ITEM_LINKED) && it->locktime != 0 &&
1113 it->locktime > engine->server.core->get_current_time()) {
1114 // This object is locked. According to docs/Document.md we should
1115 // return -1 in such cases to hide the real CAS for the other clients
1116 // (Note the check on ITEM_LINKED.. for the actual item returned by
1117 // get_locked we return an item which isn't linked (copy of the
1118 // linked item) to allow returning the real CAS.
1119 item_info->cas = uint64_t(-1);
1120 } else {
1121 item_info->cas = it->cas;
1122 }
1123
1124 item_info->vbucket_uuid = DEFAULT_ENGINE_VBUCKET_UUID;
1125 item_info->seqno = 0;
1126 if (it->exptime == 0) {
1127 item_info->exptime = 0;
1128 } else {
1129 item_info->exptime = engine->server.core->abstime(it->exptime);
1130 }
1131 item_info->nbytes = it->nbytes;
1132 item_info->flags = it->flags;
1133 item_info->nkey = hash_key_get_client_key_len(key);
1134 item_info->key = hash_key_get_client_key(key);
1135 item_info->value[0].iov_base = item_get_data(it);
1136 item_info->value[0].iov_len = it->nbytes;
1137 item_info->datatype = it->datatype;
1138 if (iflag & ITEM_ZOMBIE) {
1139 item_info->document_state = DocumentState::Deleted;
1140 } else {
1141 item_info->document_state = DocumentState::Alive;
1142 }
1143 return true;
1144 }
1145
set_item_info(gsl::not_null<ENGINE_HANDLE * > handle,gsl::not_null<item * > item,gsl::not_null<const item_info * > itm_info)1146 static bool set_item_info(gsl::not_null<ENGINE_HANDLE*> handle,
1147 gsl::not_null<item*> item,
1148 gsl::not_null<const item_info*> itm_info) {
1149 auto* it = reinterpret_cast<hash_item*>(item.get());
1150 it->datatype = itm_info->datatype;
1151 return true;
1152 }
1153
is_xattr_supported(gsl::not_null<ENGINE_HANDLE * > handle)1154 static bool is_xattr_supported(gsl::not_null<ENGINE_HANDLE*> handle) {
1155 return get_handle(handle)->config.xattr_enabled;
1156 }
1157
get_compression_mode(gsl::not_null<ENGINE_HANDLE * > handle)1158 static BucketCompressionMode get_compression_mode(
1159 gsl::not_null<ENGINE_HANDLE*> handle) {
1160 return get_handle(handle)->config.compression_mode;
1161 }
1162
get_min_compression_ratio(gsl::not_null<ENGINE_HANDLE * > handle)1163 static float get_min_compression_ratio(
1164 gsl::not_null<ENGINE_HANDLE*> handle) {
1165 return get_handle(handle)->config.min_compression_ratio;
1166 }
1167
get_max_item_size(gsl::not_null<ENGINE_HANDLE * > handle)1168 static size_t get_max_item_size(
1169 gsl::not_null<ENGINE_HANDLE*> handle) {
1170 return get_handle(handle)->config.item_size_max;
1171 }
1172