1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2018 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #pragma once
19 
20 #include "../objectregistry.h"
21 #include "collections/collection_persisted_stats.h"
22 #include "kvstore.h"
23 #include "kvstore_priv.h"
24 #include "libmagma/magma.h"
25 #include "rollback_result.h"
26 #include "vbucket_bgfetch_item.h"
27 
28 #include <platform/dirutils.h>
29 #include <platform/non_negative_counter.h>
30 
31 #include <map>
32 #include <shared_mutex>
33 #include <string>
34 #include <vector>
35 
36 namespace magma {
37 class Slice;
38 class Status;
39 } // namespace magma
40 
41 class MagmaRequest;
42 class MagmaKVStoreConfig;
43 class MagmaCompactionCB;
44 struct kvstats_ctx;
45 struct vbucket_state;
46 
47 /**
48  * Magma info is used to mimic info that is stored internally in couchstore.
49  * This info is stored with vbucket_state every time vbucket_state is stored.
50  */
51 class MagmaInfo {
52 public:
53     MagmaInfo() = default;
54 
55     // Note: we don't want to reset the kvstoreRev because it tracks
56     // the kvstore revision which is a monotonically increasing
57     // value each time the kvstore is created.
reset()58     void reset() {
59         docCount = 0;
60         persistedDeletes = 0;
61     }
62     cb::NonNegativeCounter<uint64_t> docCount{0};
63     cb::NonNegativeCounter<uint64_t> persistedDeletes{0};
64     Monotonic<uint64_t> kvstoreRev{1};
65 };
66 
67 /**
68  * A persistence store based on magma.
69  */
70 class MagmaKVStore : public KVStore {
71 public:
72     MagmaKVStore(MagmaKVStoreConfig& config);
73 
74     ~MagmaKVStore();
75 
76     void operator=(MagmaKVStore& from) = delete;
77 
78     /**
79      * Reset database to a clean state.
80      */
81     void reset(Vbid vbid) override;
82 
83     /**
84      * Begin a transaction (if not already in one).
85      */
86     bool begin(std::unique_ptr<TransactionContext> txCtx) override;
87 
88     /**
89      * Commit a transaction (unless not currently in one).
90      *
91      * Returns false if the commit fails.
92      */
93     bool commit(Collections::VB::Flush& collectionsFlush) override;
94 
95     /**
96      * Rollback a transaction (unless not currently in one).
97      */
98     void rollback() override;
99 
100     StorageProperties getStorageProperties() override;
101 
102     /**
103      * Adds a request to a queue for batch processing at commit()
104      */
105     void set(queued_item itm) override;
106 
107     GetValue get(const DiskDocKey& key, Vbid vb) override;
108 
109     GetValue getWithHeader(void* dbHandle,
110                            const DiskDocKey& key,
111                            Vbid vb,
112                            GetMetaOnly getMetaOnly) override;
113 
114     void getMulti(Vbid vb, vb_bgfetch_queue_t& itms) override;
115 
116     void getRange(Vbid vb,
117                   const DiskDocKey& startKey,
118                   const DiskDocKey& endKey,
119                   const GetRangeCb& cb) override;
120 
121     void del(queued_item itm) override;
122 
123     void delVBucket(Vbid vbucket, uint64_t fileRev) override;
124 
125     std::vector<vbucket_state*> listPersistedVbuckets(void) override;
126 
127     /**
128      * Take a snapshot of the stats in the main DB.
129      */
snapshotStats(const std::map<std::string, std::string>& m)130     bool snapshotStats(const std::map<std::string, std::string>& m) {
131         // TODO
132         return false;
133     }
134 
135     /**
136      * Take a snapshot of the vbucket states in the main DB.
137      */
138     bool snapshotVBucket(Vbid vbucketId,
139                          const vbucket_state& vbstate,
140                          VBStatePersist options) override;
141 
142     // Compaction in magma is asynchronous. Its triggered by 3 conditions:
143     //  - Level compaction
144     //  - Expiry compaction
145     //    In magma, a histogram of when items will expire is maintained.
146     //    Periodically, magma will trigger a compaction to visit those
147     //    sstables which have expired items to have them removed.
148     //  - Dropped collections removal
149     //    At the end of the flusher loop in kv_engine, a call to magma will be
150     //    made to trigger asynchronous PurgeRange scans to purge the store
151     //    of any dropped collections. For each collectionID, 2 scans will be
152     //    triggered, [Default+CollectionID] and
153     //    [DurabilityPrepare+CollectionID].
154     //    While a purge scan will identify the sstables containing the scan
155     //    key ex.[Default+CollectionID], all items for that CollectionID will
156     //    be removed ie. both Default & DurabilityPrepare. This guarantees that
157     //    the data from each sstable containing the collectionID is visited
158     //    once.
159     //    Also during this call from the bg flusher, any completed compaction
160     //    data will be picked up. The max_purged_seq will update the vbstate
161     //    and any dropped collections will trigger 2 scans (same as above) to
162     //    determine if a collection has been removed and if so, the collection
163     //    manifest is updated.
164     //
165     //    Regardless of which type of compaction is running, all compactions
166     //    required a kv_engine callback to pick up compaction_ctx and all
167     //    compactions can remove expired items or dropped collection items.
168     //
169     //    Synchronous compaction is supported for testing. Normally, kv_engine
170     //    with magma store should never call compactDB. But for testing, we
171     //    need to support a synchronous call. When compactDB is called, it will
172     //    save the compaction_ctx passed in to compactDB and will use it
173     //    to perform compaction.
174     //
175     // For DP, we will support only Level and Sychronous compaction. This
176     // alleviates the need for a callback to pick up compaction_ctx.
177     bool compactDB(compaction_ctx*) override;
178 
179     Vbid getDBFileId(const cb::mcbp::Request&) override;
180 
181     size_t getNumPersistedDeletes(Vbid vbid) override {
182         // TODO
183         return 0;
184     }
185 
186     DBFileInfo getDbFileInfo(Vbid vbid) override {
187         // Magma does not support DBFileInfo
188         DBFileInfo vbinfo;
189         return vbinfo;
190     }
191 
192     DBFileInfo getAggrDbFileInfo() override {
193         // Magma does not support DBFileInfo
194         DBFileInfo vbinfo;
195         return vbinfo;
196     }
197 
198     size_t getItemCount(Vbid vbid) override;
199 
200     RollbackResult rollback(Vbid vbid,
201                             uint64_t rollbackSeqno,
202                             std::shared_ptr<RollbackCB> cb) override;
203 
204     void pendingTasks() override {
205         // Magma does not use pendingTasks
206     }
207 
208     ENGINE_ERROR_CODE getAllKeys(
209             Vbid vbid,
210             const DiskDocKey& start_key,
211             uint32_t count,
212             std::shared_ptr<Callback<const DiskDocKey&>> cb) override;
213 
214     ScanContext* initScanContext(
215             std::shared_ptr<StatusCallback<GetValue>> cb,
216             std::shared_ptr<StatusCallback<CacheLookup>> cl,
217             Vbid vbid,
218             uint64_t startSeqno,
219             DocumentFilter options,
220             ValueFilter valOptions) override;
221 
222     scan_error_t scan(ScanContext* sctx) override;
223 
224     void destroyScanContext(ScanContext* ctx) override;
225 
226     /**
227      * The magmaKVHandle protects magma from a kvstore being dropped
228      * while an API operation is active. This is required because
229      * unlike couchstore which just unlinks the data file, magma
230      * must wait for all threads to exit and then block subsequent
231      * threads from proceeding while the kvstore is being dropped.
232      * Inside the handle is the vbstateMutex. This mutex is used
233      * to protect the vbstate from race conditions when updated.
234      */
235     struct MagmaKVHandleStruct {
236         std::shared_timed_mutex vbstateMutex;
237     };
238     using MagmaKVHandle = std::shared_ptr<MagmaKVHandleStruct>;
239 
240     std::vector<std::pair<MagmaKVHandle, std::shared_timed_mutex>>
241             magmaKVHandles;
242 
getMagmaKVHandle(Vbid vbid)243     const MagmaKVHandle getMagmaKVHandle(Vbid vbid) {
244         std::lock_guard<std::shared_timed_mutex> lock(
245                 magmaKVHandles[vbid.get()].second);
246         return magmaKVHandles[vbid.get()].first;
247     }
248 
249     class MagmaKVFileHandle : public ::KVFileHandle {
250     public:
MagmaKVFileHandle(MagmaKVStore& kvstore, Vbid vbid)251         MagmaKVFileHandle(MagmaKVStore& kvstore, Vbid vbid)
252             : ::KVFileHandle(kvstore),
253               vbid(vbid),
254               kvHandle(kvstore.getMagmaKVHandle(vbid)) {
255         }
256         Vbid vbid;
257         MagmaKVHandle kvHandle;
258     };
259 
260     std::unique_ptr<KVFileHandle, KVFileHandleDeleter> makeFileHandle(
261             Vbid vbid) override;
262 
263     void freeFileHandle(KVFileHandle* kvFileHandle) const override {
264         delete kvFileHandle;
265     }
266 
267     Collections::VB::PersistedStats getCollectionStats(
268             const KVFileHandle& kvFileHandle, CollectionID collection) override;
269 
270     /**
271      * Increment the kvstore revision.
272      */
273     void prepareToCreateImpl(Vbid vbid) override;
274 
275     /**
276      * Soft delete the kvstore.
277      */
278     uint64_t prepareToDeleteImpl(Vbid vbid) override;
279 
280     /**
281      * Retrieve the manifest from the local db.
282      * MagmaKVStore implements this method as a read of 3 _local documents
283      * manifest, open collections, open scopes
284      *
285      * @param vbid vbucket id
286      * @return collections manifest
287      */
288     Collections::KVStore::Manifest getCollectionsManifest(Vbid vbid) override;
289 
290     /**
291      * read local document to get the vector of dropped collections
292      * @param vbid vbucket id
293      * @return a vector of dropped collections (can be empty)
294      */
295     std::vector<Collections::KVStore::DroppedCollection> getDroppedCollections(
296             Vbid vbid) override;
297 
298     /**
299      * This function maintains the set of open collections, adding newly opened
300      * collections and removing those which are dropped. To validate the
301      * creation
302      * of new collections, this method must read the dropped collections.
303      *
304      * @param vbid vbucket id
305      * @param commitBatch current magma commit batch
306      * @param collectionsFlush
307      * @return status magma status
308      */
309     magma::Status updateCollectionsMeta(
310             Vbid vbid,
311             magma::Magma::CommitBatch& commitBatch,
312             Collections::VB::Flush& collectionsFlush);
313 
314     /**
315      * Maintain the current uid committed
316      *
317      * @param vbid vbucket id
318      * @param commitBatch current magma commit batch
319      * @return status magma status
320      */
321     magma::Status updateManifestUid(magma::Magma::CommitBatch& commitBatch);
322 
323     /**
324      * Maintain the list of open collections. The maintenance requires
325      * reading the dropped collections which is passed back to avoid
326      * a reread.
327      *
328      * @param vbid vbucket id
329      * @param commitBatch current magma commit batch
330      * @return pair magma status and dropped collection list
331      */
332     std::pair<magma::Status,
333               std::vector<Collections::KVStore::DroppedCollection>>
334     updateOpenCollections(Vbid vbid, magma::Magma::CommitBatch& commitBatch);
335 
336     /**
337      * Maintain the list of dropped collections
338      *
339      * @param vbid vbucket id
340      * @param commitBatch current magma commit batch
341      * @param dropped This method will only read the dropped collections
342      *   from storage if this optional is not initialised
343      * @return status magma status
344      */
345     magma::Status updateDroppedCollections(
346             Vbid vbid,
347             magma::Magma::CommitBatch& commitBatch,
348             boost::optional<
349                     std::vector<Collections::KVStore::DroppedCollection>>
350                     dropped);
351 
352     /**
353      * Maintain the list of open scopes
354      *
355      * @param vbid vbucket id
356      * @param commitBatch current magma commit batch
357      * @return status magma status
358      */
359     magma::Status updateScopes(Vbid vbid,
360                                magma::Magma::CommitBatch& commitBatch);
361 
362     /**
363      * Given a collection id, return the key used to maintain the
364      * collection stats in the local db.
365      *
366      * @param cid Collection ID
367      */
368     std::string getCollectionsStatsKey(CollectionID cid);
369 
370     /**
371      * Save stats for collection cid
372      *
373      * @param commitBatch current magma commit batch
374      * @param cid Collection ID
375      * @param stats The stats that should be persisted
376      */
377     void saveCollectionStats(magma::Magma::CommitBatch& commitBatch,
378                              CollectionID cid,
379                              const Collections::VB::PersistedStats& stats);
380 
381     /**
382      * Delete the collection stats for the given collection id
383      *
384      * @param commitBatch current magma commit batch
385      * @param cid Collection ID
386      */
387     magma::Status deleteCollectionStats(magma::Magma::CommitBatch& commitBatch,
388                                         CollectionID cid);
389 
390     /**
391      * Encode a document being stored in the local db by prefixing the
392      * MetaData to the value.
393      *
394      * Magma requires all documents stored in local DB to also include
395      * metadata because it uses the callback functions like getExpiryTime,
396      * isDeleted() and getSeqNum() as part of compaction.
397      */
398     std::string encodeLocalDoc(Vbid vbid,
399                                const std::string& value,
400                                bool isDelete);
401 
402     /**
403      * Decode a document being stored in the local db by extracting the
404      * MetaData from the value
405      */
406     std::pair<std::string, bool> decodeLocalDoc(const magma::Slice& valSlice);
407 
408     /**
409      * Read from local DB
410      */
411     std::pair<magma::Status, std::string> readLocalDoc(
412             Vbid vbid, const magma::Slice& keySlice);
413 
414     /**
415      * Add a local document to the commitBatch
416      */
417     magma::Status setLocalDoc(magma::Magma::CommitBatch& commitBatch,
418                               const magma::Slice& keySlice,
419                               std::string& valBuf,
420                               bool deleted = false);
421 
422     /**
423      * Encode the cached vbucket_state and magmaInfo into a nlohmann json struct
424      */
425     nlohmann::json encodeVBState(const vbucket_state& vbstate,
426                                  MagmaInfo& magmaInfo) const;
427 
428     /**
429      * Read the encoded vstate + magmaInfo from the local db into the cache.
430      */
431     magma::Status readVBStateFromDisk(Vbid vbid);
432 
433     /**
434      * Write the encoded vbstate + magmaInfo to the local db.
435      * with the new vbstate as well as write to disk.
436      */
437     magma::Status writeVBStateToDisk(Vbid vbid,
438                                      magma::Magma::CommitBatch& commitBatch,
439                                      vbucket_state& vbs,
440                                      MagmaInfo& minfo);
441 
442     /**
443      * Get vbstate from cache. If cache not populated, read it from disk
444      * and populate cache. If not on disk, return nullptr;
445      *
446      * vbstate and magmaInfo always go together, we should never have a
447      * case where only 1 of them is initialized. So, if vbstate is
448      * uninitilized, assume magmaInfo is as well.
449      */
450     vbucket_state* getVBucketState(Vbid vbucketId) override;
451 
452     // Magma uses a unique logger with a prefix of magma so that all logging
453     // calls from the wrapper thru magma will be prefixed with magma.
454     std::shared_ptr<BucketLogger> logger;
455 
456 private:
457     /**
458      * Mamga instance for a shard
459      */
460     std::unique_ptr<magma::Magma> magma;
461 
462     /**
463      * Container for pending Magma requests.
464      *
465      * Using deque as as the expansion behaviour is less aggressive compared to
466      * std::vector (MagmaRequest objects are ~176 bytes in size).
467      */
468     using PendingRequestQueue = std::deque<MagmaRequest>;
469 
470     /*
471      * The DB for each VBucket is created in a separated subfolder of
472      * `configuration.getDBName()`. This function returns the path of the DB
473      * subfolder for the given `vbid`.
474      *
475      * @param vbid vbucket id for the vbucket DB subfolder to return
476      */
477     std::string getVBDBSubdir(Vbid vbid);
478 
479     std::unique_ptr<Item> makeItem(Vbid vb,
480                                    const magma::Slice& keySlice,
481                                    const magma::Slice& metaSlice,
482                                    const magma::Slice& valueSlice,
483                                    GetMetaOnly getMetaOnly);
484 
485     GetValue makeGetValue(Vbid vb,
486                           const magma::Slice& keySlice,
487                           const magma::Slice& metaSlice,
488                           const magma::Slice& valueSlice,
489                           GetMetaOnly getMetaOnly = GetMetaOnly::No);
490 
491     int saveDocs(Collections::VB::Flush& collectionsFlush,
492                  kvstats_ctx& kvctx,
493                  const MagmaKVHandle& kvHandle);
494 
495     void commitCallback(int status, kvstats_ctx& kvctx);
496 
497     static ENGINE_ERROR_CODE magmaErr2EngineErr(magma::Status::Code err,
498                                                 bool found = true);
499 
500     // Used for queueing mutation requests (in `set` and `del`) and flushing
501     // them to disk (in `commit`).
502     // unique_ptr for pimpl.
503     std::unique_ptr<PendingRequestQueue> pendingReqs;
504 
505     // Magma does *not* need additional synchronisation around
506     // db->Write, but we need to prevent delVBucket racing with
507     // commit, potentially losing data.
508     std::mutex writeLock;
509 
510     // This variable is used to verify that the KVStore API is used correctly
511     // when Magma is used as store. "Correctly" means that the caller must
512     // use the API in the following way:
513     //      - begin() x1
514     //      - set() / del() xN
515     //      - commit()
516     bool in_transaction;
517     std::unique_ptr<TransactionContext> transactionCtx;
518 
519     // Path to magma files. Include shardID.
520     const std::string magmaPath;
521 
522     std::atomic<size_t> scanCounter; // atomic counter for generating scan id
523 
524     // Magma does not keep track of docCount, # of persistedDeletes or
525     // revFile internal so we need a mechanism to do that. We use magmaInfo
526     // as the structure to store that and we save magmaInfo with the vbstate.
527     std::vector<std::unique_ptr<MagmaInfo>> cachedMagmaInfo;
528 
529     // We need to mimic couchstores ability to turn every batch of items
530     // into a rollback point. This is used for testing only!
531     bool commitPointEveryBatch{false};
532 
533     // Using upsert for Set means we can't keep accurate document totals.
534     // This is used for testing only!
535     bool useUpsertForSet{false};
536 
537     // Get lock on KVHandle and wait for all threads to exit before
538     // returning the lock to the caller.
539     std::unique_lock<std::shared_timed_mutex> getExclusiveKVHandle(Vbid vbid);
540 
541     struct MagmaCompactionCtx {
MagmaCompactionCtxMagmaKVStore::MagmaCompactionCtx542         MagmaCompactionCtx(compaction_ctx* ctx, MagmaKVHandle kvHandle)
543             : ctx(ctx), kvHandle(kvHandle){};
544         compaction_ctx* ctx;
545         MagmaKVHandle kvHandle;
546     };
547     std::vector<std::unique_ptr<MagmaCompactionCtx>> compaction_ctxList;
548     std::mutex compactionCtxMutex;
549 
550     class MagmaCompactionCB : public magma::Magma::CompactionCallback {
551     public:
552         MagmaCompactionCB(MagmaKVStore& magmaKVStore);
553         ~MagmaCompactionCB();
operator ()(const magma::Slice& keySlice, const magma::Slice& metaSlice, const magma::Slice& valueSlice)554         bool operator()(const magma::Slice& keySlice,
555                         const magma::Slice& metaSlice,
556                         const magma::Slice& valueSlice) {
557             return magmaKVStore.compactionCallBack(
558                     *this, keySlice, metaSlice, valueSlice);
559         }
560         MagmaKVStore& magmaKVStore;
561         bool initialized = false;
562         compaction_ctx* ctx = nullptr;
563         MagmaKVStore::MagmaKVHandle kvHandle;
564         Vbid vbid;
565         // TODO add code for collections to keep track of # of deletes.
566         // Requires code in ~MagmaCompactionCB() to update magmaInfo docCount
567     };
568 
569     bool compactionCallBack(MagmaKVStore::MagmaCompactionCB& cbCtx,
570                             const magma::Slice& keySlice,
571                             const magma::Slice& metaSlice,
572                             const magma::Slice& valueSlice);
573 
574     friend class MagmaCompactionCB;
575 };
576