1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "config.h"
19 
20 #include <string.h>
21 #include <time.h>
22 
23 #include <fstream>
24 #include <functional>
25 #include <iostream>
26 #include <map>
27 #include <sstream>
28 #include <string>
29 #include <utility>
30 #include <vector>
31 
32 #include "access_scanner.h"
33 #include "checkpoint_remover.h"
34 #include "conflict_resolution.h"
35 #include "ep.h"
36 #include "ep_engine.h"
37 #include "failover-table.h"
38 #include "flusher.h"
39 #include "htresizer.h"
40 #include "kvshard.h"
41 #include "kvstore.h"
42 #include "locks.h"
43 #include "mutation_log.h"
44 #include "warmup.h"
45 #include "connmap.h"
46 #include "tapthrottle.h"
47 
48 class StatsValueChangeListener : public ValueChangedListener {
49 public:
StatsValueChangeListener(EPStats &st)50     StatsValueChangeListener(EPStats &st) : stats(st) {
51         // EMPTY
52     }
53 
sizeValueChanged(const std::string &key, size_t value)54     virtual void sizeValueChanged(const std::string &key, size_t value) {
55         if (key.compare("max_size") == 0) {
56             stats.setMaxDataSize(value);
57             size_t low_wat = static_cast<size_t>
58                              (static_cast<double>(value) * 0.6);
59             size_t high_wat = static_cast<size_t>(
60                               static_cast<double>(value) * 0.75);
61             stats.mem_low_wat.store(low_wat);
62             stats.mem_high_wat.store(high_wat);
63         } else if (key.compare("mem_low_wat") == 0) {
64             stats.mem_low_wat.store(value);
65         } else if (key.compare("mem_high_wat") == 0) {
66             stats.mem_high_wat.store(value);
67         } else if (key.compare("tap_throttle_threshold") == 0) {
68             stats.tapThrottleThreshold.store(
69                                           static_cast<double>(value) / 100.0);
70         } else if (key.compare("warmup_min_memory_threshold") == 0) {
71             stats.warmupMemUsedCap.store(static_cast<double>(value) / 100.0);
72         } else if (key.compare("warmup_min_items_threshold") == 0) {
73             stats.warmupNumReadCap.store(static_cast<double>(value) / 100.0);
74         } else {
75             LOG(EXTENSION_LOG_WARNING,
76                 "Failed to change value for unknown variable, %s\n",
77                 key.c_str());
78         }
79     }
80 
81 private:
82     EPStats &stats;
83 };
84 
85 /**
86  * A configuration value changed listener that responds to ep-engine
87  * parameter changes by invoking engine-specific methods on
88  * configuration change events.
89  */
90 class EPStoreValueChangeListener : public ValueChangedListener {
91 public:
EPStoreValueChangeListener(EventuallyPersistentStore &st)92     EPStoreValueChangeListener(EventuallyPersistentStore &st) : store(st) {
93     }
94 
sizeValueChanged(const std::string &key, size_t value)95     virtual void sizeValueChanged(const std::string &key, size_t value) {
96         if (key.compare("bg_fetch_delay") == 0) {
97             store.setBGFetchDelay(static_cast<uint32_t>(value));
98         } else if (key.compare("compaction_write_queue_cap") == 0) {
99             store.setCompactionWriteQueueCap(value);
100         } else if (key.compare("exp_pager_stime") == 0) {
101             store.setExpiryPagerSleeptime(value);
102         } else if (key.compare("alog_sleep_time") == 0) {
103             store.setAccessScannerSleeptime(value);
104         } else if (key.compare("alog_task_time") == 0) {
105             store.resetAccessScannerStartTime();
106         } else if (key.compare("mutation_mem_threshold") == 0) {
107             double mem_threshold = static_cast<double>(value) / 100;
108             StoredValue::setMutationMemoryThreshold(mem_threshold);
109         } else if (key.compare("backfill_mem_threshold") == 0) {
110             double backfill_threshold = static_cast<double>(value) / 100;
111             store.setBackfillMemoryThreshold(backfill_threshold);
112         } else if (key.compare("compaction_exp_mem_threshold") == 0) {
113             store.setCompactionExpMemThreshold(value);
114         } else if (key.compare("tap_throttle_queue_cap") == 0) {
115             store.getEPEngine().getTapThrottle().setQueueCap(value);
116         } else if (key.compare("tap_throttle_cap_pcnt") == 0) {
117             store.getEPEngine().getTapThrottle().setCapPercent(value);
118         } else {
119             LOG(EXTENSION_LOG_WARNING,
120                 "Failed to change value for unknown variable, %s\n",
121                 key.c_str());
122         }
123     }
124 
booleanValueChanged(const std::string &key, bool value)125     virtual void booleanValueChanged(const std::string &key, bool value) {
126         if (key.compare("access_scanner_enabled") == 0) {
127             if (value) {
128                 store.enableAccessScannerTask();
129             } else {
130                 store.disableAccessScannerTask();
131             }
132         }
133     }
134 
135 private:
136     EventuallyPersistentStore &store;
137 };
138 
139 class VBucketMemoryDeletionTask : public GlobalTask {
140 public:
VBucketMemoryDeletionTask(EventuallyPersistentEngine &eng, RCPtr<VBucket> &vb, double delay)141     VBucketMemoryDeletionTask(EventuallyPersistentEngine &eng,
142                               RCPtr<VBucket> &vb, double delay) :
143                               GlobalTask(&eng,
144                               Priority::VBMemoryDeletionPriority, delay,false),
145                               e(eng), vbucket(vb), vbid(vb->getId()) { }
146 
getDescription()147     std::string getDescription() {
148         std::stringstream ss;
149         ss << "Removing (dead) vbucket " << vbid << " from memory";
150         return ss.str();
151     }
152 
run(void)153     bool run(void) {
154         vbucket->notifyAllPendingConnsFailed(e);
155         vbucket->ht.clear();
156         vbucket.reset();
157         return false;
158     }
159 
160 private:
161     EventuallyPersistentEngine &e;
162     RCPtr<VBucket> vbucket;
163     uint16_t vbid;
164 };
165 
166 class PendingOpsNotification : public GlobalTask {
167 public:
PendingOpsNotification(EventuallyPersistentEngine &e, RCPtr<VBucket> &vb)168     PendingOpsNotification(EventuallyPersistentEngine &e, RCPtr<VBucket> &vb) :
169         GlobalTask(&e, Priority::VBMemoryDeletionPriority, 0, false),
170         engine(e), vbucket(vb) { }
171 
getDescription()172     std::string getDescription() {
173         std::stringstream ss;
174         ss << "Notify pending operations for vbucket " << vbucket->getId();
175         return ss.str();
176     }
177 
run(void)178     bool run(void) {
179         vbucket->fireAllOps(engine);
180         return false;
181     }
182 
183 private:
184     EventuallyPersistentEngine &engine;
185     RCPtr<VBucket> vbucket;
186 };
187 
EventuallyPersistentStore( EventuallyPersistentEngine &theEngine)188 EventuallyPersistentStore::EventuallyPersistentStore(
189     EventuallyPersistentEngine &theEngine) :
190     engine(theEngine), stats(engine.getEpStats()),
191     vbMap(theEngine.getConfiguration(), *this),
192     bgFetchQueue(0),
193     diskFlushAll(false), bgFetchDelay(0), backfillMemoryThreshold(0.95),
194     statsSnapshotTaskId(0), lastTransTimePerItem(0)
195 {
196     cachedResidentRatio.activeRatio.store(0);
197     cachedResidentRatio.replicaRatio.store(0);
198 
199     Configuration &config = engine.getConfiguration();
200     MutationLog *shardlog;
201     for (uint16_t i = 0; i < config.getMaxNumShards(); i++) {
202         std::stringstream s;
203         s << i;
204         shardlog = new MutationLog(engine.getConfiguration().getAlogPath() +
205                                  "." + s.str(),
206                                  engine.getConfiguration().getAlogBlockSize());
207         accessLog.push_back(shardlog);
208     }
209 
210     storageProperties = new StorageProperties(true, true, true, true);
211 
212     stats.schedulingHisto = new Histogram<hrtime_t>[MAX_TYPE_ID];
213     stats.taskRuntimeHisto = new Histogram<hrtime_t>[MAX_TYPE_ID];
214 
215     for (size_t i = 0; i < MAX_TYPE_ID; i++) {
216         stats.schedulingHisto[i].reset();
217         stats.taskRuntimeHisto[i].reset();
218     }
219 
220     ExecutorPool::get()->registerBucket(ObjectRegistry::getCurrentEngine());
221 
222     size_t num_vbs = config.getMaxVbuckets();
223     vb_mutexes = new Mutex[num_vbs];
224     schedule_vbstate_persist = new AtomicValue<bool>[num_vbs];
225     for (size_t i = 0; i < num_vbs; ++i) {
226         schedule_vbstate_persist[i] = false;
227     }
228 
229     stats.memOverhead = sizeof(EventuallyPersistentStore);
230 
231     if (config.getConflictResolutionType().compare("seqno") == 0) {
232         conflictResolver = new SeqBasedResolution();
233     }
234 
235     stats.setMaxDataSize(config.getMaxSize());
236     config.addValueChangedListener("max_size",
237                                    new StatsValueChangeListener(stats));
238 
239     stats.mem_low_wat.store(config.getMemLowWat());
240     config.addValueChangedListener("mem_low_wat",
241                                    new StatsValueChangeListener(stats));
242 
243     stats.mem_high_wat.store(config.getMemHighWat());
244     config.addValueChangedListener("mem_high_wat",
245                                    new StatsValueChangeListener(stats));
246 
247     stats.tapThrottleThreshold.store(static_cast<double>
248                                     (config.getTapThrottleThreshold())
249                                      / 100.0);
250     config.addValueChangedListener("tap_throttle_threshold",
251                                    new StatsValueChangeListener(stats));
252 
253     stats.tapThrottleWriteQueueCap.store(config.getTapThrottleQueueCap());
254     config.addValueChangedListener("tap_throttle_queue_cap",
255                                    new EPStoreValueChangeListener(*this));
256     config.addValueChangedListener("tap_throttle_cap_pcnt",
257                                    new EPStoreValueChangeListener(*this));
258 
259     setBGFetchDelay(config.getBgFetchDelay());
260     config.addValueChangedListener("bg_fetch_delay",
261                                    new EPStoreValueChangeListener(*this));
262 
263     stats.warmupMemUsedCap.store(static_cast<double>
264                                (config.getWarmupMinMemoryThreshold()) / 100.0);
265     config.addValueChangedListener("warmup_min_memory_threshold",
266                                    new StatsValueChangeListener(stats));
267     stats.warmupNumReadCap.store(static_cast<double>
268                                 (config.getWarmupMinItemsThreshold()) / 100.0);
269     config.addValueChangedListener("warmup_min_items_threshold",
270                                    new StatsValueChangeListener(stats));
271 
272     double mem_threshold = static_cast<double>
273                                       (config.getMutationMemThreshold()) / 100;
274     StoredValue::setMutationMemoryThreshold(mem_threshold);
275     config.addValueChangedListener("mutation_mem_threshold",
276                                    new EPStoreValueChangeListener(*this));
277 
278     double backfill_threshold = static_cast<double>
279                                       (config.getBackfillMemThreshold()) / 100;
280     setBackfillMemoryThreshold(backfill_threshold);
281     config.addValueChangedListener("backfill_mem_threshold",
282                                    new EPStoreValueChangeListener(*this));
283 
284     compactionExpMemThreshold = config.getCompactionExpMemThreshold();
285     config.addValueChangedListener("compaction_exp_mem_threshold",
286                                    new EPStoreValueChangeListener(*this));
287 
288     compactionWriteQueueCap = config.getCompactionWriteQueueCap();
289     config.addValueChangedListener("compaction_write_queue_cap",
290                                    new EPStoreValueChangeListener(*this));
291 
292     const std::string &policy = config.getItemEvictionPolicy();
293     if (policy.compare("value_only") == 0) {
294         eviction_policy = VALUE_ONLY;
295     } else {
296         eviction_policy = FULL_EVICTION;
297     }
298 
299     warmupTask = new Warmup(this);
300 }
301 
initialize()302 bool EventuallyPersistentStore::initialize() {
303     // We should nuke everything unless we want warmup
304     Configuration &config = engine.getConfiguration();
305     if (!config.isWarmup()) {
306         reset();
307     }
308 
309     if (!startFlusher()) {
310         LOG(EXTENSION_LOG_WARNING,
311             "FATAL: Failed to create and start flushers");
312         return false;
313     }
314     if (!startBgFetcher()) {
315         LOG(EXTENSION_LOG_WARNING,
316            "FATAL: Failed to create and start bgfetchers");
317         return false;
318     }
319 
320     warmupTask->start();
321 
322     if (config.isFailpartialwarmup() && stats.warmOOM > 0) {
323         LOG(EXTENSION_LOG_WARNING,
324             "Warmup failed to load %d records due to OOM, exiting.\n",
325             static_cast<unsigned int>(stats.warmOOM));
326         return false;
327     }
328 
329     itmpTask = new ItemPager(&engine, stats);
330     ExecutorPool::get()->schedule(itmpTask, NONIO_TASK_IDX);
331 
332     size_t expiryPagerSleeptime = config.getExpPagerStime();
333     setExpiryPagerSleeptime(expiryPagerSleeptime);
334     config.addValueChangedListener("exp_pager_stime",
335                                    new EPStoreValueChangeListener(*this));
336 
337     ExTask htrTask = new HashtableResizerTask(this, 10);
338     ExecutorPool::get()->schedule(htrTask, NONIO_TASK_IDX);
339 
340     size_t checkpointRemoverInterval = config.getChkRemoverStime();
341     chkTask = new ClosedUnrefCheckpointRemoverTask(&engine, stats,
342                                                    checkpointRemoverInterval);
343     ExecutorPool::get()->schedule(chkTask, NONIO_TASK_IDX);
344 
345     ExTask vbSnapshotTask = new DaemonVBSnapshotTask(&engine);
346     ExecutorPool::get()->schedule(vbSnapshotTask, WRITER_TASK_IDX);
347 
348     ExTask workloadMonitorTask = new WorkLoadMonitor(&engine, false);
349     ExecutorPool::get()->schedule(workloadMonitorTask, NONIO_TASK_IDX);
350 
351     return true;
352 }
353 
~EventuallyPersistentStore()354 EventuallyPersistentStore::~EventuallyPersistentStore() {
355     stopWarmup();
356     stopBgFetcher();
357     ExecutorPool::get()->stopTaskGroup(&engine, NONIO_TASK_IDX);
358 
359     ExecutorPool::get()->cancel(statsSnapshotTaskId);
360     LockHolder lh(accessScanner.mutex);
361     ExecutorPool::get()->cancel(accessScanner.task);
362     lh.unlock();
363 
364     stopFlusher();
365     ExecutorPool::get()->unregisterBucket(ObjectRegistry::getCurrentEngine());
366 
367     delete [] vb_mutexes;
368     delete [] schedule_vbstate_persist;
369     delete [] stats.schedulingHisto;
370     delete [] stats.taskRuntimeHisto;
371     delete conflictResolver;
372     delete warmupTask;
373     delete storageProperties;
374 
375     std::vector<MutationLog*>::iterator it;
376     for (it = accessLog.begin(); it != accessLog.end(); it++) {
377         delete *it;
378     }
379 }
380 
getFlusher(uint16_t shardId)381 const Flusher* EventuallyPersistentStore::getFlusher(uint16_t shardId) {
382     return vbMap.getShard(shardId)->getFlusher();
383 }
384 
getWarmup(void) const385 Warmup* EventuallyPersistentStore::getWarmup(void) const {
386     return warmupTask;
387 }
388 
startFlusher()389 bool EventuallyPersistentStore::startFlusher() {
390     for (uint16_t i = 0; i < vbMap.numShards; ++i) {
391         Flusher *flusher = vbMap.shards[i]->getFlusher();
392         flusher->start();
393     }
394     return true;
395 }
396 
stopFlusher()397 void EventuallyPersistentStore::stopFlusher() {
398     for (uint16_t i = 0; i < vbMap.numShards; i++) {
399         Flusher *flusher = vbMap.shards[i]->getFlusher();
400         bool rv = flusher->stop(stats.forceShutdown);
401         if (rv && !stats.forceShutdown) {
402             flusher->wait();
403         }
404     }
405 }
406 
pauseFlusher()407 bool EventuallyPersistentStore::pauseFlusher() {
408     bool rv = true;
409     for (uint16_t i = 0; i < vbMap.numShards; i++) {
410         Flusher *flusher = vbMap.shards[i]->getFlusher();
411         if (!flusher->pause()) {
412             LOG(EXTENSION_LOG_WARNING, "Attempted to pause flusher in state "
413                 "[%s], shard = %d", flusher->stateName(), i);
414             rv = false;
415         }
416     }
417     return rv;
418 }
419 
resumeFlusher()420 bool EventuallyPersistentStore::resumeFlusher() {
421     bool rv = true;
422     for (uint16_t i = 0; i < vbMap.numShards; i++) {
423         Flusher *flusher = vbMap.shards[i]->getFlusher();
424         if (!flusher->resume()) {
425             LOG(EXTENSION_LOG_WARNING,
426                     "Warning: attempted to resume flusher in state [%s], "
427                     "shard = %d", flusher->stateName(), i);
428             rv = false;
429         }
430     }
431     return rv;
432 }
433 
wakeUpFlusher()434 void EventuallyPersistentStore::wakeUpFlusher() {
435     if (stats.diskQueueSize.load() == 0) {
436         for (uint16_t i = 0; i < vbMap.numShards; i++) {
437             Flusher *flusher = vbMap.shards[i]->getFlusher();
438             flusher->wake();
439         }
440     }
441 }
442 
startBgFetcher()443 bool EventuallyPersistentStore::startBgFetcher() {
444     for (uint16_t i = 0; i < vbMap.numShards; i++) {
445         BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
446         if (bgfetcher == NULL) {
447             LOG(EXTENSION_LOG_WARNING,
448                 "Falied to start bg fetcher for shard %d", i);
449             return false;
450         }
451         bgfetcher->start();
452     }
453     return true;
454 }
455 
stopBgFetcher()456 void EventuallyPersistentStore::stopBgFetcher() {
457     for (uint16_t i = 0; i < vbMap.numShards; i++) {
458         BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
459         if (multiBGFetchEnabled() && bgfetcher->pendingJob()) {
460             LOG(EXTENSION_LOG_WARNING,
461                 "Shutting down engine while there are still pending data "
462                 "read for shard %d from database storage", i);
463         }
464         LOG(EXTENSION_LOG_INFO, "Stopping bg fetcher for underlying storage");
465         bgfetcher->stop();
466     }
467 }
468 
getVBucket(uint16_t vbid, vbucket_state_t wanted_state)469 RCPtr<VBucket> EventuallyPersistentStore::getVBucket(uint16_t vbid,
470                                                 vbucket_state_t wanted_state) {
471     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
472     vbucket_state_t found_state(vb ? vb->getState() : vbucket_state_dead);
473     if (found_state == wanted_state) {
474         return vb;
475     } else {
476         RCPtr<VBucket> rv;
477         return rv;
478     }
479 }
480 
481 void
deleteExpiredItem(uint16_t vbid, std::string &key, time_t startTime, uint64_t revSeqno)482 EventuallyPersistentStore::deleteExpiredItem(uint16_t vbid, std::string &key,
483                                              time_t startTime,
484                                              uint64_t revSeqno) {
485     RCPtr<VBucket> vb = getVBucket(vbid);
486     if (vb) {
487         int bucket_num(0);
488         incExpirationStat(vb);
489         LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
490         StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
491         if (v) {
492             if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
493                 // This is a temporary item whose background fetch for metadata
494                 // has completed.
495                 bool deleted = vb->ht.unlocked_del(key, bucket_num);
496                 cb_assert(deleted);
497             } else if (v->isExpired(startTime) && !v->isDeleted()) {
498                 vb->ht.unlocked_softDelete(v, 0, getItemEvictionPolicy());
499                 queueDirty(vb, v, &lh, false);
500             }
501         } else {
502             if (eviction_policy == FULL_EVICTION) {
503                 // Create a temp item and delete and push it
504                 // into the checkpoint queue.
505                 add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
506                                                             eviction_policy);
507                 if (rv == ADD_NOMEM) {
508                     return;
509                 }
510                 v = vb->ht.unlocked_find(key, bucket_num, true, false);
511                 v->setStoredValueState(StoredValue::state_deleted_key);
512                 v->setRevSeqno(revSeqno);
513                 vb->ht.unlocked_softDelete(v, 0, eviction_policy);
514                 queueDirty(vb, v, &lh, false);
515             }
516         }
517     }
518 }
519 
520 void
deleteExpiredItems(std::list<std::pair<uint16_t, std::string> > &keys)521 EventuallyPersistentStore::deleteExpiredItems(std::list<std::pair<uint16_t,
522                                                         std::string> > &keys) {
523     std::list<std::pair<uint16_t, std::string> >::iterator it;
524     time_t startTime = ep_real_time();
525     for (it = keys.begin(); it != keys.end(); it++) {
526         deleteExpiredItem(it->first, it->second, startTime, 0);
527     }
528 }
529 
fetchValidValue(RCPtr<VBucket> &vb, const std::string &key, int bucket_num, bool wantDeleted, bool trackReference, bool queueExpired)530 StoredValue *EventuallyPersistentStore::fetchValidValue(RCPtr<VBucket> &vb,
531                                                         const std::string &key,
532                                                         int bucket_num,
533                                                         bool wantDeleted,
534                                                         bool trackReference,
535                                                         bool queueExpired) {
536     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, wantDeleted,
537                                           trackReference);
538     if (v && !v->isDeleted() && !v->isTempItem()) {
539         // In the deleted case, we ignore expiration time.
540         if (v->isExpired(ep_real_time())) {
541             if (vb->getState() != vbucket_state_active) {
542                 return wantDeleted ? v : NULL;
543             }
544             if (queueExpired) {
545                 incExpirationStat(vb, false);
546                 vb->ht.unlocked_softDelete(v, 0, eviction_policy);
547                 queueDirty(vb, v, NULL, false, true);
548             }
549             return wantDeleted ? v : NULL;
550         }
551     }
552     return v;
553 }
554 
evictKey( const std::string &key, uint16_t vbucket, const char **msg, size_t *msg_size, bool force)555 protocol_binary_response_status EventuallyPersistentStore::evictKey(
556                                                         const std::string &key,
557                                                         uint16_t vbucket,
558                                                         const char **msg,
559                                                         size_t *msg_size,
560                                                         bool force) {
561     RCPtr<VBucket> vb = getVBucket(vbucket);
562     if (!vb || (vb->getState() != vbucket_state_active && !force)) {
563         return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
564     }
565 
566     int bucket_num(0);
567     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
568     StoredValue *v = fetchValidValue(vb, key, bucket_num, force, false);
569 
570     protocol_binary_response_status rv(PROTOCOL_BINARY_RESPONSE_SUCCESS);
571 
572     *msg_size = 0;
573     if (v) {
574         if (force)  {
575             v->markClean();
576         }
577         if (v->isResident()) {
578             if (vb->ht.unlocked_ejectItem(v, eviction_policy)) {
579                 *msg = "Ejected.";
580             } else {
581                 *msg = "Can't eject: Dirty object.";
582                 rv = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
583             }
584         } else {
585             *msg = "Already ejected.";
586         }
587     } else {
588         if (eviction_policy == VALUE_ONLY) {
589             *msg = "Not found.";
590             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
591         } else {
592             *msg = "Already ejected.";
593         }
594     }
595 
596     return rv;
597 }
598 
addTempItemForBgFetch( LockHolder &lock, int bucket_num, const std::string &key, RCPtr<VBucket> &vb, const void *cookie, bool metadataOnly, bool isReplication)599 ENGINE_ERROR_CODE EventuallyPersistentStore::addTempItemForBgFetch(
600                                                         LockHolder &lock,
601                                                         int bucket_num,
602                                                         const std::string &key,
603                                                         RCPtr<VBucket> &vb,
604                                                         const void *cookie,
605                                                         bool metadataOnly,
606                                                         bool isReplication) {
607 
608     add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
609                                                 eviction_policy,
610                                                 isReplication);
611     switch(rv) {
612         case ADD_NOMEM:
613             return ENGINE_ENOMEM;
614         case ADD_EXISTS:
615         case ADD_UNDEL:
616         case ADD_SUCCESS:
617         case ADD_TMP_AND_BG_FETCH:
618             // Since the hashtable bucket is locked, we shouldn't get here
619             abort();
620         case ADD_BG_FETCH:
621             lock.unlock();
622             bgFetch(key, vb->getId(), -1, cookie, metadataOnly);
623     }
624     return ENGINE_EWOULDBLOCK;
625 }
626 
set(const Item &itm, const void *cookie, bool force, uint8_t nru)627 ENGINE_ERROR_CODE EventuallyPersistentStore::set(const Item &itm,
628                                                  const void *cookie,
629                                                  bool force,
630                                                  uint8_t nru) {
631 
632     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
633     if (!vb || vb->getState() == vbucket_state_dead) {
634         ++stats.numNotMyVBuckets;
635         return ENGINE_NOT_MY_VBUCKET;
636     } else if (vb->getState() == vbucket_state_replica && !force) {
637         ++stats.numNotMyVBuckets;
638         return ENGINE_NOT_MY_VBUCKET;
639     } else if (vb->getState() == vbucket_state_pending && !force) {
640         if (vb->addPendingOp(cookie)) {
641             return ENGINE_EWOULDBLOCK;
642         }
643     }
644 
645     bool cas_op = (itm.getCas() != 0);
646     int bucket_num(0);
647     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
648     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
649                                           false);
650     if (v && v->isLocked(ep_current_time()) &&
651         (vb->getState() == vbucket_state_replica ||
652          vb->getState() == vbucket_state_pending)) {
653         v->unlock();
654     }
655     mutation_type_t mtype = vb->ht.unlocked_set(v, itm, itm.getCas(),
656                                                 true, false,
657                                                 eviction_policy, nru);
658 
659     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
660     switch (mtype) {
661     case NOMEM:
662         ret = ENGINE_ENOMEM;
663         break;
664     case INVALID_CAS:
665     case IS_LOCKED:
666         ret = ENGINE_KEY_EEXISTS;
667         break;
668     case NOT_FOUND:
669         if (cas_op) {
670             ret = ENGINE_KEY_ENOENT;
671             break;
672         }
673         // FALLTHROUGH
674     case WAS_DIRTY:
675         // Even if the item was dirty, push it into the vbucket's open
676         // checkpoint.
677     case WAS_CLEAN:
678         queueDirty(vb, v, &lh);
679         break;
680     case NEED_BG_FETCH:
681         { // CAS operation with non-resident item + full eviction.
682             if (v) {
683                 // temp item is already created. Simply schedule a bg fetch job
684                 lh.unlock();
685                 bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
686                 return ENGINE_EWOULDBLOCK;
687             }
688             ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
689                                         cookie, true);
690             break;
691         }
692     case INVALID_VBUCKET:
693         ret = ENGINE_NOT_MY_VBUCKET;
694         break;
695     }
696 
697     return ret;
698 }
699 
add(const Item &itm, const void *cookie)700 ENGINE_ERROR_CODE EventuallyPersistentStore::add(const Item &itm,
701                                                  const void *cookie)
702 {
703     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
704     if (!vb || vb->getState() == vbucket_state_dead ||
705         vb->getState() == vbucket_state_replica) {
706         ++stats.numNotMyVBuckets;
707         return ENGINE_NOT_MY_VBUCKET;
708     } else if(vb->getState() == vbucket_state_pending) {
709         if (vb->addPendingOp(cookie)) {
710             return ENGINE_EWOULDBLOCK;
711         }
712     }
713 
714     if (itm.getCas() != 0) {
715         // Adding with a cas value doesn't make sense..
716         return ENGINE_NOT_STORED;
717     }
718 
719     int bucket_num(0);
720     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
721     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
722                                           false);
723     add_type_t atype = vb->ht.unlocked_add(bucket_num, v, itm,
724                                            eviction_policy);
725 
726     switch (atype) {
727     case ADD_NOMEM:
728         return ENGINE_ENOMEM;
729     case ADD_EXISTS:
730         return ENGINE_NOT_STORED;
731     case ADD_TMP_AND_BG_FETCH:
732         return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
733                                      cookie, true);
734     case ADD_BG_FETCH:
735         lh.unlock();
736         bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
737         return ENGINE_EWOULDBLOCK;
738     case ADD_SUCCESS:
739     case ADD_UNDEL:
740         queueDirty(vb, v, &lh);
741         break;
742     }
743     return ENGINE_SUCCESS;
744 }
745 
replace(const Item &itm, const void *cookie)746 ENGINE_ERROR_CODE EventuallyPersistentStore::replace(const Item &itm,
747                                                      const void *cookie) {
748     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
749     if (!vb || vb->getState() == vbucket_state_dead ||
750         vb->getState() == vbucket_state_replica) {
751         ++stats.numNotMyVBuckets;
752         return ENGINE_NOT_MY_VBUCKET;
753     } else if (vb->getState() == vbucket_state_pending) {
754         if (vb->addPendingOp(cookie)) {
755             return ENGINE_EWOULDBLOCK;
756         }
757     }
758 
759     int bucket_num(0);
760     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
761     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
762                                           false);
763     if (v) {
764         if (v->isDeleted() || v->isTempDeletedItem() ||
765             v->isTempNonExistentItem()) {
766             return ENGINE_KEY_ENOENT;
767         }
768 
769         mutation_type_t mtype;
770         if (eviction_policy == FULL_EVICTION && v->isTempInitialItem()) {
771             mtype = NEED_BG_FETCH;
772         } else {
773             mtype = vb->ht.unlocked_set(v, itm, 0, true, false, eviction_policy,
774                                         0xff);
775         }
776 
777         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
778         switch (mtype) {
779             case NOMEM:
780                 ret = ENGINE_ENOMEM;
781                 break;
782             case IS_LOCKED:
783                 ret = ENGINE_KEY_EEXISTS;
784                 break;
785             case INVALID_CAS:
786             case NOT_FOUND:
787                 ret = ENGINE_NOT_STORED;
788                 break;
789                 // FALLTHROUGH
790             case WAS_DIRTY:
791                 // Even if the item was dirty, push it into the vbucket's open
792                 // checkpoint.
793             case WAS_CLEAN:
794                 queueDirty(vb, v, &lh);
795                 break;
796             case NEED_BG_FETCH:
797             {
798                 // temp item is already created. Simply schedule a bg fetch job
799                 lh.unlock();
800                 bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
801                 ret = ENGINE_EWOULDBLOCK;
802                 break;
803             }
804             case INVALID_VBUCKET:
805                 ret = ENGINE_NOT_MY_VBUCKET;
806                 break;
807         }
808 
809         return ret;
810     } else {
811         if (eviction_policy == VALUE_ONLY) {
812             return ENGINE_KEY_ENOENT;
813         }
814 
815         return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
816                                      cookie, false);
817     }
818 }
819 
addTAPBackfillItem(const Item &itm, uint8_t nru, bool genBySeqno)820 ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(const Item &itm,
821                                                                 uint8_t nru,
822                                                                 bool genBySeqno) {
823 
824     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
825     if (!vb ||
826         vb->getState() == vbucket_state_dead ||
827         vb->getState() == vbucket_state_active) {
828         ++stats.numNotMyVBuckets;
829         return ENGINE_NOT_MY_VBUCKET;
830     }
831 
832     int bucket_num(0);
833     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
834     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
835                                           false);
836 
837     // Note that this function is only called on replica or pending vbuckets.
838     if (v && v->isLocked(ep_current_time())) {
839         v->unlock();
840     }
841     mutation_type_t mtype = vb->ht.unlocked_set(v, itm, 0, true, true,
842                                                 eviction_policy, nru);
843 
844     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
845     switch (mtype) {
846     case NOMEM:
847         ret = ENGINE_ENOMEM;
848         break;
849     case INVALID_CAS:
850     case IS_LOCKED:
851         ret = ENGINE_KEY_EEXISTS;
852         break;
853     case WAS_DIRTY:
854         // If a given backfill item is already dirty, don't queue it again.
855         break;
856     case NOT_FOUND:
857         // FALLTHROUGH
858     case WAS_CLEAN:
859         queueDirty(vb, v, &lh, true, true, genBySeqno);
860         break;
861     case INVALID_VBUCKET:
862         ret = ENGINE_NOT_MY_VBUCKET;
863         break;
864     case NEED_BG_FETCH:
865         // SET on a non-active vbucket should not require a bg_metadata_fetch.
866         abort();
867     }
868 
869     return ret;
870 }
871 
872 class KVStatsCallback : public Callback<kvstats_ctx> {
873     public:
KVStatsCallback(EventuallyPersistentStore *store)874         KVStatsCallback(EventuallyPersistentStore *store)
875             : epstore(store) { }
876 
callback(kvstats_ctx &ctx)877         void callback(kvstats_ctx &ctx) {
878             RCPtr<VBucket> vb = epstore->getVBucket(ctx.vbucket);
879             if (vb) {
880                 vb->fileSpaceUsed = ctx.fileSpaceUsed;
881                 vb->fileSize = ctx.fileSize;
882             }
883         }
884 
885     private:
886         EventuallyPersistentStore *epstore;
887 };
888 
snapshotVBuckets(const Priority &priority, uint16_t shardId)889 void EventuallyPersistentStore::snapshotVBuckets(const Priority &priority,
890                                                  uint16_t shardId) {
891 
892     class VBucketStateVisitor : public VBucketVisitor {
893     public:
894         VBucketStateVisitor(VBucketMap &vb_map, uint16_t sid)
895             : vbuckets(vb_map), shardId(sid) { }
896         bool visitBucket(RCPtr<VBucket> &vb) {
897             if (vbuckets.getShard(vb->getId())->getId() == shardId) {
898                 uint64_t snapStart = 0;
899                 uint64_t snapEnd = 0;
900                 std::string failovers = vb->failovers->toJSON();
901                 uint64_t chkId = vbuckets.getPersistenceCheckpointId(vb->getId());
902 
903                 vb->getCurrentSnapshot(snapStart, snapEnd);
904                 vbucket_state vb_state(vb->getState(), chkId, 0,
905                                        vb->getHighSeqno(), vb->getPurgeSeqno(),
906                                        snapStart, snapEnd, failovers);
907                 states.insert(std::pair<uint16_t, vbucket_state>(vb->getId(), vb_state));
908             }
909             return false;
910         }
911 
912         void visit(StoredValue*) {
913             cb_assert(false); // this does not happen
914         }
915 
916         std::map<uint16_t, vbucket_state> states;
917 
918     private:
919         VBucketMap &vbuckets;
920         uint16_t shardId;
921     };
922 
923     KVShard *shard = vbMap.shards[shardId];
924     if (priority == Priority::VBucketPersistLowPriority) {
925         shard->setLowPriorityVbSnapshotFlag(false);
926     } else {
927         shard->setHighPriorityVbSnapshotFlag(false);
928     }
929 
930     KVStatsCallback kvcb(this);
931     VBucketStateVisitor v(vbMap, shard->getId());
932     visit(v);
933     hrtime_t start = gethrtime();
934 
935     bool success = true;
936     vbucket_map_t::reverse_iterator iter = v.states.rbegin();
937     for (; iter != v.states.rend(); ++iter) {
938         LockHolder lh(vb_mutexes[iter->first], true /*tryLock*/);
939         if (!lh.islocked()) {
940             continue;
941         }
942         KVStore *rwUnderlying = getRWUnderlying(iter->first);
943         if (!rwUnderlying->snapshotVBucket(iter->first, iter->second,
944                     &kvcb)) {
945             LOG(EXTENSION_LOG_WARNING,
946                     "VBucket snapshot task failed!!! Rescheduling");
947             success = false;
948             break;
949         }
950 
951         if (priority == Priority::VBucketPersistHighPriority) {
952             if (vbMap.setBucketCreation(iter->first, false)) {
953                 LOG(EXTENSION_LOG_INFO, "VBucket %d created", iter->first);
954             }
955         }
956     }
957 
958     if (!success) {
959         scheduleVBSnapshot(priority, shard->getId());
960     } else {
961         stats.snapshotVbucketHisto.add((gethrtime() - start) / 1000);
962     }
963 }
964 
persistVBState(const Priority &priority, uint16_t vbid)965 bool EventuallyPersistentStore::persistVBState(const Priority &priority,
966                                                uint16_t vbid) {
967     schedule_vbstate_persist[vbid] = false;
968 
969     RCPtr<VBucket> vb = getVBucket(vbid);
970     if (!vb) {
971         LOG(EXTENSION_LOG_WARNING,
972             "VBucket %d not exist!!! vb_state persistence task failed!!!", vbid);
973         return false;
974     }
975 
976     KVStatsCallback kvcb(this);
977     uint64_t chkId = vbMap.getPersistenceCheckpointId(vbid);
978     std::string failovers = vb->failovers->toJSON();
979     uint64_t snapStart = 0;
980     uint64_t snapEnd = 0;
981 
982     vb->getCurrentSnapshot(snapStart, snapEnd);
983     vbucket_state vb_state(vb->getState(), chkId, 0, vb->getHighSeqno(),
984                            vb->getPurgeSeqno(), snapStart, snapEnd, failovers);
985 
986     bool inverse = false;
987     LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
988     if (!lh.islocked()) {
989         if (schedule_vbstate_persist[vbid].compare_exchange_strong(inverse,
990                                                                    true)) {
991             return true;
992         } else {
993             return false;
994         }
995     }
996 
997     KVStore *rwUnderlying = getRWUnderlying(vbid);
998     if (rwUnderlying->snapshotVBucket(vbid, vb_state, &kvcb)) {
999         if (vbMap.setBucketCreation(vbid, false)) {
1000             LOG(EXTENSION_LOG_INFO, "VBucket %d created", vbid);
1001         }
1002     } else {
1003         LOG(EXTENSION_LOG_WARNING,
1004             "VBucket %d: vb_state persistence task failed!!! Rescheduling", vbid);
1005 
1006         if (schedule_vbstate_persist[vbid].compare_exchange_strong(inverse,
1007                                                                    true)) {
1008             return true;
1009         } else {
1010             return false;
1011         }
1012     }
1013     return false;
1014 }
1015 
setVBucketState(uint16_t vbid, vbucket_state_t to, bool transfer, bool notify_dcp)1016 ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
1017                                                            vbucket_state_t to,
1018                                                            bool transfer,
1019                                                            bool notify_dcp) {
1020     // Lock to prevent a race condition between a failed update and add.
1021     LockHolder lh(vbsetMutex);
1022     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1023     if (vb && to == vb->getState()) {
1024         return ENGINE_SUCCESS;
1025     }
1026 
1027     if (vb) {
1028         vbucket_state_t oldstate = vb->getState();
1029         if (oldstate != to && notify_dcp) {
1030             engine.getDcpConnMap().vbucketStateChanged(vbid, to);
1031         }
1032 
1033         vb->setState(to, engine.getServerApi());
1034         if (to == vbucket_state_active && !transfer) {
1035             uint64_t snapStart = 0;
1036             uint64_t snapEnd = 0;
1037             vb->getCurrentSnapshot(snapStart, snapEnd);
1038             vb->failovers->createEntry(snapStart);
1039         }
1040 
1041         lh.unlock();
1042         if (oldstate == vbucket_state_pending &&
1043             to == vbucket_state_active) {
1044             ExTask notifyTask = new PendingOpsNotification(engine, vb);
1045             ExecutorPool::get()->schedule(notifyTask, NONIO_TASK_IDX);
1046         }
1047         scheduleVBStatePersist(Priority::VBucketPersistLowPriority, vbid);
1048     } else {
1049         FailoverTable* ft = new FailoverTable(engine.getMaxFailoverEntries());
1050         RCPtr<VBucket> newvb(new VBucket(vbid, to, stats,
1051                                          engine.getCheckpointConfig(),
1052                                          vbMap.getShard(vbid), 0, 0, 0, ft));
1053         // The first checkpoint for active vbucket should start with id 2.
1054         uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0;
1055         newvb->checkpointManager.setOpenCheckpointId(start_chk_id);
1056         if (vbMap.addBucket(newvb) == ENGINE_ERANGE) {
1057             lh.unlock();
1058             return ENGINE_ERANGE;
1059         }
1060         vbMap.setPersistenceCheckpointId(vbid, 0);
1061         vbMap.setPersistenceSeqno(vbid, 0);
1062         vbMap.setBucketCreation(vbid, true);
1063         lh.unlock();
1064         scheduleVBStatePersist(Priority::VBucketPersistHighPriority, vbid);
1065     }
1066     return ENGINE_SUCCESS;
1067 }
1068 
scheduleVBSnapshot(const Priority &p)1069 bool EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p) {
1070     KVShard *shard = NULL;
1071     if (p == Priority::VBucketPersistHighPriority) {
1072         for (size_t i = 0; i < vbMap.numShards; ++i) {
1073             shard = vbMap.shards[i];
1074             if (shard->setHighPriorityVbSnapshotFlag(true)) {
1075                 ExTask task = new VBSnapshotTask(&engine, p, i, false);
1076                 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1077             }
1078         }
1079     } else {
1080         for (size_t i = 0; i < vbMap.numShards; ++i) {
1081             shard = vbMap.shards[i];
1082             if (shard->setLowPriorityVbSnapshotFlag(true)) {
1083                 ExTask task = new VBSnapshotTask(&engine, p, i, false);
1084                 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1085             }
1086         }
1087     }
1088     if (stats.isShutdown) {
1089         return false;
1090     }
1091     return true;
1092 }
1093 
scheduleVBSnapshot(const Priority &p, uint16_t shardId, bool force)1094 void EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p,
1095                                                    uint16_t shardId,
1096                                                    bool force) {
1097     KVShard *shard = vbMap.shards[shardId];
1098     if (p == Priority::VBucketPersistHighPriority) {
1099         if (force || shard->setHighPriorityVbSnapshotFlag(true)) {
1100             ExTask task = new VBSnapshotTask(&engine, p, shardId, false);
1101             ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1102         }
1103     } else {
1104         if (force || shard->setLowPriorityVbSnapshotFlag(true)) {
1105             ExTask task = new VBSnapshotTask(&engine, p, shardId, false);
1106             ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1107         }
1108     }
1109 }
1110 
scheduleVBStatePersist(const Priority &priority, uint16_t vbid, bool force)1111 void EventuallyPersistentStore::scheduleVBStatePersist(const Priority &priority,
1112                                                        uint16_t vbid,
1113                                                        bool force) {
1114     bool inverse = false;
1115     if (force ||
1116         schedule_vbstate_persist[vbid].compare_exchange_strong(inverse, true)) {
1117         ExTask task = new VBStatePersistTask(&engine, priority, vbid, false);
1118         ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1119     }
1120 }
1121 
completeVBucketDeletion(uint16_t vbid, const void* cookie)1122 bool EventuallyPersistentStore::completeVBucketDeletion(uint16_t vbid,
1123                                                         const void* cookie) {
1124     LockHolder lh(vbsetMutex);
1125 
1126     hrtime_t start_time(gethrtime());
1127     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1128     if (!vb || vb->getState() == vbucket_state_dead ||
1129          vbMap.isBucketDeletion(vbid)) {
1130         lh.unlock();
1131         LockHolder vlh(vb_mutexes[vbid]);
1132         getRWUnderlying(vbid)->delVBucket(vbid);
1133         vbMap.setBucketDeletion(vbid, false);
1134         vbMap.setPersistenceSeqno(vbid, 0);
1135         ++stats.vbucketDeletions;
1136     }
1137 
1138     hrtime_t spent(gethrtime() - start_time);
1139     hrtime_t wall_time = spent / 1000;
1140     BlockTimer::log(spent, "disk_vb_del", stats.timingLog);
1141     stats.diskVBDelHisto.add(wall_time);
1142     atomic_setIfBigger(stats.vbucketDelMaxWalltime, wall_time);
1143     stats.vbucketDelTotWalltime.fetch_add(wall_time);
1144     if (cookie) {
1145         engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
1146     }
1147 
1148     return true;
1149 }
1150 
scheduleVBDeletion(RCPtr<VBucket> &vb, const void* cookie, double delay)1151 void EventuallyPersistentStore::scheduleVBDeletion(RCPtr<VBucket> &vb,
1152                                                    const void* cookie,
1153                                                    double delay) {
1154     ExTask delTask = new VBucketMemoryDeletionTask(engine, vb, delay);
1155     ExecutorPool::get()->schedule(delTask, NONIO_TASK_IDX);
1156 
1157     if (vbMap.setBucketDeletion(vb->getId(), true)) {
1158         ExTask task = new VBDeleteTask(&engine, vb->getId(), cookie,
1159                                        Priority::VBucketDeletionPriority);
1160         ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1161     }
1162 }
1163 
deleteVBucket(uint16_t vbid, const void* c)1164 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteVBucket(uint16_t vbid,
1165                                                            const void* c) {
1166     // Lock to prevent a race condition between a failed update and add
1167     // (and delete).
1168     LockHolder lh(vbsetMutex);
1169 
1170     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1171     if (!vb) {
1172         return ENGINE_NOT_MY_VBUCKET;
1173     }
1174 
1175     engine.getDcpConnMap().vbucketStateChanged(vbid, vbucket_state_dead);
1176     vbMap.removeBucket(vbid);
1177     lh.unlock();
1178     scheduleVBDeletion(vb, c);
1179     if (c) {
1180         return ENGINE_EWOULDBLOCK;
1181     }
1182     return ENGINE_SUCCESS;
1183 }
1184 
compactDB(uint16_t vbid, compaction_ctx c, const void *cookie)1185 ENGINE_ERROR_CODE EventuallyPersistentStore::compactDB(uint16_t vbid,
1186                                                        compaction_ctx c,
1187                                                        const void *cookie) {
1188     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1189     if (!vb) {
1190         return ENGINE_NOT_MY_VBUCKET;
1191     }
1192 
1193     LockHolder lh(compactionLock);
1194     ExTask task = new CompactVBucketTask(&engine, Priority::CompactorPriority,
1195                                          vbid, c, cookie);
1196     compactionTasks.push_back(std::make_pair(vbid, task));
1197     if (compactionTasks.size() > 1) {
1198         if ((stats.diskQueueSize > compactionWriteQueueCap &&
1199             compactionTasks.size() > (vbMap.getNumShards() / 2)) ||
1200             engine.getWorkLoadPolicy().getWorkLoadPattern() == READ_HEAVY) {
1201             // Snooze a new compaction task.
1202             // We will wake it up when one of the existing compaction tasks is done.
1203             task->snooze(60);
1204         }
1205     }
1206 
1207     ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1208 
1209     LOG(EXTENSION_LOG_DEBUG, "Scheduled compaction task %d on vbucket %d,"
1210         "purge_before_ts = %lld, purge_before_seq = %lld, dropdeletes = %d",
1211         task->getId(), vbid, c.purge_before_ts,
1212         c.purge_before_seq, c.drop_deletes);
1213 
1214    return ENGINE_EWOULDBLOCK;
1215 }
1216 
1217 class ExpiredItemsCallback : public Callback<compaction_ctx> {
1218     public:
ExpiredItemsCallback(EventuallyPersistentStore *store, uint16_t vbid)1219         ExpiredItemsCallback(EventuallyPersistentStore *store, uint16_t vbid)
1220             : epstore(store), vbucket(vbid) { }
1221 
callback(compaction_ctx &ctx)1222         void callback(compaction_ctx &ctx) {
1223             std::list<expiredItemCtx>::iterator it;
1224             for (it  = ctx.expiredItems.begin();
1225                  it != ctx.expiredItems.end(); it++) {
1226                 if (epstore->compactionCanExpireItems()) {
1227                     epstore->deleteExpiredItem(vbucket, it->keyStr,
1228                                                ctx.curr_time,
1229                                                it->revSeqno);
1230                 }
1231             }
1232         }
1233 
1234     private:
1235         EventuallyPersistentStore *epstore;
1236         uint16_t vbucket;
1237 };
1238 
compactVBucket(const uint16_t vbid, compaction_ctx *ctx, const void *cookie)1239 bool EventuallyPersistentStore::compactVBucket(const uint16_t vbid,
1240                                                compaction_ctx *ctx,
1241                                                const void *cookie) {
1242     ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
1243     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1244     if (vb) {
1245         LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
1246         if (!lh.islocked()) {
1247             return true; // Schedule a compaction task again.
1248         }
1249 
1250         if (vb->getState() == vbucket_state_active) {
1251             // Set the current time ONLY for active vbuckets.
1252             ctx->curr_time = ep_real_time();
1253         } else {
1254             ctx->curr_time = 0;
1255         }
1256         ExpiredItemsCallback cb(this, vbid);
1257         KVStatsCallback kvcb(this);
1258         getRWUnderlying(vbid)->compactVBucket(vbid, ctx, cb, kvcb);
1259         vb->setPurgeSeqno(ctx->max_purged_seq);
1260     } else {
1261         err = ENGINE_NOT_MY_VBUCKET;
1262         engine.storeEngineSpecific(cookie, NULL);
1263         //Decrement session counter here, as memcached thread wouldn't
1264         //visit the engine interface in case of a NOT_MY_VB notification
1265         engine.decrementSessionCtr();
1266 
1267     }
1268 
1269     LockHolder lh(compactionLock);
1270     bool erased = false, woke = false;
1271     std::list<CompTaskEntry>::iterator it = compactionTasks.begin();
1272     while (it != compactionTasks.end()) {
1273         if ((*it).first == vbid) {
1274             it = compactionTasks.erase(it);
1275             erased = true;
1276         } else {
1277             ExTask &task = (*it).second;
1278             if (task->getState() == TASK_SNOOZED) {
1279                 ExecutorPool::get()->wake(task->getId());
1280                 woke = true;
1281             }
1282             ++it;
1283         }
1284         if (erased && woke) {
1285             break;
1286         }
1287     }
1288     lh.unlock();
1289 
1290     if (cookie) {
1291         engine.notifyIOComplete(cookie, err);
1292     }
1293     --stats.pendingCompactions;
1294     return false;
1295 }
1296 
resetVBucket(uint16_t vbid)1297 bool EventuallyPersistentStore::resetVBucket(uint16_t vbid) {
1298     LockHolder lh(vbsetMutex);
1299     bool rv(false);
1300 
1301     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1302     if (vb) {
1303         vbucket_state_t vbstate = vb->getState();
1304 
1305         vbMap.removeBucket(vbid);
1306         lh.unlock();
1307 
1308         std::list<std::string> tap_cursors = vb->checkpointManager.
1309                                              getTAPCursorNames();
1310         // Delete and recreate the vbucket database file
1311         scheduleVBDeletion(vb, NULL, 0);
1312         setVBucketState(vbid, vbstate, false);
1313 
1314         // Copy the all cursors from the old vbucket into the new vbucket
1315         RCPtr<VBucket> newvb = vbMap.getBucket(vbid);
1316         newvb->checkpointManager.resetTAPCursors(tap_cursors);
1317 
1318         rv = true;
1319     }
1320     return rv;
1321 }
1322 
1323 extern "C" {
1324 
1325     typedef struct {
1326         EventuallyPersistentEngine* engine;
1327         std::map<std::string, std::string> smap;
1328     } snapshot_stats_t;
1329 
add_stat(const char *key, const uint16_t klen, const char *val, const uint32_t vlen, const void *cookie)1330     static void add_stat(const char *key, const uint16_t klen,
1331                          const char *val, const uint32_t vlen,
1332                          const void *cookie) {
1333         cb_assert(cookie);
1334         void *ptr = const_cast<void *>(cookie);
1335         snapshot_stats_t* snap = static_cast<snapshot_stats_t*>(ptr);
1336         ObjectRegistry::onSwitchThread(snap->engine);
1337 
1338         std::string k(key, klen);
1339         std::string v(val, vlen);
1340         snap->smap.insert(std::pair<std::string, std::string>(k, v));
1341     }
1342 }
1343 
snapshotStats()1344 void EventuallyPersistentStore::snapshotStats() {
1345     snapshot_stats_t snap;
1346     snap.engine = &engine;
1347     std::map<std::string, std::string>  smap;
1348     bool rv = engine.getStats(&snap, NULL, 0, add_stat) == ENGINE_SUCCESS &&
1349               engine.getStats(&snap, "tap", 3, add_stat) == ENGINE_SUCCESS &&
1350               engine.getStats(&snap, "dcp", 3, add_stat) == ENGINE_SUCCESS;
1351 
1352     if (rv && stats.isShutdown) {
1353         snap.smap["ep_force_shutdown"] = stats.forceShutdown ?
1354                                                               "true" : "false";
1355         std::stringstream ss;
1356         ss << ep_real_time();
1357         snap.smap["ep_shutdown_time"] = ss.str();
1358     }
1359     getOneRWUnderlying()->snapshotStats(snap.smap);
1360 }
1361 
updateBGStats(const hrtime_t init, const hrtime_t start, const hrtime_t stop)1362 void EventuallyPersistentStore::updateBGStats(const hrtime_t init,
1363                                               const hrtime_t start,
1364                                               const hrtime_t stop) {
1365     if (stop >= start && start >= init) {
1366         // skip the measurement if the counter wrapped...
1367         ++stats.bgNumOperations;
1368         hrtime_t w = (start - init) / 1000;
1369         BlockTimer::log(start - init, "bgwait", stats.timingLog);
1370         stats.bgWaitHisto.add(w);
1371         stats.bgWait.fetch_add(w);
1372         atomic_setIfLess(stats.bgMinWait, w);
1373         atomic_setIfBigger(stats.bgMaxWait, w);
1374 
1375         hrtime_t l = (stop - start) / 1000;
1376         BlockTimer::log(stop - start, "bgload", stats.timingLog);
1377         stats.bgLoadHisto.add(l);
1378         stats.bgLoad.fetch_add(l);
1379         atomic_setIfLess(stats.bgMinLoad, l);
1380         atomic_setIfBigger(stats.bgMaxLoad, l);
1381     }
1382 }
1383 
completeBGFetch(const std::string &key, uint16_t vbucket, uint64_t rowid, const void *cookie, hrtime_t init, bool isMeta)1384 void EventuallyPersistentStore::completeBGFetch(const std::string &key,
1385                                                 uint16_t vbucket,
1386                                                 uint64_t rowid,
1387                                                 const void *cookie,
1388                                                 hrtime_t init,
1389                                                 bool isMeta) {
1390     hrtime_t start(gethrtime());
1391     // Go find the data
1392     RememberingCallback<GetValue> gcb;
1393     if (isMeta) {
1394         gcb.val.setPartial();
1395         ++stats.bg_meta_fetched;
1396     } else {
1397         ++stats.bg_fetched;
1398     }
1399     getROUnderlying(vbucket)->get(key, rowid, vbucket, gcb);
1400     gcb.waitForValue();
1401     cb_assert(gcb.fired);
1402     ENGINE_ERROR_CODE status = gcb.val.getStatus();
1403 
1404     // Lock to prevent a race condition between a fetch for restore and delete
1405     LockHolder lh(vbsetMutex);
1406 
1407     RCPtr<VBucket> vb = getVBucket(vbucket);
1408     if (vb) {
1409         int bucket_num(0);
1410         LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
1411         StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1412         if (isMeta) {
1413             if (v && v->unlocked_restoreMeta(gcb.val.getValue(),
1414                                              gcb.val.getStatus(), vb->ht)) {
1415                 status = ENGINE_SUCCESS;
1416             }
1417         } else {
1418             if (v && v->isResident()) {
1419                 status = ENGINE_SUCCESS;
1420             }
1421 
1422             bool restore = false;
1423             if (eviction_policy == VALUE_ONLY &&
1424                 v && !v->isResident() && !v->isDeleted()) {
1425                 restore = true;
1426             } else if (eviction_policy == FULL_EVICTION &&
1427                        v && v->isTempInitialItem()) {
1428                 restore = true;
1429             }
1430 
1431             if (restore) {
1432                 if (gcb.val.getStatus() == ENGINE_SUCCESS) {
1433                     v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
1434                     cb_assert(v->isResident());
1435                     if (vb->getState() == vbucket_state_active &&
1436                         v->getExptime() != gcb.val.getValue()->getExptime() &&
1437                         v->getCas() == gcb.val.getValue()->getCas()) {
1438                         // MB-9306: It is possible that by the time bgfetcher
1439                         // returns, the item may have been updated and queued
1440                         // Hence test the CAS value to be the same first.
1441                         // exptime mutated, schedule it into new checkpoint
1442                         queueDirty(vb, v, &hlh);
1443                     }
1444                 } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
1445                     v->setStoredValueState(
1446                                           StoredValue::state_non_existent_key);
1447                     if (eviction_policy == FULL_EVICTION) {
1448                         // For the full eviction, we should notify
1449                         // ENGINE_SUCCESS to the memcached worker thread, so
1450                         // that the worker thread can visit the ep-engine and
1451                         // figure out the correct error code.
1452                         status = ENGINE_SUCCESS;
1453                     }
1454                 } else {
1455                     // underlying kvstore couldn't fetch requested data
1456                     // log returned error and notify TMPFAIL to client
1457                     LOG(EXTENSION_LOG_WARNING,
1458                         "Warning: failed background fetch for vb=%d seq=%d "
1459                         "key=%s", vbucket, v->getBySeqno(), key.c_str());
1460                     status = ENGINE_TMPFAIL;
1461                 }
1462             }
1463         }
1464     } else {
1465         LOG(EXTENSION_LOG_INFO, "VBucket %d's file was deleted in the middle of"
1466             " a bg fetch for key %s\n", vbucket, key.c_str());
1467         status = ENGINE_NOT_MY_VBUCKET;
1468     }
1469 
1470     lh.unlock();
1471 
1472     hrtime_t stop = gethrtime();
1473     updateBGStats(init, start, stop);
1474     bgFetchQueue--;
1475 
1476     delete gcb.val.getValue();
1477     engine.notifyIOComplete(cookie, status);
1478 }
1479 
completeBGFetchMulti(uint16_t vbId, std::vector<bgfetched_item_t> &fetchedItems, hrtime_t startTime)1480 void EventuallyPersistentStore::completeBGFetchMulti(uint16_t vbId,
1481                                  std::vector<bgfetched_item_t> &fetchedItems,
1482                                  hrtime_t startTime)
1483 {
1484     RCPtr<VBucket> vb = getVBucket(vbId);
1485     if (!vb) {
1486         std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1487         for (; itemItr != fetchedItems.end(); ++itemItr) {
1488             engine.notifyIOComplete((*itemItr).second->cookie,
1489                                     ENGINE_NOT_MY_VBUCKET);
1490         }
1491         LOG(EXTENSION_LOG_WARNING,
1492             "EP Store completes %d of batched background fetch for "
1493             "for vBucket = %d that is already deleted\n",
1494             (int)fetchedItems.size(), vbId);
1495         return;
1496     }
1497 
1498     std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1499     for (; itemItr != fetchedItems.end(); ++itemItr) {
1500         VBucketBGFetchItem *bgitem = (*itemItr).second;
1501         ENGINE_ERROR_CODE status = bgitem->value.getStatus();
1502         Item *fetchedValue = bgitem->value.getValue();
1503         const std::string &key = (*itemItr).first;
1504 
1505         int bucket = 0;
1506         LockHolder blh = vb->ht.getLockedBucket(key, &bucket);
1507         StoredValue *v = fetchValidValue(vb, key, bucket, true);
1508         if (bgitem->metaDataOnly) {
1509             if (v && v->unlocked_restoreMeta(fetchedValue, status, vb->ht)) {
1510                 status = ENGINE_SUCCESS;
1511             }
1512         } else {
1513             if (v && v->isResident()) {
1514                 status = ENGINE_SUCCESS;
1515             }
1516 
1517             bool restore = false;
1518             if (eviction_policy == VALUE_ONLY &&
1519                 v && !v->isResident() && !v->isDeleted()) {
1520                 restore = true;
1521             } else if (eviction_policy == FULL_EVICTION &&
1522                        v && v->isTempInitialItem()) {
1523                 restore = true;
1524             }
1525 
1526             if (restore) {
1527                 if (status == ENGINE_SUCCESS) {
1528                     v->unlocked_restoreValue(fetchedValue, vb->ht);
1529                     cb_assert(v->isResident());
1530                     if (vb->getState() == vbucket_state_active &&
1531                         v->getExptime() != fetchedValue->getExptime() &&
1532                         v->getCas() == fetchedValue->getCas()) {
1533                         // MB-9306: It is possible that by the time
1534                         // bgfetcher returns, the item may have been
1535                         // updated and queued
1536                         // Hence test the CAS value to be the same first.
1537                         // exptime mutated, schedule it into new checkpoint
1538                         queueDirty(vb, v, &blh);
1539                     }
1540                 } else if (status == ENGINE_KEY_ENOENT) {
1541                     v->setStoredValueState(StoredValue::state_non_existent_key);
1542                     if (eviction_policy == FULL_EVICTION) {
1543                         // For the full eviction, we should notify
1544                         // ENGINE_SUCCESS to the memcached worker thread,
1545                         // so that the worker thread can visit the
1546                         // ep-engine and figure out the correct error
1547                         // code.
1548                         status = ENGINE_SUCCESS;
1549                     }
1550                 } else {
1551                     // underlying kvstore couldn't fetch requested data
1552                     // log returned error and notify TMPFAIL to client
1553                     LOG(EXTENSION_LOG_WARNING,
1554                         "Warning: failed background fetch for vb=%d "
1555                         "key=%s", vbId, key.c_str());
1556                     status = ENGINE_TMPFAIL;
1557                 }
1558             }
1559         }
1560         blh.unlock();
1561 
1562         if (bgitem->metaDataOnly) {
1563             ++stats.bg_meta_fetched;
1564         } else {
1565             ++stats.bg_fetched;
1566         }
1567 
1568         hrtime_t endTime = gethrtime();
1569         updateBGStats(bgitem->initTime, startTime, endTime);
1570         engine.notifyIOComplete(bgitem->cookie, status);
1571     }
1572 
1573     LOG(EXTENSION_LOG_DEBUG,
1574         "EP Store completes %d of batched background fetch "
1575         "for vBucket = %d endTime = %lld\n",
1576         fetchedItems.size(), vbId, gethrtime()/1000000);
1577 }
1578 
bgFetch(const std::string &key, uint16_t vbucket, uint64_t rowid, const void *cookie, bool isMeta)1579 void EventuallyPersistentStore::bgFetch(const std::string &key,
1580                                         uint16_t vbucket,
1581                                         uint64_t rowid,
1582                                         const void *cookie,
1583                                         bool isMeta) {
1584     std::stringstream ss;
1585 
1586     if (multiBGFetchEnabled()) {
1587         RCPtr<VBucket> vb = getVBucket(vbucket);
1588         cb_assert(vb);
1589         KVShard *myShard = vbMap.getShard(vbucket);
1590 
1591         // schedule to the current batch of background fetch of the given
1592         // vbucket
1593         VBucketBGFetchItem * fetchThis = new VBucketBGFetchItem(cookie,
1594                                                                 isMeta);
1595         vb->queueBGFetchItem(key, fetchThis, myShard->getBgFetcher());
1596         myShard->getBgFetcher()->notifyBGEvent();
1597         ss << "Queued a background fetch, now at "
1598            << vb->numPendingBGFetchItems() << std::endl;
1599         LOG(EXTENSION_LOG_DEBUG, "%s", ss.str().c_str());
1600     } else {
1601         bgFetchQueue++;
1602         stats.maxRemainingBgJobs = std::max(stats.maxRemainingBgJobs,
1603                                             bgFetchQueue.load());
1604         ExecutorPool* iom = ExecutorPool::get();
1605         ExTask task = new BGFetchTask(&engine, key, vbucket, rowid, cookie,
1606                                       isMeta,
1607                                       Priority::BgFetcherGetMetaPriority,
1608                                       bgFetchDelay, false);
1609         iom->schedule(task, READER_TASK_IDX);
1610         ss << "Queued a background fetch, now at " << bgFetchQueue.load()
1611            << std::endl;
1612         LOG(EXTENSION_LOG_DEBUG, "%s", ss.str().c_str());
1613     }
1614 }
1615 
getInternal(const std::string &key, uint16_t vbucket, const void *cookie, bool queueBG, bool honorStates, vbucket_state_t allowedState, bool trackReference)1616 GetValue EventuallyPersistentStore::getInternal(const std::string &key,
1617                                                 uint16_t vbucket,
1618                                                 const void *cookie,
1619                                                 bool queueBG,
1620                                                 bool honorStates,
1621                                                 vbucket_state_t allowedState,
1622                                                 bool trackReference) {
1623 
1624     vbucket_state_t disallowedState = (allowedState == vbucket_state_active) ?
1625         vbucket_state_replica : vbucket_state_active;
1626     RCPtr<VBucket> vb = getVBucket(vbucket);
1627     if (!vb) {
1628         ++stats.numNotMyVBuckets;
1629         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1630     } else if (honorStates && vb->getState() == vbucket_state_dead) {
1631         ++stats.numNotMyVBuckets;
1632         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1633     } else if (honorStates && vb->getState() == disallowedState) {
1634         ++stats.numNotMyVBuckets;
1635         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1636     } else if (honorStates && vb->getState() == vbucket_state_pending) {
1637         if (vb->addPendingOp(cookie)) {
1638             return GetValue(NULL, ENGINE_EWOULDBLOCK);
1639         }
1640     }
1641 
1642     int bucket_num(0);
1643     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1644     StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
1645                                      trackReference);
1646     if (v) {
1647         if (v->isDeleted() || v->isTempDeletedItem() ||
1648             v->isTempNonExistentItem()) {
1649             GetValue rv;
1650             return rv;
1651         }
1652         // If the value is not resident, wait for it...
1653         if (!v->isResident()) {
1654             if (queueBG) {
1655                 bgFetch(key, vbucket, v->getBySeqno(), cookie);
1656             }
1657             return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno(),
1658                             true, v->getNRUValue());
1659         }
1660 
1661         GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
1662                     ENGINE_SUCCESS, v->getBySeqno(), false, v->getNRUValue());
1663         return rv;
1664     } else {
1665         if (eviction_policy == VALUE_ONLY || diskFlushAll) {
1666             GetValue rv;
1667             return rv;
1668         }
1669         ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1670         if (queueBG) { // Full eviction and need a bg fetch.
1671             ec = addTempItemForBgFetch(lh, bucket_num, key, vb,
1672                                        cookie, false);
1673         }
1674         return GetValue(NULL, ec, -1, true);
1675     }
1676 }
1677 
getRandomKey()1678 GetValue EventuallyPersistentStore::getRandomKey() {
1679     long max = vbMap.getSize();
1680 
1681     long start = random() % max;
1682     long curr = start;
1683     Item *itm = NULL;
1684 
1685     while (itm == NULL) {
1686         RCPtr<VBucket> vb = getVBucket(curr++);
1687         while (!vb || vb->getState() != vbucket_state_active) {
1688             if (curr == start) {
1689                 return GetValue(NULL, ENGINE_KEY_ENOENT);
1690             }
1691             if (curr == max) {
1692                 curr = 0;
1693             }
1694 
1695             vb = getVBucket(curr++);
1696         }
1697 
1698         if ((itm = vb->ht.getRandomKey(random())) != NULL) {
1699             GetValue rv(itm, ENGINE_SUCCESS);
1700             return rv;
1701         }
1702 
1703         if (curr == max) {
1704             curr = 0;
1705         }
1706 
1707         if (curr == start) {
1708             return GetValue(NULL, ENGINE_KEY_ENOENT);
1709         }
1710         // Search next vbucket
1711     }
1712 
1713     return GetValue(NULL, ENGINE_KEY_ENOENT);
1714 }
1715 
1716 
getMetaData( const std::string &key, uint16_t vbucket, const void *cookie, ItemMetaData &metadata, uint32_t &deleted, bool trackReferenced)1717 ENGINE_ERROR_CODE EventuallyPersistentStore::getMetaData(
1718                                                         const std::string &key,
1719                                                         uint16_t vbucket,
1720                                                         const void *cookie,
1721                                                         ItemMetaData &metadata,
1722                                                         uint32_t &deleted,
1723                                                         bool trackReferenced)
1724 {
1725     (void) cookie;
1726     RCPtr<VBucket> vb = getVBucket(vbucket);
1727     if (!vb || vb->getState() == vbucket_state_dead ||
1728         vb->getState() == vbucket_state_replica) {
1729         ++stats.numNotMyVBuckets;
1730         return ENGINE_NOT_MY_VBUCKET;
1731     }
1732 
1733     int bucket_num(0);
1734     deleted = 0;
1735     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1736     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true,
1737                                           trackReferenced);
1738 
1739     if (v) {
1740         stats.numOpsGetMeta++;
1741 
1742         if (v->isTempInitialItem()) { // Need bg meta fetch.
1743             bgFetch(key, vbucket, -1, cookie, true);
1744             return ENGINE_EWOULDBLOCK;
1745         } else if (v->isTempNonExistentItem()) {
1746             metadata.cas = v->getCas();
1747             return ENGINE_KEY_ENOENT;
1748         } else {
1749             if (v->isTempDeletedItem() || v->isDeleted() ||
1750                 v->isExpired(ep_real_time())) {
1751                 deleted |= GET_META_ITEM_DELETED_FLAG;
1752             }
1753 
1754             if (v->isLocked(ep_current_time())) {
1755                 metadata.cas = static_cast<uint64_t>(-1);
1756             } else {
1757                 metadata.cas = v->getCas();
1758             }
1759             metadata.flags = v->getFlags();
1760             metadata.exptime = v->getExptime();
1761             metadata.revSeqno = v->getRevSeqno();
1762             return ENGINE_SUCCESS;
1763         }
1764     } else {
1765         // The key wasn't found. However, this may be because it was previously
1766         // deleted or evicted with the full eviction strategy.
1767         // So, add a temporary item corresponding to the key to the hash table
1768         // and schedule a background fetch for its metadata from the persistent
1769         // store. The item's state will be updated after the fetch completes.
1770         return addTempItemForBgFetch(lh, bucket_num, key, vb, cookie, true);
1771     }
1772 }
1773 
setWithMeta(const Item &itm, uint64_t cas, const void *cookie, bool force, bool allowExisting, uint8_t nru, bool genBySeqno, bool isReplication)1774 ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(const Item &itm,
1775                                                          uint64_t cas,
1776                                                          const void *cookie,
1777                                                          bool force,
1778                                                          bool allowExisting,
1779                                                          uint8_t nru,
1780                                                          bool genBySeqno,
1781                                                          bool isReplication)
1782 {
1783     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
1784     if (!vb || vb->getState() == vbucket_state_dead) {
1785         ++stats.numNotMyVBuckets;
1786         return ENGINE_NOT_MY_VBUCKET;
1787     } else if (vb->getState() == vbucket_state_replica && !force) {
1788         ++stats.numNotMyVBuckets;
1789         return ENGINE_NOT_MY_VBUCKET;
1790     } else if (vb->getState() == vbucket_state_pending && !force) {
1791         if (vb->addPendingOp(cookie)) {
1792             return ENGINE_EWOULDBLOCK;
1793         }
1794     }
1795 
1796     int bucket_num(0);
1797     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
1798     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
1799                                           false);
1800 
1801     if (!force) {
1802         if (v)  {
1803             if (v->isTempInitialItem()) {
1804                 bgFetch(itm.getKey(), itm.getVBucketId(), -1, cookie, true);
1805                 return ENGINE_EWOULDBLOCK;
1806             }
1807             if (!conflictResolver->resolve(v, itm.getMetaData(), false)) {
1808                 ++stats.numOpsSetMetaResolutionFailed;
1809                 return ENGINE_KEY_EEXISTS;
1810             }
1811         } else {
1812             return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
1813                                          cookie, true, isReplication);
1814         }
1815     }
1816 
1817     if (v && v->isLocked(ep_current_time()) &&
1818         (vb->getState() == vbucket_state_replica ||
1819          vb->getState() == vbucket_state_pending)) {
1820         v->unlock();
1821     }
1822     mutation_type_t mtype = vb->ht.unlocked_set(v, itm, cas, allowExisting,
1823                                                 true, eviction_policy, nru,
1824                                                 isReplication);
1825 
1826     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1827     switch (mtype) {
1828     case NOMEM:
1829         ret = ENGINE_ENOMEM;
1830         break;
1831     case INVALID_CAS:
1832     case IS_LOCKED:
1833         ret = ENGINE_KEY_EEXISTS;
1834         break;
1835     case INVALID_VBUCKET:
1836         ret = ENGINE_NOT_MY_VBUCKET;
1837         break;
1838     case WAS_DIRTY:
1839     case WAS_CLEAN:
1840         queueDirty(vb, v, &lh, false, true, genBySeqno);
1841         break;
1842     case NOT_FOUND:
1843         ret = ENGINE_KEY_ENOENT;
1844         break;
1845     case NEED_BG_FETCH:
1846         {            // CAS operation with non-resident item + full eviction.
1847             if (v) { // temp item is already created. Simply schedule a
1848                 lh.unlock(); // bg fetch job.
1849                 bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
1850                 return ENGINE_EWOULDBLOCK;
1851             }
1852             ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
1853                                         cookie, true, isReplication);
1854         }
1855     }
1856 
1857     return ret;
1858 }
1859 
getAndUpdateTtl(const std::string &key, uint16_t vbucket, const void *cookie, time_t exptime)1860 GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
1861                                                     uint16_t vbucket,
1862                                                     const void *cookie,
1863                                                     time_t exptime)
1864 {
1865     RCPtr<VBucket> vb = getVBucket(vbucket);
1866     if (!vb) {
1867         ++stats.numNotMyVBuckets;
1868         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1869     } else if (vb->getState() == vbucket_state_dead) {
1870         ++stats.numNotMyVBuckets;
1871         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1872     } else if (vb->getState() == vbucket_state_replica) {
1873         ++stats.numNotMyVBuckets;
1874         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1875     } else if (vb->getState() == vbucket_state_pending) {
1876         if (vb->addPendingOp(cookie)) {
1877             return GetValue(NULL, ENGINE_EWOULDBLOCK);
1878         }
1879     }
1880 
1881     int bucket_num(0);
1882     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1883     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1884 
1885     if (v) {
1886         if (v->isDeleted() || v->isTempDeletedItem() ||
1887             v->isTempNonExistentItem()) {
1888             GetValue rv;
1889             return rv;
1890         }
1891 
1892         if (!v->isResident()) {
1893             bgFetch(key, vbucket, v->getBySeqno(), cookie);
1894             return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno());
1895         }
1896         if (v->isLocked(ep_current_time())) {
1897             GetValue rv(NULL, ENGINE_KEY_EEXISTS, 0);
1898             return rv;
1899         }
1900 
1901         bool exptime_mutated = exptime != v->getExptime() ? true : false;
1902         if (exptime_mutated) {
1903            v->markDirty();
1904            v->setExptime(exptime);
1905         }
1906 
1907         GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
1908                     ENGINE_SUCCESS, v->getBySeqno());
1909 
1910         if (exptime_mutated) {
1911             // persist the itme in the underlying storage for
1912             // mutated exptime
1913             queueDirty(vb, v, &lh);
1914         }
1915         return rv;
1916     } else {
1917         if (eviction_policy == VALUE_ONLY) {
1918             GetValue rv;
1919             return rv;
1920         } else {
1921             ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num, key,
1922                                                          vb, cookie, false);
1923             return GetValue(NULL, ec, -1, true);
1924         }
1925     }
1926 }
1927 
1928 ENGINE_ERROR_CODE
statsVKey(const std::string &key, uint16_t vbucket, const void *cookie)1929 EventuallyPersistentStore::statsVKey(const std::string &key,
1930                                      uint16_t vbucket,
1931                                      const void *cookie) {
1932     RCPtr<VBucket> vb = getVBucket(vbucket);
1933     if (!vb) {
1934         return ENGINE_NOT_MY_VBUCKET;
1935     }
1936 
1937     int bucket_num(0);
1938     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1939     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1940 
1941     if (v) {
1942         if (v->isDeleted() || v->isTempDeletedItem() ||
1943             v->isTempNonExistentItem()) {
1944             return ENGINE_KEY_ENOENT;
1945         }
1946         bgFetchQueue++;
1947         cb_assert(bgFetchQueue > 0);
1948         ExecutorPool* iom = ExecutorPool::get();
1949         ExTask task = new VKeyStatBGFetchTask(&engine, key, vbucket,
1950                                            v->getBySeqno(), cookie,
1951                                            Priority::VKeyStatBgFetcherPriority,
1952                                            bgFetchDelay, false);
1953         iom->schedule(task, READER_TASK_IDX);
1954         return ENGINE_EWOULDBLOCK;
1955     } else {
1956         if (eviction_policy == VALUE_ONLY) {
1957             return ENGINE_KEY_ENOENT;
1958         } else {
1959             add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
1960                                                         eviction_policy);
1961             switch(rv) {
1962             case ADD_NOMEM:
1963                 return ENGINE_ENOMEM;
1964             case ADD_EXISTS:
1965             case ADD_UNDEL:
1966             case ADD_SUCCESS:
1967             case ADD_TMP_AND_BG_FETCH:
1968                 // Since the hashtable bucket is locked, we shouldn't get here
1969                 abort();
1970             case ADD_BG_FETCH:
1971                 {
1972                     ++bgFetchQueue;
1973                     cb_assert(bgFetchQueue > 0);
1974                     ExecutorPool* iom = ExecutorPool::get();
1975                     ExTask task = new VKeyStatBGFetchTask(&engine, key,
1976                                                           vbucket, -1, cookie,
1977                                            Priority::VKeyStatBgFetcherPriority,
1978                                                           bgFetchDelay, false);
1979                     iom->schedule(task, READER_TASK_IDX);
1980                 }
1981             }
1982             return ENGINE_EWOULDBLOCK;
1983         }
1984     }
1985 }
1986 
completeStatsVKey(const void* cookie, std::string &key, uint16_t vbid, uint64_t bySeqNum)1987 void EventuallyPersistentStore::completeStatsVKey(const void* cookie,
1988                                                   std::string &key,
1989                                                   uint16_t vbid,
1990                                                   uint64_t bySeqNum) {
1991     RememberingCallback<GetValue> gcb;
1992 
1993     getROUnderlying(vbid)->get(key, bySeqNum, vbid, gcb);
1994     gcb.waitForValue();
1995     cb_assert(gcb.fired);
1996 
1997     if (eviction_policy == FULL_EVICTION) {
1998         RCPtr<VBucket> vb = getVBucket(vbid);
1999         if (vb) {
2000             int bucket_num(0);
2001             LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
2002             StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2003             if (v && v->isTempInitialItem()) {
2004                 if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2005                     v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
2006                     cb_assert(v->isResident());
2007                 } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
2008                     v->setStoredValueState(
2009                                           StoredValue::state_non_existent_key);
2010                 } else {
2011                     // underlying kvstore couldn't fetch requested data
2012                     // log returned error and notify TMPFAIL to client
2013                     LOG(EXTENSION_LOG_WARNING,
2014                         "Warning: failed background fetch for vb=%d seq=%d "
2015                         "key=%s", vbid, v->getBySeqno(), key.c_str());
2016                 }
2017             }
2018         }
2019     }
2020 
2021     if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2022         engine.addLookupResult(cookie, gcb.val.getValue());
2023     } else {
2024         engine.addLookupResult(cookie, NULL);
2025     }
2026 
2027     bgFetchQueue--;
2028     engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
2029 }
2030 
getLocked(const std::string &key, uint16_t vbucket, Callback<GetValue> &cb, rel_time_t currentTime, uint32_t lockTimeout, const void *cookie)2031 bool EventuallyPersistentStore::getLocked(const std::string &key,
2032                                           uint16_t vbucket,
2033                                           Callback<GetValue> &cb,
2034                                           rel_time_t currentTime,
2035                                           uint32_t lockTimeout,
2036                                           const void *cookie) {
2037     RCPtr<VBucket> vb = getVBucket(vbucket, vbucket_state_active);
2038     if (!vb) {
2039         ++stats.numNotMyVBuckets;
2040         GetValue rv(NULL, ENGINE_NOT_MY_VBUCKET);
2041         cb.callback(rv);
2042         return false;
2043     }
2044 
2045     int bucket_num(0);
2046     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2047     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2048 
2049     if (v) {
2050         if (v->isDeleted() || v->isTempNonExistentItem() ||
2051             v->isTempDeletedItem()) {
2052             GetValue rv;
2053             cb.callback(rv);
2054             return true;
2055         }
2056 
2057         // if v is locked return error
2058         if (v->isLocked(currentTime)) {
2059             GetValue rv;
2060             cb.callback(rv);
2061             return false;
2062         }
2063 
2064         // If the value is not resident, wait for it...
2065         if (!v->isResident()) {
2066             if (cookie) {
2067                 bgFetch(key, vbucket, v->getBySeqno(), cookie);
2068             }
2069             GetValue rv(NULL, ENGINE_EWOULDBLOCK, -1, true);
2070             cb.callback(rv);
2071             return false;
2072         }
2073 
2074         // acquire lock and increment cas value
2075         v->lock(currentTime + lockTimeout);
2076 
2077         Item *it = v->toItem(false, vbucket);
2078         it->setCas();
2079         v->setCas(it->getCas());
2080 
2081         GetValue rv(it);
2082         cb.callback(rv);
2083         return true;
2084     } else {
2085         if (eviction_policy == VALUE_ONLY) {
2086             GetValue rv;
2087             cb.callback(rv);
2088             return true;
2089         } else {
2090             ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num, key,
2091                                                          vb, cookie, false);
2092             GetValue rv(NULL, ec, -1, true);
2093             cb.callback(rv);
2094             return false;
2095         }
2096     }
2097 }
2098 
2099 ENGINE_ERROR_CODE
unlockKey(const std::string &key, uint16_t vbucket, uint64_t cas, rel_time_t currentTime)2100 EventuallyPersistentStore::unlockKey(const std::string &key,
2101                                      uint16_t vbucket,
2102                                      uint64_t cas,
2103                                      rel_time_t currentTime)
2104 {
2105 
2106     RCPtr<VBucket> vb = getVBucket(vbucket, vbucket_state_active);
2107     if (!vb) {
2108         ++stats.numNotMyVBuckets;
2109         return ENGINE_NOT_MY_VBUCKET;
2110     }
2111 
2112     int bucket_num(0);
2113     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2114     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2115 
2116     if (v) {
2117         if (v->isDeleted() || v->isTempNonExistentItem() ||
2118             v->isTempDeletedItem()) {
2119             return ENGINE_KEY_ENOENT;
2120         }
2121         if (v->isLocked(currentTime)) {
2122             if (v->getCas() == cas) {
2123                 v->unlock();
2124                 return ENGINE_SUCCESS;
2125             }
2126         }
2127         return ENGINE_TMPFAIL;
2128     } else {
2129         if (eviction_policy == VALUE_ONLY) {
2130             return ENGINE_KEY_ENOENT;
2131         } else {
2132             // With the full eviction, an item's lock is automatically
2133             // released when the item is evicted from memory. Therefore,
2134             // we simply return ENGINE_TMPFAIL when we receive unlockKey
2135             // for an item that is not in memocy cache. Note that we don't
2136             // spawn any bg fetch job to figure out if an item actually
2137             // exists in disk or not.
2138             return ENGINE_TMPFAIL;
2139         }
2140     }
2141 }
2142 
2143 
getKeyStats( const std::string &key, uint16_t vbucket, const void *cookie, struct key_stats &kstats, bool bgfetch, bool wantsDeleted)2144 ENGINE_ERROR_CODE EventuallyPersistentStore::getKeyStats(
2145                                             const std::string &key,
2146                                             uint16_t vbucket,
2147                                             const void *cookie,
2148                                             struct key_stats &kstats,
2149                                             bool bgfetch,
2150                                             bool wantsDeleted)
2151 {
2152     RCPtr<VBucket> vb = getVBucket(vbucket);
2153     if (!vb) {
2154         return ENGINE_NOT_MY_VBUCKET;
2155     }
2156 
2157     int bucket_num(0);
2158     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2159     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2160 
2161     if (v) {
2162         if ((v->isDeleted() && !wantsDeleted) ||
2163             v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2164             return ENGINE_KEY_ENOENT;
2165         }
2166         if (eviction_policy == FULL_EVICTION &&
2167             v->isTempInitialItem() && bgfetch) {
2168             lh.unlock();
2169             bgFetch(key, vbucket, -1, cookie, true);
2170             return ENGINE_EWOULDBLOCK;
2171         }
2172         kstats.logically_deleted = v->isDeleted();
2173         kstats.dirty = v->isDirty();
2174         kstats.exptime = v->getExptime();
2175         kstats.flags = v->getFlags();
2176         kstats.cas = v->getCas();
2177         kstats.vb_state = vb->getState();
2178         return ENGINE_SUCCESS;
2179     } else {
2180         if (eviction_policy == VALUE_ONLY) {
2181             return ENGINE_KEY_ENOENT;
2182         } else {
2183             if (bgfetch) {
2184                 return addTempItemForBgFetch(lh, bucket_num, key, vb,
2185                                              cookie, true);
2186             } else {
2187                 return ENGINE_KEY_ENOENT;
2188             }
2189         }
2190     }
2191 }
2192 
validateKey(const std::string &key, uint16_t vbucket, Item &diskItem)2193 std::string EventuallyPersistentStore::validateKey(const std::string &key,
2194                                                    uint16_t vbucket,
2195                                                    Item &diskItem) {
2196     int bucket_num(0);
2197     RCPtr<VBucket> vb = getVBucket(vbucket);
2198     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2199     StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
2200                                      false, true);
2201 
2202     if (v) {
2203         if (v->isDeleted() || v->isTempNonExistentItem() ||
2204             v->isTempDeletedItem()) {
2205             return "item_deleted";
2206         }
2207 
2208         if (diskItem.getFlags() != v->getFlags()) {
2209             return "flags_mismatch";
2210         } else if (v->isResident() && memcmp(diskItem.getData(),
2211                                              v->getValue()->getData(),
2212                                              diskItem.getNBytes())) {
2213             return "data_mismatch";
2214         } else {
2215             return "valid";
2216         }
2217     } else {
2218         return "item_deleted";
2219     }
2220 
2221 }
2222 
deleteItem(const std::string &key, uint64_t* cas, uint16_t vbucket, const void *cookie, bool force, ItemMetaData *itemMeta, bool tapBackfill)2223 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
2224                                                         uint64_t* cas,
2225                                                         uint16_t vbucket,
2226                                                         const void *cookie,
2227                                                         bool force,
2228                                                         ItemMetaData *itemMeta,
2229                                                         bool tapBackfill)
2230 {
2231     RCPtr<VBucket> vb = getVBucket(vbucket);
2232     if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2233         ++stats.numNotMyVBuckets;
2234         return ENGINE_NOT_MY_VBUCKET;
2235     } else if(vb->getState() == vbucket_state_replica && !force) {
2236         ++stats.numNotMyVBuckets;
2237         return ENGINE_NOT_MY_VBUCKET;
2238     } else if(vb->getState() == vbucket_state_pending && !force) {
2239         if (vb->addPendingOp(cookie)) {
2240             return ENGINE_EWOULDBLOCK;
2241         }
2242     }
2243 
2244     int bucket_num(0);
2245     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2246     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2247     if (!v || v->isDeleted() || v->isTempItem()) {
2248         if (eviction_policy == VALUE_ONLY) {
2249             return ENGINE_KEY_ENOENT;
2250         } else { // Full eviction.
2251             if (!force) {
2252                 if (!v) { // Item might be evicted from cache.
2253                     return addTempItemForBgFetch(lh, bucket_num, key, vb,
2254                                                  cookie, true);
2255                 } else if (v->isTempInitialItem()) {
2256                     lh.unlock();
2257                     bgFetch(key, vbucket, -1, cookie, true);
2258                     return ENGINE_EWOULDBLOCK;
2259                 } else { // Non-existent or deleted key.
2260                     return ENGINE_KEY_ENOENT;
2261                 }
2262             } else {
2263                 if (!v) { // Item might be evicted from cache.
2264                     // Create a temp item and delete it below as it is a
2265                     // force deletion.
2266                     add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num,
2267                                                               key,
2268                                                               eviction_policy);
2269                     if (rv == ADD_NOMEM) {
2270                         return ENGINE_ENOMEM;
2271                     }
2272                     v = vb->ht.unlocked_find(key, bucket_num, true, false);
2273                     v->setStoredValueState(StoredValue::state_deleted_key);
2274                 } else if (v->isTempInitialItem()) {
2275                     v->setStoredValueState(StoredValue::state_deleted_key);
2276                 } else { // Non-existent or deleted key.
2277                     return ENGINE_KEY_ENOENT;
2278                 }
2279             }
2280         }
2281     }
2282 
2283     if (v && v->isLocked(ep_current_time()) &&
2284         (vb->getState() == vbucket_state_replica ||
2285          vb->getState() == vbucket_state_pending)) {
2286         v->unlock();
2287     }
2288     mutation_type_t delrv;
2289     delrv = vb->ht.unlocked_softDelete(v, *cas, eviction_policy);
2290 
2291     if (itemMeta && v) {
2292         itemMeta->revSeqno = v->getRevSeqno();
2293         itemMeta->cas = v->getCas();
2294         itemMeta->flags = v->getFlags();
2295         itemMeta->exptime = v->getExptime();
2296     }
2297     *cas = v ? v->getCas() : 0;
2298 
2299     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2300     switch (delrv) {
2301     case NOMEM:
2302         ret = ENGINE_ENOMEM;
2303         break;
2304     case INVALID_VBUCKET:
2305         ret = ENGINE_NOT_MY_VBUCKET;
2306         break;
2307     case INVALID_CAS:
2308         ret = ENGINE_KEY_EEXISTS;
2309         break;
2310     case IS_LOCKED:
2311         ret = ENGINE_TMPFAIL;
2312         break;
2313     case NOT_FOUND:
2314         ret = ENGINE_KEY_ENOENT;
2315         if (v) {
2316             queueDirty(vb, v, &lh, tapBackfill);
2317         }
2318         break;
2319     case WAS_DIRTY:
2320     case WAS_CLEAN:
2321         queueDirty(vb, v, &lh, tapBackfill);
2322         break;
2323     case NEED_BG_FETCH:
2324         // We already figured out if a bg fetch is requred for a full-evicted
2325         // item above.
2326         abort();
2327     }
2328     return ret;
2329 }
2330 
deleteWithMeta( const std::string &key, uint64_t* cas, uint16_t vbucket, const void *cookie, bool force, ItemMetaData *itemMeta, bool tapBackfill, bool genBySeqno, uint64_t bySeqno, bool isReplication)2331 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
2332                                                         const std::string &key,
2333                                                         uint64_t* cas,
2334                                                         uint16_t vbucket,
2335                                                         const void *cookie,
2336                                                         bool force,
2337                                                         ItemMetaData *itemMeta,
2338                                                         bool tapBackfill,
2339                                                         bool genBySeqno,
2340                                                         uint64_t bySeqno,
2341                                                         bool isReplication)
2342 {
2343     RCPtr<VBucket> vb = getVBucket(vbucket);
2344     if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2345         ++stats.numNotMyVBuckets;
2346         return ENGINE_NOT_MY_VBUCKET;
2347     } else if(vb->getState() == vbucket_state_replica && !force) {
2348         ++stats.numNotMyVBuckets;
2349         return ENGINE_NOT_MY_VBUCKET;
2350     } else if(vb->getState() == vbucket_state_pending && !force) {
2351         if (vb->addPendingOp(cookie)) {
2352             return ENGINE_EWOULDBLOCK;
2353         }
2354     }
2355 
2356     int bucket_num(0);
2357     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2358     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2359     if (!force) { // Need conflict resolution.
2360         if (v)  {
2361             if (v->isTempInitialItem()) {
2362                 bgFetch(key, vbucket, -1, cookie, true);
2363                 return ENGINE_EWOULDBLOCK;
2364             }
2365             if (!conflictResolver->resolve(v, *itemMeta, true)) {
2366                 ++stats.numOpsDelMetaResolutionFailed;
2367                 return ENGINE_KEY_EEXISTS;
2368             }
2369         } else {
2370             // Item is 1) deleted or not existent in the value eviction case OR
2371             // 2) deleted or evicted in the full eviction.
2372             return addTempItemForBgFetch(lh, bucket_num, key, vb,
2373                                          cookie, true, isReplication);
2374         }
2375     } else {
2376         if (!v) {
2377             // Create a temp item and delete it below as it is a force deletion
2378             add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2379                                                         eviction_policy,
2380                                                         isReplication);
2381             if (rv == ADD_NOMEM) {
2382                 return ENGINE_ENOMEM;
2383             }
2384             v = vb->ht.unlocked_find(key, bucket_num, true, false);
2385             v->setStoredValueState(StoredValue::state_deleted_key);
2386         } else if (v->isTempInitialItem()) {
2387             v->setStoredValueState(StoredValue::state_deleted_key);
2388         }
2389     }
2390 
2391     if (v && v->isLocked(ep_current_time()) &&
2392         (vb->getState() == vbucket_state_replica ||
2393          vb->getState() == vbucket_state_pending)) {
2394         v->unlock();
2395     }
2396     mutation_type_t delrv;
2397     delrv = vb->ht.unlocked_softDelete(v, *cas, *itemMeta,
2398                                        eviction_policy, true);
2399     *cas = v ? v->getCas() : 0;
2400 
2401     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2402     switch (delrv) {
2403     case NOMEM:
2404         ret = ENGINE_ENOMEM;
2405         break;
2406     case INVALID_VBUCKET:
2407         ret = ENGINE_NOT_MY_VBUCKET;
2408         break;
2409     case INVALID_CAS:
2410         ret = ENGINE_KEY_EEXISTS;
2411         break;
2412     case IS_LOCKED:
2413         ret = ENGINE_TMPFAIL;
2414         break;
2415     case NOT_FOUND:
2416         ret = ENGINE_KEY_ENOENT;
2417         break;
2418     case WAS_DIRTY:
2419     case WAS_CLEAN:
2420         if (!genBySeqno) {
2421             v->setBySeqno(bySeqno);
2422         }
2423         queueDirty(vb, v, &lh, tapBackfill, true, genBySeqno);
2424         break;
2425     case NEED_BG_FETCH:
2426         lh.unlock();
2427         bgFetch(key, vbucket, -1, cookie, true);
2428         ret = ENGINE_EWOULDBLOCK;
2429     }
2430 
2431     return ret;
2432 }
2433 
reset()2434 void EventuallyPersistentStore::reset() {
2435     std::vector<int> buckets = vbMap.getBuckets();
2436     std::vector<int>::iterator it;
2437     for (it = buckets.begin(); it != buckets.end(); ++it) {
2438         RCPtr<VBucket> vb = getVBucket(*it);
2439         if (vb) {
2440             LockHolder lh(vb_mutexes[vb->getId()]);
2441             vb->ht.clear();
2442             vb->checkpointManager.clear(vb->getState());
2443             vb->resetStats();
2444             vb->setCurrentSnapshot(0, 0);
2445         }
2446     }
2447 
2448     bool inverse = false;
2449     if (diskFlushAll.compare_exchange_strong(inverse, true)) {
2450         ++stats.diskQueueSize;
2451         // wake up (notify) one flusher is good enough for diskFlushAll
2452         vbMap.shards[EP_PRIMARY_SHARD]->getFlusher()->notifyFlushEvent();
2453     }
2454 }
2455 
2456 /**
2457  * Callback invoked after persisting an item from memory to disk.
2458  *
2459  * This class exists to create a closure around a few variables within
2460  * EventuallyPersistentStore::flushOne so that an object can be
2461  * requeued in case of failure to store in the underlying layer.
2462  */
2463 class PersistenceCallback : public Callback<mutation_result>,
2464                             public Callback<int> {
2465 public:
2466 
PersistenceCallback(const queued_item &qi, RCPtr<VBucket> &vb, EventuallyPersistentStore *st, EPStats *s, uint64_t c)2467     PersistenceCallback(const queued_item &qi, RCPtr<VBucket> &vb,
2468                         EventuallyPersistentStore *st, EPStats *s, uint64_t c)
2469         : queuedItem(qi), vbucket(vb), store(st), stats(s), cas(c) {
2470         cb_assert(vb);
2471         cb_assert(s);
2472     }
2473 
2474     // This callback is invoked for set only.
callback(mutation_result &value)2475     void callback(mutation_result &value) {
2476         if (value.first == 1) {
2477             int bucket_num(0);
2478             LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2479                                                         &bucket_num);
2480             StoredValue *v = store->fetchValidValue(vbucket,
2481                                                     queuedItem->getKey(),
2482                                                     bucket_num, true, false);
2483             if (v) {
2484                 if (v->getCas() == cas) {
2485                     // mark this item clean only if current and stored cas
2486                     // value match
2487                     v->markClean();
2488                 }
2489                 if (v->isNewCacheItem()) {
2490                     if (value.second) {
2491                         // Insert in value-only or full eviction mode.
2492                         ++vbucket->opsCreate;
2493                         vbucket->incrMetaDataDisk(*queuedItem);
2494                     } else { // Update in full eviction mode.
2495                         --vbucket->ht.numTotalItems;
2496                         ++vbucket->opsUpdate;
2497                     }
2498                     v->setNewCacheItem(false);
2499                 } else { // Update in value-only or full eviction mode.
2500                     ++vbucket->opsUpdate;
2501                 }
2502             }
2503 
2504             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2505             stats->decrDiskQueueSize(1);
2506             stats->totalPersisted++;
2507         } else {
2508             // If the return was 0 here, we're in a bad state because
2509             // we do not know the rowid of this object.
2510             if (value.first == 0) {
2511                 int bucket_num(0);
2512                 LockHolder lh = vbucket->ht.getLockedBucket(
2513                                            queuedItem->getKey(), &bucket_num);
2514                 StoredValue *v = store->fetchValidValue(vbucket,
2515                                                         queuedItem->getKey(),
2516                                                         bucket_num, true,
2517                                                         false);
2518                 if (v) {
2519                     std::stringstream ss;
2520                     ss << "Persisting ``" << queuedItem->getKey() << "'' on vb"
2521                        << queuedItem->getVBucketId() << " (rowid="
2522                        << v->getBySeqno() << ") returned 0 updates\n";
2523                     LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2524                 } else {
2525                     LOG(EXTENSION_LOG_WARNING,
2526                         "Error persisting now missing ``%s'' from vb%d",
2527                         queuedItem->getKey().c_str(),
2528                         queuedItem->getVBucketId());
2529                 }
2530 
2531                 vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2532                 stats->decrDiskQueueSize(1);
2533             } else {
2534                 std::stringstream ss;
2535                 ss <<
2536                 "Fatal error in persisting SET ``" <<
2537                 queuedItem->getKey() << "'' on vb "
2538                    << queuedItem->getVBucketId() << "!!! Requeue it...\n";
2539                 LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2540                 redirty();
2541             }
2542         }
2543     }
2544 
2545     // This callback is invoked for deletions only.
2546     //
2547     // The boolean indicates whether the underlying storage
2548     // successfully deleted the item.
callback(int &value)2549     void callback(int &value) {
2550         // > 1 would be bad.  We were only trying to delete one row.
2551         cb_assert(value < 2);
2552         // -1 means fail
2553         // 1 means we deleted one row
2554         // 0 means we did not delete a row, but did not fail (did not exist)
2555         if (value >= 0) {
2556             // We have succesfully removed an item from the disk, we
2557             // may now remove it from the hash table.
2558             int bucket_num(0);
2559             LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2560                                                         &bucket_num);
2561             StoredValue *v = store->fetchValidValue(vbucket,
2562                                                     queuedItem->getKey(),
2563                                                     bucket_num, true, false);
2564             if (v && v->isDeleted()) {
2565                 bool newCacheItem = v->isNewCacheItem();
2566                 bool deleted = vbucket->ht.unlocked_del(queuedItem->getKey(),
2567                                                         bucket_num);
2568                 cb_assert(deleted);
2569                 if (newCacheItem && value > 0) {
2570                     // Need to decrement the item counter again for an item that
2571                     // exists on DB file, but not in memory (i.e., full eviction),
2572                     // because we created the temp item in memory and incremented
2573                     // the item counter when a deletion is pushed in the queue.
2574                     --vbucket->ht.numTotalItems;
2575                 }
2576             }
2577 
2578             if (value > 0) {
2579                 ++stats->totalPersisted;
2580                 ++vbucket->opsDelete;
2581             }
2582             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2583             stats->decrDiskQueueSize(1);
2584             vbucket->decrMetaDataDisk(*queuedItem);
2585         } else {
2586             std::stringstream ss;
2587             ss << "Fatal error in persisting DELETE ``" <<
2588             queuedItem->getKey() << "'' on vb "
2589                << queuedItem->getVBucketId() << "!!! Requeue it...\n";
2590             LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2591             redirty();
2592         }
2593     }
2594 
2595 private:
2596 
redirty()2597     void redirty() {
2598         if (store->vbMap.isBucketDeletion(vbucket->getId())) {
2599             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2600             stats->decrDiskQueueSize(1);
2601             return;
2602         }
2603         ++stats->flushFailed;
2604         store->invokeOnLockedStoredValue(queuedItem->getKey(),
2605                                          queuedItem->getVBucketId(),
2606                                          &StoredValue::reDirty);
2607         vbucket->rejectQueue.push(queuedItem);
2608     }
2609 
2610     const queued_item queuedItem;
2611     RCPtr<VBucket> &vbucket;
2612     EventuallyPersistentStore *store;
2613     EPStats *stats;
2614     uint64_t cas;
2615     DISALLOW_COPY_AND_ASSIGN(PersistenceCallback);
2616 };
2617 
flushOneDeleteAll()2618 void EventuallyPersistentStore::flushOneDeleteAll() {
2619     for (size_t i = 0; i < vbMap.getSize(); ++i) {
2620         RCPtr<VBucket> vb = getVBucket(i);
2621         if (vb) {
2622             LockHolder lh(vb_mutexes[vb->getId()]);
2623             getRWUnderlying(vb->getId())->reset(i);
2624         }
2625     }
2626 
2627     bool inverse = true;
2628     diskFlushAll.compare_exchange_strong(inverse, false);
2629     stats.decrDiskQueueSize(1);
2630 }
2631 
flushVBucket(uint16_t vbid)2632 int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
2633     KVShard *shard = vbMap.getShard(vbid);
2634     if (diskFlushAll) {
2635         if (shard->getId() == EP_PRIMARY_SHARD) {
2636             flushOneDeleteAll();
2637         } else {
2638             // disk flush is pending just return
2639             return 0;
2640         }
2641     }
2642 
2643     if (vbMap.isBucketCreation(vbid)) {
2644         return RETRY_FLUSH_VBUCKET;
2645     }
2646 
2647     int items_flushed = 0;
2648     rel_time_t flush_start = ep_current_time();
2649 
2650     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
2651     if (vb) {
2652         LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
2653         if (!lh.islocked()) { // Try another bucket if this one is locked
2654             return RETRY_FLUSH_VBUCKET; // to avoid blocking flusher
2655         }
2656 
2657         KVStatsCallback cb(this);
2658         std::vector<queued_item> items;
2659         KVStore *rwUnderlying = getRWUnderlying(vbid);
2660 
2661         while (!vb->rejectQueue.empty()) {
2662             items.push_back(vb->rejectQueue.front());
2663             vb->rejectQueue.pop();
2664         }
2665 
2666         LockHolder slh = vb->getSnapshotLock();
2667         uint64_t snapStart;
2668         uint64_t snapEnd;
2669         vb->getCurrentSnapshot_UNLOCKED(snapStart, snapEnd);
2670 
2671         vb->getBackfillItems(items);
2672         vb->checkpointManager.getAllItemsForPersistence(items);
2673         slh.unlock();
2674 
2675         if (!items.empty()) {
2676             while (!rwUnderlying->begin()) {
2677                 ++stats.beginFailed;
2678                 LOG(EXTENSION_LOG_WARNING, "Failed to start a transaction!!! "
2679                     "Retry in 1 sec ...");
2680                 sleep(1);
2681             }
2682             rwUnderlying->optimizeWrites(items);
2683 
2684             Item *prev = NULL;
2685             uint64_t maxSeqno = 0;
2686             std::list<PersistenceCallback*> pcbs;
2687             std::vector<queued_item>::iterator it = items.begin();
2688             for(; it != items.end(); ++it) {
2689                 if ((*it)->getOperation() != queue_op_set &&
2690                     (*it)->getOperation() != queue_op_del) {
2691                     continue;
2692                 } else if (!prev || prev->getKey() != (*it)->getKey()) {
2693                     prev = (*it).get();
2694                     ++items_flushed;
2695                     PersistenceCallback *cb = flushOneDelOrSet(*it, vb);
2696                     if (cb) {
2697                         pcbs.push_back(cb);
2698                     }
2699                     maxSeqno = std::max(maxSeqno, (uint64_t)(*it)->getBySeqno());
2700                     ++stats.flusher_todo;
2701                 } else {
2702                     stats.decrDiskQueueSize(1);
2703                     vb->doStatsForFlushing(*(*it), (*it)->size());
2704                 }
2705             }
2706 
2707             BlockTimer timer(&stats.diskCommitHisto, "disk_commit",
2708                              stats.timingLog);
2709             hrtime_t start = gethrtime();
2710 
2711             if (vb->getState() == vbucket_state_active) {
2712                 snapStart = maxSeqno;
2713                 snapEnd = maxSeqno;
2714             }
2715 
2716             while (!rwUnderlying->commit(&cb, snapStart, snapEnd)) {
2717                 ++stats.commitFailed;
2718                 LOG(EXTENSION_LOG_WARNING, "Flusher commit failed!!! Retry in "
2719                     "1 sec...\n");
2720                 sleep(1);
2721 
2722             }
2723 
2724             if (vb->rejectQueue.empty()) {
2725                 uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid);
2726                 if (highSeqno > 0 &&
2727                     highSeqno != vbMap.getPersistenceSeqno(vbid)) {
2728                     vbMap.setPersistenceSeqno(vbid, highSeqno);
2729                 }
2730             }
2731 
2732             while (!pcbs.empty()) {
2733                 delete pcbs.front();
2734                 pcbs.pop_front();
2735             }
2736 
2737             ++stats.flusherCommits;
2738             hrtime_t end = gethrtime();
2739             uint64_t commit_time = (end - start) / 1000000;
2740             uint64_t trans_time = (end - flush_start) / 1000000;
2741 
2742             lastTransTimePerItem = (items_flushed == 0) ? 0 :
2743                 static_cast<double>(trans_time) /
2744                 static_cast<double>(items_flushed);
2745             stats.commit_time.store(commit_time);
2746             stats.cumulativeCommitTime.fetch_add(commit_time);
2747             stats.cumulativeFlushTime.fetch_add(ep_current_time()
2748                                                 - flush_start);
2749             stats.flusher_todo.store(0);
2750         }
2751 
2752         rwUnderlying->pendingTasks();
2753 
2754         if (vb->checkpointManager.getNumCheckpoints() > 1) {
2755             wakeUpCheckpointRemover();
2756         }
2757 
2758         if (vb->rejectQueue.empty()) {
2759             vb->checkpointManager.itemsPersisted();
2760             uint64_t seqno = vbMap.getPersistenceSeqno(vbid);
2761             uint64_t chkid = vb->checkpointManager.getPersistenceCursorPreChkId();
2762             vb->notifyCheckpointPersisted(engine, seqno, true);
2763             vb->notifyCheckpointPersisted(engine, chkid, false);
2764             if (chkid > 0 && chkid != vbMap.getPersistenceCheckpointId(vbid)) {
2765                 vbMap.setPersistenceCheckpointId(vbid, chkid);
2766             }
2767         }
2768     }
2769 
2770     return items_flushed;
2771 }
2772 
2773 PersistenceCallback*
flushOneDelOrSet(const queued_item &qi, RCPtr<VBucket> &vb)2774 EventuallyPersistentStore::flushOneDelOrSet(const queued_item &qi,
2775                                             RCPtr<VBucket> &vb) {
2776 
2777     if (!vb) {
2778         stats.decrDiskQueueSize(1);
2779         return NULL;
2780     }
2781 
2782     int64_t bySeqno = qi->getBySeqno();
2783     bool deleted = qi->isDeleted();
2784     rel_time_t queued(qi->getQueuedTime());
2785 
2786     int dirtyAge = ep_current_time() - queued;
2787     stats.dirtyAgeHisto.add(dirtyAge * 1000000);
2788     stats.dirtyAge.store(dirtyAge);
2789     stats.dirtyAgeHighWat.store(std::max(stats.dirtyAge.load(),
2790                                          stats.dirtyAgeHighWat.load()));
2791 
2792     // Wait until the vbucket database is created by the vbucket state
2793     // snapshot task.
2794     if (vbMap.isBucketCreation(qi->getVBucketId()) ||
2795         vbMap.isBucketDeletion(qi->getVBucketId())) {
2796         vb->rejectQueue.push(qi);
2797         ++vb->opsReject;
2798         return NULL;
2799     }
2800 
2801     KVStore *rwUnderlying = getRWUnderlying(qi->getVBucketId());
2802     if (!deleted) {
2803         // TODO: Need to separate disk_insert from disk_update because
2804         // bySeqno doesn't give us that information.
2805         BlockTimer timer(bySeqno == -1 ?
2806                          &stats.diskInsertHisto : &stats.diskUpdateHisto,
2807                          bySeqno == -1 ? "disk_insert" : "disk_update",
2808                          stats.timingLog);
2809         PersistenceCallback *cb =
2810             new PersistenceCallback(qi, vb, this, &stats, qi->getCas());
2811         rwUnderlying->set(*qi, *cb);
2812         return cb;
2813     } else {
2814         BlockTimer timer(&stats.diskDelHisto, "disk_delete",
2815                          stats.timingLog);
2816         PersistenceCallback *cb =
2817             new PersistenceCallback(qi, vb, this, &stats, 0);
2818         rwUnderlying->del(*qi, *cb);
2819         return cb;
2820     }
2821 }
2822 
queueDirty(RCPtr<VBucket> &vb, StoredValue* v, LockHolder *plh, bool tapBackfill, bool notifyReplicator, bool genBySeqno)2823 void EventuallyPersistentStore::queueDirty(RCPtr<VBucket> &vb,
2824                                            StoredValue* v,
2825                                            LockHolder *plh,
2826                                            bool tapBackfill,
2827                                            bool notifyReplicator,
2828                                            bool genBySeqno) {
2829     if (vb) {
2830         queued_item qi(v->toItem(false, vb->getId()));
2831         bool rv;
2832         if (genBySeqno) {
2833             LockHolder slh = vb->getSnapshotLock();
2834             rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
2835                                vb->checkpointManager.queueDirty(vb, qi,
2836                                                                 genBySeqno);
2837             v->setBySeqno(qi->getBySeqno());
2838             vb->setCurrentSnapshot_UNLOCKED(qi->getBySeqno(), qi->getBySeqno());
2839         } else {
2840             rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
2841                                vb->checkpointManager.queueDirty(vb, qi,
2842                                                                 genBySeqno);
2843             v->setBySeqno(qi->getBySeqno());
2844         }
2845 
2846         if (plh) {
2847             plh->unlock();
2848         }
2849 
2850         if (rv) {
2851             KVShard* shard = vbMap.getShard(vb->getId());
2852             shard->getFlusher()->notifyFlushEvent();
2853 
2854         }
2855         if (!tapBackfill && notifyReplicator) {
2856             engine.getTapConnMap().notifyVBConnections(vb->getId());
2857             engine.getDcpConnMap().notifyVBConnections(vb->getId(),
2858                                                        qi->getBySeqno());
2859         }
2860     }
2861 }
2862 
loadVBucketState()2863 std::vector<vbucket_state *> EventuallyPersistentStore::loadVBucketState()
2864 {
2865     return getOneROUnderlying()->listPersistedVbuckets();
2866 }
2867 
warmupCompleted()2868 void EventuallyPersistentStore::warmupCompleted() {
2869     // Run the vbucket state snapshot job once after the warmup
2870     scheduleVBSnapshot(Priority::VBucketPersistHighPriority);
2871 
2872     if (engine.getConfiguration().getAlogPath().length() > 0) {
2873 
2874         if (engine.getConfiguration().