1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2021-Present Couchbase, Inc.
4  *
5  *   Use of this software is governed by the Business Source License included
6  *   in the file licenses/BSL-Couchbase.txt.  As of the Change Date specified
7  *   in that file, in accordance with the Business Source License, use of this
8  *   software will be governed by the Apache License, Version 2.0, included in
9  *   the file licenses/APL2.txt.
10  */
11 
12 #include "nexus-kvstore.h"
13 
14 #include "bucket_logger.h"
15 #include "collections/collection_persisted_stats.h"
16 #include "collections/vbucket_manifest.h"
17 #include "collections/vbucket_manifest_handles.h"
18 #include "error_handler.h"
19 #include "kvstore/kvstore.h"
20 #include "kvstore/kvstore_transaction_context.h"
21 #ifdef EP_USE_MAGMA
22 #include "kvstore/magma-kvstore/magma-kvstore.h"
23 #endif
24 #include "kvstore/rollback_callback.h"
25 #include "nexus-kvstore-config.h"
26 #include "nexus-kvstore-persistence-callback.h"
27 #include "nexus-kvstore-transaction-context.h"
28 #include "rollback_result.h"
29 #include "vb_commit.h"
30 #include "vbucket.h"
31 #include "vbucket_bgfetch_item.h"
32 #include "vbucket_state.h"
33 
34 #include <platform/dirutils.h>
35 #include <statistics/cbstat_collector.h>
36 #include <utilities/logtags.h>
37 #include <utilities/string_utilities.h>
38 
39 #include <utility>
40 
41 class NexusKVFileHandle : public KVFileHandle {
42 public:
43     NexusKVFileHandle(std::unique_ptr<KVFileHandle> primary,
44                       std::unique_ptr<KVFileHandle> secondary)
45         : primaryFileHandle(std::move(primary)),
46           secondaryFileHandle(std::move(secondary)) {
47     }
48 
49     std::unique_ptr<KVFileHandle> primaryFileHandle;
50     std::unique_ptr<KVFileHandle> secondaryFileHandle;
51 };
52 
53 NexusKVStore::NexusKVStore(NexusKVStoreConfig& config) : configuration(config) {
54     try {
55         std::filesystem::create_directories(
56                 configuration.getPrimaryConfig().getDBName());
57         std::filesystem::create_directories(
58                 configuration.getSecondaryConfig().getDBName());
59     } catch (const std::system_error& error) {
60         throw std::runtime_error(
61                 fmt::format("Failed to create nexus data directories {}",
62                             error.code().message()));
63     }
64 
65     primary = KVStoreFactory::create(configuration.getPrimaryConfig());
66     secondary = KVStoreFactory::create(configuration.getSecondaryConfig());
67 
68     auto cacheSize = configuration.getCacheSize();
69     purgeSeqno =
70             std::vector<AtomicMonotonic<uint64_t, IgnorePolicy>>(cacheSize);
71     vbMutexes = std::vector<std::mutex>(cacheSize);
72     skipGetWithHeaderChecksForRollback =
73             std::vector<std::atomic_bool>(cacheSize);
74     compactionRunning = std::vector<std::atomic_bool>(cacheSize);
75 
76     loadPurgeSeqnoCache();
77 }
78 
79 void NexusKVStore::loadPurgeSeqnoCache() {
80     auto primaryPurgeSeqnos = std::vector<uint64_t>();
81     auto secondaryPurgeSeqnos = std::vector<uint64_t>();
82 
83     for (auto& state : primary->listPersistedVbuckets()) {
84         primaryPurgeSeqnos.push_back(state ? state->purgeSeqno : 0);
85     }
86 
87     for (auto& state : secondary->listPersistedVbuckets()) {
88         secondaryPurgeSeqnos.push_back(state ? state->purgeSeqno : 0);
89     }
90 
91     if (primaryPurgeSeqnos.size() != secondaryPurgeSeqnos.size() ||
92         primaryPurgeSeqnos.size() != purgeSeqno.size()) {
93         auto msg = fmt::format(
94                 "NexusKVStore::NexusKVStore: {} purge seqno"
95                 "vectors are different sizes primary:{}"
96                 "secondary:{} nexus:{}",
97                 configuration.getShardId(),
98                 primaryPurgeSeqnos.size(),
99                 secondaryPurgeSeqnos.size(),
100                 purgeSeqno.size());
101         handleError(msg, {} /*vbid*/);
102     }
103 
104     for (size_t i = 0; i < primaryPurgeSeqnos.size(); i++) {
105         purgeSeqno[i] =
106                 std::max(primaryPurgeSeqnos[i], secondaryPurgeSeqnos[i]);
107     }
108 }
109 
110 void NexusKVStore::deinitialize() {
111     primary->deinitialize();
112     secondary->deinitialize();
113 }
114 
115 bool NexusKVStore::pause() {
116     bool primary_result = primary->pause();
117     bool secondary_result = secondary->pause();
118     if (primary_result != secondary_result) {
119         auto msg = fmt::format(
120                 "NexusKVStore::pause: {} results are different primary:{}"
121                 "secondary:{}",
122                 configuration.getShardId(),
123                 primary_result,
124                 secondary_result);
125         handleError(msg, {} /*vbid*/);
126     }
127     return primary_result;
128 }
129 
130 void NexusKVStore::resume() {
131     primary->resume();
132     secondary->resume();
133 }
134 
135 void NexusKVStore::addStats(const AddStatFn& add_stat, CookieIface& c) const {
136     // We want to print both sets of stats here for debug-ability, but we don't
137     // want to break anything relying on these stats so print primary stats
138     // as-is and the secondary stats with an additional prefix
139     primary->addStats(add_stat, c);
140 
141     auto prefixedAddStatFn =
142             [&add_stat](std::string_view key, std::string_view value, auto& c) {
143                 add_prefixed_stat("secondary", key, value, add_stat, c);
144             };
145     secondary->addStats(prefixedAddStatFn, c);
146 
147     add_prefixed_stat("nexus_" + std::to_string(getConfig().getShardId()),
148                       "skipped_checks_due_to_purge",
149                       skippedChecksDueToPurging,
150                       add_stat,
151                       c);
152 }
153 
154 bool NexusKVStore::getStat(std::string_view name, size_t& value) const {
155     // As far as I can tell stats exist for either the primary or secondary, and
156     // names aren't common between the two... We'll assert for now that that
157     // must be the case as it makes things a little simpler here to return.
158     auto primaryResult = primary->getStat(name, value);
159 
160     size_t secondaryValue;
161     auto secondaryResult = secondary->getStat(name, secondaryValue);
162 
163     if (primaryResult) {
164         Expects(!secondaryResult);
165         return primaryResult;
166     }
167 
168     if (secondaryResult) {
169         Expects(!primaryResult);
170         value = secondaryValue;
171         return primaryResult;
172     }
173 
174     return false;
175 }
176 
177 GetStatsMap NexusKVStore::getStats(
178         gsl::span<const std::string_view> keys) const {
179     return primary->getStats(keys);
180 }
181 
182 void NexusKVStore::addTimingStats(const AddStatFn& add_stat,
183                                   CookieIface& c) const {
184     primary->addTimingStats(add_stat, c);
185 
186     auto prefixedAddStatFn =
187             [&add_stat](std::string_view key, std::string_view value, auto& c) {
188                 add_prefixed_stat("secondary", key, value, add_stat, c);
189             };
190     secondary->addTimingStats(prefixedAddStatFn, c);
191 }
192 
193 void NexusKVStore::resetStats() {
194     primary->resetStats();
195     secondary->resetStats();
196 }
197 
198 size_t NexusKVStore::getMemFootPrint() const {
199     return primary->getMemFootPrint() + secondary->getMemFootPrint();
200 }
201 
202 Collections::VB::Manifest NexusKVStore::generateSecondaryVBManifest(
203         Vbid vbid, const VB::Commit& primaryCommitData) {
204     // Create a copy of the primary manifest for the secondary KVStore
205     auto secondaryManifest = primaryCommitData.collections.getManifest();
206 
207     // Having generated the Manifest object we now need to correct the disk
208     // sizes as they may differ between KVStores. We'll load the disk sizes of
209     // each collection now...
210 
211     // First each scope's dataSize must begin at zero
212     {
213         auto secondary = secondaryManifest.lock();
214         for (auto itr = secondary.beginScopes(); itr != secondary.endScopes();
215              ++itr) {
216             itr->second.setDataSize(0);
217         }
218     } // end locking scope
219 
220     // Check if vbucket state is on disk, if not it will cause secondary
221     // KVStore::getCollectionStats() to log a warning message for each
222     // collection in 'collections. This can happen in the situation where this
223     // is method is being called for the first NexusKVStore::commit() to disk
224     // since the vbucket has been created and we're trying to persist new
225     // collections.
226     try {
227         auto vbstate = secondary->getPersistedVBucketState(vbid);
228     } catch (std::exception& e) {
229         return secondaryManifest;
230     }
231 
232     // read lock the primary so we can iterate all collections
233     auto rHandle = primaryCommitData.collections.getManifest().lock();
234 
235     // iterate over the primary collections and update the copy with the
236     // diskSizes read from secondary kvstore
237     for (const auto& entry : rHandle) {
238         auto [status, stats] = secondary->getCollectionStats(vbid, entry.first);
239         if (status == GetCollectionStatsStatus::Success) {
240             auto statsHandle = secondaryManifest.lock(entry.first);
241             statsHandle.setDiskSize(stats.diskSize);
242             statsHandle.updateScopeDataSize(stats.diskSize);
243         }
244     }
245 
246     return secondaryManifest;
247 }
248 
249 void NexusKVStore::doCollectionsMetadataChecks(
250         Vbid vbid,
251         const VB::Commit* primaryVBCommit,
252         const VB::Commit* secondaryVBCommit) {
253     auto* primaryVBManifest =
254             primaryVBCommit ? &primaryVBCommit->collections.getManifest()
255                             : nullptr;
256     auto* secondaryVBManifest =
257             secondaryVBCommit ? &secondaryVBCommit->collections.getManifest()
258                               : nullptr;
259 
260     // 1) Compare on disk manifests
261     auto [primaryManifestResult, primaryKVStoreManifest] =
262             primary->getCollectionsManifest(vbid);
263     auto [secondaryManifestResult, secondaryKVStoreManifest] =
264             secondary->getCollectionsManifest(vbid);
265     if (!primaryManifestResult || !secondaryManifestResult) {
266         auto msg = fmt::format(
267                 "NexusKVStore::doCollectionsMetadataChecks: {}: issue getting "
268                 "collections manifest primary:{} secondary:{}",
269                 vbid,
270                 primaryManifestResult,
271                 secondaryManifestResult);
272         handleError(msg, vbid);
273     }
274 
275     if (primaryVBCommit && primaryVBCommit->collections.getManifestUid() != 0 &&
276         primaryKVStoreManifest.manifestUid !=
277                 primaryVBCommit->collections.getManifestUid()) {
278         auto msg = fmt::format(
279                 "NexusKVStore::doCollectionsMetadataChecks: {}: collections "
280                 "manifest uid not flushed with expected value for primary "
281                 "disk:{}, "
282                 "flush:{}",
283                 vbid,
284                 primaryKVStoreManifest.manifestUid,
285                 primaryVBCommit->collections.getManifestUid());
286         handleError(msg, vbid);
287     }
288 
289     if (secondaryVBCommit &&
290         secondaryVBCommit->collections.getManifestUid() != 0 &&
291         secondaryKVStoreManifest.manifestUid !=
292                 secondaryVBCommit->collections.getManifestUid()) {
293         auto msg = fmt::format(
294                 "NexusKVStore::doCollectionsMetadataChecks: {}: collections "
295                 "manifest uid not flushed with expected value for secondary "
296                 "disk:{}, flush:{}",
297                 vbid,
298                 secondaryKVStoreManifest.manifestUid,
299                 secondaryVBCommit->collections.getManifestUid());
300         handleError(msg, vbid);
301     }
302 
303     if (primaryKVStoreManifest != secondaryKVStoreManifest) {
304         auto msg = fmt::format(
305                 "NexusKVStore::doCollectionsMetadataChecks: {}: collections "
306                 "manifest not equal primary:{} secondary: {}",
307                 vbid,
308                 primaryKVStoreManifest,
309                 secondaryKVStoreManifest);
310         handleError(msg, vbid);
311     }
312 
313     // 2) Compare collections stats doc values
314     for (const auto& collection : primaryKVStoreManifest.collections) {
315         auto& cid = collection.metaData.cid;
316 
317         auto [primaryResult, primaryStats] =
318                 primary->getCollectionStats(vbid, cid);
319 
320         auto [secondaryResult, secondaryStats] =
321                 secondary->getCollectionStats(vbid, cid);
322         if (primaryResult != secondaryResult) {
323             auto msg = fmt::format(
324                     "NexusKVStore::doCollectionsMetadataChecks: {}: issue "
325                     "getting "
326                     "collection stats primary:{} secondary:{}",
327                     vbid,
328                     primaryResult,
329                     secondaryResult);
330             handleError(msg, vbid);
331         }
332 
333         if (primaryStats.itemCount != secondaryStats.itemCount) {
334             auto msg = fmt::format(
335                     "NexusKVStore::doCollectionsMetadataChecks: {}: cid:{} "
336                     "item "
337                     "count mismatch primary:{} secondary:{}",
338                     vbid,
339                     cid,
340                     primaryStats.itemCount,
341                     secondaryStats.itemCount);
342             handleError(msg, vbid);
343         }
344 
345         if (primaryStats.highSeqno != secondaryStats.highSeqno) {
346             auto msg = fmt::format(
347                     "NexusKVStore::doCollectionsMetadataChecks: {}: cid:{} "
348                     "high "
349                     "seqno mismatch primary:{} secondary:{}",
350                     vbid,
351                     cid,
352                     primaryStats.highSeqno,
353                     secondaryStats.highSeqno);
354             handleError(msg, vbid);
355         }
356 
357         // All checks from here down need the (in-memory) VBManifests
358         if (!primaryVBManifest || !secondaryVBManifest) {
359             return;
360         }
361 
362         auto primaryManifestHandle = primaryVBManifest->lock(cid);
363         auto secondaryManifestHandle = secondaryVBManifest->lock(cid);
364 
365         if (primaryManifestHandle.valid() &&
366             primaryStats.itemCount != primaryManifestHandle.getItemCount()) {
367             auto msg = fmt::format(
368                     "NexusKVStore::doCollectionsMetadataChecks: {}: cid:{} "
369                     "item "
370                     "count mismatch for primary disk:{} VBManifest:{}",
371                     vbid,
372                     cid,
373                     primaryStats.itemCount,
374                     primaryVBManifest->lock(cid).getItemCount());
375             handleError(msg, vbid);
376         }
377         if (secondaryManifestHandle.valid() &&
378             secondaryStats.itemCount !=
379                     secondaryManifestHandle.getItemCount()) {
380             auto msg = fmt::format(
381                     "NexusKVStore::doCollectionsMetadataChecks: {}: cid:{} "
382                     "item "
383                     "count mismatch for secondary disk:{} VBManifest:{}",
384                     vbid,
385                     cid,
386                     secondaryStats.itemCount,
387                     secondaryVBManifest->lock(cid).getItemCount());
388             handleError(msg, vbid);
389         }
390 
391         if (primaryManifestHandle.valid() &&
392             primaryStats.highSeqno !=
393                     primaryManifestHandle.getPersistedHighSeqno()) {
394             auto msg = fmt::format(
395                     "NexusKVStore::doCollectionsMetadataChecks: {}: cid:{} "
396                     "high "
397                     "seqno mismatch for primary disk:{} VBManifest:{}",
398                     vbid,
399                     cid,
400                     primaryStats.highSeqno,
401                     primaryVBManifest->lock(cid).getPersistedHighSeqno());
402             handleError(msg, vbid);
403         }
404         if (secondaryManifestHandle.valid() &&
405             secondaryStats.highSeqno !=
406                     secondaryManifestHandle.getPersistedHighSeqno()) {
407             auto msg = fmt::format(
408                     "NexusKVStore::doCollectionsMetadataChecks: {}: cid:{} "
409                     "high "
410                     "seqno mismatch for secondary disk:{} VBManifest:{}",
411                     vbid,
412                     cid,
413                     secondaryStats.highSeqno,
414                     secondaryVBManifest->lock(cid).getPersistedHighSeqno());
415             handleError(msg, vbid);
416         }
417 
418         // We can't compare disk size between primary and secondary as they
419         // will differ if the underlying KVStore type is different. We can
420         // check them against the VB Manifest though.
421         if (primaryManifestHandle.valid() &&
422             primaryStats.diskSize != primaryManifestHandle.getDiskSize()) {
423             auto msg = fmt::format(
424                     "NexusKVStore::doCollectionsMetadataChecks: {}: cid:{} "
425                     "disk "
426                     "size mismatch for primary disk:{} VBManifest:{}",
427                     vbid,
428                     cid,
429                     primaryStats.diskSize,
430                     primaryVBManifest->lock(cid).getDiskSize());
431             handleError(msg, vbid);
432         }
433         if (secondaryManifestHandle.valid() &&
434             secondaryStats.diskSize != secondaryManifestHandle.getDiskSize()) {
435             auto msg = fmt::format(
436                     "NexusKVStore::doCollectionsMetadataChecks: {}: cid:{} "
437                     "disk "
438                     "size mismatch for secondary disk:{} VBManifest:{}",
439                     vbid,
440                     cid,
441                     secondaryStats.diskSize,
442                     secondaryVBManifest->lock(cid).getDiskSize());
443             handleError(msg, vbid);
444         }
445     }
446 }
447 
448 bool NexusKVStore::commit(std::unique_ptr<TransactionContext> txnCtx,
449                           VB::Commit& primaryCommitData) {
450     auto& nexusTxnCtx = dynamic_cast<NexusKVStoreTransactionContext&>(*txnCtx);
451     auto vbid = txnCtx->vbid;
452 
453     auto lh = getLock(vbid);
454 
455     // Need to create the manifest for the secondary KVStore
456     auto secondaryVBManifest =
457             generateSecondaryVBManifest(vbid, primaryCommitData);
458 
459     // Copy the flush tracking before we go and update it for each doc we see.
460     // We need this copy for the secondary to start the counts from the same
461     // place. We need to swap the manifest that the collections flush object
462     // points to though so that it doesn't try to update the original manifest
463     // for the secondary KVStore. We'll check that manifest against the disk
464     // state to make sure the secondary KVStore is correct.
465     VB::Commit secondaryCommitData = primaryCommitData;
466     secondaryCommitData.collections.setManifest(secondaryVBManifest);
467 
468     // Secondary commit data needs some tweaking if prepares are dealt with
469     // differently
470     auto* secondaryVbState = secondary->getCachedVBucketState(vbid);
471     if (!primary->getStorageProperties().hasPrepareCounting() &&
472         secondary->getStorageProperties().hasPrepareCounting() &&
473         secondaryVbState) {
474         // Secondary supports prepare counting but primary doesn't. This means
475         // that flushes which call getCachedVBucketState and other things that
476         // call getPersistedVBucketState will have incorrect numbers for
477         // prepares as we'll reset counters on every flush. We can fix this by
478         // passing back the secondary count (under the assumption that the
479         // primary doesn't care about it).
480         secondaryCommitData.proposedVBState.onDiskPrepares =
481                 secondaryVbState->onDiskPrepares;
482         secondaryCommitData.proposedVBState.setOnDiskPrepareBytes(
483                 secondaryVbState->getOnDiskPrepareBytes());
484     }
485 
486     // Sanity check that some interesting parts of our commitData are the same
487     if (primaryCommitData.collections.getManifestUid() !=
488         secondaryCommitData.collections.getManifestUid()) {
489         auto msg = fmt::format(
490                 "NexusKVStore::commit: {}: manifest uids not "
491                 "the same before commit primary: {} "
492                 "secondary: {}",
493                 vbid,
494                 primaryCommitData.collections.getManifestUid(),
495                 secondaryCommitData.collections.getManifestUid());
496         handleError(msg, vbid);
497     }
498 
499     if (primaryCommitData.collections.isReadyForCommit() !=
500         secondaryCommitData.collections.isReadyForCommit()) {
501         auto msg = fmt::format(
502                 "NexusKVStore::commit: {}: ready for commit not "
503                 "the same before commit primary: {} "
504                 "secondary: {}",
505                 vbid,
506                 primaryCommitData.collections.isReadyForCommit(),
507                 secondaryCommitData.collections.isReadyForCommit());
508         handleError(msg, vbid);
509     }
510 
511     auto primaryResult = primary->commit(std::move(nexusTxnCtx.primaryContext),
512                                          primaryCommitData);
513     auto secondaryResult = secondary->commit(
514             std::move(nexusTxnCtx.secondaryContext), secondaryCommitData);
515     if (primaryResult != secondaryResult) {
516         auto msg = fmt::format(
517                 "NexusKVStore::commit: {}: primaryResult:{} secondaryResult:{}",
518                 vbid,
519                 primaryResult,
520                 secondaryResult);
521         handleError(msg, vbid);
522     }
523 
524     // Concurrent compaction may have modified one KVStore but not the other.
525     // In this case we need to skip the checks here as the state is probably not
526     // going to be the same.
527     if (compactionRunning[getCacheSlot(vbid)]) {
528         return primaryResult;
529     }
530 
531     doCollectionsMetadataChecks(vbid, &primaryCommitData, &secondaryCommitData);
532 
533     return primaryResult;
534 }
535 
536 StorageProperties NexusKVStore::getStorageProperties() const {
537     auto primaryProperties = primary->getStorageProperties();
538     auto secondaryProperties = secondary->getStorageProperties();
539 
540     // ByIdScan adds an extra DCP feature that clients should not assume
541     // exists (so we should only enable it if bost KVStores support it).
542     auto byIdScan = StorageProperties::ByIdScan::No;
543     if (primaryProperties.hasByIdScan() && secondaryProperties.hasByIdScan()) {
544         byIdScan = StorageProperties::ByIdScan::Yes;
545     }
546 
547     // Auto de-dupe will change the flush batches and not all KVStores can deal
548     // with that so we can only set it to true if it's true for both. All
549     // KVStores should be able to deal with a deduped flush batch.
550     auto autoDedupe = StorageProperties::AutomaticDeduplication::No;
551     if (primaryProperties.hasAutomaticDeduplication() &&
552         secondaryProperties.hasAutomaticDeduplication()) {
553         autoDedupe = StorageProperties::AutomaticDeduplication::Yes;
554     }
555 
556     // Not all KVStores can count prepares
557     auto prepareCounting = StorageProperties::PrepareCounting::No;
558     if (primaryProperties.hasPrepareCounting() &&
559         secondaryProperties.hasPrepareCounting()) {
560         prepareCounting = StorageProperties::PrepareCounting::Yes;
561     }
562 
563     // Nexus calls back from compaction with the callbacks from the primary.
564     // The bucket should be able to deal with either.
565     auto compactionStaleItemCallbacks =
566             primary->getStorageProperties().hasCompactionStaleItemCallbacks()
567                     ? StorageProperties::CompactionStaleItemCallbacks::Yes
568                     : StorageProperties::CompactionStaleItemCallbacks::No;
569 
570     return StorageProperties(byIdScan,
571                              autoDedupe,
572                              prepareCounting,
573                              compactionStaleItemCallbacks);
574 }
575 
576 void NexusKVStore::set(TransactionContext& txnCtx, queued_item item) {
577     auto& nexusTxnCtx = dynamic_cast<NexusKVStoreTransactionContext&>(txnCtx);
578     primary->set(*nexusTxnCtx.primaryContext, item);
579     secondary->set(*nexusTxnCtx.secondaryContext, item);
580 }
581 
582 void NexusKVStore::doPostGetChecks(std::string_view caller,
583                                    Vbid vb,
584                                    const DiskDocKey& key,
585                                    const GetValue& primaryGetValue,
586                                    const GetValue& secondaryGetValue) const {
587     if (primaryGetValue.getStatus() != secondaryGetValue.getStatus()) {
588         // One of the KVStores may have purged something, we can only make the
589         // comparisons here if the seqno fetched is greater than the purgeSeqno
590         // (highest purged seqno of both KVStores).
591         if ((primaryGetValue.getStatus() == cb::engine_errc::no_such_key &&
592              secondaryGetValue.getStatus() == cb::engine_errc::success &&
593              static_cast<uint64_t>(secondaryGetValue.item->getBySeqno()) <=
594                      getPurgeSeqno(vb)) ||
595             (secondaryGetValue.getStatus() == cb::engine_errc::no_such_key &&
596              primaryGetValue.getStatus() == cb::engine_errc::success &&
597              static_cast<uint64_t>(primaryGetValue.item->getBySeqno()) <=
598                      getPurgeSeqno(vb))) {
599             skippedChecksDueToPurging++;
600             return;
601         }
602 
603         auto msg = fmt::format(
604                 "NexusKVStore::{}: {} key:{} status mismatch primary:{} "
605                 "secondary:{}",
606                 caller,
607                 vb,
608                 cb::UserData(key.to_string()),
609                 primaryGetValue.getStatus(),
610                 secondaryGetValue.getStatus());
611         handleError(msg, vb);
612     }
613 
614     if (primaryGetValue.getStatus() == cb::engine_errc::success &&
615         !compareItem(*primaryGetValue.item, *secondaryGetValue.item)) {
616         auto msg = fmt::format(
617                 "NexusKVStore::{}: {} key:{} item mismatch primary:{} "
618                 "secondary:{}",
619                 caller,
620                 vb,
621                 cb::UserData(key.to_string()),
622                 *primaryGetValue.item,
623                 *secondaryGetValue.item);
624         handleError(msg, vb);
625     }
626 }
627 
628 bool NexusKVStore::compareItem(Item primaryItem, Item secondaryItem) const {
629     // We can't use the Item comparator as that's going to check datatype and
630     // value fields which may be different if we asked for a compressed item and
631     // the KVStore returned it de-compressed because it stored it decompressed.
632     if (primaryItem.isCommitted() != secondaryItem.isCommitted() ||
633         primaryItem.getOperation() != secondaryItem.getOperation() ||
634         primaryItem.getRevSeqno() != secondaryItem.getRevSeqno() ||
635         primaryItem.getVBucketId() != secondaryItem.getVBucketId() ||
636         primaryItem.getCas() != secondaryItem.getCas() ||
637         primaryItem.getExptime() != secondaryItem.getExptime() ||
638         primaryItem.getPrepareSeqno() != secondaryItem.getPrepareSeqno() ||
639         primaryItem.getBySeqno() != secondaryItem.getBySeqno() ||
640         primaryItem.getKey() != secondaryItem.getKey() ||
641         primaryItem.isDeleted() != secondaryItem.isDeleted()) {
642         return false;
643     }
644 
645     if (primaryItem.isDeleted() &&
646         primaryItem.deletionSource() != secondaryItem.deletionSource()) {
647         // If deleted, source should be the same
648         return false;
649     }
650 
651     if (primaryItem.getDataType() == secondaryItem.getDataType()) {
652         // Direct comparison of value is possible
653         return primaryItem.getValueView() == secondaryItem.getValueView();
654     }
655 
656     // Datatypes not the same... we want to check the value but we're going to
657     // have to make sure that both items are in the same state of compression to
658     // compare them.
659     std::string decompressedValue;
660     if (cb::mcbp::datatype::is_snappy(primaryItem.getDataType())) {
661         primaryItem.decompressValue();
662         decompressedValue = primaryItem.getValueView();
663     } else {
664         decompressedValue = primaryItem.getValueView();
665     }
666 
667     std::string otherDecompressedValue;
668     if (cb::mcbp::datatype::is_snappy(secondaryItem.getDataType())) {
669         secondaryItem.decompressValue();
670         otherDecompressedValue = secondaryItem.getValueView();
671     } else {
672         otherDecompressedValue = secondaryItem.getValueView();
673     }
674 
675     if (decompressedValue != otherDecompressedValue) {
676         return false;
677     }
678 
679     return true;
680 }
681 
682 GetValue NexusKVStore::get(const DiskDocKey& key,
683                            Vbid vb,
684                            ValueFilter filter) const {
685     auto lh = getLock(vb);
686     auto primaryGetValue = primary->get(key, vb, filter);
687     auto secondaryGetValue = secondary->get(key, vb, filter);
688 
689     doPostGetChecks(__FUNCTION__, vb, key, primaryGetValue, secondaryGetValue);
690     return primaryGetValue;
691 }
692 
693 GetValue NexusKVStore::getWithHeader(const KVFileHandle& kvFileHandle,
694                                      const DiskDocKey& key,
695                                      Vbid vb,
696                                      ValueFilter filter) const {
697     if (skipGetWithHeaderChecksForRollback[getCacheSlot(vb)]) {
698         // We're calling this from rollback, and the primary KVStore will have
699         // been rolled back already and we're looking for the pre-rollback seqno
700         // state of a doc in the callback in EPBucket. Any comparison here would
701         // be invalid so we just return early. We use the raw file handle passed
702         // in here rather than case to the Nexus variant as rollback will invoke
703         // this with a file handle that it create (i.e. a primary or secondary
704         // specific one).
705         return primary->getWithHeader(kvFileHandle, key, vb, filter);
706     }
707 
708     auto& nexusFileHandle =
709             dynamic_cast<const NexusKVFileHandle&>(kvFileHandle);
710     auto lh = getLock(vb);
711 
712     auto primaryGetValue = primary->getWithHeader(
713             *nexusFileHandle.primaryFileHandle, key, vb, filter);
714 
715     auto secondaryGetValue = secondary->getWithHeader(
716             *nexusFileHandle.secondaryFileHandle, key, vb, filter);
717 
718     doPostGetChecks(__FUNCTION__, vb, key, primaryGetValue, secondaryGetValue);
719     return primaryGetValue;
720 }
721 
722 void NexusKVStore::setMaxDataSize(size_t size) {
723     primary->setMaxDataSize(size);
724     secondary->setMaxDataSize(size);
725 }
726 
727 /**
728  * BGFetchItem created by NexusKVStore to perform the same BGFetch operation
729  * against the secondary KVStore in NexusKVStore.
730  */
731 class NexusBGFetchItem : public BGFetchItem {
732 public:
733     explicit NexusBGFetchItem(std::chrono::steady_clock::time_point initTime,
734                               ValueFilter filter,
735                               uint64_t token)
736         : BGFetchItem(initTime, token), filter(filter) {
737     }
738 
739     void complete(EventuallyPersistentEngine& engine,
740                   VBucketPtr& vb,
741                   std::chrono::steady_clock::time_point startTime,
742                   const DiskDocKey& key) const override {
743         // Do nothing, we will compare the GetValues later
744     }
745 
746     void abort(
747             EventuallyPersistentEngine& engine,
748             cb::engine_errc status,
749             std::map<CookieIface*, cb::engine_errc>& toNotify) const override {
750         // Same as above
751     }
752 
753     ValueFilter getValueFilter() const override {
754         return filter;
755     }
756 
757 private:
758     ValueFilter filter;
759 };
760 
761 void NexusKVStore::getMulti(Vbid vb, vb_bgfetch_queue_t& primaryQueue) const {
762     auto lh = getLock(vb);
763     vb_bgfetch_queue_t secondaryQueue;
764     for (const auto& [key, primaryCtx] : primaryQueue) {
765         auto [itr, inserted] =
766                 secondaryQueue.emplace(key, vb_bgfetch_item_ctx_t());
767         Expects(inserted);
768 
769         for (const auto& bgFetchItem : primaryCtx.getRequests()) {
770             itr->second.addBgFetch(std::make_unique<NexusBGFetchItem>(
771                     bgFetchItem->initTime, bgFetchItem->getValueFilter(), 0));
772         }
773     }
774 
775     primary->getMulti(vb, primaryQueue);
776     secondary->getMulti(vb, secondaryQueue);
777 
778     if (primaryQueue.size() != secondaryQueue.size()) {
779         auto msg = fmt::format(
780                 "NexusKVStore::getMulti: {}: primary queue and secondary "
781                 "queue are different sizes",
782                 vb);
783         handleError(msg, vb);
784     }
785 
786     for (auto& [key, value] : primaryQueue) {
787         auto secondaryItr = secondaryQueue.find(key);
788         if (secondaryItr == secondaryQueue.end()) {
789             auto msg = fmt::format(
790                     "NexusKVStore::getMulti: {}: found key:{} in primary queue "
791                     "but not secondary",
792                     vb,
793                     cb::UserData(key.to_string()));
794             handleError(msg, vb);
795         }
796 
797         doPostGetChecks(
798                 __FUNCTION__, vb, key, value.value, secondaryItr->second.value);
799     }
800 }
801 
802 void NexusKVStore::getRange(Vbid vb,
803                             const DiskDocKey& startKey,
804                             const DiskDocKey& endKey,
805                             ValueFilter filter,
806                             const KVStoreIface::GetRangeCb& cb) const {
807     auto lh = getLock(vb);
808 
809     std::deque<GetValue> primaryGetValues;
810     auto primaryCb = [&primaryGetValues](GetValue&& gv) {
811         primaryGetValues.emplace_back(std::move(gv));
812     };
813 
814     primary->getRange(vb, startKey, endKey, filter, primaryCb);
815 
816     std::deque<GetValue> secondaryGetValues;
817     auto secondaryCb = [&secondaryGetValues](GetValue&& gv) {
818         secondaryGetValues.emplace_back(std::move(gv));
819     };
820 
821     secondary->getRange(vb, startKey, endKey, filter, secondaryCb);
822 
823     // Callbacks should be in the same order, but purging could mean that
824     // there are gaps in either. getRange doens't use a file handle so the purge
825     // seqno is not consistent with the scanned items so we can only make a best
826     // effort here and check thoroughly if the purge seqno for both is 0.
827     if (getPurgeSeqno(vb) != 0) {
828         skippedChecksDueToPurging++;
829         return;
830     }
831 
832     if (primaryGetValues.size() != secondaryGetValues.size()) {
833         auto msg = fmt::format(
834                 "NexusKVStore::getMulti: {}: primary getvalues  and secondary "
835                 "get values are different sizes",
836                 vb);
837         handleError(msg, vb);
838     }
839 
840     auto size = primaryGetValues.size();
841     for (size_t i = 0; i < size; i++) {
842         // Check primary vs secondary
843         if (primaryGetValues.front().getStatus() !=
844             secondaryGetValues.front().getStatus()) {
845             // Might be able to log key if one is success
846             std::string key = "";
847             if (primaryGetValues.front().getStatus() ==
848                 cb::engine_errc::success) {
849                 key = primaryGetValues.front().item->getKey().to_string();
850             }
851             if (secondaryGetValues.front().getStatus() ==
852                 cb::engine_errc::success) {
853                 key = secondaryGetValues.front().item->getKey().to_string();
854             }
855 
856             auto msg = fmt::format(
857                     "NexusKVStore::getRange: {}: different result for item "
858                     "with key {}"
859                     "primary:{} secondary:{}",
860                     vb,
861                     key,
862                     primaryGetValues.front().getStatus(),
863                     secondaryGetValues.front().getStatus());
864             handleError(msg, vb);
865         }
866 
867         if (primaryGetValues.front().getStatus() == cb::engine_errc::success) {
868             doPostGetChecks(__FUNCTION__,
869                             vb,
870                             DiskDocKey(primaryGetValues.front().item->getKey()),
871                             primaryGetValues.front(),
872                             secondaryGetValues.front());
873         }
874 
875         cb(std::move(primaryGetValues.front()));
876         primaryGetValues.pop_front();
877         secondaryGetValues.pop_front();
878     }
879 
880     if (!secondaryGetValues.empty()) {
881         std::stringstream ss;
882         for (auto& gv : secondaryGetValues) {
883             ss << *gv.item << ",";
884         }
885         ss.unget();
886 
887         auto msg = fmt::format(
888                 "NexusKVStore::getRange: {}: secondary callbacks not made by "
889                 "primary:{}",
890                 vb,
891                 cb::UserData(ss.str()));
892         handleError(msg, vb);
893     }
894 }
895 
896 void NexusKVStore::del(TransactionContext& txnCtx, queued_item item) {
897     auto& nexusTxnCtx = dynamic_cast<NexusKVStoreTransactionContext&>(txnCtx);
898     primary->del(*nexusTxnCtx.primaryContext, item);
899     secondary->del(*nexusTxnCtx.secondaryContext, item);
900 }
901 
902 class NexusKVStoreRevision : public KVStoreRevision {
903 public:
904     NexusKVStoreRevision(uint64_t primaryRevision, uint64_t secondaryRevision)
905         : KVStoreRevision(primaryRevision),
906           secondaryRevision(secondaryRevision) {
907     }
908 
909     uint64_t getPrimaryRevision() const {
910         return getRevision();
911     }
912 
913     uint64_t getSecondaryRevision() const {
914         return secondaryRevision;
915     }
916 
917 protected:
918     uint64_t secondaryRevision;
919 };
920 
921 void NexusKVStore::delVBucket(Vbid vbucket,
922                               std::unique_ptr<KVStoreRevision> fileRev) {
923     auto& nexusFileRev = dynamic_cast<NexusKVStoreRevision&>(*fileRev);
924 
925     auto primaryFileRev = std::make_unique<KVStoreRevision>(
926             nexusFileRev.getPrimaryRevision());
927     auto secondaryFileRev = std::make_unique<KVStoreRevision>(
928             nexusFileRev.getSecondaryRevision());
929 
930     primary->delVBucket(vbucket, std::move(primaryFileRev));
931     secondary->delVBucket(vbucket, std::move(secondaryFileRev));
932 }
933 
934 bool NexusKVStore::compareVBucketState(Vbid vbid,
935                                        vbucket_state primaryVbState,
936                                        vbucket_state secondaryVbState) const {
937     if (!getStorageProperties().hasPrepareCounting()) {
938         // Can't compare prepare counts so zero them out
939         primaryVbState.onDiskPrepares = 0;
940         secondaryVbState.onDiskPrepares = 0;
941         primaryVbState.setOnDiskPrepareBytes(0);
942         secondaryVbState.setOnDiskPrepareBytes(0);
943     }
944 
945     if (getPurgeSeqno(vbid) != 0) {
946         // Purged something - purge seqnos are likely to be no longer
947         // comparable.
948         skippedChecksDueToPurging++;
949         primaryVbState.purgeSeqno = 0;
950         secondaryVbState.purgeSeqno = 0;
951     }
952 
953     return primaryVbState == secondaryVbState;
954 }
955 
956 std::vector<vbucket_state*> NexusKVStore::listPersistedVbuckets() {
957     auto primaryVbStates = primary->listPersistedVbuckets();
958     auto secondaryVbStates = secondary->listPersistedVbuckets();
959 
960     // listPersistedVbuckets returns the array of cached vbucket states (as that
961     // should be populated with what's on disk). cachedVbucketStates is sized
962     // such that it only tracks the vBuckets a shard cares about (i.e. max
963     // vBuckets / max shards). Were one KVStore to return too many or too few
964     // vBuckets then we'd have messed up the construction. To map the vector
965     // index to vbid we have to multiply by max shards and add the shard id.
966     // Should the sizes be different here then that implies that vBuckets
967     // returned are entirely un-comparable and there's not much point printing
968     // them because somthing fundamental has gone wrong.
969     if (primaryVbStates.size() != secondaryVbStates.size()) {
970         auto msg = fmt::format(
971                 "NexusKVStore::listPersistedVbuckets: size of "
972                 "listPersistedVbuckets not equal primary: {} "
973                 "secondary:{} shard id:{} max shards:{}",
974                 primaryVbStates.size(),
975                 secondaryVbStates.size(),
976                 configuration.getShardId(),
977                 configuration.getMaxShards());
978         handleError(msg, {} /*vbid*/);
979 
980         // This isn't comparable, just return.
981         return primaryVbStates;
982     }
983 
984     for (size_t i = 0; i < primaryVbStates.size(); i++) {
985         Vbid vbid = Vbid(i * configuration.getMaxShards() +
986                          configuration.getShardId());
987         if (primaryVbStates[i] == nullptr || secondaryVbStates[i] == nullptr) {
988             if (primaryVbStates[i] != secondaryVbStates[i]) {
989                 auto msg = fmt::format(
990                         "NexusKVStore::listPersistedVbuckets: {} "
991                         "vbucket state found primary:{} secondary:{}",
992                         vbid,
993                         primaryVbStates[i] != nullptr,
994                         secondaryVbStates[i] != nullptr);
995                 handleError(msg, {} /*vbid*/);
996             }
997             continue;
998         }
999 
1000         if (!compareVBucketState(
1001                     vbid, *primaryVbStates[i], *secondaryVbStates[i])) {
1002             auto msg = fmt::format(
1003                     "NexusKVStore::listPersistedVbuckets: {} "
1004                     "vbucket state not equal primary:{} secondary:{}",
1005                     vbid,
1006                     *primaryVbStates[i],
1007                     *secondaryVbStates[i]);
1008             handleError(msg, vbid);
1009         }
1010     }
1011 
1012     return primaryVbStates;
1013 }
1014 
1015 bool NexusKVStore::snapshotVBucket(Vbid vbucketId,
1016                                    const vbucket_state& vbstate) {
1017     auto primaryResult = primary->snapshotVBucket(vbucketId, vbstate);
1018     auto secondaryResult = secondary->snapshotVBucket(vbucketId, vbstate);
1019 
1020     if (primaryResult != secondaryResult) {
1021         auto msg = fmt::format(
1022                 "NexusKVStore::snapshotVBucket: {} primaryResult:{} "
1023                 "secondaryResult:{}",
1024                 vbucketId,
1025                 primaryResult,
1026                 secondaryResult);
1027         handleError(msg, vbucketId);
1028     }
1029 
1030     auto primaryVbState = primary->getPersistedVBucketState(vbucketId);
1031     auto secondaryVbState = secondary->getPersistedVBucketState(vbucketId);
1032 
1033     if (primaryVbState.status != secondaryVbState.status) {
1034         auto msg = fmt::format(
1035                 "NexusKVStore::getPersistedVBucketState: {}:"
1036                 "difference in status primary:{} secondary:{}",
1037                 vbucketId,
1038                 to_string(primaryVbState.status),
1039                 to_string(secondaryVbState.status));
1040         handleError(msg, vbucketId);
1041     }
1042 
1043     if (primaryVbState.status != ReadVBStateStatus::Success) {
1044         // Only valid to compare below if we have a good state.
1045         return primaryResult;
1046     }
1047 
1048     if (!compareVBucketState(
1049                 vbucketId, primaryVbState.state, secondaryVbState.state)) {
1050         auto msg = fmt::format(
1051                 "NexusKVStore::snapshotVBucket: {} difference in vbstate "
1052                 "primary:{} secondary:{}",
1053                 vbucketId,
1054                 primaryVbState.state,
1055                 secondaryVbState.state);
1056         handleError(msg, vbucketId);
1057     }
1058 
1059     return primaryResult;
1060 }
1061 
1062 /**
1063  * Expiry callback variant that stores a set of callback invocations and
1064  * (if supplied) forwards the callback on to the real expiry callback
1065  */
1066 class NexusExpiryCB : public Callback<Item&, time_t&> {
1067 public:
1068     explicit NexusExpiryCB(std::shared_ptr<Callback<Item&, time_t&>> cb = {})
1069         : cb(std::move(cb)) {
1070     }
1071 
1072     void callback(Item& it, time_t& startTime) override {
1073         // Time is not interesting here
1074         callbacks.emplace(it.getKey(), it.getBySeqno());
1075         if (cb) {
1076             cb->callback(it, startTime);
1077         }
1078     }
1079 
1080     std::unordered_map<DiskDocKey, int64_t> callbacks;
1081     std::shared_ptr<Callback<Item&, time_t&>> cb;
1082 };
1083 
1084 struct NexusCompactionContext {
1085     KVStoreIface* kvStoreToCompactFirst;
1086     KVStoreIface* kvStoreToCompactSecond;
1087     std::shared_ptr<CompactionContext> firstCtx;
1088     std::shared_ptr<CompactionContext> secondCtx;
1089 
1090     bool attemptToPruneStaleCallbacks;
1091 };
1092 
1093 NexusCompactionContext NexusKVStore::calculateCompactionOrder(
1094         std::shared_ptr<CompactionContext> primaryCtx,
1095         std::shared_ptr<CompactionContext> secondaryCtx) {
1096     auto primaryStaleCallbacks =
1097             primary->getStorageProperties().hasCompactionStaleItemCallbacks();
1098     auto secondaryStaleCallbacks =
1099             secondary->getStorageProperties().hasCompactionStaleItemCallbacks();
1100 
1101     if (!primaryStaleCallbacks && !secondaryStaleCallbacks) {
1102         // Couchstore + Couchstore
1103         // Neither has stale call backs, comparisons are simple and it should
1104         // not matter in which order we run compaction
1105         return {primary.get(),
1106                 secondary.get(),
1107                 primaryCtx,
1108                 secondaryCtx,
1109                 false};
1110     } else if (primaryStaleCallbacks && !secondaryStaleCallbacks) {
1111         // Magma + Couchstore
1112         // Run primary first to attempt to prune the stale callbacks
1113         return {primary.get(), secondary.get(), primaryCtx, secondaryCtx, true};
1114     } else if (!primaryStaleCallbacks && secondaryStaleCallbacks) {
1115         // Couchstore + Magma
1116         // Run secondary first to attempt to prune the stale callbacks
1117         return {secondary.get(), primary.get(), secondaryCtx, primaryCtx, true};
1118     } else {
1119         // Magma + Magma
1120         // Order shouldn't matter, the stale callback pruning may/may not work
1121         // depending on how/when files are compacted in magma
1122         return {primary.get(),
1123                 secondary.get(),
1124                 primaryCtx,
1125                 secondaryCtx,
1126                 false};
1127     }
1128 }
1129 
1130 /**
1131  * Special PurgedItemCtx hook to update the purgeSeqno member of NexusKVStore
1132  * when we move the purge seqno in one of the underlying KVStores
1133  */
1134 class NexusPurgedItemCtx : public PurgedItemCtx {
1135 public:
1136     NexusPurgedItemCtx(NexusKVStore& kvstore, Vbid vbid, uint64_t purgeSeq)
1137         : PurgedItemCtx(purgeSeq), kvstore(kvstore), vbid(vbid) {
1138     }
1139 
1140     void purgedItem(PurgedItemType type, uint64_t seqno) override {
1141         PurgedItemCtx::purgedItem(type, seqno);
1142 
1143         // Can't use getPurgeSeqno as it is const
1144         kvstore.purgeSeqno[kvstore.getCacheSlot(vbid)] = seqno;
1145     }
1146 
1147 protected:
1148     NexusKVStore& kvstore;
1149     Vbid vbid;
1150 };
1151 
1152 CompactDBStatus NexusKVStore::compactDB(
1153         std::unique_lock<std::mutex>& vbLock,
1154         std::shared_ptr<CompactionContext> primaryCtx) {
1155     auto primaryVbPtr = primaryCtx->getVBucket();
1156     auto vbid = primaryVbPtr->getId();
1157 
1158     // We take the nexus lock at this point as we may/may not be running with
1159     // support for concurrent flushing and compaciton. If we are, then we will
1160     // unlock this lock later.
1161     auto nexusLock = getLock(vbid);
1162 
1163     // compactionLock* is a pointer to the lock holder that we will pass to the
1164     // individual KVStores. We either pass the vbLock if we support concurrent
1165     // flushing and compaction, or we pass dummyLh so that we can lock/unlock
1166     // freely with no effect.
1167     auto* compactionLock = &vbLock;
1168 
1169     // dummyLock is passed into the KVStores as a replacement for the vbLock if
1170     // we are not running with concurrent flushing and compaction. As the API
1171     // requires a lock holder (unique_lock) we need to take that now and we'll
1172     // unlock it later if we don't need it
1173     std::mutex dummyLock;
1174     auto dummyLh = std::unique_lock<std::mutex>(dummyLock);
1175 
1176     // If we're running with support for concurrent flushing and compaction then
1177     // the main consideration in terms of the comparisons we can make now is if
1178     // compaction runs partially and we do something else in between the
1179     // compactions of the KVstores. We can deal with purges quite simply by
1180     // moving the NexusKVStore::purgeSeqno which all operations check. Any
1181     // comparisons lower than that may not be valid. Expiries are more
1182     // interesting here though as we compare the callback invocations. One
1183     // example situation is if the KVStore we compact first generates an expiry
1184     // which gets flushed before we compact the second KVStore. In this case the
1185     // expiry invocations may be different. There is also an interesting case
1186     // with logical deletions and collection recreation/surrection, so we'll
1187     // deal with them both in the same way.
1188     if (configuration.isConcurrentFlushCompactionEnabled()) {
1189         // We unlock the nexusLock to allow flushes in, we're still holding the
1190         // vbLock at this point but the underlying KVStore will unlock it when
1191         // it's ready
1192         nexusLock.unlock();
1193         // We unlock the dummyLock at this point to prevent lock order
1194         // inversions with the vbLock and the dummyLock. Because we have a lock
1195         // holder we have to take it outside of this scope and unlock it
1196         // manually.
1197         dummyLh.unlock();
1198     } else {
1199         compactionLock = &dummyLh;
1200     }
1201 
1202     compactionRunning[getCacheSlot(vbid)] = true;
1203 
1204     // Scope guard to reset compactionRunning just in case something throws
1205     auto guard = folly::makeGuard(
1206             [this, vbid] { compactionRunning[getCacheSlot(vbid)] = false; });
1207 
1208     // Create a new context to avoid calling things like the completion callback
1209     // which sets in memory state after the secondary compacts
1210     auto secondaryCtx = std::make_shared<CompactionContext>(
1211             std::move(primaryVbPtr),
1212             primaryCtx->compactConfig,
1213             primaryCtx->getRollbackPurgeSeqno(),
1214             primaryCtx->timeToExpireFrom);
1215 
1216     secondaryCtx->isShuttingDown = primaryCtx->isShuttingDown;
1217 
1218     // Don't set the NexusExpiryCB cb member to avoid forwarding expiries to
1219     // the engine (we will do so for the primary)
1220     auto secondaryExpiryCb = std::make_shared<NexusExpiryCB>();
1221     secondaryCtx->expiryCallback = secondaryExpiryCb;
1222 
1223     // Replace the ExpiredItemsCallback with our own that stores the result for
1224     // later comparison with the secondary and forwards the result on
1225     auto primaryExpiryCb =
1226             std::make_shared<NexusExpiryCB>(primaryCtx->expiryCallback);
1227     primaryCtx->expiryCallback = primaryExpiryCb;
1228 
1229     std::unordered_map<DiskDocKey, int64_t> primaryDrops;
1230     std::unordered_map<DiskDocKey, int64_t> secondaryDrops;
1231     Collections::KVStore::DroppedCb originalDroppedKeyCb =
1232             primaryCtx->droppedKeyCb;
1233     primaryCtx->droppedKeyCb = [&primaryDrops, &originalDroppedKeyCb](
1234                                        const DiskDocKey& key,
1235                                        int64_t seqno,
1236                                        bool aborted,
1237                                        int64_t pcs) {
1238         auto [itr, inserted] = primaryDrops.try_emplace(key, seqno);
1239         itr->second = std::max<uint64_t>(itr->second, seqno);
1240 
1241         originalDroppedKeyCb(key, seqno, aborted, pcs);
1242     };
1243 
1244     secondaryCtx->droppedKeyCb = [&secondaryDrops](const DiskDocKey& key,
1245                                                    int64_t seqno,
1246                                                    bool aborted,
1247                                                    int64_t pcs) {
1248         auto itr = secondaryDrops.find(key);
1249         if (itr != secondaryDrops.end()) {
1250             itr->second = std::max<uint64_t>(itr->second, seqno);
1251         } else {
1252             secondaryDrops[key] = seqno;
1253         }
1254     };
1255 
1256     primaryCtx->purgedItemCtx = std::make_unique<NexusPurgedItemCtx>(
1257             *this, vbid, getPurgeSeqno(vbid));
1258     secondaryCtx->purgedItemCtx = std::make_unique<NexusPurgedItemCtx>(
1259             *this, vbid, getPurgeSeqno(vbid));
1260 
1261     // Comparisons in the callbacks made may be difficult to make if one KVStore
1262     // may call back with stale items but the other does not. If we know that
1263     // one of the KVStores will do so then we can run the compaction for that
1264     // KVStore first and check the item against the other to see if it is stale
1265     // or not. If the callback is for a stale item, we remove it from the list
1266     // to compare.
1267     auto nexusCompactionContext =
1268             calculateCompactionOrder(primaryCtx, secondaryCtx);
1269 
1270     if (!vbLock.owns_lock()) {
1271         throw std::logic_error(
1272                 fmt::format("NexusKVStore::compactDB: Passed vbLock for {} but "
1273                             "it is not locked",
1274                             vbid));
1275     }
1276 
1277     preCompactionHook();
1278 
1279     // We're going to take a copy of the high seqno before we compact the
1280     // first KVStore so that we can determine later on if it's valid to make
1281     // comparisons (if a concurrent flush ran then expiry or logical deletions
1282     // callbacks may be different and not comparable).
1283     auto beforeFirstCompactionHighSeqno = getLastPersistedSeqno(vbid);
1284 
1285     auto firstResult = nexusCompactionContext.kvStoreToCompactFirst->compactDB(
1286             *compactionLock, nexusCompactionContext.firstCtx);
1287 
1288     if (nexusCompactionContext.attemptToPruneStaleCallbacks) {
1289         // Iterate over the callbacks made by the first compaction and run
1290         // a get against the other KVStore to check if the item exists. If it
1291         // does and the seqno of the drop is lower than that of the primary
1292         // then we should just ignore the callback invocation as it's probably
1293         // a stale key.
1294         auto& firstDrops =
1295                 nexusCompactionContext.kvStoreToCompactFirst == primary.get()
1296                         ? primaryDrops
1297                         : secondaryDrops;
1298         for (auto itr = firstDrops.begin(); itr != firstDrops.end();) {
1299             auto key = itr->first;
1300             auto seqno = itr->second;
1301 
1302             auto gv = nexusCompactionContext.kvStoreToCompactSecond->get(key,
1303                                                                          vbid);
1304             if (gv.getStatus() == cb::engine_errc::success &&
1305                 gv.item->getBySeqno() > seqno) {
1306                 // Remove stale callback invocation
1307                 firstDrops.erase(itr++);
1308             } else {
1309                 itr++;
1310             }
1311         }
1312 
1313         auto& firstExpiries =
1314                 nexusCompactionContext.kvStoreToCompactFirst == primary.get()
1315                         ? primaryExpiryCb->callbacks
1316                         : secondaryExpiryCb->callbacks;
1317 
1318         for (auto itr = firstExpiries.begin(); itr != firstExpiries.end();) {
1319             auto key = itr->first;
1320             auto seqno = itr->second;
1321 
1322             auto gv = nexusCompactionContext.kvStoreToCompactSecond->get(key,
1323                                                                          vbid);
1324 
1325             if (gv.getStatus() == cb::engine_errc::success &&
1326                 gv.item->getBySeqno() > seqno) {
1327                 // Remove stale callback invocation
1328                 firstExpiries.erase(itr++);
1329             } else {
1330                 itr++;
1331             }
1332         }
1333     }
1334 
1335     midCompactionHook(vbLock);
1336 
1337     // Might have to re-acquire the lock, depending what the first kvstore does
1338     // with it...
1339     if (!compactionLock->owns_lock()) {
1340         compactionLock->lock();
1341     }
1342 
1343     // We've just locked the vBucket lock so flushers are going to be blocked
1344     // now. As we allow flushing and compaction to run concurrently (in both
1345     // the full server and NexusKVStore) it may be the case that flushes may
1346     // have happened that have changed the state of documents such that the
1347     // callbacks made by the second (not secondary) KVStore are not the same as
1348     // those made by the primary. We can detect a flush having run now by
1349     // checking the high seqno, and should this have moved since the original
1350     // compaction we'll skip comparing callbacks.
1351     auto beforeSecondCompactionHighSeqno = getLastPersistedSeqno(vbid);
1352 
1353     auto secondResult =
1354             nexusCompactionContext.kvStoreToCompactSecond->compactDB(
1355                     *compactionLock, nexusCompactionContext.secondCtx);
1356 
1357     if (firstResult != secondResult) {
1358         auto msg = fmt::format(
1359                 "NexusKVStore::compactDB: {}: compaction result mismatch "
1360                 "first:{} second:{}",
1361                 vbid,
1362                 firstResult,
1363                 secondResult);
1364         handleError(msg, vbid);
1365     }
1366 
1367     // We bump the collectionsPurged stat when we erase collections, magma only
1368     // purged the range rather than the full data set if it is purging
1369     // collections so comparisons won't be valid if we are purging collections.
1370     // We check both the primary and secondary as when we enable concurrent
1371     // flushing and compaction they may differ.
1372     if (primaryCtx->stats.collectionsPurged != 0 ||
1373         secondaryCtx->stats.collectionsPurged != 0) {
1374         return nexusCompactionContext.kvStoreToCompactFirst == primary.get()
1375                        ? firstResult
1376                        : secondResult;
1377     }
1378 
1379     if (beforeFirstCompactionHighSeqno != beforeSecondCompactionHighSeqno) {
1380         // Not valid to compare expiries or logical deletions as the high seqno
1381         // has moved
1382         return nexusCompactionContext.kvStoreToCompactFirst == primary.get()
1383                        ? firstResult
1384                        : secondResult;
1385     }
1386 
1387     // Compare the collections state if successful
1388     if (firstResult == CompactDBStatus::Success) {
1389         doCollectionsMetadataChecks(vbid, nullptr, nullptr);
1390     }
1391 
1392     // The expiration callback invocations should be the same
1393     for (auto& [key, seqno] : primaryExpiryCb->callbacks) {
1394         if (secondaryExpiryCb->callbacks.find(key) ==
1395             secondaryExpiryCb->callbacks.end()) {
1396             auto msg = fmt::format(
1397                     "NexusKVStore::compactDB: {}: Expiry callback found with "
1398                     "key:{} seqno:{} for primary but not secondary",
1399                     vbid,
1400                     cb::UserData(key.to_string()),
1401                     seqno);
1402             handleError(msg, vbid);
1403         } else {
1404             secondaryExpiryCb->callbacks.erase(key);
1405         }
1406     }
1407 
1408     if (!secondaryExpiryCb->callbacks.empty()) {
1409         std::stringstream ss;
1410         for (auto& [key, seqno] : secondaryExpiryCb->callbacks) {
1411             ss << "key: " << cb::UserData(key.to_string())
1412                << " seqno: " << seqno << ",";
1413         }
1414         ss.unget();
1415 
1416         auto msg = fmt::format(
1417                 "NexusKVStore::compactDB: {}: secondary expiry callbacks not "
1418                 "made by primary:{}",
1419                 vbid,
1420                 ss.str());
1421         handleError(msg, vbid);
1422     }
1423 
1424     for (auto& [key, seqno] : primaryDrops) {
1425         auto itr = secondaryDrops.find(key);
1426         if (itr == secondaryDrops.end()) {
1427             if (static_cast<uint64_t>(seqno) <= getPurgeSeqno(vbid)) {
1428                 // Seqno may have been purged, skip to next key as any
1429                 // comparison is not valid.
1430                 skippedChecksDueToPurging++;
1431                 continue;
1432             }
1433 
1434             auto msg = fmt::format(
1435                     "NexusKVStore::compactDB: {}: drop callback found with "
1436                     "key:{} for primary but not secondary",
1437                     vbid,
1438                     cb::UserData(key.to_string()));
1439             handleError(msg, vbid);
1440         } else if (seqno != itr->second) {
1441             auto msg = fmt::format(
1442                     "NexusKVStore::compactDB: {}: drop callback found with "
1443                     "key:{} and different seqno primary:{} secondary:{}",
1444                     vbid,
1445                     cb::UserData(key.to_string()),
1446                     seqno,
1447                     itr->second);
1448             handleError(msg, vbid);
1449         } else {
1450             secondaryDrops.erase(key);
1451         }
1452     }
1453 
1454     // We may have purged a bunch of stuff from secondary that was not purged
1455     // from the primary (as it already had been). Erase it all from
1456     // secondaryDrops if it's lower than the purgeSeqno as comparisons are not
1457     // valid. Anything above purgeSeqno will be kept and printed below as that's
1458     // an error (or bug).
1459     auto secondaryItr = secondaryDrops.begin();
1460     while (secondaryItr != secondaryDrops.end()) {
1461         if (static_cast<uint64_t>(secondaryItr->second) <=
1462             getPurgeSeqno(vbid)) {
1463             secondaryItr = secondaryDrops.erase(secondaryItr);
1464             skippedChecksDueToPurging++;
1465             continue;
1466         }
1467         secondaryItr++;
1468     }
1469 
1470     if (!secondaryDrops.empty()) {
1471         std::stringstream ss;
1472         for (auto& [key, seqno] : secondaryDrops) {
1473             ss << "[key:" << cb::UserData(key.to_string()) << ",seqno:" << seqno
1474                << "],";
1475         }
1476         ss.unget();
1477 
1478         auto msg = fmt::format(
1479                 "NexusKVStore::compactDB: {}: secondary callbacks not made by "
1480                 "primary:{}",
1481                 vbid,
1482                 ss.str());
1483         handleError(msg, vbid);
1484     }
1485 
1486     return nexusCompactionContext.kvStoreToCompactFirst == primary.get()
1487                    ? firstResult
1488                    : secondResult;
1489 }
1490 
1491 void NexusKVStore::abortCompactionIfRunning(
1492         std::unique_lock<std::mutex>& vbLock, Vbid vbid) {
1493     primary->abortCompactionIfRunning(vbLock, vbid);
1494     secondary->abortCompactionIfRunning(vbLock, vbid);
1495 }
1496 
1497 vbucket_state* NexusKVStore::getCachedVBucketState(Vbid vbid) {
1498     auto* primaryVbState = primary->getCachedVBucketState(vbid);
1499     auto* secondaryVbState = secondary->getCachedVBucketState(vbid);
1500 
1501     if (static_cast<bool>(primaryVbState) !=
1502         static_cast<bool>(secondaryVbState)) {
1503         auto msg = fmt::format(
1504                 "NexusKVStore::getCachedVBucketState: {}:"
1505                 "vbstate returned for only one KVStore"
1506                 "primary:{} secondary:{}",
1507                 vbid,
1508                 static_cast<bool>(primaryVbState),
1509                 static_cast<bool>(secondaryVbState));
1510         handleError(msg, vbid);
1511     }
1512 
1513     if (primaryVbState && secondaryVbState &&
1514         !compareVBucketState(vbid, *primaryVbState, *secondaryVbState)) {
1515         auto msg = fmt::format(
1516                 "NexusKVStore::getCachedVBucketState: {}: "
1517                 "difference in vBucket state primary:{} "
1518                 "secondary:{}",
1519                 vbid,
1520                 *primaryVbState,
1521                 *secondaryVbState);
1522         handleError(msg, vbid);
1523     }
1524 
1525     return primary->getCachedVBucketState(vbid);
1526 }
1527 
1528 KVStoreIface::ReadVBStateResult NexusKVStore::getPersistedVBucketState(
1529         Vbid vbid) const {
1530     auto primaryVbState = primary->getPersistedVBucketState(vbid);
1531     auto secondaryVbState = secondary->getPersistedVBucketState(vbid);
1532 
1533     if (primaryVbState.status != secondaryVbState.status) {
1534         auto msg = fmt::format(
1535                 "NexusKVStore::getPersistedVBucketState: {}:"
1536                 "difference in status primary:{} secondary:{}",
1537                 vbid,
1538                 to_string(primaryVbState.status),
1539                 to_string(secondaryVbState.status));
1540         handleError(msg, vbid);
1541     }
1542 
1543     if (primaryVbState.status != ReadVBStateStatus::Success) {
1544         // Only valid to compare below if we have a good state.
1545         return primaryVbState;
1546     }
1547 
1548     if (!compareVBucketState(
1549                 vbid, primaryVbState.state, secondaryVbState.state)) {
1550         auto msg = fmt::format(
1551                 "NexusKVStore::getPersistedVBucketState: {}: "
1552                 "difference in vBucket state primary:{} "
1553                 "secondary:{}",
1554                 vbid,
1555                 primaryVbState.state,
1556                 secondaryVbState.state);
1557         handleError(msg, vbid);
1558     }
1559     return primaryVbState;
1560 }
1561 
1562 KVStoreIface::ReadVBStateResult NexusKVStore::getPersistedVBucketState(
1563         KVFileHandle& handle, Vbid vbid) const {
1564     auto& nexusFileHandle = dynamic_cast<const NexusKVFileHandle&>(handle);
1565     auto primaryVbState = primary->getPersistedVBucketState(
1566             *nexusFileHandle.primaryFileHandle, vbid);
1567     auto secondaryVbState = secondary->getPersistedVBucketState(
1568             *nexusFileHandle.secondaryFileHandle, vbid);
1569 
1570     if (primaryVbState.status != secondaryVbState.status) {
1571         auto msg = fmt::format(
1572                 "NexusKVStore::getPersistedVBucketState(handle): {}:"
1573                 "difference in status primary:{} secondary:{}",
1574                 vbid,
1575                 to_string(primaryVbState.status),
1576                 to_string(secondaryVbState.status));
1577         handleError(msg, vbid);
1578     }
1579 
1580     if (primaryVbState.status != ReadVBStateStatus::Success) {
1581         // Only valid to compare below if we have a good state.
1582         return primaryVbState;
1583     }
1584 
1585     if (!compareVBucketState(
1586                 vbid, primaryVbState.state, secondaryVbState.state)) {
1587         auto msg = fmt::format(
1588                 "NexusKVStore::getPersistedVBucketState(handle): {}: "
1589                 "difference in vBucket state primary:{} "
1590                 "secondary:{}",
1591                 vbid,
1592                 primaryVbState.state,
1593                 secondaryVbState.state);
1594         handleError(msg, vbid);
1595     }
1596     return primaryVbState;
1597 }
1598 
1599 size_t NexusKVStore::getNumPersistedDeletes(Vbid vbid) {
1600     return primary->getNumPersistedDeletes(vbid);
1601 }
1602 
1603 DBFileInfo NexusKVStore::getDbFileInfo(Vbid dbFileId) {
1604     return primary->getDbFileInfo(dbFileId);
1605 }
1606 
1607 DBFileInfo NexusKVStore::getAggrDbFileInfo() {
1608     return primary->getAggrDbFileInfo();
1609 }
1610 
1611 size_t NexusKVStore::getItemCount(Vbid vbid) {
1612     auto primaryCount = primary->getItemCount(vbid);
1613     auto secondaryCount = secondary->getItemCount(vbid);
1614 
1615     // If primary supports prepare counting then a test is valid as we should be
1616     // able to adjust the value of the primary by the onDiskPrepares in the
1617     // vbstate. If the primary /does not/ support prepare counting though and
1618     // the secondary /does/ then we can't adjust correctly as we store the
1619     // vbstate of the primary everywhere. In this case, just skip the test and
1620     // return.
1621     if (!primary->getStorageProperties().hasPrepareCounting() &&
1622         secondary->getStorageProperties().hasPrepareCounting()) {
1623         return primaryCount;
1624     }
1625 
1626     // We return the primary value so we need to copy it to adjust for
1627     // comparison
1628     auto correctedPrimaryCount = primaryCount;
1629 
1630     size_t primaryPrepares = 0;
1631     if (primary->getStorageProperties().hasPrepareCounting()) {
1632         auto vbState = primary->getPersistedVBucketState(vbid);
1633         if (vbState.status != KVStoreIface::ReadVBStateStatus::Success) {
1634             auto msg = fmt::format(
1635                     "NexusKVStore::getItemCount: {}: failed to "
1636                     "get vbucket state for primary:{}",
1637                     vbid,
1638                     to_string(vbid));
1639             handleError(msg, vbid);
1640         }
1641         primaryPrepares = vbState.state.onDiskPrepares;
1642         correctedPrimaryCount -= vbState.state.onDiskPrepares;
1643     }
1644 
1645     size_t secondaryPrepares = 0;
1646     if (secondary->getStorageProperties().hasPrepareCounting()) {
1647         auto vbState = secondary->getPersistedVBucketState(vbid);
1648         if (vbState.status != KVStoreIface::ReadVBStateStatus::Success) {
1649             auto msg = fmt::format(
1650                     "NexusKVStore::getItemCount: {}: failed to "
1651                     "get vbucket state for secondary:{}",
1652                     vbid,
1653                     to_string(vbid));
1654             handleError(msg, vbid);
1655         }
1656         secondaryPrepares = vbState.state.onDiskPrepares;
1657         secondaryCount -= vbState.state.onDiskPrepares;
1658     }
1659 
1660     if (correctedPrimaryCount != secondaryCount) {
1661         auto msg = fmt::format(
1662                 "NexusKVStore::getItemCount: {}: difference in "
1663                 "item count primary:{} secondary:{} prepare count primary:{} "
1664                 "secondary:{}",
1665                 vbid,
1666                 correctedPrimaryCount,
1667                 secondaryCount,
1668                 primaryPrepares,
1669                 secondaryPrepares);
1670         handleError(msg, vbid);
1671     }
1672 
1673     // Return the primary result again
1674     return primaryCount;
1675 }
1676 
1677 /**
1678  * Rollback callback for NexusKVStore. This callback:
1679  *
1680  * a) forwards the callback invocation on to the original callback (if the
1681  *    original callback is supplied during construction)
1682  * b) forwards gets and sets of the file handle on to the original callback (if
1683  *    original callback is supplied during construction)
1684  * b) stores a copy of the key and seqno for comparison of the callback
1685  *    invocations between primary and secondary KVStores
1686  */
1687 class NexusRollbackCB : public RollbackCB {
1688 public:
1689     NexusRollbackCB(NexusKVStore& kvstore,
1690                     Vbid vbid,
1691                     std::unordered_map<DiskDocKey, uint64_t>& rollbacks,
1692                     std::unique_ptr<RollbackCB> originalCb = {})
1693         : kvstore(kvstore),
1694           vbid(vbid),
1695           rolledBack(rollbacks),
1696           originalCb(std::move(originalCb)) {
1697     }
1698 
1699     void callback(GetValue& val) override {
1700         // The item passed in here is the post-rollback item. We should compare
1701         // it to the items rolled back by the primary.
1702         Expects(val.item);
1703         auto [itr, emplaceResult] = rolledBack.try_emplace(
1704                 DiskDocKey(*val.item), val.item->getBySeqno());
1705         if (!emplaceResult) {
1706             auto msg = fmt::format(
1707                     "NexusRollbackCB::callback: {}: called back for {} with "
1708                     "seqno {} but callback already exists with seqno {}",
1709                     vbid,
1710                     cb::UserData(val.item->getKey().to_string()),
1711                     val.item->getBySeqno(),
1712                     itr->second);
1713             kvstore.handleError(msg, vbid);
1714         }
1715 
1716         if (originalCb) {
1717             originalCb->callback(val);
1718         }
1719     }
1720 
1721     void setKVFileHandle(std::unique_ptr<KVFileHandle> handle) override {
1722         // Give the file handle to the original (if it exists), otherwise we
1723         // need to store it ourselves.
1724         if (originalCb) {
1725             originalCb->setKVFileHandle(std::move(handle));
1726             return;
1727         }
1728 
1729         RollbackCB::setKVFileHandle(std::move(handle));
1730     }
1731 
1732     const KVFileHandle* getKVFileHandle() const override {
1733         // Get the file handle from the original if it exists (we should have
1734         // given it to the orignal via setKVFileHandle), otherwise we should
1735         // have store it in the parent so we should return that one.
1736         if (originalCb) {
1737             return originalCb->getKVFileHandle();
1738         }
1739 
1740         return RollbackCB::getKVFileHandle();
1741     }
1742 
1743     // Used for logging errors
1744     NexusKVStore& kvstore;
1745 
1746     // Used for logging errors
1747     Vbid vbid;
1748 
1749     /**
1750      * Map of DiskDocKey (includes prepare namespace) to seqno
1751      */
1752     std::unordered_map<DiskDocKey, uint64_t>& rolledBack;
1753 
1754     /**
1755      * Original callback to be invoked if set
1756      */
1757     std::unique_ptr<RollbackCB> originalCb;
1758 };
1759 
1760 struct NexusRollbackContext {
1761     KVStoreIface* kvstoreToRollbackFirst;
1762     KVStoreIface* kvstoreToRollbackSecond;
1763 };
1764 
1765 NexusRollbackContext NexusKVStore::calculateRollbackOrder() {
1766 #ifdef EP_USE_MAGMA
1767 
1768     bool primaryIsMagma = dynamic_cast<MagmaKVStore*>(primary.get());
1769     bool secondaryIsMagma = dynamic_cast<MagmaKVStore*>(secondary.get());
1770 
1771     if (!primaryIsMagma && secondaryIsMagma) {
1772         // Got to do magma (secondary) first
1773         return {secondary.get(), primary.get()};
1774     }
1775 #endif
1776 
1777     return {primary.get(), secondary.get()};
1778 }
1779 
1780 RollbackResult NexusKVStore::rollback(Vbid vbid,
1781                                       uint64_t rollbackseqno,
1782                                       std::unique_ptr<RollbackCB> ptr) {
1783     // We're not taking the lock for the vBucket here because the callback in
1784     // EPDiskRollbackCB is going to call getWithHeader for each item we roll
1785     // back. We're protected from getting into odd states though as:
1786     //
1787     // 1) This vBucket must be a replica so no bg fetches
1788     // 2) During rollback we take the vBucket write lock so no flushes
1789 
1790     // Skip checks, see member declaration for more details.
1791     skipGetWithHeaderChecksForRollback[getCacheSlot(vbid)] = true;
1792     auto guard = folly::makeGuard([this, vbid] {
1793         skipGetWithHeaderChecksForRollback[getCacheSlot(vbid)] = false;
1794     });
1795 
1796     std::unordered_map<DiskDocKey, uint64_t> primaryRollbacks;
1797     auto primaryCb = std::make_unique<NexusRollbackCB>(
1798             *this, vbid, primaryRollbacks, std::move(ptr));
1799 
1800     std::unordered_map<DiskDocKey, uint64_t> secondaryRollbacks;
1801     auto secondaryCb =
1802             std::make_unique<NexusRollbackCB>(*this, vbid, secondaryRollbacks);
1803 
1804     // Magma is only going to keep n checkpoints (i.e. n rollback points) in
1805     // memory and as we're checkpointing every flush batch (to ensure that
1806     // rollback points are consistent between magma and couchstore) that's
1807     // effectively n flush batches. Best thing to do here is to do the magma
1808     // rollback first (assuming we are running magma) and then assert later that
1809     // couchstore rolls back to the same seqno. Should magma be unable to roll
1810     // back to anything other than 0 then there's no point rolling back
1811     // couchstore.
1812     auto nexusRollbackContext = calculateRollbackOrder();
1813 
1814     auto firstResult = nexusRollbackContext.kvstoreToRollbackFirst->rollback(
1815             vbid,
1816             rollbackseqno,
1817             nexusRollbackContext.kvstoreToRollbackFirst == primary.get()
1818                     ? std::move(primaryCb)
1819                     : std::move(secondaryCb));
1820 
1821     if (!firstResult.success) {
1822         // Need to roll back to zero, may as well just return now
1823         return firstResult;
1824     }
1825 
1826     auto secondResult = nexusRollbackContext.kvstoreToRollbackSecond->rollback(
1827             vbid,
1828             rollbackseqno,
1829             nexusRollbackContext.kvstoreToRollbackSecond == primary.get()
1830                     ? std::move(primaryCb)
1831                     : std::move(secondaryCb));
1832 
1833     if (firstResult != secondResult) {
1834         auto msg = fmt::format(
1835                 "NexusKVStore::rollback: {}: rollback result not equal "
1836                 "first:{} second:{}",
1837                 vbid,
1838                 firstResult,
1839                 secondResult);
1840         handleError(msg, vbid);
1841     }
1842 
1843     for (const auto& [key, seqno] : primaryRollbacks) {
1844         auto itr = secondaryRollbacks.find(key);
1845         if (itr == secondaryRollbacks.end()) {
1846             if (seqno <= getPurgeSeqno(vbid)) {
1847                 // Below the purge seqno, comparison not valid
1848                 skippedChecksDueToPurging++;
1849                 continue;
1850             }
1851 
1852             auto msg = fmt::format(
1853                     "NexusKVStore::rollback: {}: primary invoked rollback "
1854                     "callback for {} at seqno {} but secondary did not",
1855                     vbid,
1856                     cb::UserData(key.to_string()),
1857                     seqno);
1858             handleError(msg, vbid);
1859         }
1860         secondaryRollbacks.erase(itr);
1861     }
1862 
1863     if (!secondaryRollbacks.empty()) {
1864         auto msg = fmt::format(
1865                 "NexusKVStore::rollback: {}: secondary callbacks invocations "
1866                 "not made by primary:",
1867                 vbid);
1868         for (const auto& [key, seqno] : secondaryRollbacks) {
1869             fmt::format_to(std::back_inserter(msg),
1870                            "[key:{},seqno:{}],",
1871                            cb::UserData(key.to_string()),
1872                            seqno);
1873         }
1874         msg.pop_back();
1875         handleError(msg, vbid);
1876     }
1877 
1878     doCollectionsMetadataChecks(vbid, nullptr, nullptr);
1879 
1880     return nexusRollbackContext.kvstoreToRollbackFirst == primary.get()
1881                    ? firstResult
1882                    : secondResult;
1883 }
1884 
1885 void NexusKVStore::pendingTasks() {
1886     primary->pendingTasks();
1887     secondary->pendingTasks();
1888 }
1889 
1890 /**
1891  * GetAllKeys callback invocations
1892  * Key is stored for comparison with the key that is returned by the secondary
1893  * Error code is stored to forward on the error code from the primary to the
1894  * secondary (i.e. if we stop scanning the primary after 5 items then the
1895  * secondary should stop too).
1896  */
1897 using NexusGetAllKeysCallbackCallbacks =
1898         std::deque<std::pair<DiskDocKey, cb::engine_errc>>;
1899 
1900 /**
1901  * GetAllKeysCallback for use with the primary KVStore. Passes the callback
1902  * invocations along to the original callback which will:
1903  *
1904  * a) do the actual logic with the key
1905  * b) return a cancel status if we should stop scanning
1906  */
1907 class NexusKVStorePrimaryGetAllKeysCallback
1908     : public StatusCallback<const DiskDocKey&> {
1909 public:
1910     NexusKVStorePrimaryGetAllKeysCallback(
1911             std::shared_ptr<StatusCallback<const DiskDocKey&>> cb)
1912         : originalCb(std::move(cb)) {
1913     }
1914 
1915     void callback(const DiskDocKey& key) override {
1916         // Forward on to the original callback first to get the status
1917         originalCb->callback(key);
1918 
1919         // Set our status to that of the original callback to stop scanning if
1920         // required
1921         setStatus(originalCb->getStatus());
1922 
1923         // Store this invocation for later comparison with the secondary
1924         callbacks.emplace_back(key, cb::engine_errc(getStatus()));
1925     }
1926 
1927     NexusGetAllKeysCallbackCallbacks callbacks;
1928     std::shared_ptr<StatusCallback<const DiskDocKey&>> originalCb;
1929 };
1930 
1931 /**
1932  * GetAllKeysCallback for use with the secondary KVStore. Invocations are
1933  * compared with those made by the primary and the status that the primary
1934  * returned is then returned by this callback to stop scanning at the same point
1935  */
1936 class NexusKVStoreSecondaryGetAllKeysCallback
1937     : public StatusCallback<const DiskDocKey&> {
1938 public:
1939     NexusKVStoreSecondaryGetAllKeysCallback(
1940             const NexusKVStore& kvstore,
1941             Vbid vbid,
1942             NexusGetAllKeysCallbackCallbacks& primaryCallbacks)
1943         : kvstore(kvstore), vbid(vbid), primaryCallbacks(primaryCallbacks) {
1944     }
1945 
1946     void callback(const DiskDocKey& key) override {
1947         // Callbacks should be in the same order, but purging could mean that
1948         // there are gaps in either. getAllKeys doens't use a file handle so
1949         // the purge seqno is not consistent with the scanned items so we can
1950         // only make a best effort here and check thoroughly if the purge seqno
1951         // for both is 0.
1952         if (kvstore.getPurgeSeqno(vbid) != 0) {
1953             EP_LOG_DEBUG(
1954                     "NexusKVStore::SecondaryGetAllKeys::callback {}: purge "
1955                     "seqno is non-zero ({}) so no checks are valid",
1956                     vbid,
1957                     kvstore.getPurgeSeqno(vbid));
1958             primaryCallbacks.clear();
1959             kvstore.skippedChecksDueToPurging++;
1960             return;
1961         }
1962 
1963         if (primaryCallbacks.empty()) {
1964             auto msg = fmt::format(
1965                     "NexusSecondaryGetAllKeysCallback::callback: {}: primary "
1966                     "made fewer invocations. Secondary key:{}",
1967                     vbid,
1968                     cb::UserData(key.to_string()));
1969             kvstore.handleError(msg, vbid);
1970         }
1971 
1972         const auto& [primaryKey, primaryResult] = primaryCallbacks.front();
1973         if (primaryKey != key) {
1974             auto msg = fmt::format(
1975                     "NexusSecondaryGetAllKeysCallback::callback: {}: invoked "
1976                     "with different key primary:{} secondary:{}",
1977                     vbid,
1978                     cb::UserData(primaryKey.to_string()),
1979                     cb::UserData(key.to_string()));
1980             kvstore.handleError(msg, vbid);
1981         }
1982 
1983         // Set our status so that we stop scanning after the same number of
1984         // items as the primary
1985         setStatus(primaryResult);
1986 
1987         primaryCallbacks.pop_front();
1988     }
1989 
1990     // For logging discrepancies
1991     const NexusKVStore& kvstore;
1992     Vbid vbid;
1993     NexusGetAllKeysCallbackCallbacks& primaryCallbacks;
1994 };
1995 
1996 cb::engine_errc NexusKVStore::getAllKeys(
1997         Vbid vbid,
1998         const DiskDocKey& start_key,
1999         uint32_t count,
2000         std::shared_ptr<StatusCallback<const DiskDocKey&>> cb) const {
2001     auto lh = getLock(vbid);
2002 
2003     auto primaryCallback =
2004             std::make_shared<NexusKVStorePrimaryGetAllKeysCallback>(cb);
2005     auto secondaryCallback =
2006             std::make_shared<NexusKVStoreSecondaryGetAllKeysCallback>(
2007                     *this, vbid, primaryCallback->callbacks);
2008 
2009     auto primaryResult =
2010             primary->getAllKeys(vbid, start_key, count, primaryCallback);
2011     auto secondaryResult =
2012             secondary->getAllKeys(vbid, start_key, count, secondaryCallback);
2013 
2014     if (primaryResult != secondaryResult) {
2015         auto msg = fmt::format(
2016                 "NexusKVStore::getAllKeys: {}: different result "
2017                 "primary:{} secondary:{}",
2018                 vbid,
2019                 primaryResult,
2020                 secondaryResult);
2021         handleError(msg, vbid);
2022     }
2023 
2024     if (!secondaryCallback->primaryCallbacks.empty()) {
2025         std::stringstream ss;
2026         for (auto& [key, errc] : secondaryCallback->primaryCallbacks) {
2027             ss << cb::UserData(key.to_string()) << ",";
2028         }
2029         ss.unget();
2030 
2031         auto msg = fmt::format(
2032                 "NexusKVStore::getAllKeys: {}: callbacks made by primary but "
2033                 "not secondary: {}",
2034                 vbid,
2035                 ss.str());
2036         handleError(msg, vbid);
2037     }
2038 
2039     return primaryResult;
2040 }
2041 
2042 bool NexusKVStore::supportsHistoricalSnapshots() const {
2043     return primary->supportsHistoricalSnapshots() &&
2044            secondary->supportsHistoricalSnapshots();
2045 }
2046 
2047 template <class Element>
2048 using NexusScanCallbackQueue = std::deque<std::pair<Element, cb::engine_errc>>;
2049 
2050 /**
2051  * Scan callback invocations.
2052  * Item is stored to compare the value returned from the secondary to the value
2053  * returned from the primary. We copy this rather than the GetValue that we
2054  * invoked the callback with as the GetValue holds a unique_ptr to this Item
2055  * which can't be copied.
2056  * Error code is stored to forward on the error code from the primary to the
2057  * secondary (i.e. if we stop scanning the primary after 5 items then the
2058  * secondary should stop too).
2059  */
2060 using NexusScanCallbacks = NexusScanCallbackQueue<Item>;
2061 
2062 /**
2063  * ScanCallback for use with the primary KVStore. This ScanCallback will pass
2064  * the callback invocations along to the original callback which will:
2065  *
2066  * a) do the "actual" logic with the item
2067  * b) give us a no mem return if we should stop scanning
2068  */
2069 class NexusPrimaryScanCallback : public StatusCallback<GetValue> {
2070 public:
2071     NexusPrimaryScanCallback(
2072             std::unique_ptr<StatusCallback<GetValue>> originalCb)
2073         : originalCb(std::move(originalCb)) {
2074     }
2075 
2076     void callback(GetValue& val) override {
2077         // Copy our item now as the originalCb will consume it
2078         auto item = *val.item;
2079 
2080         originalCb->callback(val);
2081         setStatus(originalCb->getStatus());
2082 
2083         // Now that we've set our status we can store this "invocation"
2084         callbacks.emplace_back(std::move(item), cb::engine_errc(getStatus()));
2085     }
2086 
2087     NexusScanCallbacks callbacks;
2088     std::unique_ptr<StatusCallback<GetValue>> originalCb;
2089 };
2090 
2091 /**
2092  * ScanCallback for use with the secondary KVStore. This ScanCallback will check
2093  * the invocation made by the secondary KVStore again the one made by the
2094  * primary.
2095  */
2096 class NexusSecondaryScanCallback : public StatusCallback<GetValue> {
2097 public:
2098     NexusSecondaryScanCallback(const NexusKVStore& kvstore,
2099                                Vbid vb,
2100                                NexusScanCallbacks& primaryCbs)
2101         : kvstore(kvstore), vbid(vb), primaryCallbacks(primaryCbs) {
2102     }
2103 
2104     void callback(GetValue& val) override {
2105         if (primaryCallbacks.empty() &&
2106             static_cast<uint64_t>(val.item->getBySeqno()) <=
2107                     kvstore.getPurgeSeqno(vbid)) {
2108             // primaryCallbacks could be empty if we're below the purge seqno
2109             // and paused in an inconvenient place. Yield because we don't want
2110             // the secondary scanning farther than the primary did and getting
2111             // out of sync
2112             yield();
2113             kvstore.skippedChecksDueToPurging++;
2114             return;
2115         }
2116 
2117         // Pop anything the primary visited that is under the purge seqno if
2118         // it's not our item as the secondary may have purged something
2119         while (!primaryCallbacks.empty() &&
2120                static_cast<uint64_t>(
2121                        primaryCallbacks.front().first.getBySeqno()) <=
2122                        kvstore.getPurgeSeqno(vbid) &&
2123                !kvstore.compareItem(primaryCallbacks.front().first,
2124                                     *val.item)) {
2125             primaryCallbacks.pop_front();
2126             kvstore.skippedChecksDueToPurging++;
2127         }
2128 
2129         if (primaryCallbacks.empty()) {
2130             // Shouldn't happen provided that the KVStores are in sync, but we
2131             // don't want to use primaryCallbacks.front() below if it is empty
2132             // or we'd segfault.
2133             auto msg = fmt::format(
2134                     "NexusSecondaryScanCallback::callback: {} key:{} scanned "
2135                     "item:{} but did not find primaryCallback",
2136                     vbid,
2137                     cb::UserData(val.item->getKey().to_string()),
2138                     *val.item);
2139             kvstore.handleError(msg, vbid);
2140         }
2141 
2142         auto& [primaryVal, primaryStatus] = primaryCallbacks.front();
2143 
2144         // Item should match the one returned by the primary
2145         if (!kvstore.compareItem(primaryVal, *val.item)) {
2146             if (static_cast<uint64_t>(val.item->getBySeqno()) <=
2147                 kvstore.getPurgeSeqno(vbid)) {
2148                 kvstore.skippedChecksDueToPurging++;
2149                 return;
2150             }
2151 
2152             auto msg = fmt::format(
2153                     "NexusSecondaryScanCallback::callback: {} key:{} "
2154                     "item mismatch primary:{} secondary:{}",
2155                     vbid,
2156                     cb::UserData(primaryVal.getKey().to_string()),
2157                     primaryVal,
2158                     *val.item);
2159             kvstore.handleError(msg, vbid);
2160         }
2161 
2162         // Set our status to that of the primary so we can stop scanning after
2163         // the same number of items
2164         setStatus(primaryStatus);
2165         primaryCallbacks.pop_front();
2166     }
2167 
2168     // For logging discrepancies
2169     const NexusKVStore& kvstore;
2170     Vbid vbid;
2171     NexusScanCallbacks& primaryCallbacks;
2172 };
2173 
2174 /**
2175  * ScanCallback for use in the NexusScanContext. This ScanCallback shouldn't get
2176  * called on but needs to exist to compile
2177  */
2178 class NexusDummyScanCallback : public StatusCallback<GetValue> {
2179     void callback(GetValue& val) override {
2180         folly::assume_unreachable();
2181     }
2182 };
2183 
2184 /**
2185  * Cache lookup invocations.
2186  * CacheLookup is stored for comparison with the one made by the secondary
2187  * KVStore.
2188  * Error code is stored to forward on the error code from the primary to the
2189  * secondary (i.e. if we stop scanning the primary after 5 items then the
2190  * secondary should stop too).
2191  */
2192 using NexusCacheLookups = NexusScanCallbackQueue<CacheLookup>;
2193 
2194 /**
2195  * CacheLookup for use with the primary KVStore. Usage is similar to the
2196  * NexusPrimaryScanCallback.
2197  */
2198 class NexusPrimaryCacheLookup : public StatusCallback<CacheLookup> {
2199 public:
2200     NexusPrimaryCacheLookup(
2201             std::unique_ptr<StatusCallback<CacheLookup>> originalCb)
2202         : originalCb(std::move(originalCb)) {
2203     }
2204 
2205     void callback(CacheLookup& val) override {
2206         originalCb->callback(val);
2207         setStatus(originalCb->getStatus());
2208 
2209         callbacks.emplace_back(val, cb::engine_errc(getStatus()));
2210     }
2211 
2212     NexusCacheLookups callbacks;
2213     std::unique_ptr<StatusCallback<CacheLookup>> originalCb;
2214 };
2215 
2216 /**
2217  * CacheLookup for use with the secondary KVStore. Usage is similar to
2218  * NexusSecondaryScanContext.
2219  */
2220 class NexusSecondaryCacheLookup : public StatusCallback<CacheLookup> {
2221 public:
2222     NexusSecondaryCacheLookup(const NexusKVStore& kvstore,
2223                               Vbid vbid,
2224                               NexusCacheLookups& primaryCbs)
2225         : kvstore(kvstore), vbid(vbid), primaryCallbacks(primaryCbs) {
2226     }
2227 
2228     void callback(CacheLookup& val) override {
2229         // primaryCallbacks could be empty if we're below the purge seqno and
2230         // paused in an inconvenient place. Yield because we don't want the
2231         // secondary scanning farther than the primary and getting out of sync
2232         if (primaryCallbacks.empty() &&
2233             static_cast<uint64_t>(val.getBySeqno()) <=
2234                     kvstore.getPurgeSeqno(vbid)) {
2235             yield();
2236             kvstore.skippedChecksDueToPurging++;
2237             return;
2238         }
2239 
2240         // We should have at least one invocation because we filter out when we
2241         // forward on the callback so we should see every invocation here
2242         // regardless of status.
2243         Expects(!primaryCallbacks.empty());
2244 
2245         // Special case where every item seen by the primary scan has been
2246         // purged by the secondary already. We need to stop now to keep the scan
2247         // in-sync.
2248         for (auto itr = primaryCallbacks.rbegin();
2249              itr != primaryCallbacks.rend();
2250              itr++) {
2251             if (itr->second == cb::engine_errc::success) {
2252                 if (static_cast<uint64_t>(itr->first.getBySeqno()) <=
2253                             kvstore.getPurgeSeqno(vbid) &&
2254                     itr->first.getBySeqno() < val.getBySeqno()) {
2255                     // We want to stop the scan now, but we need to set
2256                     // the last read seqno to resume from the same point as the
2257                     // primary
2258                     primaryCallbacks.clear();
2259                     yield();
2260                     return;
2261                 } else {
2262                     break;
2263                 };
2264             }
2265         }
2266 
2267         // Remove anything under the purge seqno from the start of
2268         // primaryCallbacks if it's not the item that the secondary has made
2269         // this callback for
2270         auto [primaryVal, primaryStatus] = primaryCallbacks.front();
2271         while (static_cast<uint64_t>(primaryVal.getBySeqno()) <=
2272                        kvstore.getPurgeSeqno(vbid) &&
2273                primaryVal != val) {
2274             primaryCallbacks.pop_front();
2275 
2276             // Nothing to read here...
2277             if (primaryCallbacks.empty()) {
2278                 break;
2279             }
2280 
2281             std::tie(primaryVal, primaryStatus) = primaryCallbacks.front();
2282             kvstore.skippedChecksDueToPurging++;
2283         }
2284 
2285         if (primaryCallbacks.empty()) {
2286             // If primary has purged more than secondary then we may see that
2287             // primaryCallbacks is now empty. Pause the scan now rather than
2288             // continue so that we can pick up more primary items and deal with
2289             // this later. If we continued now we'd skip keys for the secondary
2290             // that the primary may see.
2291             if (static_cast<uint64_t>(val.getBySeqno()) <=
2292                 kvstore.getPurgeSeqno(vbid)) {
2293                 kvstore.skippedChecksDueToPurging++;
2294                 yield();
2295                 return;
2296             }
2297 
2298             // Shouldn't happen provided that the KVStores are in sync, but we
2299             // don't want to use primaryCallbacks.front() below if it is empty
2300             // or we'd segfault.
2301             auto msg = fmt::format(
2302                     "NexusSecondaryCacheCallback::callback: {} key:{} scanned "
2303                     "item seqno:{} but did not find primaryCallback",
2304                     vbid,
2305                     cb::UserData(val.getKey().to_string()),
2306                     val.getBySeqno());
2307             kvstore.handleError(msg, vbid);
2308         }
2309 
2310         if (primaryVal != val) {
2311             if (static_cast<uint64_t>(val.getBySeqno()) <=
2312                 kvstore.getPurgeSeqno(vbid)) {
2313                 kvstore.skippedChecksDueToPurging++;
2314                 return;
2315             }
2316 
2317             auto msg = fmt::format(
2318                     "NexusSecondaryCacheLookup::callback: {} "
2319                     "cache lookup mismatch key:{} primary seqno:{} secondary "
2320                     "seqno:{}",
2321                     vbid,
2322                     cb::UserData(primaryVal.getKey().to_string()),
2323                     primaryVal.getBySeqno(),
2324                     val.getBySeqno());
2325             kvstore.handleError(msg, vbid);
2326         }
2327 
2328         // Set our status to that of the primary so we can stop scanning after
2329         // the same number of items
2330         setStatus(primaryStatus);
2331         primaryCallbacks.pop_front();
2332     }
2333 
2334     // For logging discrepancies
2335     const NexusKVStore& kvstore;
2336     Vbid vbid;
2337     NexusCacheLookups& primaryCallbacks;
2338 };
2339 
2340 /**
2341  * CacheLookup for use in the NexusScanContext. This CacheLookup shouldn't get
2342  * called on but needs to exist to compile
2343  */
2344 class NexusDummyCacheLookup : public StatusCallback<CacheLookup> {
2345     void callback(CacheLookup& val) override {
2346         folly::assume_unreachable();
2347     }
2348 };
2349 
2350 class NexusKVStoreBySeqnoScanContext : public BySeqnoScanContext {
2351 public:
2352     NexusKVStoreBySeqnoScanContext(
2353             std::unique_ptr<StatusCallback<GetValue>> cb,
2354             std::unique_ptr<StatusCallback<CacheLookup>> cl,
2355             Vbid vb,
2356             std::unique_ptr<KVFileHandle> handle,
2357             int64_t start,
2358             int64_t end,
2359             uint64_t purgeSeqno,
2360             DocumentFilter _docFilter,
2361             ValueFilter _valFilter,
2362             uint64_t _documentCount,
2363             const vbucket_state& vbucketState,
2364             const std::vector<Collections::KVStore::DroppedCollection>&
2365                     droppedCollections)
2366         : BySeqnoScanContext(std::move(cb),
2367                              std::move(cl),
2368                              vb,
2369                              std::move(handle),
2370                              start,
2371                              end,
2372                              purgeSeqno,
2373                              _docFilter,
2374                              _valFilter,
2375                              _documentCount,
2376                              vbucketState,
2377                              droppedCollections) {
2378     }
2379 
2380     /**
2381      * @return the original callback that now lives in the
2382      * NexusPrimaryScanCallback as the caller wants it's own callback
2383      */
2384     const StatusCallback<GetValue>& getValueCallback() const override {
2385         auto& nexusCallback = dynamic_cast<NexusPrimaryScanCallback&>(
2386                 primaryCtx->getValueCallback());
2387         return *nexusCallback.originalCb;
2388     }
2389 
2390     /**
2391      * @return the original callback that now lives in the
2392      * NexusPrimaryScanCallback as the caller wants it's own callback
2393      */
2394     StatusCallback<GetValue>& getValueCallback() override {
2395         auto& nexusCallback = dynamic_cast<NexusPrimaryScanCallback&>(
2396                 primaryCtx->getValueCallback());
2397         return *nexusCallback.originalCb;
2398     }
2399 
2400     /**
2401      * @return the original callback that now lives in the
2402      * NexusPrimaryCacheLookup as the caller wants it's own callback
2403      */
2404     const StatusCallback<CacheLookup>& getCacheCallback() const override {
2405         auto& nexusCallback = dynamic_cast<NexusPrimaryCacheLookup&>(
2406                 primaryCtx->getCacheCallback());
2407         return *nexusCallback.originalCb;
2408     }
2409 
2410     /**
2411      * @return the original callback that now lives in the
2412      * NexusPrimaryCacheLookup as the caller wants it's own callback
2413      */
2414     StatusCallback<CacheLookup>& getCacheCallback() override {
2415         auto& nexusCallback = dynamic_cast<NexusPrimaryCacheLookup&>(
2416                 primaryCtx->getCacheCallback());
2417         return *nexusCallback.originalCb;
2418     }
2419 
2420     std::unique_ptr<BySeqnoScanContext> primaryCtx;
2421     std::unique_ptr<BySeqnoScanContext> secondaryCtx;
2422 };
2423 
2424 std::unique_ptr<BySeqnoScanContext> NexusKVStore::initBySeqnoScanContext(
2425         std::unique_ptr<StatusCallback<GetValue>> cb,
2426         std::unique_ptr<StatusCallback<CacheLookup>> cl,
2427         Vbid vbid,
2428         uint64_t startSeqno,
2429         DocumentFilter options,
2430         ValueFilter valOptions,
2431         SnapshotSource source,
2432         std::unique_ptr<KVFileHandle> fileHandle) const {
2433     // Need to take the Nexus lock for the vBucket to stop racing flushes (or
2434     // compactions) from modifying one of the KVStores and not the other
2435     auto lh = getLock(vbid);
2436 
2437     // The primary KVStore ScanContext will own and invoke the original
2438     // callbacks as we need to invoke them to work out how many items the
2439     // secondary KVStore has to scan
2440     auto primaryCb = std::make_unique<NexusPrimaryScanCallback>(std::move(cb));
2441     auto primaryCl = std::make_unique<NexusPrimaryCacheLookup>(std::move(cl));
2442     auto primaryCtx = primary->initBySeqnoScanContext(std::move(primaryCb),
2443                                                       std::move(primaryCl),
2444                                                       vbid,
2445                                                       startSeqno,
2446                                                       options,
2447                                                       valOptions,
2448                                                       source);
2449 
2450     std::unique_ptr<BySeqnoScanContext> secondaryCtx;
2451 
2452     if (!primaryCtx) {
2453         // This could happen if we try to scan when nothing exists, returning
2454         // nullptr is what the underlying KVStores do too. We need the ctx for
2455         // further construction so may as well abort now. The underlying KVStore
2456         // should have logged some error...
2457         return nullptr;
2458     }
2459 
2460     auto& primaryScanCallback = dynamic_cast<NexusPrimaryScanCallback&>(
2461             primaryCtx->getValueCallback());
2462     auto secondaryCb = std::make_unique<NexusSecondaryScanCallback>(
2463             *this, vbid, primaryScanCallback.callbacks);
2464 
2465     auto& primaryCacheCallback = dynamic_cast<NexusPrimaryCacheLookup&>(
2466             primaryCtx->getCacheCallback());
2467     auto secondaryCl = std::make_unique<NexusSecondaryCacheLookup>(
2468             *this, vbid, primaryCacheCallback.callbacks);
2469     secondaryCtx = secondary->initBySeqnoScanContext(std::move(secondaryCb),
2470                                                      std::move(secondaryCl),
2471                                                      vbid,
2472                                                      startSeqno,
2473                                                      options,
2474                                                      valOptions,
2475                                                      source);
2476 
2477     if (!secondaryCtx) {
2478         // If we could build the primaryCtx but not the secondary then something
2479         // is wrong.
2480         auto msg = fmt::format(
2481                 "NexusKVStore::initBySeqnoScanContext: {}: "
2482                 "failed to create the secondary scan context. "
2483                 "Check secondary KVStore logs for details.",
2484                 vbid);
2485         handleError(msg, vbid);
2486     }
2487 
2488     // Some error checking for the two contexts before we create the
2489     // NexusScanContext
2490     if (primaryCtx->startSeqno != secondaryCtx->startSeqno) {
2491         auto msg = fmt::format(
2492                 "NexusKVStore::initBySeqnoScanContext: {}: "
2493                 "scan ctx start seqno not equal primary:{} "
2494                 "secondary:{}",
2495                 vbid,
2496                 primaryCtx->startSeqno,
2497                 secondaryCtx->startSeqno);
2498         handleError(msg, vbid);
2499     }
2500 
2501     if (primaryCtx->maxVisibleSeqno != secondaryCtx->maxVisibleSeqno) {
2502         auto msg = fmt::format(
2503                 "NexusKVStore::initBySeqnoScanContext: {}: "
2504                 "scan ctx max visible seqno not equal "
2505                 "primary:{} secondary:{}",
2506                 vbid,
2507                 primaryCtx->purgeSeqno,
2508                 secondaryCtx->purgeSeqno);
2509         handleError(msg, vbid);
2510     }
2511 
2512     if (primaryCtx->persistedCompletedSeqno !=
2513         secondaryCtx->persistedCompletedSeqno) {
2514         auto msg = fmt::format(
2515                 "NexusKVStore::initBySeqnoScanContext: {}: "
2516                 "scan ctx persisted completed seqno not equal "
2517                 "primary:{} secondary:{}",
2518                 vbid,
2519                 primaryCtx->persistedCompletedSeqno,
2520                 secondaryCtx->persistedCompletedSeqno);
2521         handleError(msg, vbid);
2522     }
2523 
2524     if (primaryCtx->collectionsContext != secondaryCtx->collectionsContext) {
2525         auto msg = fmt::format(
2526                 "NexusKVStore::initBySeqnoScanContext: {}: "
2527                 "scan ctx collections context not equal "
2528                 "primary:{} secondary:{}",
2529                 vbid,
2530                 primaryCtx->collectionsContext,
2531                 secondaryCtx->collectionsContext);
2532         handleError(msg, vbid);
2533     }
2534 
2535     if (primaryCtx->maxSeqno != secondaryCtx->maxSeqno) {
2536         auto msg = fmt::format(
2537                 "NexusKVStore::initBySeqnoScanContext: {}: "
2538                 "scan ctx maxSeqno not equal "
2539                 "primary:{} secondary:{}",
2540                 vbid,
2541                 primaryCtx->maxSeqno,
2542                 secondaryCtx->maxSeqno);
2543         handleError(msg, vbid);
2544     }
2545 
2546     // Acquiring the lock at the start of this function means that nothing
2547     // should be running that can modify the file handle that we grab here. We
2548     // need this in the NexusScanContext as it's exposed to callers
2549     // to use
2550     auto handle = makeFileHandle(vbid);
2551     if (!handle) {
2552         auto msg = fmt::format(
2553                 "NexusKVStore::initBySeqnoScanContext: {}: "
2554                 "failed to get the primary file handle. Check "
2555                 "primary KVStore logs for details.",
2556                 vbid);
2557         handleError(msg, vbid);
2558     }
2559 
2560     // We need the vbstate and dropped collections to construct the scan
2561     // context. Again, the lock acquired at the start of this function means
2562     // that these should be consistent even though we're not getting them from a
2563     // snapshot
2564     auto vbstate = getPersistedVBucketState(vbid);
2565     if (vbstate.status != ReadVBStateStatus::Success) {
2566         auto msg = fmt::format(
2567                 "NexusKVStore::initBySeqnoScanContext: {}:"
2568                 "failed to get the primary vbstate. Check primary KVStore "
2569                 "logs for details",
2570                 vbid);
2571         handleError(msg, vbid);
2572     }
2573 
2574     auto [droppedStatus, droppedCollections] = getDroppedCollections(vbid);
2575 
2576     // Dummy callbacks won't get invoked
2577     auto dummyCb = std::make_unique<NexusDummyScanCallback>();
2578     auto dummyCl = std::make_unique<NexusDummyCacheLookup>();
2579     auto nexusScanContext = std::make_unique<NexusKVStoreBySeqnoScanContext>(
2580             std::move(dummyCb),
2581             std::move(dummyCl),
2582             vbid,
2583             std::move(handle),
2584             startSeqno,
2585             primaryCtx->maxSeqno,
2586             primaryCtx->purgeSeqno,
2587             options,
2588             valOptions,
2589             primaryCtx->documentCount,
2590             vbstate.state,
2591             droppedCollections);
2592 
2593     nexusScanContext->primaryCtx = std::move(primaryCtx);
2594     nexusScanContext->secondaryCtx = std::move(secondaryCtx);
2595 
2596     return nexusScanContext;
2597 }
2598 
2599 class NexusKVStoreByIdScanContext : public ByIdScanContext {
2600 public:
2601     NexusKVStoreByIdScanContext(
2602             std::unique_ptr<StatusCallback<GetValue>> cb,
2603             std::unique_ptr<StatusCallback<CacheLookup>> cl,
2604             Vbid vb,
2605             std::unique_ptr<KVFileHandle> handle,
2606             std::vector<ByIdRange> ranges,
2607             DocumentFilter _docFilter,
2608             ValueFilter _valFilter,
2609             const std::vector<Collections::KVStore::DroppedCollection>&
2610                     droppedCollections,
2611             int64_t maxSeqno)
2612         : ByIdScanContext(std::move(cb),
2613                           std::move(cl),
2614                           vb,
2615                           std::move(handle),
2616                           ranges,
2617                           _docFilter,
2618                           _valFilter,
2619                           droppedCollections,
2620                           maxSeqno) {
2621     }
2622 
2623     /**
2624      * @return the original callback that now lives in the
2625      * NexusPrimaryScanCallback as the caller wants it's own callback
2626      */
2627     const StatusCallback<GetValue>& getValueCallback() const override {
2628         auto& nexusCallback = dynamic_cast<NexusPrimaryScanCallback&>(
2629                 primaryCtx->getValueCallback());
2630         return *nexusCallback.originalCb;
2631     }
2632 
2633     /**
2634      * @return the original callback that now lives in the
2635      * NexusPrimaryScanCallback as the caller wants it's own callback
2636      */
2637     StatusCallback<GetValue>& getValueCallback() override {
2638         auto& nexusCallback = dynamic_cast<NexusPrimaryScanCallback&>(
2639                 primaryCtx->getValueCallback());
2640         return *nexusCallback.originalCb;
2641     }
2642 
2643     /**
2644      * @return the original callback that now lives in the
2645      * NexusPrimaryCacheLookup as the caller wants it's own callback
2646      */
2647     const StatusCallback<CacheLookup>& getCacheCallback() const override {
2648         auto& nexusCallback = dynamic_cast<NexusPrimaryCacheLookup&>(
2649                 primaryCtx->getCacheCallback());
2650         return *nexusCallback.originalCb;
2651     }
2652 
2653     /**
2654      * @return the original callback that now lives in the
2655      * NexusPrimaryCacheLookup as the caller wants it's own callback
2656      */
2657     StatusCallback<CacheLookup>& getCacheCallback() override {
2658         auto& nexusCallback = dynamic_cast<NexusPrimaryCacheLookup&>(
2659                 primaryCtx->getCacheCallback());
2660         return *nexusCallback.originalCb;
2661     }
2662 
2663     std::unique_ptr<ByIdScanContext> primaryCtx;
2664     std::unique_ptr<ByIdScanContext> secondaryCtx;
2665 };
2666 
2667 static std::string formatElement(const Item& item, cb::engine_errc status) {
2668     return fmt::format("[key:'{}',seqno:{},deleted:{},status:{}],",
2669                        cb::UserData(item.getKey().to_string()),
2670                        item.getBySeqno(),
2671                        item.isDeleted(),
2672                        status);
2673 }
2674 
2675 static std::string formatElement(const CacheLookup& cl,
2676                                  cb::engine_errc status) {
2677     return fmt::format("[key:'{}',seqno:{},status:{}],",
2678                        cb::UserData(cl.getKey().to_string()),
2679                        cl.getBySeqno(),
2680                        status);
2681 }
2682 
2683 template <class T>
2684 void NexusKVStore::checkScanCallbacks(Vbid vbid, const T& callbacks) const {
2685     if (!callbacks.empty()) {
2686         auto firstItem = callbacks.back().first;
2687         if (static_cast<uint64_t>(firstItem.getBySeqno()) >
2688             getPurgeSeqno(vbid)) {
2689             auto msg = fmt::format(
2690                     "NexusKVStore::checkScanCallbacks: {}: {} scan "
2691                     "callbacks were not matched by secondary scan "
2692                     "callbacks ",
2693                     vbid,
2694                     callbacks.size());
2695             for (auto& [e, status] : callbacks) {
2696                 msg.append(formatElement(e, status));
2697             }
2698             msg.pop_back();
2699             handleError(msg, vbid);
2700         } else {
2701             skippedChecksDueToPurging++;
2702         }
2703     }
2704 }
2705 
2706 std::unique_ptr<ByIdScanContext> NexusKVStore::initByIdScanContext(
2707         std::unique_ptr<StatusCallback<GetValue>> cb,
2708         std::unique_ptr<StatusCallback<CacheLookup>> cl,
2709         Vbid vbid,
2710         const std::vector<ByIdRange>& ranges,
2711         DocumentFilter docFilter,
2712         ValueFilter valFilter,
2713         std::unique_ptr<KVFileHandle> fileHandle) const {
2714     // Need to take the Nexus lock for the vBucket to stop racing flushes (or
2715     // compactions) from modifying one of the KVStores and not the other
2716     auto lh = getLock(vbid);
2717 
2718     // The primary KVStore ScanContext will own and invoke the original
2719     // callbacks as we need to invoke them to work out how many items the
2720     // secondary KVStore has to scan
2721     auto primaryCb = std::make_unique<NexusPrimaryScanCallback>(std::move(cb));
2722     auto primaryCl = std::make_unique<NexusPrimaryCacheLookup>(std::move(cl));
2723     auto primaryCtx = primary->initByIdScanContext(std::move(primaryCb),
2724                                                    std::move(primaryCl),
2725                                                    vbid,
2726                                                    ranges,
2727                                                    docFilter,
2728                                                    valFilter);
2729     if (!primaryCtx) {
2730         return nullptr;
2731     }
2732 
2733     auto& primaryScanCallback = dynamic_cast<NexusPrimaryScanCallback&>(
2734             primaryCtx->getValueCallback());
2735     auto secondaryCb = std::make_unique<NexusSecondaryScanCallback>(
2736             *this, vbid, primaryScanCallback.callbacks);
2737 
2738     auto& primaryCacheCallback = dynamic_cast<NexusPrimaryCacheLookup&>(
2739             primaryCtx->getCacheCallback());
2740     auto secondaryCl = std::make_unique<NexusSecondaryCacheLookup>(
2741             *this, vbid, primaryCacheCallback.callbacks);
2742     auto secondaryCtx = secondary->initByIdScanContext(std::move(secondaryCb),
2743                                                        std::move(secondaryCl),
2744                                                        vbid,
2745                                                        ranges,
2746                                                        docFilter,
2747                                                        valFilter);
2748 
2749     if (!secondaryCtx) {
2750         // If we could build the primaryCtx but not the secondary then something
2751         // is wrong.
2752         auto msg = fmt::format(
2753                 "NexusKVStore::initByIdScanContext: {}: "
2754                 "failed to create the secondary scan context. "
2755                 "Check secondary KVStore logs for details.",
2756                 vbid);
2757         handleError(msg, vbid);
2758     }
2759 
2760     auto handle = makeFileHandle(vbid);
2761     if (!handle) {
2762         auto msg = fmt::format(
2763                 "NexusKVStore::initByIdScanContext: {}: "
2764                 "failed to get the primary file handle. Check "
2765                 "primary KVStore logs for details.",
2766                 vbid);
2767         handleError(msg, vbid);
2768     }
2769 
2770     auto vbstate = getPersistedVBucketState(vbid);
2771     auto [droppedStatus, droppedCollections] = getDroppedCollections(vbid);
2772 
2773     // Dummy callbacks won't get invoked
2774     auto dummyCb = std::make_unique<NexusDummyScanCallback>();
2775     auto dummyCl = std::make_unique<NexusDummyCacheLookup>();
2776     auto nexusScanContext =
2777             std::make_unique<NexusKVStoreByIdScanContext>(std::move(dummyCb),
2778                                                           std::move(dummyCl),
2779                                                           vbid,
2780                                                           std::move(handle),
2781                                                           ranges,
2782                                                           docFilter,
2783                                                           valFilter,
2784                                                           droppedCollections,
2785                                                           primaryCtx->maxSeqno);
2786 
2787     nexusScanContext->primaryCtx = std::move(primaryCtx);
2788     nexusScanContext->secondaryCtx = std::move(secondaryCtx);
2789 
2790     return nexusScanContext;
2791 }
2792 
2793 ScanStatus NexusKVStore::scan(BySeqnoScanContext& ctx) const {
2794     auto& nexusCtx = dynamic_cast<NexusKVStoreBySeqnoScanContext&>(ctx);
2795     auto& primaryCtx = *nexusCtx.primaryCtx;
2796     auto& secondaryCtx = *nexusCtx.secondaryCtx;
2797 
2798     auto primaryScanResult = primary->scan(*nexusCtx.primaryCtx);
2799     auto secondaryScanResult = secondary->scan(*nexusCtx.secondaryCtx);
2800 
2801     if (primaryScanResult != secondaryScanResult) {
2802         auto msg = fmt::format(
2803                 "NexusKVStore::scan: {}: scan result not equal "
2804                 "primary:{} secondary:{}",
2805                 ctx.vbid,
2806                 primaryScanResult,
2807                 secondaryScanResult);
2808         handleError(msg, ctx.vbid);
2809     }
2810 
2811     if (primaryCtx.lastReadSeqno != secondaryCtx.lastReadSeqno) {
2812         if (static_cast<uint64_t>(primaryCtx.lastReadSeqno) >
2813                     getPurgeSeqno(ctx.vbid) &&
2814             static_cast<uint64_t>(secondaryCtx.lastReadSeqno) >
2815                     getPurgeSeqno(ctx.vbid)) {
2816             auto msg = fmt::format(
2817                     "NexusKVStore::scan: {}: last read seqno not "
2818                     "equal primary:{} secondary:{}",
2819                     ctx.vbid,
2820                     primaryCtx.lastReadSeqno,
2821                     secondaryCtx.lastReadSeqno);
2822             handleError(msg, ctx.vbid);
2823         } else {
2824             skippedChecksDueToPurging++;
2825         }
2826     }
2827 
2828     checkScanCallbacks(ctx.vbid,
2829                        dynamic_cast<NexusPrimaryScanCallback&>(
2830                                primaryCtx.getValueCallback())
2831                                .callbacks);
2832     checkScanCallbacks(ctx.vbid,
2833                        dynamic_cast<NexusPrimaryCacheLookup&>(
2834                                primaryCtx.getCacheCallback())
2835                                .callbacks);
2836 
2837     // lastReadSeqno gets checked by backfill so we need to set it in the
2838     // Nexus ctx.
2839     nexusCtx.lastReadSeqno = primaryCtx.lastReadSeqno;
2840 
2841     return primaryScanResult;
2842 }
2843 
2844 ScanStatus NexusKVStore::scan(ByIdScanContext& ctx) const {
2845     auto& nexusCtx = dynamic_cast<NexusKVStoreByIdScanContext&>(ctx);
2846     auto& primaryCtx = *nexusCtx.primaryCtx;
2847     auto& secondaryCtx = *nexusCtx.secondaryCtx;
2848 
2849     // Set the primary/secondary ranges from the nexus context.
2850     // The start/end do get adjusted since creation (if scans yield), so must be
2851     // copied over.
2852     primaryCtx.ranges = nexusCtx.ranges;
2853     secondaryCtx.ranges = nexusCtx.ranges;
2854 
2855     auto primaryScanResult = primary->scan(*nexusCtx.primaryCtx);
2856     auto secondaryScanResult = secondary->scan(*nexusCtx.secondaryCtx);
2857 
2858     if (primaryScanResult != secondaryScanResult) {
2859         auto msg = fmt::format(
2860                 "NexusKVStore::scan: {}: scan result not equal "
2861                 "primary:{} secondary:{}",
2862                 ctx.vbid,
2863                 primaryScanResult,
2864                 secondaryScanResult);
2865         handleError(msg, ctx.vbid);
2866     }
2867 
2868     if (primaryCtx.lastReadKey != secondaryCtx.lastReadKey) {
2869         auto msg = fmt::format(
2870                 "NexusKVStore::scan: {}: lastReadKey not "
2871                 "equal primary:{} secondary:{}",
2872                 ctx.vbid,
2873                 primaryCtx.lastReadKey,
2874                 secondaryCtx.lastReadKey);
2875         handleError(msg, ctx.vbid);
2876     }
2877 
2878     if (primaryCtx.ranges != secondaryCtx.ranges) {
2879         auto msg = fmt::format("NexusKVStore::scan: {}: ranges not equal",
2880                                ctx.vbid);
2881         handleError(msg, ctx.vbid);
2882     }
2883 
2884     checkScanCallbacks(ctx.vbid,
2885                        dynamic_cast<NexusPrimaryScanCallback&>(
2886                                primaryCtx.getValueCallback())
2887                                .callbacks);
2888     checkScanCallbacks(ctx.vbid,
2889                        dynamic_cast<NexusPrimaryCacheLookup&>(
2890                                primaryCtx.getCacheCallback())
2891                                .callbacks);
2892 
2893     // Both ranges and lastReadKey are updated by the underlying scan, must copy
2894     // over to the nexus context in-case of a scan being resumed.
2895     nexusCtx.lastReadKey = primaryCtx.lastReadKey;
2896     nexusCtx.ranges = primaryCtx.ranges;
2897 
2898     return primaryScanResult;
2899 }
2900 
2901 std::unique_ptr<KVFileHandle> NexusKVStore::makeFileHandle(Vbid vbid) const {
2902     return std::make_unique<NexusKVFileHandle>(primary->makeFileHandle(vbid),
2903                                                secondary->makeFileHandle(vbid));
2904 }
2905 
2906 std::pair<KVStore::GetCollectionStatsStatus, Collections::VB::PersistedStats>
2907 NexusKVStore::getCollectionStats(const KVFileHandle& kvFileHandle,
2908                                  CollectionID collection) const {
2909     auto& nexusFileHandle =
2910             dynamic_cast<const NexusKVFileHandle&>(kvFileHandle);
2911 
2912     const auto [primaryResult, primaryStats] = primary->getCollectionStats(
2913             *nexusFileHandle.primaryFileHandle, collection);
2914     const auto [secondaryResult, secondaryStats] =
2915             secondary->getCollectionStats(*nexusFileHandle.secondaryFileHandle,
2916                                           collection);
2917 
2918     if (primaryResult != secondaryResult) {
2919         auto msg = fmt::format(
2920                 "NexusKVStore::getCollectionStats: issue getting stats for {} "
2921                 "primary:{} secondary:{}",
2922                 collection,
2923                 primaryResult,
2924                 secondaryResult);
2925         handleError(msg, {} /*vbid*/);
2926     }
2927 
2928     // Can't check disk size as that may differ
2929     if (primaryStats.itemCount != secondaryStats.itemCount ||
2930         primaryStats.highSeqno != secondaryStats.highSeqno) {
2931         auto msg = fmt::format(
2932                 "NexusKVStore::getCollectionStats: difference in stats for "
2933                 "collection {} primary:{} secondary:{}",
2934                 collection,
2935                 primaryStats,
2936                 secondaryStats);
2937         handleError(msg, {} /*vbid*/);
2938     }
2939 
2940     return {primaryResult, primaryStats};
2941 }
2942 
2943 std::pair<KVStore::GetCollectionStatsStatus, Collections::VB::PersistedStats>
2944 NexusKVStore::getCollectionStats(Vbid vbid, CollectionID collection) const {
2945     const auto [primaryResult, primaryStats] =
2946             primary->getCollectionStats(vbid, collection);
2947     const auto [secondaryResult, secondaryStats] =
2948             secondary->getCollectionStats(vbid, collection);
2949 
2950     if (primaryResult != secondaryResult) {
2951         auto msg = fmt::format(
2952                 "NexusKVStore::getCollectionStats: {} issue getting stats for "
2953                 "{} primary:{} secondary:{}",
2954                 vbid,
2955                 collection,
2956                 primaryResult,
2957                 secondaryResult);
2958         handleError(msg, vbid);
2959     }
2960 
2961     // Can't check disk size as that may differ
2962     if (primaryStats.itemCount != secondaryStats.itemCount ||
2963         primaryStats.highSeqno != secondaryStats.highSeqno) {
2964         auto msg = fmt::format(
2965                 "NexusKVStore::getCollectionStats: {} difference in stats for "
2966                 "collection {} primary:{} secondary:{}",
2967                 vbid,
2968                 collection,
2969                 primaryStats,
2970                 secondaryStats);
2971         handleError(msg, vbid);
2972     }
2973 
2974     return {primaryResult, primaryStats};
2975 }
2976 
2977 std::optional<Collections::ManifestUid> NexusKVStore::getCollectionsManifestUid(
2978         KVFileHandle& kvFileHandle) const {
2979     auto& nexusFileHandle =
2980             dynamic_cast<const NexusKVFileHandle&>(kvFileHandle);
2981     const auto primaryResult = primary->getCollectionsManifestUid(
2982             *nexusFileHandle.primaryFileHandle);
2983     const auto secondaryResult = secondary->getCollectionsManifestUid(
2984             *nexusFileHandle.secondaryFileHandle);
2985 
2986     if (primaryResult != secondaryResult) {
2987         auto msg = fmt::format(
2988                 "NexusKVStore::getCollectionsManifestUid: Difference in "
2989                 "collection stats primary:{} secondary:{}",
2990                 primaryResult,
2991                 secondaryResult);
2992         handleError(msg, {} /*vbid*/);
2993     }
2994 
2995     return primaryResult;
2996 }
2997 
2998 std::pair<bool, Collections::KVStore::Manifest>
2999 NexusKVStore::getCollectionsManifest(Vbid vbid) const {
3000     auto [primaryResult, primaryManifest] =
3001             primary->getCollectionsManifest(vbid);
3002     auto [secondaryResult, secondaryManifest] =
3003             secondary->getCollectionsManifest(vbid);
3004 
3005     if (primaryResult != secondaryResult) {
3006         auto msg = fmt::format(
3007                 "NexusKVStore::getCollectionsManifest: {}: different result "
3008                 "primary:{} "
3009                 "secondary:{}",
3010                 vbid,
3011                 primaryResult,
3012                 secondaryResult);
3013         handleError(msg, vbid);
3014     }
3015 
3016     if (primaryManifest != secondaryManifest) {
3017         auto msg = fmt::format(
3018                 "NexusKVStore::getCollectionsManifest: {}: different manifest "
3019                 "primary:{} secondary:{}",
3020                 vbid,
3021                 primaryManifest,
3022                 secondaryManifest);
3023         handleError(msg, vbid);
3024     }
3025 
3026     return {primaryResult, primaryManifest};
3027 }
3028 
3029 std::pair<bool, std::vector<Collections::KVStore::DroppedCollection>>
3030 NexusKVStore::getDroppedCollections(Vbid vbid) const {
3031     auto [primaryResult, primaryDropped] = primary->getDroppedCollections(vbid);
3032     auto [secondaryResult, secondaryDropped] =
3033             secondary->getDroppedCollections(vbid);
3034 
3035     if (primaryResult != secondaryResult) {
3036         auto msg = fmt::format(
3037                 "NexusKVStore::getDroppedCollections: {}: primaryResult:{} "
3038                 "secondaryResult:{}",
3039                 vbid,
3040                 primaryResult,
3041                 secondaryResult);
3042         handleError(msg, vbid);
3043     }
3044 
3045     for (const auto& dc : primaryDropped) {
3046         auto itr =
3047                 std::find(secondaryDropped.begin(), secondaryDropped.end(), dc);
3048         if (itr == secondaryDropped.end()) {
3049             auto msg = fmt::format(
3050                     "NexusKVStore::getDroppedCollections: {}: found dropped "
3051                     "collection for primary but not secondary, cid:{} start:{} "
3052                     "end:{}",
3053                     vbid,
3054                     dc.collectionId,
3055                     dc.startSeqno,
3056                     dc.endSeqno);
3057             handleError(msg, vbid);
3058         }
3059 
3060         secondaryDropped.erase(itr);
3061     }
3062 
3063     if (!secondaryDropped.empty()) {
3064         auto msg = fmt::format(
3065                 "NexusKVStore::getDroppedCollections: {}: found dropped "
3066                 "collections for secondary but not primary ",
3067                 vbid);
3068         for (auto& dc : secondaryDropped) {
3069             fmt::format_to(std::back_inserter(msg),
3070                            "[cid:{},start:{},end:{}],",
3071                            dc.collectionId,
3072                            dc.startSeqno,
3073                            dc.endSeqno);
3074         }
3075         msg.pop_back();
3076         handleError(msg, vbid);
3077     }
3078 
3079     return {primaryResult, primaryDropped};
3080 }
3081 
3082 const KVStoreConfig& NexusKVStore::getConfig() const {
3083     return primary->getConfig();
3084 }
3085 
3086 GetValue NexusKVStore::getBySeqno(KVFileHandle& handle,
3087                                   Vbid vbid,
3088                                   uint64_t seq,
3089                                   ValueFilter filter) const {
3090     auto& nexusFileHandle = dynamic_cast<NexusKVFileHandle&>(handle);
3091 
3092     auto primaryGetValue = primary->getBySeqno(
3093             *nexusFileHandle.primaryFileHandle, vbid, seq, filter);
3094     const auto secondaryGetValue = secondary->getBySeqno(
3095             *nexusFileHandle.secondaryFileHandle, vbid, seq, filter);
3096 
3097     // There's no point comparing values below the purge seqno as one of the
3098     // KVStores may have purged the value that we're looking for
3099     if (seq <= getPurgeSeqno(vbid)) {
3100         skippedChecksDueToPurging++;
3101         return primaryGetValue;
3102     }
3103 
3104     if (primaryGetValue.getStatus() != secondaryGetValue.getStatus()) {
3105         auto msg = fmt::format(
3106                 "NexusKVStore::getBySeqno: {} seqno:{} status mismatch "
3107                 "primary:{} "
3108                 "secondary:{}",
3109                 vbid,
3110                 seq,
3111                 primaryGetValue.getStatus(),
3112                 secondaryGetValue.getStatus());
3113         handleError(msg, vbid);
3114     }
3115 
3116     if (primaryGetValue.getStatus() == cb::engine_errc::success &&
3117         !compareItem(*primaryGetValue.item, *secondaryGetValue.item)) {
3118         auto msg = fmt::format(
3119                 "NexusKVStore::{}: {} seqno:{} item mismatch primary:{} "
3120                 "secondary:{}",
3121                 vbid,
3122                 seq,
3123                 *primaryGetValue.item,
3124                 *secondaryGetValue.item);
3125         handleError(msg, vbid);
3126     }
3127 
3128     return primaryGetValue;
3129 }
3130 
3131 void NexusKVStore::setStorageThreads(ThreadPoolConfig::StorageThreadCount num) {
3132     primary->setStorageThreads(num);
3133     secondary->setStorageThreads(num);
3134 }
3135 
3136 std::unique_ptr<TransactionContext> NexusKVStore::begin(
3137         Vbid vbid, std::unique_ptr<PersistenceCallback> pcb) {
3138     auto ctx = std::make_unique<NexusKVStoreTransactionContext>(*this, vbid);
3139 
3140     ctx->primaryContext = primary->begin(
3141             vbid,
3142             std::make_unique<NexusKVStorePrimaryPersistenceCallback>(
3143                     std::move(pcb), ctx->primarySets, ctx->primaryDeletions));
3144     ctx->secondaryContext = secondary->begin(
3145             vbid,
3146             std::make_unique<NexusKVStoreSecondaryPersistenceCallback>(
3147                     *this, ctx->primarySets, ctx->primaryDeletions));
3148 
3149     return ctx;
3150 }
3151 
3152 const KVStoreStats& NexusKVStore::getKVStoreStat() const {
3153     return primary->getKVStoreStat();
3154 }
3155 
3156 void NexusKVStore::setMakeCompactionContextCallback(
3157         MakeCompactionContextCallback cb) {
3158     if (!configuration.isImplicitCompactionEnabled()) {
3159         return;
3160     }
3161 
3162     auto nexusPrimaryCb =
3163             [this, cb](Vbid vbid, CompactionConfig& cfg, uint64_t purgeSeqno) {
3164                 auto ctx = cb(vbid, cfg, purgeSeqno);