1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #ifndef SRC_EP_ENGINE_H_
19 #define SRC_EP_ENGINE_H_ 1
20 
21 #include "config.h"
22 
23 #include <errno.h>
24 
25 #include <algorithm>
26 #include <cstdio>
27 #include <limits>
28 #include <list>
29 #include <map>
30 #include <sstream>
31 #include <string>
32 
33 #include "configuration.h"
34 #include "ep.h"
35 #include "ep-engine/command_ids.h"
36 #include "item_pager.h"
37 #include "kvstore.h"
38 #include "locks.h"
39 #include "tapconnection.h"
40 #include "workload.h"
41 
42 
43 class DcpConnMap;
44 class TapConnMap;
45 class TapThrottle;
46 
47 extern "C" {
48     EXPORT_FUNCTION
49     ENGINE_ERROR_CODE create_instance(uint64_t interface,
50                                       GET_SERVER_API get_server_api,
51                                       ENGINE_HANDLE **handle);
52     void EvpNotifyPendingConns(void*arg);
53 }
54 
55 /* We're using notify_io_complete from ptr_fun, but that func
56  * got a "C" linkage that ptr_fun doesn't like... just
57  * cast it away with this typedef ;)
58  */
59 typedef void (*NOTIFY_IO_COMPLETE_T)(const void *cookie,
60                                      ENGINE_ERROR_CODE status);
61 
62 
63 // Forward decl
64 class EventuallyPersistentEngine;
65 class TapConnMap;
66 
67 /**
68  * Vbucket visitor that counts active vbuckets.
69  */
70 class VBucketCountVisitor : public VBucketVisitor {
71 public:
VBucketCountVisitor(EventuallyPersistentEngine &e, vbucket_state_t state)72     VBucketCountVisitor(EventuallyPersistentEngine &e,
73                         vbucket_state_t state) :
74         engine(e),
75         desired_state(state), numItems(0),
76         numTempItems(0),nonResident(0),
77         numVbucket(0), htMemory(0),
78         htItemMemory(0), htCacheSize(0),
79         numEjects(0), numExpiredItems(0),
80         metaDataMemory(0), metaDataDisk(0),
81         opsCreate(0),
82         opsUpdate(0), opsDelete(0),
83         opsReject(0), queueSize(0),
84         queueMemory(0), queueAge(0),
85         queueFill(0), queueDrain(0),
86         pendingWrites(0), chkPersistRemaining(0),
87         fileSpaceUsed(0), fileSize(0)
88     { }
89 
90     bool visitBucket(RCPtr<VBucket> &vb);
91 
visit(StoredValue* v)92     void visit(StoredValue* v) {
93         (void)v;
94         cb_assert(false); // this does not happen
95     }
96 
getVBucketState()97     vbucket_state_t getVBucketState() { return desired_state; }
98 
getNumItems()99     size_t getNumItems() { return numItems; }
100 
getNumTempItems()101     size_t getNumTempItems() { return numTempItems; }
102 
getNonResident()103     size_t getNonResident() { return nonResident; }
104 
getVBucketNumber()105     size_t getVBucketNumber() { return numVbucket; }
106 
getMemResidentPer()107     size_t getMemResidentPer() {
108         size_t numResident = numItems - nonResident;
109         return (numItems != 0) ? (size_t) (numResident *100.0) / (numItems) : 100;
110     }
111 
getEjects()112     size_t getEjects() { return numEjects; }
113 
getExpired()114     size_t getExpired() { return numExpiredItems; }
115 
getMetaDataMemory()116     size_t getMetaDataMemory() { return metaDataMemory; }
117 
getMetaDataDisk()118     size_t getMetaDataDisk() { return metaDataDisk; }
119 
getHashtableMemory()120     size_t getHashtableMemory() { return htMemory; }
121 
getItemMemory()122     size_t getItemMemory() { return htItemMemory; }
getCacheSize()123     size_t getCacheSize() { return htCacheSize; }
124 
getOpsCreate()125     size_t getOpsCreate() { return opsCreate; }
getOpsUpdate()126     size_t getOpsUpdate() { return opsUpdate; }
getOpsDelete()127     size_t getOpsDelete() { return opsDelete; }
getOpsReject()128     size_t getOpsReject() { return opsReject; }
129 
getQueueSize()130     size_t getQueueSize() { return queueSize; }
getQueueMemory()131     size_t getQueueMemory() { return queueMemory; }
getQueueFill()132     size_t getQueueFill() { return queueFill; }
getQueueDrain()133     size_t getQueueDrain() { return queueDrain; }
getAge()134     uint64_t getAge() { return queueAge; }
getPendingWrites()135     size_t getPendingWrites() { return pendingWrites; }
getChkPersistRemaining()136     size_t getChkPersistRemaining() { return chkPersistRemaining; }
137 
getFileSpaceUsed()138     size_t getFileSpaceUsed() { return fileSpaceUsed; }
getFileSize()139     size_t getFileSize() { return fileSize; }
140 
141 private:
142     EventuallyPersistentEngine &engine;
143     vbucket_state_t desired_state;
144 
145     size_t numItems;
146     size_t numTempItems;
147     size_t nonResident;
148     size_t numVbucket;
149     size_t htMemory;
150     size_t htItemMemory;
151     size_t htCacheSize;
152     size_t numEjects;
153     size_t numExpiredItems;
154     size_t metaDataMemory;
155     size_t metaDataDisk;
156 
157     size_t opsCreate;
158     size_t opsUpdate;
159     size_t opsDelete;
160     size_t opsReject;
161 
162     size_t queueSize;
163     size_t queueMemory;
164     uint64_t queueAge;
165     size_t queueFill;
166     size_t queueDrain;
167     size_t pendingWrites;
168     size_t chkPersistRemaining;
169 
170     size_t fileSpaceUsed;
171     size_t fileSize;
172 };
173 
174 /**
175  * memcached engine interface to the EventuallyPersistentStore.
176  */
177 class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
178     friend class LookupCallback;
179 public:
180     ENGINE_ERROR_CODE initialize(const char* config);
181     void destroy(bool force);
182 
itemAllocate(const void* cookie, item** itm, const void* key, const size_t nkey, const size_t nbytes, const int flags, const rel_time_t exptime, uint8_t datatype)183     ENGINE_ERROR_CODE itemAllocate(const void* cookie,
184                                    item** itm,
185                                    const void* key,
186                                    const size_t nkey,
187                                    const size_t nbytes,
188                                    const int flags,
189                                    const rel_time_t exptime,
190                                    uint8_t datatype)
191     {
192         (void)cookie;
193         if (nbytes > maxItemSize) {
194             return ENGINE_E2BIG;
195         }
196 
197         time_t expiretime = (exptime == 0) ? 0 : ep_abs_time(ep_reltime(exptime));
198 
199         uint8_t ext_meta[1];
200         uint8_t ext_len = EXT_META_LEN;
201         *(ext_meta) = datatype;
202         *itm = new Item(key, nkey, nbytes, flags, expiretime, ext_meta,
203                         ext_len);
204         if (*itm == NULL) {
205             return memoryCondition();
206         } else {
207             stats.itemAllocSizeHisto.add(nbytes);
208             return ENGINE_SUCCESS;
209         }
210     }
211 
itemDelete(const void* cookie, const void* key, const size_t nkey, uint64_t* cas, uint16_t vbucket)212     ENGINE_ERROR_CODE itemDelete(const void* cookie,
213                                  const void* key,
214                                  const size_t nkey,
215                                  uint64_t* cas,
216                                  uint16_t vbucket)
217     {
218         std::string k(static_cast<const char*>(key), nkey);
219         return itemDelete(cookie, k, cas, vbucket);
220     }
221 
itemDelete(const void* cookie, const std::string &key, uint64_t* cas, uint16_t vbucket)222     ENGINE_ERROR_CODE itemDelete(const void* cookie,
223                                  const std::string &key,
224                                  uint64_t* cas,
225                                  uint16_t vbucket)
226     {
227         ENGINE_ERROR_CODE ret = epstore->deleteItem(key, cas,
228                                                     vbucket, cookie,
229                                                     false, // not force
230                                                     NULL);
231 
232         if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
233             if (isDegradedMode()) {
234                 return ENGINE_TMPFAIL;
235             }
236         } else if (ret == ENGINE_SUCCESS) {
237             ++stats.numOpsDelete;
238         }
239         return ret;
240     }
241 
242 
itemRelease(const void* cookie, item *itm)243     void itemRelease(const void* cookie, item *itm)
244     {
245         (void)cookie;
246         delete (Item*)itm;
247     }
248 
get(const void* cookie, item** itm, const void* key, const int nkey, uint16_t vbucket, bool track_stat = false)249     ENGINE_ERROR_CODE get(const void* cookie,
250                           item** itm,
251                           const void* key,
252                           const int nkey,
253                           uint16_t vbucket,
254                           bool track_stat = false)
255     {
256         BlockTimer timer(&stats.getCmdHisto);
257         std::string k(static_cast<const char*>(key), nkey);
258 
259         GetValue gv(epstore->get(k, vbucket, cookie, serverApi->core));
260         ENGINE_ERROR_CODE ret = gv.getStatus();
261 
262         if (ret == ENGINE_SUCCESS) {
263             *itm = gv.getValue();
264             if (track_stat) {
265                 ++stats.numOpsGet;
266             }
267         } else if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
268             if (isDegradedMode()) {
269                 return ENGINE_TMPFAIL;
270             }
271         }
272 
273         return ret;
274     }
275 
getName()276     const char* getName() {
277         return name.c_str();
278     }
279 
280     ENGINE_ERROR_CODE getStats(const void* cookie,
281                                const char* stat_key,
282                                int nkey,
283                                ADD_STAT add_stat);
284 
resetStats()285     void resetStats() {
286         stats.reset();
287         if (epstore) {
288             epstore->resetUnderlyingStats();
289         }
290     }
291 
292     ENGINE_ERROR_CODE store(const void *cookie,
293                             item* itm,
294                             uint64_t *cas,
295                             ENGINE_STORE_OPERATION operation,
296                             uint16_t vbucket);
297 
arithmetic(const void* cookie, const void* key, const int nkey, const bool increment, const bool create, const uint64_t delta, const uint64_t initial, const rel_time_t exptime, uint64_t *cas, uint8_t datatype, uint64_t *result, uint16_t vbucket)298     ENGINE_ERROR_CODE arithmetic(const void* cookie,
299                                  const void* key,
300                                  const int nkey,
301                                  const bool increment,
302                                  const bool create,
303                                  const uint64_t delta,
304                                  const uint64_t initial,
305                                  const rel_time_t exptime,
306                                  uint64_t *cas,
307                                  uint8_t datatype,
308                                  uint64_t *result,
309                                  uint16_t vbucket)
310     {
311         BlockTimer timer(&stats.arithCmdHisto);
312         item *it = NULL;
313         uint8_t ext_meta[1];
314         uint8_t ext_len = EXT_META_LEN;
315         *(ext_meta) = datatype;
316 
317         rel_time_t expiretime = (exptime == 0 ||
318                                  exptime == 0xffffffff) ?
319             0 : ep_abs_time(ep_reltime(exptime));
320 
321         ENGINE_ERROR_CODE ret = get(cookie, &it, key, nkey, vbucket);
322         if (ret == ENGINE_SUCCESS) {
323             Item *itm = static_cast<Item*>(it);
324             char *endptr = NULL;
325             char data[24];
326             size_t len = std::min(static_cast<uint32_t>(sizeof(data) - 1),
327                                   itm->getNBytes());
328             data[len] = 0;
329             memcpy(data, itm->getData(), len);
330             uint64_t val = strtoull(data, &endptr, 10);
331             if (itm->getCas() == (uint64_t) -1) {
332                 // item is locked, can't perform arithmetic operation
333                 delete itm;
334                 return ENGINE_TMPFAIL;
335             }
336             if ((errno != ERANGE) && (isspace(*endptr)
337                                       || (*endptr == '\0' && endptr != data))) {
338                 if (increment) {
339                     val += delta;
340                 } else {
341                     if (delta > val) {
342                         val = 0;
343                     } else {
344                         val -= delta;
345                     }
346                 }
347 
348                 std::stringstream vals;
349                 vals << val;
350                 size_t nb = vals.str().length();
351                 *result = val;
352                 Item *nit = new Item(key, (uint16_t)nkey, itm->getFlags(),
353                                      itm->getExptime(), vals.str().c_str(), nb,
354                                      ext_meta, ext_len);
355                 nit->setCas(itm->getCas());
356                 ret = store(cookie, nit, cas, OPERATION_CAS, vbucket);
357                 delete nit;
358             } else {
359                 ret = ENGINE_EINVAL;
360             }
361 
362             delete itm;
363         } else if (ret == ENGINE_NOT_MY_VBUCKET) {
364             return isDegradedMode() ? ENGINE_TMPFAIL: ret;
365         } else if (ret == ENGINE_KEY_ENOENT) {
366             if (isDegradedMode()) {
367                 return ENGINE_TMPFAIL;
368             }
369             if (create) {
370                 std::stringstream vals;
371                 vals << initial;
372                 size_t nb = vals.str().length();
373                 *result = initial;
374                 Item *itm = new Item(key, (uint16_t)nkey, 0, expiretime,
375                                      vals.str().c_str(), nb, ext_meta, ext_len);
376                 ret = store(cookie, itm, cas, OPERATION_ADD, vbucket);
377                 delete itm;
378             }
379         }
380 
381         /* We had a race condition.. just call ourself recursively to retry */
382         if (ret == ENGINE_KEY_EEXISTS) {
383             return arithmetic(cookie, key, nkey, increment, create, delta,
384                               initial, expiretime, cas, datatype, result,
385                               vbucket);
386         } else if (ret == ENGINE_SUCCESS) {
387             ++stats.numOpsStore;
388         }
389 
390         return ret;
391     }
392 
393 
394 
395     ENGINE_ERROR_CODE flush(const void *cookie, time_t when);
396 
397     uint16_t walkTapQueue(const void *cookie, item **itm, void **es,
398                           uint16_t *nes, uint8_t *ttl, uint16_t *flags,
399                           uint32_t *seqno, uint16_t *vbucket);
400 
401     bool createTapQueue(const void *cookie,
402                         std::string &client,
403                         uint32_t flags,
404                         const void *userdata,
405                         size_t nuserdata);
406 
407     ENGINE_ERROR_CODE tapNotify(const void *cookie,
408                                 void *engine_specific,
409                                 uint16_t nengine,
410                                 uint8_t ttl,
411                                 uint16_t tap_flags,
412                                 uint16_t tap_event,
413                                 uint32_t tap_seqno,
414                                 const void *key,
415                                 size_t nkey,
416                                 uint32_t flags,
417                                 uint32_t exptime,
418                                 uint64_t cas,
419                                 uint8_t datatype,
420                                 const void *data,
421                                 size_t ndata,
422                                 uint16_t vbucket);
423 
424     ENGINE_ERROR_CODE dcpOpen(const void* cookie,
425                               uint32_t opaque,
426                               uint32_t seqno,
427                               uint32_t flags,
428                               void *stream_name,
429                               uint16_t nname);
430 
431     ENGINE_ERROR_CODE ConnHandlerCheckPoint(TapConsumer *consumer,
432                                             uint8_t event,
433                                             uint16_t vbucket,
434                                             uint64_t checkpointId);
435 
436     ENGINE_ERROR_CODE touch(const void* cookie,
437                             protocol_binary_request_header *request,
438                             ADD_RESPONSE response);
439 
440     ENGINE_ERROR_CODE getMeta(const void* cookie,
441                               protocol_binary_request_get_meta *request,
442                               ADD_RESPONSE response);
443     ENGINE_ERROR_CODE setWithMeta(const void* cookie,
444                                   protocol_binary_request_set_with_meta *request,
445                                   ADD_RESPONSE response);
446     ENGINE_ERROR_CODE deleteWithMeta(const void* cookie,
447                                      protocol_binary_request_delete_with_meta *request,
448                                      ADD_RESPONSE response);
449 
450     ENGINE_ERROR_CODE returnMeta(const void* cookie,
451                                  protocol_binary_request_return_meta *request,
452                                  ADD_RESPONSE response);
453 
454     ENGINE_ERROR_CODE setClusterConfig(const void* cookie,
455                                 protocol_binary_request_set_cluster_config *request,
456                                 ADD_RESPONSE response);
457 
458     ENGINE_ERROR_CODE getClusterConfig(const void* cookie,
459                                 protocol_binary_request_get_cluster_config *request,
460                                 ADD_RESPONSE response);
461 
462     ENGINE_ERROR_CODE getAllKeys(const void* cookie,
463                                 protocol_binary_request_get_keys *request,
464                                 ADD_RESPONSE response);
465 
466     /**
467      * Visit the objects and add them to the tap/dcp connecitons queue.
468      * @todo this code should honor the backfill time!
469      */
470     void queueBackfill(const VBucketFilter &backfillVBFilter, Producer *tc);
471 
setDCPPriority(const void* cookie, CONN_PRIORITY priority)472     void setDCPPriority(const void* cookie, CONN_PRIORITY priority) {
473         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
474         serverApi->cookie->set_priority(cookie, priority);
475         ObjectRegistry::onSwitchThread(epe);
476     }
477 
notifyIOComplete(const void *cookie, ENGINE_ERROR_CODE status)478     void notifyIOComplete(const void *cookie, ENGINE_ERROR_CODE status) {
479         if (cookie == NULL) {
480             LOG(EXTENSION_LOG_WARNING, "Tried to signal a NULL cookie!");
481         } else {
482             BlockTimer bt(&stats.notifyIOHisto);
483             EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
484             serverApi->cookie->notify_io_complete(cookie, status);
485             ObjectRegistry::onSwitchThread(epe);
486         }
487     }
488 
489     ENGINE_ERROR_CODE reserveCookie(const void *cookie);
490     ENGINE_ERROR_CODE releaseCookie(const void *cookie);
491 
storeEngineSpecific(const void *cookie, void *engine_data)492     void storeEngineSpecific(const void *cookie, void *engine_data) {
493         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
494         serverApi->cookie->store_engine_specific(cookie, engine_data);
495         ObjectRegistry::onSwitchThread(epe);
496     }
497 
getEngineSpecific(const void *cookie)498     void *getEngineSpecific(const void *cookie) {
499         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
500         void *engine_data = serverApi->cookie->get_engine_specific(cookie);
501         ObjectRegistry::onSwitchThread(epe);
502         return engine_data;
503     }
504 
isDatatypeSupported(const void *cookie)505     bool isDatatypeSupported(const void *cookie) {
506         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
507         bool isSupported = serverApi->cookie->is_datatype_supported(cookie);
508         ObjectRegistry::onSwitchThread(epe);
509         return isSupported;
510     }
511 
getOpcodeIfEwouldblockSet(const void *cookie)512     uint8_t getOpcodeIfEwouldblockSet(const void *cookie) {
513         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
514         uint8_t opcode = serverApi->cookie->get_opcode_if_ewouldblock_set(cookie);
515         ObjectRegistry::onSwitchThread(epe);
516         return opcode;
517     }
518 
validateSessionCas(const uint64_t cas)519     bool validateSessionCas(const uint64_t cas) {
520         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
521         bool ret = serverApi->cookie->validate_session_cas(cas);
522         ObjectRegistry::onSwitchThread(epe);
523         return ret;
524     }
525 
decrementSessionCtr(void)526     void decrementSessionCtr(void) {
527         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
528         serverApi->cookie->decrement_session_ctr();
529         ObjectRegistry::onSwitchThread(epe);
530     }
531 
532     void registerEngineCallback(ENGINE_EVENT_TYPE type,
533                                 EVENT_CALLBACK cb, const void *cb_data);
534 
535     template <typename T>
notifyIOComplete(T cookies, ENGINE_ERROR_CODE status)536     void notifyIOComplete(T cookies, ENGINE_ERROR_CODE status) {
537         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
538         std::for_each(cookies.begin(), cookies.end(),
539                       std::bind2nd(std::ptr_fun((NOTIFY_IO_COMPLETE_T)serverApi->cookie->notify_io_complete),
540                                    status));
541         ObjectRegistry::onSwitchThread(epe);
542     }
543 
544     void handleDisconnect(const void *cookie);
545 
stopFlusher(const char **msg, size_t *msg_size)546     protocol_binary_response_status stopFlusher(const char **msg, size_t *msg_size) {
547         (void) msg_size;
548         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
549         *msg = NULL;
550         if (!epstore->pauseFlusher()) {
551             LOG(EXTENSION_LOG_INFO, "Unable to stop flusher");
552             *msg = "Flusher not running.";
553             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
554         }
555         return rv;
556     }
557 
startFlusher(const char **msg, size_t *msg_size)558     protocol_binary_response_status startFlusher(const char **msg, size_t *msg_size) {
559         (void) msg_size;
560         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
561         *msg = NULL;
562         if (!epstore->resumeFlusher()) {
563             LOG(EXTENSION_LOG_INFO, "Unable to start flusher");
564             *msg = "Flusher not shut down.";
565             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
566         }
567         return rv;
568     }
569 
deleteVBucket(uint16_t vbid, const void* c = NULL)570     ENGINE_ERROR_CODE deleteVBucket(uint16_t vbid, const void* c = NULL) {
571         return epstore->deleteVBucket(vbid, c);
572     }
573 
compactDB(uint16_t vbid, compaction_ctx c, const void *cookie = NULL)574     ENGINE_ERROR_CODE compactDB(uint16_t vbid, compaction_ctx c,
575                                 const void *cookie = NULL) {
576         return epstore->compactDB(vbid, c, cookie);
577     }
578 
resetVBucket(uint16_t vbid)579     bool resetVBucket(uint16_t vbid) {
580         return epstore->resetVBucket(vbid);
581     }
582 
setTapKeepAlive(uint32_t to)583     void setTapKeepAlive(uint32_t to) {
584         configuration.setTapKeepalive((size_t)to);
585     }
586 
setFlushAll(bool enabled)587     void setFlushAll(bool enabled) {
588         flushAllEnabled = enabled;
589     }
590 
evictKey(const std::string &key, uint16_t vbucket, const char **msg, size_t *msg_size)591     protocol_binary_response_status evictKey(const std::string &key,
592                                              uint16_t vbucket,
593                                              const char **msg,
594                                              size_t *msg_size) {
595         return epstore->evictKey(key, vbucket, msg, msg_size);
596     }
597 
getLocked(const std::string &key, uint16_t vbucket, Callback<GetValue> &cb, rel_time_t currentTime, uint32_t lockTimeout, const void *cookie)598     bool getLocked(const std::string &key,
599                    uint16_t vbucket,
600                    Callback<GetValue> &cb,
601                    rel_time_t currentTime,
602                    uint32_t lockTimeout,
603                    const void *cookie) {
604         return epstore->getLocked(key, vbucket, cb, currentTime, lockTimeout, cookie);
605     }
606 
unlockKey(const std::string &key, uint16_t vbucket, uint64_t cas, rel_time_t currentTime)607     ENGINE_ERROR_CODE unlockKey(const std::string &key,
608                                 uint16_t vbucket,
609                                 uint64_t cas,
610                                 rel_time_t currentTime) {
611         return epstore->unlockKey(key, vbucket, cas, currentTime);
612     }
613 
614     ENGINE_ERROR_CODE observe(const void* cookie,
615                               protocol_binary_request_header *request,
616                               ADD_RESPONSE response);
617 
getVBucket(uint16_t vbucket)618     RCPtr<VBucket> getVBucket(uint16_t vbucket) {
619         return epstore->getVBucket(vbucket);
620     }
621 
setVBucketState(uint16_t vbid, vbucket_state_t to, bool transfer)622     ENGINE_ERROR_CODE setVBucketState(uint16_t vbid, vbucket_state_t to, bool transfer) {
623         return epstore->setVBucketState(vbid, to, transfer);
624     }
625 
626     ~EventuallyPersistentEngine();
627 
getInfo()628     engine_info *getInfo() {
629         return &info.info;
630     }
631 
getEpStats()632     EPStats &getEpStats() {
633         return stats;
634     }
635 
getEpStore()636     EventuallyPersistentStore* getEpStore() { return epstore; }
637 
getTapConnMap()638     TapConnMap &getTapConnMap() { return *tapConnMap; }
639 
getDcpConnMap()640     DcpConnMap &getDcpConnMap() { return *dcpConnMap_; }
641 
getTapConfig()642     TapConfig &getTapConfig() { return *tapConfig; }
643 
getTapThrottle()644     TapThrottle &getTapThrottle() { return *tapThrottle; }
645 
getCheckpointConfig()646     CheckpointConfig &getCheckpointConfig() { return *checkpointConfig; }
647 
getServerApi()648     SERVER_HANDLE_V1* getServerApi() { return serverApi; }
649 
getConfiguration()650     Configuration &getConfiguration() {
651         return configuration;
652     }
653 
654     ENGINE_ERROR_CODE deregisterTapClient(const void* cookie,
655                                           protocol_binary_request_header *request,
656                                           ADD_RESPONSE response);
657 
658     ENGINE_ERROR_CODE handleCheckpointCmds(const void* cookie,
659                                            protocol_binary_request_header *request,
660                                            ADD_RESPONSE response);
661 
662     ENGINE_ERROR_CODE handleSeqnoCmds(const void* cookie,
663                                       protocol_binary_request_header *request,
664                                       ADD_RESPONSE response);
665 
666     ENGINE_ERROR_CODE resetReplicationChain(const void* cookie,
667                                             protocol_binary_request_header *request,
668                                             ADD_RESPONSE response);
669 
670     ENGINE_ERROR_CODE changeTapVBFilter(const void* cookie,
671                                         protocol_binary_request_header *request,
672                                         ADD_RESPONSE response);
673 
674     ENGINE_ERROR_CODE handleTrafficControlCmd(const void* cookie,
675                                               protocol_binary_request_header *request,
676                                               ADD_RESPONSE response);
677 
getGetlDefaultTimeout() const678     size_t getGetlDefaultTimeout() const {
679         return getlDefaultTimeout;
680     }
681 
getGetlMaxTimeout() const682     size_t getGetlMaxTimeout() const {
683         return getlMaxTimeout;
684     }
685 
getMaxFailoverEntries() const686     size_t getMaxFailoverEntries() const {
687         return maxFailoverEntries;
688     }
689 
isDegradedMode() const690     bool isDegradedMode() const {
691         return epstore->isWarmingUp() || !trafficEnabled.load();
692     }
693 
getWorkLoadPolicy(void)694     WorkLoadPolicy &getWorkLoadPolicy(void) {
695         return *workload;
696     }
697 
getWorkloadPriority(void)698     bucket_priority_t getWorkloadPriority(void) {return workloadPriority; }
setWorkloadPriority(bucket_priority_t p)699     void setWorkloadPriority(bucket_priority_t p) { workloadPriority = p; }
700 
701     struct clusterConfig {
clusterConfigEventuallyPersistentEngine::clusterConfig702         clusterConfig() : len(0), config(NULL) {}
703         uint32_t len;
704         uint8_t *config;
705         Mutex lock;
706     } clusterConfig;
707 
708     ENGINE_ERROR_CODE getRandomKey(const void *cookie,
709                                    ADD_RESPONSE response);
710 
711     ConnHandler* getConnHandler(const void *cookie);
712 
713     void addLookupAllKeys(const void *cookie, ENGINE_ERROR_CODE err);
714 
715 protected:
716     friend class EpEngineValueChangeListener;
717 
setMaxItemSize(size_t value)718     void setMaxItemSize(size_t value) {
719         maxItemSize = value;
720     }
721 
setGetlDefaultTimeout(size_t value)722     void setGetlDefaultTimeout(size_t value) {
723         getlDefaultTimeout = value;
724     }
725 
setGetlMaxTimeout(size_t value)726     void setGetlMaxTimeout(size_t value) {
727         getlMaxTimeout = value;
728     }
729 
730 private:
731     EventuallyPersistentEngine(GET_SERVER_API get_server_api);
732     friend ENGINE_ERROR_CODE create_instance(uint64_t interface,
733                                              GET_SERVER_API get_server_api,
734                                              ENGINE_HANDLE **handle);
735     uint16_t doWalkTapQueue(const void *cookie, item **itm, void **es,
736                             uint16_t *nes, uint8_t *ttl, uint16_t *flags,
737                             uint32_t *seqno, uint16_t *vbucket,
738                             TapProducer *c, bool &retry);
739 
740 
741     ENGINE_ERROR_CODE processTapAck(const void *cookie,
742                                     uint32_t seqno,
743                                     uint16_t status,
744                                     const std::string &msg);
745 
746     /**
747      * Report the state of a memory condition when out of memory.
748      *
749      * @return ETMPFAIL if we think we can recover without interaction,
750      *         else ENOMEM
751      */
memoryCondition()752     ENGINE_ERROR_CODE memoryCondition() {
753         // Do we think it's possible we could free something?
754         bool haveEvidenceWeCanFreeMemory(stats.getMaxDataSize() > stats.memOverhead);
755         if (haveEvidenceWeCanFreeMemory) {
756             // Look for more evidence by seeing if we have resident items.
757             VBucketCountVisitor countVisitor(*this, vbucket_state_active);
758             epstore->visit(countVisitor);
759 
760             haveEvidenceWeCanFreeMemory = countVisitor.getNonResident() <
761                 countVisitor.getNumItems();
762         }
763         if (haveEvidenceWeCanFreeMemory) {
764             ++stats.tmp_oom_errors;
765             // Wake up the item pager task as memory usage
766             // seems to have exceeded high water mark
767             if ((getEpStore()->fetchItemPagerTask())->getState() ==
768                                                                 TASK_SNOOZED) {
769                 ExecutorPool::get()->wake(
770                         (getEpStore()->fetchItemPagerTask())->getId());
771             }
772             return ENGINE_TMPFAIL;
773         } else {
774             ++stats.oom_errors;
775             return ENGINE_ENOMEM;
776         }
777     }
778 
779     friend class BGFetchCallback;
780     friend class EventuallyPersistentStore;
781 
enableTraffic(bool enable)782     bool enableTraffic(bool enable) {
783         bool inverse = !enable;
784         return trafficEnabled.compare_exchange_strong(inverse, enable);
785     }
786 
787     ENGINE_ERROR_CODE doEngineStats(const void *cookie, ADD_STAT add_stat);
788     ENGINE_ERROR_CODE doKlogStats(const void *cookie, ADD_STAT add_stat);
789     ENGINE_ERROR_CODE doMemoryStats(const void *cookie, ADD_STAT add_stat);
790     ENGINE_ERROR_CODE doVBucketStats(const void *cookie, ADD_STAT add_stat,
791                                      const char* stat_key,
792                                      int nkey,
793                                      bool prevStateRequested,
794                                      bool details);
795     ENGINE_ERROR_CODE doHashStats(const void *cookie, ADD_STAT add_stat);
796     ENGINE_ERROR_CODE doCheckpointStats(const void *cookie, ADD_STAT add_stat,
797                                         const char* stat_key, int nkey);
798     ENGINE_ERROR_CODE doTapStats(const void *cookie, ADD_STAT add_stat);
799     ENGINE_ERROR_CODE doDcpStats(const void *cookie, ADD_STAT add_stat);
800     ENGINE_ERROR_CODE doConnAggStats(const void *cookie, ADD_STAT add_stat,
801                                      const char *sep, size_t nsep,
802                                      conn_type_t connType);
803     ENGINE_ERROR_CODE doTimingStats(const void *cookie, ADD_STAT add_stat);
804     ENGINE_ERROR_CODE doSchedulerStats(const void *cookie, ADD_STAT add_stat);
805     ENGINE_ERROR_CODE doRunTimeStats(const void *cookie, ADD_STAT add_stat);
806     ENGINE_ERROR_CODE doDispatcherStats(const void *cookie, ADD_STAT add_stat);
807     ENGINE_ERROR_CODE doKeyStats(const void *cookie, ADD_STAT add_stat,
808                                  uint16_t vbid, std::string &key, bool validate=false);
809     ENGINE_ERROR_CODE doTapVbTakeoverStats(const void *cookie,
810                                            ADD_STAT add_stat,
811                                            std::string& key,
812                                            uint16_t vbid);
813 
814     ENGINE_ERROR_CODE doDcpVbTakeoverStats(const void *cookie,
815                                            ADD_STAT add_stat,
816                                            std::string &key,
817                                            uint16_t vbid);
818     ENGINE_ERROR_CODE doVbIdFailoverLogStats(const void *cookie,
819                                              ADD_STAT add_stat,
820                                              uint16_t vbid);
821     ENGINE_ERROR_CODE doAllFailoverLogStats(const void *cookie, ADD_STAT add_stat);
822     ENGINE_ERROR_CODE doWorkloadStats(const void *cookie, ADD_STAT add_stat);
823     ENGINE_ERROR_CODE doSeqnoStats(const void *cookie, ADD_STAT add_stat,
824                                    const char* stat_key, int nkey);
825     ENGINE_ERROR_CODE doDiskStats(const void *cookie, ADD_STAT add_stat,
826                                   const char* stat_key, int nkey);
827 
addLookupResult(const void *cookie, Item *result)828     void addLookupResult(const void *cookie, Item *result) {
829         LockHolder lh(lookupMutex);
830         std::map<const void*, Item*>::iterator it = lookups.find(cookie);
831         if (it != lookups.end()) {
832             if (it->second != NULL) {
833                 LOG(EXTENSION_LOG_DEBUG,
834                     "Cleaning up old lookup result for '%s'",
835                     it->second->getKey().c_str());
836                 delete it->second;
837             } else {
838                 LOG(EXTENSION_LOG_DEBUG, "Cleaning up old null lookup result");
839             }
840             lookups.erase(it);
841         }
842         lookups[cookie] = result;
843     }
844 
fetchLookupResult(const void *cookie, Item **itm)845     bool fetchLookupResult(const void *cookie, Item **itm) {
846         // This will return *and erase* the lookup result for a connection.
847         // You look it up, you own it.
848         LockHolder lh(lookupMutex);
849         std::map<const void*, Item*>::iterator it = lookups.find(cookie);
850         if (it != lookups.end()) {
851             *itm = it->second;
852             lookups.erase(it);
853             return true;
854         } else {
855             return false;
856         }
857     }
858 
859     // Get the current tap connection for this cookie.
860     // If this method returns NULL, you should return TAP_DISCONNECT
861     TapProducer* getTapProducer(const void *cookie);
862 
863     SERVER_HANDLE_V1 *serverApi;
864     EventuallyPersistentStore *epstore;
865     WorkLoadPolicy *workload;
866     bucket_priority_t workloadPriority;
867 
868     TapThrottle *tapThrottle;
869     std::map<const void*, Item*> lookups;
870     unordered_map<const void*, ENGINE_ERROR_CODE> allKeysLookups;
871     Mutex lookupMutex;
872     GET_SERVER_API getServerApiFunc;
873     union {
874         engine_info info;
875         char buffer[sizeof(engine_info) + 10 * sizeof(feature_info) ];
876     } info;
877 
878     DcpConnMap *dcpConnMap_;
879     TapConnMap *tapConnMap;
880     TapConfig *tapConfig;
881     CheckpointConfig *checkpointConfig;
882     std::string name;
883     size_t maxItemSize;
884     size_t getlDefaultTimeout;
885     size_t getlMaxTimeout;
886     size_t maxFailoverEntries;
887     EPStats stats;
888     Configuration configuration;
889     AtomicValue<bool> trafficEnabled;
890 
891     bool flushAllEnabled;
892     // a unique system generated token initialized at each time
893     // ep_engine starts up.
894     time_t startupTime;
895 
896 };
897 
898 #endif  // SRC_EP_ENGINE_H_
899