xref: /3.0.3-GA/ep-engine/src/ep_engine.h (revision 729d202d)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2013 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#ifndef SRC_EP_ENGINE_H_
19#define SRC_EP_ENGINE_H_ 1
20
21#include "config.h"
22
23#include <errno.h>
24
25#include <algorithm>
26#include <cstdio>
27#include <limits>
28#include <list>
29#include <map>
30#include <sstream>
31#include <string>
32
33#include "configuration.h"
34#include "ep.h"
35#include "ep-engine/command_ids.h"
36#include "item_pager.h"
37#include "kvstore.h"
38#include "locks.h"
39#include "tapconnection.h"
40#include "workload.h"
41
42
43class UprConnMap;
44class TapConnMap;
45class TapThrottle;
46
47extern "C" {
48    EXPORT_FUNCTION
49    ENGINE_ERROR_CODE create_instance(uint64_t interface,
50                                      GET_SERVER_API get_server_api,
51                                      ENGINE_HANDLE **handle);
52    void EvpNotifyPendingConns(void*arg);
53}
54
55/* We're using notify_io_complete from ptr_fun, but that func
56 * got a "C" linkage that ptr_fun doesn't like... just
57 * cast it away with this typedef ;)
58 */
59typedef void (*NOTIFY_IO_COMPLETE_T)(const void *cookie,
60                                     ENGINE_ERROR_CODE status);
61
62
63// Forward decl
64class EventuallyPersistentEngine;
65class TapConnMap;
66
67/**
68 * Vbucket visitor that counts active vbuckets.
69 */
70class VBucketCountVisitor : public VBucketVisitor {
71public:
72    VBucketCountVisitor(EventuallyPersistentEngine &e,
73                        vbucket_state_t state) :
74        engine(e),
75        desired_state(state), numItems(0),
76        numTempItems(0),nonResident(0),
77        numVbucket(0), htMemory(0),
78        htItemMemory(0), htCacheSize(0),
79        numEjects(0), numExpiredItems(0),
80        metaDataMemory(0), opsCreate(0),
81        opsUpdate(0), opsDelete(0),
82        opsReject(0), queueSize(0),
83        queueMemory(0), queueAge(0),
84        queueFill(0), queueDrain(0),
85        pendingWrites(0), chkPersistRemaining(0),
86        fileSpaceUsed(0), fileSize(0)
87    { }
88
89    bool visitBucket(RCPtr<VBucket> &vb);
90
91    void visit(StoredValue* v) {
92        (void)v;
93        cb_assert(false); // this does not happen
94    }
95
96    vbucket_state_t getVBucketState() { return desired_state; }
97
98    size_t getNumItems() { return numItems; }
99
100    size_t getNumTempItems() { return numTempItems; }
101
102    size_t getNonResident() { return nonResident; }
103
104    size_t getVBucketNumber() { return numVbucket; }
105
106    size_t getMemResidentPer() {
107        size_t numResident = numItems - nonResident;
108        return (numItems != 0) ? (size_t) (numResident *100.0) / (numItems) : 0;
109    }
110
111    size_t getEjects() { return numEjects; }
112
113    size_t getExpired() { return numExpiredItems; }
114
115    size_t getMetaDataMemory() { return metaDataMemory; }
116
117    size_t getHashtableMemory() { return htMemory; }
118
119    size_t getItemMemory() { return htItemMemory; }
120    size_t getCacheSize() { return htCacheSize; }
121
122    size_t getOpsCreate() { return opsCreate; }
123    size_t getOpsUpdate() { return opsUpdate; }
124    size_t getOpsDelete() { return opsDelete; }
125    size_t getOpsReject() { return opsReject; }
126
127    size_t getQueueSize() { return queueSize; }
128    size_t getQueueMemory() { return queueMemory; }
129    size_t getQueueFill() { return queueFill; }
130    size_t getQueueDrain() { return queueDrain; }
131    uint64_t getAge() { return queueAge; }
132    size_t getPendingWrites() { return pendingWrites; }
133    size_t getChkPersistRemaining() { return chkPersistRemaining; }
134
135    size_t getFileSpaceUsed() { return fileSpaceUsed; }
136    size_t getFileSize() { return fileSize; }
137
138private:
139    EventuallyPersistentEngine &engine;
140    vbucket_state_t desired_state;
141
142    size_t numItems;
143    size_t numTempItems;
144    size_t nonResident;
145    size_t numVbucket;
146    size_t htMemory;
147    size_t htItemMemory;
148    size_t htCacheSize;
149    size_t numEjects;
150    size_t numExpiredItems;
151    size_t metaDataMemory;
152
153    size_t opsCreate;
154    size_t opsUpdate;
155    size_t opsDelete;
156    size_t opsReject;
157
158    size_t queueSize;
159    size_t queueMemory;
160    uint64_t queueAge;
161    size_t queueFill;
162    size_t queueDrain;
163    size_t pendingWrites;
164    size_t chkPersistRemaining;
165
166    size_t fileSpaceUsed;
167    size_t fileSize;
168};
169
170/**
171 * memcached engine interface to the EventuallyPersistentStore.
172 */
173class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
174    friend class LookupCallback;
175public:
176    ENGINE_ERROR_CODE initialize(const char* config);
177    void destroy(bool force);
178
179    ENGINE_ERROR_CODE itemAllocate(const void* cookie,
180                                   item** itm,
181                                   const void* key,
182                                   const size_t nkey,
183                                   const size_t nbytes,
184                                   const int flags,
185                                   const rel_time_t exptime,
186                                   uint8_t datatype)
187    {
188        (void)cookie;
189        if (nbytes > maxItemSize) {
190            return ENGINE_E2BIG;
191        }
192
193        time_t expiretime = (exptime == 0) ? 0 : ep_abs_time(ep_reltime(exptime));
194
195        uint8_t ext_meta[1];
196        uint8_t ext_len = EXT_META_LEN;
197        *(ext_meta) = datatype;
198        *itm = new Item(key, nkey, nbytes, flags, expiretime, ext_meta,
199                        ext_len);
200        if (*itm == NULL) {
201            return memoryCondition();
202        } else {
203            stats.itemAllocSizeHisto.add(nbytes);
204            return ENGINE_SUCCESS;
205        }
206    }
207
208    ENGINE_ERROR_CODE itemDelete(const void* cookie,
209                                 const void* key,
210                                 const size_t nkey,
211                                 uint64_t* cas,
212                                 uint16_t vbucket)
213    {
214        std::string k(static_cast<const char*>(key), nkey);
215        return itemDelete(cookie, k, cas, vbucket);
216    }
217
218    ENGINE_ERROR_CODE itemDelete(const void* cookie,
219                                 const std::string &key,
220                                 uint64_t* cas,
221                                 uint16_t vbucket)
222    {
223        ENGINE_ERROR_CODE ret = epstore->deleteItem(key, cas,
224                                                    vbucket, cookie,
225                                                    false, // not force
226                                                    NULL);
227
228        if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
229            if (isDegradedMode()) {
230                return ENGINE_TMPFAIL;
231            }
232        }
233        return ret;
234    }
235
236
237    void itemRelease(const void* cookie, item *itm)
238    {
239        (void)cookie;
240        delete (Item*)itm;
241    }
242
243    ENGINE_ERROR_CODE get(const void* cookie,
244                          item** itm,
245                          const void* key,
246                          const int nkey,
247                          uint16_t vbucket)
248    {
249        BlockTimer timer(&stats.getCmdHisto);
250        std::string k(static_cast<const char*>(key), nkey);
251
252        GetValue gv(epstore->get(k, vbucket, cookie, serverApi->core));
253        ENGINE_ERROR_CODE ret = gv.getStatus();
254
255        if (ret == ENGINE_SUCCESS) {
256            *itm = gv.getValue();
257        } else if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
258            if (isDegradedMode()) {
259                return ENGINE_TMPFAIL;
260            }
261        }
262
263        return ret;
264    }
265
266    const char* getName() {
267        return name.c_str();
268    }
269
270    ENGINE_ERROR_CODE getStats(const void* cookie,
271                               const char* stat_key,
272                               int nkey,
273                               ADD_STAT add_stat);
274
275    void resetStats() {
276        stats.reset();
277        if (epstore) {
278            epstore->resetUnderlyingStats();
279        }
280    }
281
282    ENGINE_ERROR_CODE store(const void *cookie,
283                            item* itm,
284                            uint64_t *cas,
285                            ENGINE_STORE_OPERATION operation,
286                            uint16_t vbucket);
287
288    ENGINE_ERROR_CODE arithmetic(const void* cookie,
289                                 const void* key,
290                                 const int nkey,
291                                 const bool increment,
292                                 const bool create,
293                                 const uint64_t delta,
294                                 const uint64_t initial,
295                                 const rel_time_t exptime,
296                                 uint64_t *cas,
297                                 uint8_t datatype,
298                                 uint64_t *result,
299                                 uint16_t vbucket)
300    {
301        BlockTimer timer(&stats.arithCmdHisto);
302        item *it = NULL;
303        uint8_t ext_meta[1];
304        uint8_t ext_len = EXT_META_LEN;
305        *(ext_meta) = datatype;
306
307        rel_time_t expiretime = (exptime == 0 ||
308                                 exptime == 0xffffffff) ?
309            0 : ep_abs_time(ep_reltime(exptime));
310
311        ENGINE_ERROR_CODE ret = get(cookie, &it, key, nkey, vbucket);
312        if (ret == ENGINE_SUCCESS) {
313            Item *itm = static_cast<Item*>(it);
314            char *endptr = NULL;
315            char data[24];
316            size_t len = std::min(static_cast<uint32_t>(sizeof(data) - 1),
317                                  itm->getNBytes());
318            data[len] = 0;
319            memcpy(data, itm->getData(), len);
320            uint64_t val = strtoull(data, &endptr, 10);
321            if (itm->getCas() == (uint64_t) -1) {
322                // item is locked, can't perform arithmetic operation
323                delete itm;
324                return ENGINE_TMPFAIL;
325            }
326            if ((errno != ERANGE) && (isspace(*endptr)
327                                      || (*endptr == '\0' && endptr != data))) {
328                if (increment) {
329                    val += delta;
330                } else {
331                    if (delta > val) {
332                        val = 0;
333                    } else {
334                        val -= delta;
335                    }
336                }
337
338                std::stringstream vals;
339                vals << val;
340                size_t nb = vals.str().length();
341                *result = val;
342                Item *nit = new Item(key, (uint16_t)nkey, itm->getFlags(),
343                                     itm->getExptime(), vals.str().c_str(), nb,
344                                     ext_meta, ext_len);
345                nit->setCas(itm->getCas());
346                ret = store(cookie, nit, cas, OPERATION_CAS, vbucket);
347                delete nit;
348            } else {
349                ret = ENGINE_EINVAL;
350            }
351
352            delete itm;
353        } else if (ret == ENGINE_NOT_MY_VBUCKET) {
354            return isDegradedMode() ? ENGINE_TMPFAIL: ret;
355        } else if (ret == ENGINE_KEY_ENOENT) {
356            if (isDegradedMode()) {
357                return ENGINE_TMPFAIL;
358            }
359            if (create) {
360                std::stringstream vals;
361                vals << initial;
362                size_t nb = vals.str().length();
363                *result = initial;
364                Item *itm = new Item(key, (uint16_t)nkey, 0, expiretime,
365                                     vals.str().c_str(), nb, ext_meta, ext_len);
366                ret = store(cookie, itm, cas, OPERATION_ADD, vbucket);
367                delete itm;
368            }
369        }
370
371        /* We had a race condition.. just call ourself recursively to retry */
372        if (ret == ENGINE_KEY_EEXISTS) {
373            return arithmetic(cookie, key, nkey, increment, create, delta,
374                              initial, expiretime, cas, datatype, result,
375                              vbucket);
376        }
377
378        return ret;
379    }
380
381
382
383    ENGINE_ERROR_CODE flush(const void *cookie, time_t when);
384
385    uint16_t walkTapQueue(const void *cookie, item **itm, void **es,
386                          uint16_t *nes, uint8_t *ttl, uint16_t *flags,
387                          uint32_t *seqno, uint16_t *vbucket);
388
389    bool createTapQueue(const void *cookie,
390                        std::string &client,
391                        uint32_t flags,
392                        const void *userdata,
393                        size_t nuserdata);
394
395    ENGINE_ERROR_CODE tapNotify(const void *cookie,
396                                void *engine_specific,
397                                uint16_t nengine,
398                                uint8_t ttl,
399                                uint16_t tap_flags,
400                                uint16_t tap_event,
401                                uint32_t tap_seqno,
402                                const void *key,
403                                size_t nkey,
404                                uint32_t flags,
405                                uint32_t exptime,
406                                uint64_t cas,
407                                uint8_t datatype,
408                                const void *data,
409                                size_t ndata,
410                                uint16_t vbucket);
411
412    ENGINE_ERROR_CODE uprOpen(const void* cookie,
413                              uint32_t opaque,
414                              uint32_t seqno,
415                              uint32_t flags,
416                              void *stream_name,
417                              uint16_t nname);
418
419    ENGINE_ERROR_CODE ConnHandlerCheckPoint(TapConsumer *consumer,
420                                            uint8_t event,
421                                            uint16_t vbucket,
422                                            uint64_t checkpointId);
423
424    ENGINE_ERROR_CODE touch(const void* cookie,
425                            protocol_binary_request_header *request,
426                            ADD_RESPONSE response);
427
428    ENGINE_ERROR_CODE getMeta(const void* cookie,
429                              protocol_binary_request_get_meta *request,
430                              ADD_RESPONSE response);
431    ENGINE_ERROR_CODE setWithMeta(const void* cookie,
432                                  protocol_binary_request_set_with_meta *request,
433                                  ADD_RESPONSE response);
434    ENGINE_ERROR_CODE deleteWithMeta(const void* cookie,
435                                     protocol_binary_request_delete_with_meta *request,
436                                     ADD_RESPONSE response);
437
438    ENGINE_ERROR_CODE returnMeta(const void* cookie,
439                                 protocol_binary_request_return_meta *request,
440                                 ADD_RESPONSE response);
441
442    ENGINE_ERROR_CODE setClusterConfig(const void* cookie,
443                                protocol_binary_request_set_cluster_config *request,
444                                ADD_RESPONSE response);
445
446    ENGINE_ERROR_CODE getClusterConfig(const void* cookie,
447                                protocol_binary_request_get_cluster_config *request,
448                                ADD_RESPONSE response);
449
450    ENGINE_ERROR_CODE getAllKeys(const void* cookie,
451                                protocol_binary_request_get_keys *request,
452                                ADD_RESPONSE response);
453
454    /**
455     * Visit the objects and add them to the tap/upr connecitons queue.
456     * @todo this code should honor the backfill time!
457     */
458    void queueBackfill(const VBucketFilter &backfillVBFilter, Producer *tc);
459
460    void notifyIOComplete(const void *cookie, ENGINE_ERROR_CODE status) {
461        if (cookie == NULL) {
462            LOG(EXTENSION_LOG_WARNING, "Tried to signal a NULL cookie!");
463        } else {
464            BlockTimer bt(&stats.notifyIOHisto);
465            EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
466            serverApi->cookie->notify_io_complete(cookie, status);
467            ObjectRegistry::onSwitchThread(epe);
468        }
469    }
470
471    ENGINE_ERROR_CODE reserveCookie(const void *cookie);
472    ENGINE_ERROR_CODE releaseCookie(const void *cookie);
473
474    void storeEngineSpecific(const void *cookie, void *engine_data) {
475        EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
476        serverApi->cookie->store_engine_specific(cookie, engine_data);
477        ObjectRegistry::onSwitchThread(epe);
478    }
479
480    void *getEngineSpecific(const void *cookie) {
481        EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
482        void *engine_data = serverApi->cookie->get_engine_specific(cookie);
483        ObjectRegistry::onSwitchThread(epe);
484        return engine_data;
485    }
486
487    bool isDatatypeSupported(const void *cookie) {
488        EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
489        bool isSupported = serverApi->cookie->is_datatype_supported(cookie);
490        ObjectRegistry::onSwitchThread(epe);
491        return isSupported;
492    }
493
494    uint8_t getOpcodeIfEwouldblockSet(const void *cookie) {
495        EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
496        uint8_t opcode = serverApi->cookie->get_opcode_if_ewouldblock_set(cookie);
497        ObjectRegistry::onSwitchThread(epe);
498        return opcode;
499    }
500
501    bool validateSessionCas(const uint64_t cas) {
502        EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
503        bool ret = serverApi->cookie->validate_session_cas(cas);
504        ObjectRegistry::onSwitchThread(epe);
505        return ret;
506    }
507
508    void decrementSessionCtr(void) {
509        EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
510        serverApi->cookie->decrement_session_ctr();
511        ObjectRegistry::onSwitchThread(epe);
512    }
513
514    void registerEngineCallback(ENGINE_EVENT_TYPE type,
515                                EVENT_CALLBACK cb, const void *cb_data);
516
517    template <typename T>
518    void notifyIOComplete(T cookies, ENGINE_ERROR_CODE status) {
519        EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
520        std::for_each(cookies.begin(), cookies.end(),
521                      std::bind2nd(std::ptr_fun((NOTIFY_IO_COMPLETE_T)serverApi->cookie->notify_io_complete),
522                                   status));
523        ObjectRegistry::onSwitchThread(epe);
524    }
525
526    void handleDisconnect(const void *cookie);
527
528    protocol_binary_response_status stopFlusher(const char **msg, size_t *msg_size) {
529        (void) msg_size;
530        protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
531        *msg = NULL;
532        if (!epstore->pauseFlusher()) {
533            LOG(EXTENSION_LOG_INFO, "Unable to stop flusher");
534            *msg = "Flusher not running.";
535            rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
536        }
537        return rv;
538    }
539
540    protocol_binary_response_status startFlusher(const char **msg, size_t *msg_size) {
541        (void) msg_size;
542        protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
543        *msg = NULL;
544        if (!epstore->resumeFlusher()) {
545            LOG(EXTENSION_LOG_INFO, "Unable to start flusher");
546            *msg = "Flusher not shut down.";
547            rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
548        }
549        return rv;
550    }
551
552    ENGINE_ERROR_CODE deleteVBucket(uint16_t vbid, const void* c = NULL) {
553        return epstore->deleteVBucket(vbid, c);
554    }
555
556    ENGINE_ERROR_CODE compactDB(uint16_t vbid, compaction_ctx c,
557                                const void *cookie = NULL) {
558        return epstore->compactDB(vbid, c, cookie);
559    }
560
561    bool resetVBucket(uint16_t vbid) {
562        return epstore->resetVBucket(vbid);
563    }
564
565    void setTapKeepAlive(uint32_t to) {
566        configuration.setTapKeepalive((size_t)to);
567    }
568
569    void setFlushAll(bool enabled) {
570        flushAllEnabled = enabled;
571    }
572
573    protocol_binary_response_status evictKey(const std::string &key,
574                                             uint16_t vbucket,
575                                             const char **msg,
576                                             size_t *msg_size) {
577        return epstore->evictKey(key, vbucket, msg, msg_size);
578    }
579
580    bool getLocked(const std::string &key,
581                   uint16_t vbucket,
582                   Callback<GetValue> &cb,
583                   rel_time_t currentTime,
584                   uint32_t lockTimeout,
585                   const void *cookie) {
586        return epstore->getLocked(key, vbucket, cb, currentTime, lockTimeout, cookie);
587    }
588
589    ENGINE_ERROR_CODE unlockKey(const std::string &key,
590                                uint16_t vbucket,
591                                uint64_t cas,
592                                rel_time_t currentTime) {
593        return epstore->unlockKey(key, vbucket, cas, currentTime);
594    }
595
596    ENGINE_ERROR_CODE observe(const void* cookie,
597                              protocol_binary_request_header *request,
598                              ADD_RESPONSE response);
599
600    RCPtr<VBucket> getVBucket(uint16_t vbucket) {
601        return epstore->getVBucket(vbucket);
602    }
603
604    ENGINE_ERROR_CODE setVBucketState(uint16_t vbid, vbucket_state_t to, bool transfer) {
605        return epstore->setVBucketState(vbid, to, transfer);
606    }
607
608    ~EventuallyPersistentEngine();
609
610    engine_info *getInfo() {
611        return &info.info;
612    }
613
614    EPStats &getEpStats() {
615        return stats;
616    }
617
618    EventuallyPersistentStore* getEpStore() { return epstore; }
619
620    TapConnMap &getTapConnMap() { return *tapConnMap; }
621
622    UprConnMap &getUprConnMap() { return *uprConnMap_; }
623
624    TapConfig &getTapConfig() { return *tapConfig; }
625
626    TapThrottle &getTapThrottle() { return *tapThrottle; }
627
628    CheckpointConfig &getCheckpointConfig() { return *checkpointConfig; }
629
630    SERVER_HANDLE_V1* getServerApi() { return serverApi; }
631
632    Configuration &getConfiguration() {
633        return configuration;
634    }
635
636    ENGINE_ERROR_CODE deregisterTapClient(const void* cookie,
637                                          protocol_binary_request_header *request,
638                                          ADD_RESPONSE response);
639
640    ENGINE_ERROR_CODE handleCheckpointCmds(const void* cookie,
641                                           protocol_binary_request_header *request,
642                                           ADD_RESPONSE response);
643
644    ENGINE_ERROR_CODE handleSeqnoCmds(const void* cookie,
645                                      protocol_binary_request_header *request,
646                                      ADD_RESPONSE response);
647
648    ENGINE_ERROR_CODE resetReplicationChain(const void* cookie,
649                                            protocol_binary_request_header *request,
650                                            ADD_RESPONSE response);
651
652    ENGINE_ERROR_CODE changeTapVBFilter(const void* cookie,
653                                        protocol_binary_request_header *request,
654                                        ADD_RESPONSE response);
655
656    ENGINE_ERROR_CODE handleTrafficControlCmd(const void* cookie,
657                                              protocol_binary_request_header *request,
658                                              ADD_RESPONSE response);
659
660    size_t getGetlDefaultTimeout() const {
661        return getlDefaultTimeout;
662    }
663
664    size_t getGetlMaxTimeout() const {
665        return getlMaxTimeout;
666    }
667
668    size_t getMaxFailoverEntries() const {
669        return maxFailoverEntries;
670    }
671
672    bool isDegradedMode() const {
673        return epstore->isWarmingUp() || !trafficEnabled.load();
674    }
675
676    WorkLoadPolicy &getWorkLoadPolicy(void) {
677        return *workload;
678    }
679
680    bucket_priority_t getWorkloadPriority(void) {return workloadPriority; }
681    void setWorkloadPriority(bucket_priority_t p) { workloadPriority = p; }
682
683    struct clusterConfig {
684        clusterConfig() : len(0), config(NULL) {}
685        uint32_t len;
686        uint8_t *config;
687        Mutex lock;
688    } clusterConfig;
689
690    ENGINE_ERROR_CODE getRandomKey(const void *cookie,
691                                   ADD_RESPONSE response);
692
693    ConnHandler* getConnHandler(const void *cookie);
694
695    void addLookupAllKeys(const void *cookie, ENGINE_ERROR_CODE err);
696
697protected:
698    friend class EpEngineValueChangeListener;
699
700    void setMaxItemSize(size_t value) {
701        maxItemSize = value;
702    }
703
704    void setGetlDefaultTimeout(size_t value) {
705        getlDefaultTimeout = value;
706    }
707
708    void setGetlMaxTimeout(size_t value) {
709        getlMaxTimeout = value;
710    }
711
712private:
713    EventuallyPersistentEngine(GET_SERVER_API get_server_api);
714    friend ENGINE_ERROR_CODE create_instance(uint64_t interface,
715                                             GET_SERVER_API get_server_api,
716                                             ENGINE_HANDLE **handle);
717    uint16_t doWalkTapQueue(const void *cookie, item **itm, void **es,
718                            uint16_t *nes, uint8_t *ttl, uint16_t *flags,
719                            uint32_t *seqno, uint16_t *vbucket,
720                            TapProducer *c, bool &retry);
721
722
723    ENGINE_ERROR_CODE processTapAck(const void *cookie,
724                                    uint32_t seqno,
725                                    uint16_t status,
726                                    const std::string &msg);
727
728    /**
729     * Report the state of a memory condition when out of memory.
730     *
731     * @return ETMPFAIL if we think we can recover without interaction,
732     *         else ENOMEM
733     */
734    ENGINE_ERROR_CODE memoryCondition() {
735        // Do we think it's possible we could free something?
736        bool haveEvidenceWeCanFreeMemory(stats.getMaxDataSize() > stats.memOverhead);
737        if (haveEvidenceWeCanFreeMemory) {
738            // Look for more evidence by seeing if we have resident items.
739            VBucketCountVisitor countVisitor(*this, vbucket_state_active);
740            epstore->visit(countVisitor);
741
742            haveEvidenceWeCanFreeMemory = countVisitor.getNonResident() <
743                countVisitor.getNumItems();
744        }
745        if (haveEvidenceWeCanFreeMemory) {
746            ++stats.tmp_oom_errors;
747            return ENGINE_TMPFAIL;
748        } else {
749            ++stats.oom_errors;
750            return ENGINE_ENOMEM;
751        }
752    }
753
754    friend class BGFetchCallback;
755    friend class EventuallyPersistentStore;
756
757    bool enableTraffic(bool enable) {
758        bool inverse = !enable;
759        return trafficEnabled.compare_exchange_strong(inverse, enable);
760    }
761
762    ENGINE_ERROR_CODE doEngineStats(const void *cookie, ADD_STAT add_stat);
763    ENGINE_ERROR_CODE doKlogStats(const void *cookie, ADD_STAT add_stat);
764    ENGINE_ERROR_CODE doMemoryStats(const void *cookie, ADD_STAT add_stat);
765    ENGINE_ERROR_CODE doVBucketStats(const void *cookie, ADD_STAT add_stat,
766                                     const char* stat_key,
767                                     int nkey,
768                                     bool prevStateRequested,
769                                     bool details);
770    ENGINE_ERROR_CODE doHashStats(const void *cookie, ADD_STAT add_stat);
771    ENGINE_ERROR_CODE doCheckpointStats(const void *cookie, ADD_STAT add_stat,
772                                        const char* stat_key, int nkey);
773    ENGINE_ERROR_CODE doTapStats(const void *cookie, ADD_STAT add_stat);
774    ENGINE_ERROR_CODE doUprStats(const void *cookie, ADD_STAT add_stat);
775    ENGINE_ERROR_CODE doConnAggStats(const void *cookie, ADD_STAT add_stat,
776                                     const char *sep, size_t nsep,
777                                     conn_type_t connType);
778    ENGINE_ERROR_CODE doTimingStats(const void *cookie, ADD_STAT add_stat);
779    ENGINE_ERROR_CODE doDispatcherStats(const void *cookie, ADD_STAT add_stat);
780    ENGINE_ERROR_CODE doKeyStats(const void *cookie, ADD_STAT add_stat,
781                                 uint16_t vbid, std::string &key, bool validate=false);
782    ENGINE_ERROR_CODE doTapVbTakeoverStats(const void *cookie,
783                                           ADD_STAT add_stat,
784                                           std::string& key,
785                                           uint16_t vbid);
786
787    ENGINE_ERROR_CODE doUprVbTakeoverStats(const void *cookie,
788                                           ADD_STAT add_stat,
789                                           std::string &key,
790                                           uint16_t vbid);
791    ENGINE_ERROR_CODE doVbIdFailoverLogStats(const void *cookie,
792                                             ADD_STAT add_stat,
793                                             uint16_t vbid);
794    ENGINE_ERROR_CODE doAllFailoverLogStats(const void *cookie, ADD_STAT add_stat);
795    ENGINE_ERROR_CODE doWorkloadStats(const void *cookie, ADD_STAT add_stat);
796    ENGINE_ERROR_CODE doSeqnoStats(const void *cookie, ADD_STAT add_stat,
797                                   const char* stat_key, int nkey);
798    ENGINE_ERROR_CODE doDiskStats(const void *cookie, ADD_STAT add_stat,
799                                  const char* stat_key, int nkey);
800
801    void addLookupResult(const void *cookie, Item *result) {
802        LockHolder lh(lookupMutex);
803        std::map<const void*, Item*>::iterator it = lookups.find(cookie);
804        if (it != lookups.end()) {
805            if (it->second != NULL) {
806                LOG(EXTENSION_LOG_DEBUG,
807                    "Cleaning up old lookup result for '%s'",
808                    it->second->getKey().c_str());
809                delete it->second;
810            } else {
811                LOG(EXTENSION_LOG_DEBUG, "Cleaning up old null lookup result");
812            }
813            lookups.erase(it);
814        }
815        lookups[cookie] = result;
816    }
817
818    bool fetchLookupResult(const void *cookie, Item **itm) {
819        // This will return *and erase* the lookup result for a connection.
820        // You look it up, you own it.
821        LockHolder lh(lookupMutex);
822        std::map<const void*, Item*>::iterator it = lookups.find(cookie);
823        if (it != lookups.end()) {
824            *itm = it->second;
825            lookups.erase(it);
826            return true;
827        } else {
828            return false;
829        }
830    }
831
832    // Get the current tap connection for this cookie.
833    // If this method returns NULL, you should return TAP_DISCONNECT
834    TapProducer* getTapProducer(const void *cookie);
835
836    SERVER_HANDLE_V1 *serverApi;
837    EventuallyPersistentStore *epstore;
838    WorkLoadPolicy *workload;
839    bucket_priority_t workloadPriority;
840
841    TapThrottle *tapThrottle;
842    std::map<const void*, Item*> lookups;
843    unordered_map<const void*, ENGINE_ERROR_CODE> allKeysLookups;
844    Mutex lookupMutex;
845    GET_SERVER_API getServerApiFunc;
846    union {
847        engine_info info;
848        char buffer[sizeof(engine_info) + 10 * sizeof(feature_info) ];
849    } info;
850
851    UprConnMap *uprConnMap_;
852    TapConnMap *tapConnMap;
853    TapConfig *tapConfig;
854    CheckpointConfig *checkpointConfig;
855    std::string name;
856    size_t maxItemSize;
857    size_t getlDefaultTimeout;
858    size_t getlMaxTimeout;
859    size_t maxFailoverEntries;
860    EPStats stats;
861    Configuration configuration;
862    AtomicValue<bool> trafficEnabled;
863
864    bool flushAllEnabled;
865    // a unique system generated token initialized at each time
866    // ep_engine starts up.
867    time_t startupTime;
868
869};
870
871#endif  // SRC_EP_ENGINE_H_
872