1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #ifndef SRC_EP_ENGINE_H_
19 #define SRC_EP_ENGINE_H_ 1
20 
21 #include "config.h"
22 
23 #include <errno.h>
24 
25 #include <algorithm>
26 #include <cstdio>
27 #include <limits>
28 #include <list>
29 #include <map>
30 #include <sstream>
31 #include <string>
32 
33 #include "configuration.h"
34 #include "ep.h"
35 #include "ep-engine/command_ids.h"
36 #include "ext_meta_parser.h"
37 #include "item_pager.h"
38 #include "kvstore.h"
39 #include "locks.h"
40 #include "tapconnection.h"
41 #include "workload.h"
42 
43 #include <JSON_checker.h>
44 
45 class DcpConnMap;
46 class TapConnMap;
47 class TapThrottle;
48 
49 extern "C" {
50     EXPORT_FUNCTION
51     ENGINE_ERROR_CODE create_instance(uint64_t interface,
52                                       GET_SERVER_API get_server_api,
53                                       ENGINE_HANDLE **handle);
54     void EvpNotifyPendingConns(void*arg);
55 }
56 
57 /* We're using notify_io_complete from ptr_fun, but that func
58  * got a "C" linkage that ptr_fun doesn't like... just
59  * cast it away with this typedef ;)
60  */
61 typedef void (*NOTIFY_IO_COMPLETE_T)(const void *cookie,
62                                      ENGINE_ERROR_CODE status);
63 
64 // Forward decl
65 class EventuallyPersistentEngine;
66 class TapConnMap;
67 
68 /**
69  * Vbucket visitor that counts active vbuckets.
70  */
71 class VBucketCountVisitor : public VBucketVisitor {
72 public:
VBucketCountVisitor(EventuallyPersistentEngine &e, vbucket_state_t state)73     VBucketCountVisitor(EventuallyPersistentEngine &e,
74                         vbucket_state_t state) :
75         engine(e),
76         desired_state(state), numItems(0),
77         numTempItems(0),nonResident(0),
78         numVbucket(0), htMemory(0),
79         htItemMemory(0), htCacheSize(0),
80         numEjects(0), numExpiredItems(0),
81         metaDataMemory(0), metaDataDisk(0),
82         opsCreate(0),
83         opsUpdate(0), opsDelete(0),
84         opsReject(0), queueSize(0),
85         queueMemory(0), queueAge(0),
86         queueFill(0), queueDrain(0),
87         pendingWrites(0), chkPersistRemaining(0),
88         fileSpaceUsed(0), fileSize(0)
89     { }
90 
91     bool visitBucket(RCPtr<VBucket> &vb);
92 
visit(StoredValue* v)93     void visit(StoredValue* v) {
94         (void)v;
95         cb_assert(false); // this does not happen
96     }
97 
getVBucketState()98     vbucket_state_t getVBucketState() { return desired_state; }
99 
getNumItems()100     size_t getNumItems() { return numItems; }
101 
getNumTempItems()102     size_t getNumTempItems() { return numTempItems; }
103 
getNonResident()104     size_t getNonResident() { return nonResident; }
105 
getVBucketNumber()106     size_t getVBucketNumber() { return numVbucket; }
107 
getMemResidentPer()108     size_t getMemResidentPer() {
109         size_t numResident = numItems - nonResident;
110         return (numItems != 0) ? (size_t) (numResident *100.0) / (numItems) : 100;
111     }
112 
getEjects()113     size_t getEjects() { return numEjects; }
114 
getExpired()115     size_t getExpired() { return numExpiredItems; }
116 
getMetaDataMemory()117     size_t getMetaDataMemory() { return metaDataMemory; }
118 
getMetaDataDisk()119     size_t getMetaDataDisk() { return metaDataDisk; }
120 
getHashtableMemory()121     size_t getHashtableMemory() { return htMemory; }
122 
getItemMemory()123     size_t getItemMemory() { return htItemMemory; }
getCacheSize()124     size_t getCacheSize() { return htCacheSize; }
125 
getOpsCreate()126     size_t getOpsCreate() { return opsCreate; }
getOpsUpdate()127     size_t getOpsUpdate() { return opsUpdate; }
getOpsDelete()128     size_t getOpsDelete() { return opsDelete; }
getOpsReject()129     size_t getOpsReject() { return opsReject; }
130 
getQueueSize()131     size_t getQueueSize() { return queueSize; }
getQueueMemory()132     size_t getQueueMemory() { return queueMemory; }
getQueueFill()133     size_t getQueueFill() { return queueFill; }
getQueueDrain()134     size_t getQueueDrain() { return queueDrain; }
getAge()135     uint64_t getAge() { return queueAge; }
getPendingWrites()136     size_t getPendingWrites() { return pendingWrites; }
getChkPersistRemaining()137     size_t getChkPersistRemaining() { return chkPersistRemaining; }
138 
getFileSpaceUsed()139     size_t getFileSpaceUsed() { return fileSpaceUsed; }
getFileSize()140     size_t getFileSize() { return fileSize; }
141 
142 private:
143     EventuallyPersistentEngine &engine;
144     vbucket_state_t desired_state;
145 
146     size_t numItems;
147     size_t numTempItems;
148     size_t nonResident;
149     size_t numVbucket;
150     size_t htMemory;
151     size_t htItemMemory;
152     size_t htCacheSize;
153     size_t numEjects;
154     size_t numExpiredItems;
155     size_t metaDataMemory;
156     size_t metaDataDisk;
157 
158     size_t opsCreate;
159     size_t opsUpdate;
160     size_t opsDelete;
161     size_t opsReject;
162 
163     size_t queueSize;
164     size_t queueMemory;
165     uint64_t queueAge;
166     size_t queueFill;
167     size_t queueDrain;
168     size_t pendingWrites;
169     size_t chkPersistRemaining;
170 
171     size_t fileSpaceUsed;
172     size_t fileSize;
173 };
174 
175 /**
176  * memcached engine interface to the EventuallyPersistentStore.
177  */
178 class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
179     friend class LookupCallback;
180 public:
181     ENGINE_ERROR_CODE initialize(const char* config);
182     void destroy(bool force);
183 
itemAllocate(const void* cookie, item** itm, const void* key, const size_t nkey, const size_t nbytes, const int flags, const rel_time_t exptime, uint8_t datatype)184     ENGINE_ERROR_CODE itemAllocate(const void* cookie,
185                                    item** itm,
186                                    const void* key,
187                                    const size_t nkey,
188                                    const size_t nbytes,
189                                    const int flags,
190                                    const rel_time_t exptime,
191                                    uint8_t datatype)
192     {
193         (void)cookie;
194         if (nbytes > maxItemSize) {
195             return ENGINE_E2BIG;
196         }
197 
198         if(!hasAvailableSpace(sizeof(Item) + sizeof(Blob) + nkey + nbytes)) {
199             return memoryCondition();
200         }
201 
202         time_t expiretime = (exptime == 0) ? 0 : ep_abs_time(ep_reltime(exptime));
203 
204         uint8_t ext_meta[1];
205         uint8_t ext_len = EXT_META_LEN;
206         *(ext_meta) = datatype;
207         *itm = new Item(key, nkey, flags, expiretime, NULL, nbytes, ext_meta,
208                         ext_len);
209         if (*itm == NULL) {
210             return memoryCondition();
211         } else {
212             stats.itemAllocSizeHisto.add(nbytes);
213             return ENGINE_SUCCESS;
214         }
215     }
216 
itemDelete(const void* cookie, const void* key, const size_t nkey, uint64_t* cas, uint16_t vbucket, mutation_descr_t *mut_info)217     ENGINE_ERROR_CODE itemDelete(const void* cookie,
218                                  const void* key,
219                                  const size_t nkey,
220                                  uint64_t* cas,
221                                  uint16_t vbucket,
222                                  mutation_descr_t *mut_info)
223     {
224         std::string k(static_cast<const char*>(key), nkey);
225         return itemDelete(cookie, k, cas, vbucket, mut_info);
226     }
227 
itemDelete(const void* cookie, const std::string &key, uint64_t* cas, uint16_t vbucket, mutation_descr_t *mut_info)228     ENGINE_ERROR_CODE itemDelete(const void* cookie,
229                                  const std::string &key,
230                                  uint64_t* cas,
231                                  uint16_t vbucket,
232                                  mutation_descr_t *mut_info)
233     {
234         ENGINE_ERROR_CODE ret = epstore->deleteItem(key, cas,
235                                                     vbucket, cookie,
236                                                     false, // not force
237                                                     NULL, mut_info);
238 
239         if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
240             if (isDegradedMode()) {
241                 return ENGINE_TMPFAIL;
242             }
243         } else if (ret == ENGINE_SUCCESS) {
244             ++stats.numOpsDelete;
245         }
246         return ret;
247     }
248 
249 
itemRelease(const void* cookie, item *itm)250     void itemRelease(const void* cookie, item *itm)
251     {
252         (void)cookie;
253         delete (Item*)itm;
254     }
255 
get(const void* cookie, item** itm, const void* key, const int nkey, uint16_t vbucket, bool track_stat = false)256     ENGINE_ERROR_CODE get(const void* cookie,
257                           item** itm,
258                           const void* key,
259                           const int nkey,
260                           uint16_t vbucket,
261                           bool track_stat = false)
262     {
263         BlockTimer timer(&stats.getCmdHisto);
264         std::string k(static_cast<const char*>(key), nkey);
265 
266         GetValue gv(epstore->get(k, vbucket, cookie, serverApi->core));
267         ENGINE_ERROR_CODE ret = gv.getStatus();
268 
269         if (ret == ENGINE_SUCCESS) {
270             *itm = gv.getValue();
271             if (track_stat) {
272                 ++stats.numOpsGet;
273             }
274         } else if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
275             if (isDegradedMode()) {
276                 return ENGINE_TMPFAIL;
277             }
278         }
279 
280         return ret;
281     }
282 
getName()283     const char* getName() {
284         return name.c_str();
285     }
286 
287     ENGINE_ERROR_CODE getStats(const void* cookie,
288                                const char* stat_key,
289                                int nkey,
290                                ADD_STAT add_stat);
291 
resetStats()292     void resetStats() {
293         stats.reset();
294         if (epstore) {
295             epstore->resetUnderlyingStats();
296         }
297     }
298 
299     ENGINE_ERROR_CODE store(const void *cookie,
300                             item* itm,
301                             uint64_t *cas,
302                             ENGINE_STORE_OPERATION operation,
303                             uint16_t vbucket);
304 
arithmetic(const void* cookie, const void* key, const int nkey, const bool increment, const bool create, const uint64_t delta, const uint64_t initial, const rel_time_t exptime, item **ret_itm, uint8_t datatype, uint64_t *result, uint16_t vbucket)305     ENGINE_ERROR_CODE arithmetic(const void* cookie,
306                                  const void* key,
307                                  const int nkey,
308                                  const bool increment,
309                                  const bool create,
310                                  const uint64_t delta,
311                                  const uint64_t initial,
312                                  const rel_time_t exptime,
313                                  item **ret_itm,
314                                  uint8_t datatype,
315                                  uint64_t *result,
316                                  uint16_t vbucket)
317     {
318         BlockTimer timer(&stats.arithCmdHisto);
319         item *it = NULL;
320         Item *nit = NULL;
321         uint64_t cas = 0;
322 
323         uint8_t ext_meta[1];
324         uint8_t ext_len = EXT_META_LEN;
325         //Datatype of arithmetic values is always JSON
326         *(ext_meta) = PROTOCOL_BINARY_DATATYPE_JSON;
327 
328         rel_time_t expiretime = (exptime == 0 ||
329                                  exptime == 0xffffffff) ?
330             0 : ep_abs_time(ep_reltime(exptime));
331 
332         ENGINE_ERROR_CODE ret = get(cookie, &it, key, nkey, vbucket);
333         if (ret == ENGINE_SUCCESS) {
334             Item *itm = static_cast<Item*>(it);
335             char *endptr = NULL;
336             char data[24];
337             size_t len = std::min(static_cast<uint32_t>(sizeof(data) - 1),
338                                   itm->getNBytes());
339             data[len] = 0;
340             memcpy(data, itm->getData(), len);
341             uint64_t val = strtoull(data, &endptr, 10);
342             if (itm->getCas() == (uint64_t) -1) {
343                 // item is locked, can't perform arithmetic operation
344                 delete itm;
345                 return ENGINE_TMPFAIL;
346             }
347             cb_assert(itm->getCas());
348             if ((errno != ERANGE) && (isspace(*endptr)
349                                       || (*endptr == '\0' && endptr != data))) {
350                 if (increment) {
351                     val += delta;
352                 } else {
353                     if (delta > val) {
354                         val = 0;
355                     } else {
356                         val -= delta;
357                     }
358                 }
359 
360                 std::stringstream vals;
361                 vals << val;
362                 size_t nb = vals.str().length();
363                 *result = val;
364 
365                 nit = new Item(key, (uint16_t)nkey, itm->getFlags(),
366                                      itm->getExptime(), vals.str().c_str(), nb,
367                                      ext_meta, ext_len);
368                 nit->setCas(itm->getCas());
369                 ret = store(cookie, nit, &cas, OPERATION_CAS, vbucket);
370             } else {
371                 ret = ENGINE_EINVAL;
372             }
373 
374             delete itm;
375         } else if (ret == ENGINE_NOT_MY_VBUCKET) {
376             return isDegradedMode() ? ENGINE_TMPFAIL: ret;
377         } else if (ret == ENGINE_KEY_ENOENT) {
378             if (isDegradedMode()) {
379                 return ENGINE_TMPFAIL;
380             }
381             if (create) {
382                 std::stringstream vals;
383                 vals << initial;
384                 size_t nb = vals.str().length();
385                 *result = initial;
386 
387                 nit = new Item(key, (uint16_t)nkey, 0, expiretime,
388                                      vals.str().c_str(), nb, ext_meta, ext_len);
389                 ret = store(cookie, nit, &cas, OPERATION_ADD, vbucket);
390             }
391         }
392 
393         /* We had a race condition.. just call ourself recursively to retry */
394         if ((ret == ENGINE_KEY_EEXISTS) || (ret == ENGINE_NOT_STORED)) {
395             delete nit;
396             return arithmetic(cookie, key, nkey, increment, create, delta,
397                               initial, expiretime, ret_itm, datatype, result,
398                               vbucket);
399         } else if (ret == ENGINE_SUCCESS) {
400             *ret_itm = nit;
401             ++stats.numOpsStore;
402         } else {
403             delete nit;
404         }
405 
406         return ret;
407     }
408 
409 
410     ENGINE_ERROR_CODE flush(const void *cookie, time_t when);
411 
412     uint16_t walkTapQueue(const void *cookie, item **itm, void **es,
413                           uint16_t *nes, uint8_t *ttl, uint16_t *flags,
414                           uint32_t *seqno, uint16_t *vbucket);
415 
416     bool createTapQueue(const void *cookie,
417                         std::string &client,
418                         uint32_t flags,
419                         const void *userdata,
420                         size_t nuserdata);
421 
422     ENGINE_ERROR_CODE tapNotify(const void *cookie,
423                                 void *engine_specific,
424                                 uint16_t nengine,
425                                 uint8_t ttl,
426                                 uint16_t tap_flags,
427                                 uint16_t tap_event,
428                                 uint32_t tap_seqno,
429                                 const void *key,
430                                 size_t nkey,
431                                 uint32_t flags,
432                                 uint32_t exptime,
433                                 uint64_t cas,
434                                 uint8_t datatype,
435                                 const void *data,
436                                 size_t ndata,
437                                 uint16_t vbucket);
438 
439     ENGINE_ERROR_CODE dcpOpen(const void* cookie,
440                               uint32_t opaque,
441                               uint32_t seqno,
442                               uint32_t flags,
443                               void *stream_name,
444                               uint16_t nname);
445 
446     ENGINE_ERROR_CODE dcpAddStream(const void* cookie,
447                                    uint32_t opaque,
448                                    uint16_t vbucket,
449                                    uint32_t flags);
450 
451     ENGINE_ERROR_CODE ConnHandlerCheckPoint(TapConsumer *consumer,
452                                             uint8_t event,
453                                             uint16_t vbucket,
454                                             uint64_t checkpointId);
455 
456     ENGINE_ERROR_CODE touch(const void* cookie,
457                             protocol_binary_request_header *request,
458                             ADD_RESPONSE response);
459 
460     ENGINE_ERROR_CODE getMeta(const void* cookie,
461                               protocol_binary_request_get_meta *request,
462                               ADD_RESPONSE response);
463 
464     ENGINE_ERROR_CODE setWithMeta(const void* cookie,
465                                  protocol_binary_request_set_with_meta *request,
466                                  ADD_RESPONSE response);
467 
468     ENGINE_ERROR_CODE deleteWithMeta(const void* cookie,
469                               protocol_binary_request_delete_with_meta *request,
470                               ADD_RESPONSE response);
471 
472     ENGINE_ERROR_CODE returnMeta(const void* cookie,
473                                  protocol_binary_request_return_meta *request,
474                                  ADD_RESPONSE response);
475 
476     ENGINE_ERROR_CODE setClusterConfig(const void* cookie,
477                             protocol_binary_request_set_cluster_config *request,
478                             ADD_RESPONSE response);
479 
480     ENGINE_ERROR_CODE getClusterConfig(const void* cookie,
481                             protocol_binary_request_get_cluster_config *request,
482                             ADD_RESPONSE response);
483 
484     ENGINE_ERROR_CODE getAllKeys(const void* cookie,
485                                 protocol_binary_request_get_keys *request,
486                                 ADD_RESPONSE response);
487 
488     ENGINE_ERROR_CODE getAdjustedTime(const void* cookie,
489                              protocol_binary_request_get_adjusted_time *request,
490                              ADD_RESPONSE response);
491 
492     ENGINE_ERROR_CODE setDriftCounterState(const void* cookie,
493                        protocol_binary_request_set_drift_counter_state *request,
494                        ADD_RESPONSE response);
495 
496     /**
497      * Visit the objects and add them to the tap/dcp connecitons queue.
498      * @todo this code should honor the backfill time!
499      */
500     void queueBackfill(const VBucketFilter &backfillVBFilter, Producer *tc);
501 
setDCPPriority(const void* cookie, CONN_PRIORITY priority)502     void setDCPPriority(const void* cookie, CONN_PRIORITY priority) {
503         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
504         serverApi->cookie->set_priority(cookie, priority);
505         ObjectRegistry::onSwitchThread(epe);
506     }
507 
notifyIOComplete(const void *cookie, ENGINE_ERROR_CODE status)508     void notifyIOComplete(const void *cookie, ENGINE_ERROR_CODE status) {
509         if (cookie == NULL) {
510             LOG(EXTENSION_LOG_WARNING, "Tried to signal a NULL cookie!");
511         } else {
512             BlockTimer bt(&stats.notifyIOHisto);
513             EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
514             serverApi->cookie->notify_io_complete(cookie, status);
515             ObjectRegistry::onSwitchThread(epe);
516         }
517     }
518 
519     ENGINE_ERROR_CODE reserveCookie(const void *cookie);
520     ENGINE_ERROR_CODE releaseCookie(const void *cookie);
521 
storeEngineSpecific(const void *cookie, void *engine_data)522     void storeEngineSpecific(const void *cookie, void *engine_data) {
523         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
524         serverApi->cookie->store_engine_specific(cookie, engine_data);
525         ObjectRegistry::onSwitchThread(epe);
526     }
527 
getEngineSpecific(const void *cookie)528     void *getEngineSpecific(const void *cookie) {
529         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
530         void *engine_data = serverApi->cookie->get_engine_specific(cookie);
531         ObjectRegistry::onSwitchThread(epe);
532         return engine_data;
533     }
534 
isDatatypeSupported(const void *cookie)535     bool isDatatypeSupported(const void *cookie) {
536         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
537         bool isSupported = serverApi->cookie->is_datatype_supported(cookie);
538         ObjectRegistry::onSwitchThread(epe);
539         return isSupported;
540     }
541 
isMutationExtrasSupported(const void *cookie)542     bool isMutationExtrasSupported(const void *cookie) {
543         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
544         bool isSupported = serverApi->cookie->is_mutation_extras_supported(cookie);
545         ObjectRegistry::onSwitchThread(epe);
546         return isSupported;
547     }
548 
getOpcodeIfEwouldblockSet(const void *cookie)549     uint8_t getOpcodeIfEwouldblockSet(const void *cookie) {
550         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
551         uint8_t opcode = serverApi->cookie->get_opcode_if_ewouldblock_set(cookie);
552         ObjectRegistry::onSwitchThread(epe);
553         return opcode;
554     }
555 
validateSessionCas(const uint64_t cas)556     bool validateSessionCas(const uint64_t cas) {
557         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
558         bool ret = serverApi->cookie->validate_session_cas(cas);
559         ObjectRegistry::onSwitchThread(epe);
560         return ret;
561     }
562 
decrementSessionCtr(void)563     void decrementSessionCtr(void) {
564         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
565         serverApi->cookie->decrement_session_ctr();
566         ObjectRegistry::onSwitchThread(epe);
567     }
568 
569     void registerEngineCallback(ENGINE_EVENT_TYPE type,
570                                 EVENT_CALLBACK cb, const void *cb_data);
571 
572     template <typename T>
notifyIOComplete(T cookies, ENGINE_ERROR_CODE status)573     void notifyIOComplete(T cookies, ENGINE_ERROR_CODE status) {
574         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
575         std::for_each(cookies.begin(), cookies.end(),
576                       std::bind2nd(std::ptr_fun((NOTIFY_IO_COMPLETE_T)serverApi->cookie->notify_io_complete),
577                                    status));
578         ObjectRegistry::onSwitchThread(epe);
579     }
580 
581     void handleDisconnect(const void *cookie);
582 
stopFlusher(const char **msg, size_t *msg_size)583     protocol_binary_response_status stopFlusher(const char **msg, size_t *msg_size) {
584         (void) msg_size;
585         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
586         *msg = NULL;
587         if (!epstore->pauseFlusher()) {
588             LOG(EXTENSION_LOG_INFO, "Unable to stop flusher");
589             *msg = "Flusher not running.";
590             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
591         }
592         return rv;
593     }
594 
startFlusher(const char **msg, size_t *msg_size)595     protocol_binary_response_status startFlusher(const char **msg, size_t *msg_size) {
596         (void) msg_size;
597         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
598         *msg = NULL;
599         if (!epstore->resumeFlusher()) {
600             LOG(EXTENSION_LOG_INFO, "Unable to start flusher");
601             *msg = "Flusher not shut down.";
602             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
603         }
604         return rv;
605     }
606 
deleteVBucket(uint16_t vbid, const void* c = NULL)607     ENGINE_ERROR_CODE deleteVBucket(uint16_t vbid, const void* c = NULL) {
608         return epstore->deleteVBucket(vbid, c);
609     }
610 
compactDB(uint16_t vbid, compaction_ctx c, const void *cookie = NULL)611     ENGINE_ERROR_CODE compactDB(uint16_t vbid, compaction_ctx c,
612                                 const void *cookie = NULL) {
613         return epstore->compactDB(vbid, c, cookie);
614     }
615 
resetVBucket(uint16_t vbid)616     bool resetVBucket(uint16_t vbid) {
617         return epstore->resetVBucket(vbid);
618     }
619 
setTapKeepAlive(uint32_t to)620     void setTapKeepAlive(uint32_t to) {
621         configuration.setTapKeepalive((size_t)to);
622     }
623 
setFlushAll(bool enabled)624     void setFlushAll(bool enabled) {
625         flushAllEnabled = enabled;
626     }
627 
evictKey(const std::string &key, uint16_t vbucket, const char **msg, size_t *msg_size)628     protocol_binary_response_status evictKey(const std::string &key,
629                                              uint16_t vbucket,
630                                              const char **msg,
631                                              size_t *msg_size) {
632         return epstore->evictKey(key, vbucket, msg, msg_size);
633     }
634 
getLocked(const std::string &key, uint16_t vbucket, Callback<GetValue> &cb, rel_time_t currentTime, uint32_t lockTimeout, const void *cookie)635     bool getLocked(const std::string &key,
636                    uint16_t vbucket,
637                    Callback<GetValue> &cb,
638                    rel_time_t currentTime,
639                    uint32_t lockTimeout,
640                    const void *cookie) {
641         return epstore->getLocked(key, vbucket, cb, currentTime, lockTimeout, cookie);
642     }
643 
unlockKey(const std::string &key, uint16_t vbucket, uint64_t cas, rel_time_t currentTime)644     ENGINE_ERROR_CODE unlockKey(const std::string &key,
645                                 uint16_t vbucket,
646                                 uint64_t cas,
647                                 rel_time_t currentTime) {
648         return epstore->unlockKey(key, vbucket, cas, currentTime);
649     }
650 
651     ENGINE_ERROR_CODE observe(const void* cookie,
652                               protocol_binary_request_header *request,
653                               ADD_RESPONSE response);
654 
655     ENGINE_ERROR_CODE observe_seqno(const void* cookie,
656                                     protocol_binary_request_header *request,
657                                     ADD_RESPONSE response);
658 
getVBucket(uint16_t vbucket)659     RCPtr<VBucket> getVBucket(uint16_t vbucket) {
660         return epstore->getVBucket(vbucket);
661     }
662 
setVBucketState(uint16_t vbid, vbucket_state_t to, bool transfer)663     ENGINE_ERROR_CODE setVBucketState(uint16_t vbid, vbucket_state_t to, bool transfer) {
664         return epstore->setVBucketState(vbid, to, transfer);
665     }
666 
667     ~EventuallyPersistentEngine();
668 
getInfo()669     engine_info *getInfo() {
670         return &info.info;
671     }
672 
getEpStats()673     EPStats &getEpStats() {
674         return stats;
675     }
676 
getEpStore()677     EventuallyPersistentStore* getEpStore() { return epstore; }
678 
getTapConnMap()679     TapConnMap &getTapConnMap() { return *tapConnMap; }
680 
getDcpConnMap()681     DcpConnMap &getDcpConnMap() { return *dcpConnMap_; }
682 
getTapConfig()683     TapConfig &getTapConfig() { return *tapConfig; }
684 
getTapThrottle()685     TapThrottle &getTapThrottle() { return *tapThrottle; }
686 
getCheckpointConfig()687     CheckpointConfig &getCheckpointConfig() { return *checkpointConfig; }
688 
getServerApi()689     SERVER_HANDLE_V1* getServerApi() { return serverApi; }
690 
getConfiguration()691     Configuration &getConfiguration() {
692         return configuration;
693     }
694 
695     ENGINE_ERROR_CODE deregisterTapClient(const void* cookie,
696                                           protocol_binary_request_header *request,
697                                           ADD_RESPONSE response);
698 
699     ENGINE_ERROR_CODE handleCheckpointCmds(const void* cookie,
700                                            protocol_binary_request_header *request,
701                                            ADD_RESPONSE response);
702 
703     ENGINE_ERROR_CODE handleSeqnoCmds(const void* cookie,
704                                       protocol_binary_request_header *request,
705                                       ADD_RESPONSE response);
706 
707     ENGINE_ERROR_CODE resetReplicationChain(const void* cookie,
708                                             protocol_binary_request_header *request,
709                                             ADD_RESPONSE response);
710 
711     ENGINE_ERROR_CODE changeTapVBFilter(const void* cookie,
712                                         protocol_binary_request_header *request,
713                                         ADD_RESPONSE response);
714 
715     ENGINE_ERROR_CODE handleTrafficControlCmd(const void* cookie,
716                                               protocol_binary_request_header *request,
717                                               ADD_RESPONSE response);
718 
getGetlDefaultTimeout() const719     size_t getGetlDefaultTimeout() const {
720         return getlDefaultTimeout;
721     }
722 
getGetlMaxTimeout() const723     size_t getGetlMaxTimeout() const {
724         return getlMaxTimeout;
725     }
726 
getMaxFailoverEntries() const727     size_t getMaxFailoverEntries() const {
728         return maxFailoverEntries;
729     }
730 
isDegradedMode() const731     bool isDegradedMode() const {
732         return epstore->isWarmingUp() || !trafficEnabled.load();
733     }
734 
getWorkLoadPolicy(void)735     WorkLoadPolicy &getWorkLoadPolicy(void) {
736         return *workload;
737     }
738 
getWorkloadPriority(void)739     bucket_priority_t getWorkloadPriority(void) {return workloadPriority; }
setWorkloadPriority(bucket_priority_t p)740     void setWorkloadPriority(bucket_priority_t p) { workloadPriority = p; }
741 
742     struct clusterConfig {
clusterConfigEventuallyPersistentEngine::clusterConfig743         clusterConfig() : len(0), config(NULL) {}
744         uint32_t len;
745         uint8_t *config;
746         Mutex lock;
747     } clusterConfig;
748 
749     ENGINE_ERROR_CODE getRandomKey(const void *cookie,
750                                    ADD_RESPONSE response);
751 
752     ConnHandler* getConnHandler(const void *cookie);
753 
754     void addLookupAllKeys(const void *cookie, ENGINE_ERROR_CODE err);
755 
756     /*
757      * Explicitly trigger the defragmenter task. Provided to facilitate
758      * testing.
759      */
760     void runDefragmenterTask(void);
761 
762     /**
763      * Get a (sloppy) list of the sequence numbers for all of the vbuckets
764      * on this server. It is not to be treated as a consistent set of seqence,
765      * but rather a list of "at least" numbers. The way the list is generated
766      * is that we're starting for vbucket 0 and record the current number,
767      * then look at the next vbucket and record its number. That means that
768      * at the time we get the number for vbucket X all of the previous
769      * numbers could have been incremented. If the client just needs a list
770      * of where we are for each vbucket this method may be more optimal than
771      * requesting one by one.
772      *
773      * @param cookie The cookie representing the connection to requesting
774      *               list
775      * @param add_response The method used to format the output buffer
776      * @return ENGINE_SUCCESS upon success
777      */
778     ENGINE_ERROR_CODE getAllVBucketSequenceNumbers(
779                                         const void *cookie,
780                                         protocol_binary_request_header *request,
781                                         ADD_RESPONSE response);
782 
783 protected:
784     friend class EpEngineValueChangeListener;
785 
setMaxItemSize(size_t value)786     void setMaxItemSize(size_t value) {
787         maxItemSize = value;
788     }
789 
setGetlDefaultTimeout(size_t value)790     void setGetlDefaultTimeout(size_t value) {
791         getlDefaultTimeout = value;
792     }
793 
setGetlMaxTimeout(size_t value)794     void setGetlMaxTimeout(size_t value) {
795         getlMaxTimeout = value;
796     }
797 
798 private:
799     EventuallyPersistentEngine(GET_SERVER_API get_server_api);
800     friend ENGINE_ERROR_CODE create_instance(uint64_t interface,
801                                              GET_SERVER_API get_server_api,
802                                              ENGINE_HANDLE **handle);
803     uint16_t doWalkTapQueue(const void *cookie, item **itm, void **es,
804                             uint16_t *nes, uint8_t *ttl, uint16_t *flags,
805                             uint32_t *seqno, uint16_t *vbucket,
806                             TapProducer *c, bool &retry);
807 
808 
809     ENGINE_ERROR_CODE processTapAck(const void *cookie,
810                                     uint32_t seqno,
811                                     uint16_t status,
812                                     const std::string &msg);
813 
814     /**
815      * Report the state of a memory condition when out of memory.
816      *
817      * @return ETMPFAIL if we think we can recover without interaction,
818      *         else ENOMEM
819      */
memoryCondition()820     ENGINE_ERROR_CODE memoryCondition() {
821         // Do we think it's possible we could free something?
822         bool haveEvidenceWeCanFreeMemory(stats.getMaxDataSize() > stats.memOverhead);
823         if (haveEvidenceWeCanFreeMemory) {
824             // Look for more evidence by seeing if we have resident items.
825             VBucketCountVisitor countVisitor(*this, vbucket_state_active);
826             epstore->visit(countVisitor);
827 
828             haveEvidenceWeCanFreeMemory = countVisitor.getNonResident() <
829                 countVisitor.getNumItems();
830         }
831         if (haveEvidenceWeCanFreeMemory) {
832             ++stats.tmp_oom_errors;
833             // Wake up the item pager task as memory usage
834             // seems to have exceeded high water mark
835             if ((getEpStore()->fetchItemPagerTask())->getState() ==
836                                                                 TASK_SNOOZED) {
837                 ExecutorPool::get()->wake(
838                         (getEpStore()->fetchItemPagerTask())->getId());
839             }
840             return ENGINE_TMPFAIL;
841         } else {
842             ++stats.oom_errors;
843             return ENGINE_ENOMEM;
844         }
845     }
846 
847     /**
848      * Check if there is any available memory space to allocate an Item
849      * instance with a given size.
850      */
hasAvailableSpace(uint32_t nBytes)851     bool hasAvailableSpace(uint32_t nBytes) {
852         return (stats.getTotalMemoryUsed() + nBytes) <= stats.getMaxDataSize();
853     }
854 
855     friend class BGFetchCallback;
856     friend class EventuallyPersistentStore;
857 
enableTraffic(bool enable)858     bool enableTraffic(bool enable) {
859         bool inverse = !enable;
860         return trafficEnabled.compare_exchange_strong(inverse, enable);
861     }
862 
863     ENGINE_ERROR_CODE doEngineStats(const void *cookie, ADD_STAT add_stat);
864     ENGINE_ERROR_CODE doKlogStats(const void *cookie, ADD_STAT add_stat);
865     ENGINE_ERROR_CODE doMemoryStats(const void *cookie, ADD_STAT add_stat);
866     ENGINE_ERROR_CODE doVBucketStats(const void *cookie, ADD_STAT add_stat,
867                                      const char* stat_key,
868                                      int nkey,
869                                      bool prevStateRequested,
870                                      bool details);
871     ENGINE_ERROR_CODE doHashStats(const void *cookie, ADD_STAT add_stat);
872     ENGINE_ERROR_CODE doCheckpointStats(const void *cookie, ADD_STAT add_stat,
873                                         const char* stat_key, int nkey);
874     ENGINE_ERROR_CODE doTapStats(const void *cookie, ADD_STAT add_stat);
875     ENGINE_ERROR_CODE doDcpStats(const void *cookie, ADD_STAT add_stat);
876     ENGINE_ERROR_CODE doConnAggStats(const void *cookie, ADD_STAT add_stat,
877                                      const char *sep, size_t nsep,
878                                      conn_type_t connType);
879     ENGINE_ERROR_CODE doTimingStats(const void *cookie, ADD_STAT add_stat);
880     ENGINE_ERROR_CODE doSchedulerStats(const void *cookie, ADD_STAT add_stat);
881     ENGINE_ERROR_CODE doRunTimeStats(const void *cookie, ADD_STAT add_stat);
882     ENGINE_ERROR_CODE doDispatcherStats(const void *cookie, ADD_STAT add_stat);
883     ENGINE_ERROR_CODE doKeyStats(const void *cookie, ADD_STAT add_stat,
884                                  uint16_t vbid, std::string &key, bool validate=false);
885     ENGINE_ERROR_CODE doTapVbTakeoverStats(const void *cookie,
886                                            ADD_STAT add_stat,
887                                            std::string& key,
888                                            uint16_t vbid);
889 
890     ENGINE_ERROR_CODE doDcpVbTakeoverStats(const void *cookie,
891                                            ADD_STAT add_stat,
892                                            std::string &key,
893                                            uint16_t vbid);
894     ENGINE_ERROR_CODE doVbIdFailoverLogStats(const void *cookie,
895                                              ADD_STAT add_stat,
896                                              uint16_t vbid);
897     ENGINE_ERROR_CODE doAllFailoverLogStats(const void *cookie, ADD_STAT add_stat);
898     ENGINE_ERROR_CODE doWorkloadStats(const void *cookie, ADD_STAT add_stat);
899     ENGINE_ERROR_CODE doSeqnoStats(const void *cookie, ADD_STAT add_stat,
900                                    const char* stat_key, int nkey);
901     ENGINE_ERROR_CODE doDiskStats(const void *cookie, ADD_STAT add_stat,
902                                   const char* stat_key, int nkey);
903 
addLookupResult(const void *cookie, Item *result)904     void addLookupResult(const void *cookie, Item *result) {
905         LockHolder lh(lookupMutex);
906         std::map<const void*, Item*>::iterator it = lookups.find(cookie);
907         if (it != lookups.end()) {
908             if (it->second != NULL) {
909                 LOG(EXTENSION_LOG_DEBUG,
910                     "Cleaning up old lookup result for '%s'",
911                     it->second->getKey().c_str());
912                 delete it->second;
913             } else {
914                 LOG(EXTENSION_LOG_DEBUG, "Cleaning up old null lookup result");
915             }
916             lookups.erase(it);
917         }
918         lookups[cookie] = result;
919     }
920 
fetchLookupResult(const void *cookie, Item **itm)921     bool fetchLookupResult(const void *cookie, Item **itm) {
922         // This will return *and erase* the lookup result for a connection.
923         // You look it up, you own it.
924         LockHolder lh(lookupMutex);
925         std::map<const void*, Item*>::iterator it = lookups.find(cookie);
926         if (it != lookups.end()) {
927             *itm = it->second;
928             lookups.erase(it);
929             return true;
930         } else {
931             return false;
932         }
933     }
934 
935     // Get the current tap connection for this cookie.
936     // If this method returns NULL, you should return TAP_DISCONNECT
937     TapProducer* getTapProducer(const void *cookie);
938 
939     SERVER_HANDLE_V1 *serverApi;
940     EventuallyPersistentStore *epstore;
941     WorkLoadPolicy *workload;
942     bucket_priority_t workloadPriority;
943 
944     TapThrottle *tapThrottle;
945     std::map<const void*, Item*> lookups;
946     unordered_map<const void*, ENGINE_ERROR_CODE> allKeysLookups;
947     Mutex lookupMutex;
948     GET_SERVER_API getServerApiFunc;
949     union {
950         engine_info info;
951         char buffer[sizeof(engine_info) + 10 * sizeof(feature_info) ];
952     } info;
953 
954     DcpConnMap *dcpConnMap_;
955     TapConnMap *tapConnMap;
956     TapConfig *tapConfig;
957     CheckpointConfig *checkpointConfig;
958     std::string name;
959     size_t maxItemSize;
960     size_t getlDefaultTimeout;
961     size_t getlMaxTimeout;
962     size_t maxFailoverEntries;
963     EPStats stats;
964     Configuration configuration;
965     AtomicValue<bool> trafficEnabled;
966 
967     bool flushAllEnabled;
968     // a unique system generated token initialized at each time
969     // ep_engine starts up.
970     time_t startupTime;
971 
972 };
973 
974 #endif  // SRC_EP_ENGINE_H_
975