1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017-Present Couchbase, Inc.
4  *
5  *   Use of this software is governed by the Business Source License included
6  *   in the file licenses/BSL-Couchbase.txt.  As of the Change Date specified
7  *   in that file, in accordance with the Business Source License, use of this
8  *   software will be governed by the Apache License, Version 2.0, included in
9  *   the file licenses/APL2.txt.
10  */
11 
12 #include "collections/manager.h"
13 #include "bucket_logger.h"
14 #include "collections/flush.h"
15 #include "collections/manifest.h"
16 #include "collections/persist_manifest_task.h"
17 #include "collections/vbucket_manifest_handles.h"
18 #include "ep_bucket.h"
19 #include "ep_engine.h"
20 #include "kv_bucket.h"
21 #include "string_utils.h"
22 #include "vb_visitors.h"
23 #include "vbucket.h"
24 
25 #include <folly/container/F14Map.h>
26 #include <memcached/collections.h>
27 #include <nlohmann/json.hpp>
28 #include <spdlog/fmt/ostr.h>
29 #include <statistics/cbstat_collector.h>
30 #include <statistics/labelled_collector.h>
31 #include <optional>
32 #include <utility>
33 
34 Collections::Manager::Manager() = default;
35 
36 cb::engine_error Collections::Manager::update(
37         const VBucketStateRLockMap& vbStateLocks,
38         KVBucket& bucket,
39         std::string_view manifestString,
40         CookieIface* cookie) {
41     auto lockedUpdateCookie = updateInProgress.wlock();
42     if (*lockedUpdateCookie != nullptr && *lockedUpdateCookie != cookie) {
43         // log this as it's very unexpected, only ever 1 manager
44         return {cb::engine_errc::too_busy,
45                 "An update is already in-progress for another cookie:" +
46                         std::to_string(uintptr_t(*lockedUpdateCookie))};
47     }
48 
49     // Now getEngineSpecific - if that is null this is a new command, else
50     // it's the IO complete command
51     auto manifest = bucket.getEPEngine().takeEngineSpecific<Manifest*>(*cookie);
52 
53     if (manifest.has_value()) {
54         // I/O complete path?
55         if (!*lockedUpdateCookie) {
56             // This can occur for a DCP connection, cookie is 'reserved'.
57             EP_LOG_WARN(
58                     "Collections::Manager::update aborted as we have found a "
59                     "manifest:{} but updateInProgress:{}",
60                     static_cast<const void*>(*manifest),
61                     static_cast<const void*>(*lockedUpdateCookie));
62             return {cb::engine_errc::failed,
63                     "Collections::Manager::update failure"};
64         }
65 
66         // Final stage of update now happening, clear the cookie and engine
67         // specific so the next update can start after this one returns.
68         *lockedUpdateCookie = nullptr;
69 
70         // Take ownership back of the manifest so it destructs/frees on return
71         std::unique_ptr<Manifest> newManifest(*manifest);
72         return updateFromIOComplete(std::move(vbStateLocks),
73                                     bucket,
74                                     std::move(newManifest),
75                                     cookie);
76     }
77 
78     // Construct a new Manifest (ctor will throw if JSON was illegal)
79     std::unique_ptr<Manifest> newManifest;
80     try {
81         newManifest = std::make_unique<Manifest>(
82                 manifestString,
83                 bucket.getEPEngine().getConfiguration().getMaxVbuckets());
84     } catch (std::exception& e) {
85         EP_LOG_WARN(
86                 "Collections::Manager::update can't construct manifest "
87                 "e.what:{}",
88                 e.what());
89         return {cb::engine_errc::invalid_arguments,
90                 "Collections::Manager::update manifest json invalid:" +
91                         std::string(manifestString)};
92     }
93 
94     // Next compare with current
95     // First get an upgrade lock (which is initially read)
96     // Persistence will schedule a task and drop the lock whereas ephemeral will
97     // upgrade from read to write locking and do the update
98     auto current = currentManifest.ulock();
99     auto isSuccessorResult = current->isSuccessor(*newManifest);
100     if (isSuccessorResult.code() != cb::engine_errc::success) {
101         return isSuccessorResult;
102     }
103 
104     // New manifest is a legal successor the update is going ahead.
105     // Ephemeral bucket can update now, Persistent bucket on wake-up from
106     // successful run of the PeristManifestTask.
107     cb::engine_errc status = cb::engine_errc::success;
108     if (!bucket.maybeScheduleManifestPersistence(cookie, newManifest)) {
109         // Ephemeral case, apply immediately
110         return applyNewManifest(std::move(vbStateLocks),
111                                 bucket,
112                                 current,
113                                 std::move(newManifest));
114     } else {
115         *lockedUpdateCookie = cookie;
116         status = cb::engine_errc::would_block;
117     }
118 
119     return {status, "Collections::Manager::update part one complete"};
120 }
121 
122 cb::engine_error Collections::Manager::updateFromIOComplete(
123         const VBucketStateRLockMap& vbStateLocks,
124         KVBucket& bucket,
125         std::unique_ptr<Manifest> newManifest,
126         CookieIface* cookie) {
127     auto current = currentManifest.ulock(); // Will update to newManifest
128     return applyNewManifest(
129             std::move(vbStateLocks), bucket, current, std::move(newManifest));
130 }
131 
132 // common to ephemeral/persistent, this does the update
133 cb::engine_error Collections::Manager::applyNewManifest(
134         const VBucketStateRLockMap& vbStateLocks,
135         KVBucket& bucket,
136         folly::Synchronized<Manifest>::UpgradeLockedPtr& current,
137         std::unique_ptr<Manifest> newManifest) {
138     auto updated =
139             updateAllVBuckets(std::move(vbStateLocks), bucket, *newManifest);
140     if (updated.has_value()) {
141         return {cb::engine_errc::cannot_apply_collections_manifest,
142                 "Collections::Manager::update aborted on " +
143                         updated->to_string() + ", cannot apply to vbuckets"};
144     }
145 
146     // Now switch to write locking and change the manifest. The lock is
147     // released after this statement.
148     *current.moveFromUpgradeToWrite() = std::move(*newManifest);
149     return {cb::engine_errc::success,
150             "Collections::Manager::update applied new manifest"};
151 }
152 
153 std::optional<Vbid> Collections::Manager::updateAllVBuckets(
154         const VBucketStateRLockMap& vbStateLocks,
155         KVBucket& bucket,
156         const Manifest& newManifest) {
157     for (Vbid::id_type i = 0; i < bucket.getVBuckets().getSize(); i++) {
158         auto vb = bucket.getVBuckets().getBucket(Vbid(i));
159         auto vbStateLockIt = vbStateLocks.find(Vbid(i));
160 
161         // We should have a vbstatelock iff the corresponding vbucket is present
162         bool hasVbStateLock = vbStateLockIt != vbStateLocks.end();
163         if (bool(vb) != hasVbStateLock) {
164             throw std::logic_error(
165                     fmt::format("updateAllVBuckets(): {} is {}present, but the "
166                                 "corresponding lock is{}",
167                                 Vbid(i),
168                                 vb ? "" : "not ",
169                                 hasVbStateLock ? "" : " not"));
170         }
171 
172         // We took a lock on the vbsetMutex (all vBucket states) to guard state
173         // changes here) in KVBucket::setCollections.
174         if (vb && vb->getState() == vbucket_state_active) {
175             bool abort = false;
176             auto status =
177                     vb->updateFromManifest(vbStateLockIt->second, newManifest);
178             using namespace Collections;
179             switch (status) {
180             case VB::ManifestUpdateStatus::EqualUidWithDifferences:
181             case VB::ManifestUpdateStatus::ImmutablePropertyModified:
182 
183                 // This error is unexpected and the best action is not to
184                 // continue applying it
185                 abort = true;
186                 [[fallthrough]];
187             case VB::ManifestUpdateStatus::Behind:
188                 // Applying a manifest which is 'behind' the vbucket is
189                 // expected (certainly for newly promoted replica), however
190                 // still log it for now.
191                 EP_LOG_WARN(
192                         "Collections::Manager::updateAllVBuckets: error:{} {}",
193                         to_string(status),
194                         vb->getId());
195             case VB::ManifestUpdateStatus::Success:
196                 break;
197             }
198             if (abort) {
199                 return vb->getId();
200             }
201         }
202     }
203     return {};
204 }
205 
206 void Collections::Manager::updatePersistManifestTaskDone(
207         EventuallyPersistentEngine& engine,
208         CookieIface* cookie,
209         cb::engine_errc status) {
210     // If !success the command will return to the caller, so must clean-up
211     // ready for any further task.
212     if (status != cb::engine_errc::success) {
213         auto lockedUpdateCookie = updateInProgress.wlock();
214         *lockedUpdateCookie = nullptr;
215         engine.clearEngineSpecific(*cookie);
216     }
217 }
218 
219 std::pair<cb::mcbp::Status, nlohmann::json> Collections::Manager::getManifest(
220         const Collections::IsVisibleFunction& isVisible) const {
221     return {cb::mcbp::Status::Success,
222             currentManifest.rlock()->to_json(isVisible)};
223 }
224 
225 bool Collections::Manager::validateGetCollectionIDPath(std::string_view path) {
226     return std::count(path.begin(), path.end(), '.') == 1;
227 }
228 
229 bool Collections::Manager::validateGetScopeIDPath(std::string_view path) {
230     return std::count(path.begin(), path.end(), '.') <= 1;
231 }
232 
233 cb::EngineErrorGetCollectionIDResult Collections::Manager::getCollectionID(
234         std::string_view path) const {
235     if (!validateGetCollectionIDPath(path)) {
236         return cb::EngineErrorGetCollectionIDResult{
237                 cb::engine_errc::invalid_arguments};
238     }
239 
240     auto current = currentManifest.rlock();
241     auto scope = current->getScopeID(path);
242     if (!scope) {
243         return {cb::engine_errc::unknown_scope, current->getUid()};
244     }
245 
246     auto collection = current->getCollectionID(scope.value(), path);
247     if (!collection) {
248         return {cb::engine_errc::unknown_collection, current->getUid()};
249     }
250 
251     return {current->getUid(), scope.value(), collection.value()};
252 }
253 
254 cb::EngineErrorGetScopeIDResult Collections::Manager::getScopeID(
255         std::string_view path) const {
256     if (!validateGetScopeIDPath(path)) {
257         return cb::EngineErrorGetScopeIDResult{
258                 cb::engine_errc::invalid_arguments};
259     }
260     auto current = currentManifest.rlock();
261     auto scope = current->getScopeID(path);
262     if (!scope) {
263         return cb::EngineErrorGetScopeIDResult{current->getUid()};
264     }
265 
266     return {current->getUid(), scope.value()};
267 }
268 
269 std::pair<uint64_t, std::optional<ScopeID>> Collections::Manager::getScopeID(
270         CollectionID cid) const {
271     // 'shortcut' For the default collection, just return the default scope.
272     // If the default collection was deleted the vbucket will have the final say
273     // but for this interface allow this without taking the rlock.
274     if (cid.isDefaultCollection()) {
275         // Allow the default collection in the default scope...
276         return std::make_pair<uint64_t, std::optional<ScopeID>>(
277                 0, ScopeID{ScopeID::Default});
278     }
279 
280     auto current = currentManifest.rlock();
281     return std::make_pair<uint64_t, std::optional<ScopeID>>(
282             current->getUid(), current->getScopeID(cid));
283 }
284 
285 std::pair<uint64_t, std::optional<Collections::CollectionEntry>>
286 Collections::Manager::getCollectionEntry(CollectionID cid) const {
287     // 'shortcut' For the default collection, just return the default scope.
288     // If the default collection was deleted the vbucket will have the final say
289     // but for this interface allow this without taking the rlock.
290     if (cid.isDefaultCollection()) {
291         // Allow the default collection in the default scope...
292         return std::make_pair<uint64_t,
293                               std::optional<Collections::CollectionEntry>>(
294                 0, Collections::DefaultCollectionEntry);
295     }
296 
297     auto current = currentManifest.rlock();
298     return std::make_pair<uint64_t,
299                           std::optional<Collections::CollectionEntry>>(
300             current->getUid(), current->getCollectionEntry(cid));
301 }
302 
303 cb::EngineErrorGetScopeIDResult Collections::Manager::isScopeIDValid(
304         ScopeID sid) const {
305     auto manifestLocked = currentManifest.rlock();
306     if (manifestLocked->findScope(sid) != manifestLocked->endScopes()) {
307         return cb::EngineErrorGetScopeIDResult{manifestLocked->getUid(), sid};
308     }
309     // Returns unknown_scope + manifestUid
310     return cb::EngineErrorGetScopeIDResult{manifestLocked->getUid()};
311 }
312 
313 bool Collections::Manager::needsUpdating(const VBucket& vb) const {
314     // If the currentUid is ahead or equal, requires an update
315     return currentManifest.rlock()->getUid() >
316            vb.getManifest().lock().getManifestUid();
317 }
318 
319 void Collections::Manager::maybeUpdate(VBucketStateLockRef vbStateLock,
320                                        VBucket& vb) const {
321     // Lock manager updates, errors are logged by VB::Manifest
322     currentManifest.withRLock([&vb, vbStateLock](const auto& manifest) {
323         vb.updateFromManifest(vbStateLock, manifest);
324     });
325 }
326 
327 // This method is really to aid development and allow the dumping of the VB
328 // collection data to the logs.
329 void Collections::Manager::logAll(KVBucket& bucket) const {
330     EP_LOG_INFO("{}", *this);
331     for (Vbid::id_type i = 0; i < bucket.getVBuckets().getSize(); i++) {
332         Vbid vbid = Vbid(i);
333         auto vb = bucket.getVBuckets().getBucket(vbid);
334         if (vb) {
335             EP_LOG_INFO("{}: {} {}",
336                         vbid,
337                         VBucket::toString(vb->getState()),
338                         vb->lockCollections());
339         }
340     }
341 }
342 
343 void Collections::Manager::addCollectionStats(
344         KVBucket& bucket, const BucketStatCollector& collector) const {
345     currentManifest.rlock()->addCollectionStats(bucket, collector);
346 }
347 
348 void Collections::Manager::addScopeStats(
349         KVBucket& bucket, const BucketStatCollector& collector) const {
350     currentManifest.rlock()->addScopeStats(bucket, collector);
351 }
352 
353 bool Collections::Manager::warmupLoadManifest(const std::string& dbpath) {
354     auto rv = Collections::PersistManifestTask::tryAndLoad(dbpath);
355     if (rv.has_value()) {
356         EP_LOG_INFO(
357                 "Collections::Manager::warmupLoadManifest: starting at "
358                 "uid:{:#x}",
359                 rv.value().getUid());
360         *currentManifest.wlock() = std::move(rv.value());
361         return true;
362     }
363     // else tryAndLoad detected (and logged) some kind of corruption issue.
364     // If this corruption occurred at the same time as some issue in the
365     // forward flow of the Manifest, KV can't validate that any change to the
366     // manifest is a legal successor (Manifest::isSuccessor) - return false
367     // so Warmup can fail - holding the node::bucket pending.
368     return false;
369 }
370 
371 /**
372  * Perform actions for a completed warmup - currently check if any
373  * collections are 'deleting' and require erase re-triggering.
374  */
375 void Collections::Manager::warmupCompleted(EPBucket& bucket) const {
376     for (Vbid::id_type i = 0; i < bucket.getVBuckets().getSize(); i++) {
377         Vbid vbid = Vbid(i);
378         auto vb = bucket.getVBuckets().getBucket(vbid);
379         if (vb) {
380             if (vb->getManifest().isDropInProgress()) {
381                 Collections::VB::Flush::triggerPurge(vbid, bucket);
382             }
383         }
384     }
385 }
386 
387 SingleThreadedRCPtr<Collections::VB::CollectionSharedMetaData>
388 Collections::Manager::createOrReferenceMeta(
389         CollectionID cid,
390         const Collections::VB::CollectionSharedMetaDataView& view) {
391     return collectionSMT.wlock()->createOrReference(cid, view);
392 }
393 
394 void Collections::Manager::dereferenceMeta(
395         CollectionID cid,
396         SingleThreadedRCPtr<VB::CollectionSharedMetaData>&& meta) {
397     collectionSMT.wlock()->dereference(cid, std::move(meta));
398 }
399 
400 SingleThreadedRCPtr<Collections::VB::ScopeSharedMetaData>
401 Collections::Manager::createOrReferenceMeta(
402         ScopeID sid, const Collections::VB::ScopeSharedMetaDataView& view) {
403     return scopeSMT.wlock()->createOrReference(sid, view);
404 }
405 
406 void Collections::Manager::dereferenceMeta(
407         ScopeID sid, SingleThreadedRCPtr<VB::ScopeSharedMetaData>&& meta) {
408     scopeSMT.wlock()->dereference(sid, std::move(meta));
409 }
410 
411 /// VbucketVisitor that gathers stats for all collections
412 class AllCollectionsGetStatsVBucketVisitor : public VBucketVisitor {
413 public:
414     void visitBucket(VBucket& vb) override {
415         if (vb.getState() == vbucket_state_active) {
416             vb.lockCollections().updateSummary(summary);
417         }
418     }
419     Collections::Summary summary;
420 };
421 
422 /// VbucketVisitor that gathers stats for the given collections
423 class CollectionsGetStatsVBucketVisitor : public VBucketVisitor {
424 public:
425     explicit CollectionsGetStatsVBucketVisitor(
426             const std::vector<Collections::CollectionEntry>& collections)
427         : collections(collections) {
428         for (const auto& entry : collections) {
429             summary.emplace(entry.cid, Collections::AccumulatedStats{});
430         }
431     }
432 
433     void visitBucket(VBucket& vb) override {
434         if (vb.getState() == vbucket_state_active) {
435             vb.lockCollections().accumulateStats(collections, summary);
436         }
437     }
438 
439     const std::vector<Collections::CollectionEntry>& collections;
440     Collections::Summary summary;
441 };
442 
443 class CollectionDetailedVBucketVisitor : public VBucketVisitor {
444 public:
445     CollectionDetailedVBucketVisitor(const BucketStatCollector& collector)
446         : collector(collector) {
447     }
448 
449     void visitBucket(VBucket& vb) override {
450         success = vb.lockCollections().addCollectionStats(vb.getId(),
451                                                           collector) ||
452                   success;
453     }
454 
455     bool getSuccess() const {
456         return success;
457     }
458 
459 private:
460     const BucketStatCollector& collector;
461     bool success = true;
462 };
463 
464 class ScopeDetailedVBucketVisitor : public VBucketVisitor {
465 public:
466     ScopeDetailedVBucketVisitor(const BucketStatCollector& collector)
467         : collector(collector) {
468     }
469 
470     void visitBucket(VBucket& vb) override {
471         success = vb.lockCollections().addScopeStats(vb.getId(), collector) ||
472                   success;
473     }
474 
475     bool getSuccess() const {
476         return success;
477     }
478 
479 private:
480     const BucketStatCollector& collector;
481     bool success = true;
482 };
483 
484 // collections-details
485 //   - return top level stats (manager/manifest)
486 //   - iterate vbuckets returning detailed VB stats
487 // collections-details n
488 //   - return detailed VB stats for n only
489 // collections
490 //   - return top level stats (manager/manifest)
491 //   - return per collection item counts from all active VBs
492 cb::EngineErrorGetCollectionIDResult Collections::Manager::doCollectionStats(
493         KVBucket& bucket,
494         const BucketStatCollector& collector,
495         const std::string& statKey) {
496     std::optional<std::string> arg;
497     if (auto pos = statKey.find_first_of(' '); pos != std::string::npos) {
498         arg = statKey.substr(pos + 1);
499     }
500 
501     if (cb_isPrefix(statKey, "collections-details")) {
502         return doCollectionDetailStats(bucket, collector, arg);
503     }
504 
505     if (!arg) {
506         return doAllCollectionsStats(bucket, collector);
507     }
508     return doOneCollectionStats(bucket, collector, arg.value(), statKey);
509 }
510 
511 // handle key "collections-details"
512 cb::EngineErrorGetCollectionIDResult
513 Collections::Manager::doCollectionDetailStats(
514         KVBucket& bucket,
515         const BucketStatCollector& collector,
516         std::optional<std::string> arg) {
517     bool success = false;
518     if (arg) {
519         // VB may be encoded in statKey
520         uint16_t id;
521         try {
522             id = std::stoi(*arg);
523         } catch (const std::logic_error& e) {
524             EP_LOG_WARN(
525                     "Collections::Manager::doCollectionDetailStats invalid "
526                     "vbid:{}, exception:{}",
527                     *arg,
528                     e.what());
529             return cb::EngineErrorGetCollectionIDResult{
530                     cb::engine_errc::invalid_arguments};
531         }
532 
533         Vbid vbid = Vbid(id);
534         VBucketPtr vb = bucket.getVBucket(vbid);
535         if (!vb) {
536             return cb::EngineErrorGetCollectionIDResult{
537                     cb::engine_errc::not_my_vbucket};
538         }
539 
540         success = vb->lockCollections().addCollectionStats(vbid, collector);
541 
542     } else {
543         bucket.getCollectionsManager().addCollectionStats(bucket, collector);
544         CollectionDetailedVBucketVisitor visitor(collector);
545         bucket.visit(visitor);
546         success = visitor.getSuccess();
547     }
548     return {success ? cb::engine_errc::success : cb::engine_errc::failed,
549             cb::EngineErrorGetCollectionIDResult::allowSuccess{}};
550 }
551 
552 // handle key "collections"
553 cb::EngineErrorGetCollectionIDResult
554 Collections::Manager::doAllCollectionsStats(
555         KVBucket& bucket, const BucketStatCollector& collector) {
556     // no collection ID was provided
557 
558     // Do the high level stats (includes global count)
559     bucket.getCollectionsManager().addCollectionStats(bucket, collector);
560     auto cachedStats = getPerCollectionStats(bucket);
561     auto current = bucket.getCollectionsManager().currentManifest.rlock();
562     // do stats for every collection
563     for (const auto& entry : *current) {
564         // Access check for SimpleStats. Use testPrivilege as it won't log
565         if (collector.testPrivilegeForStat(entry.second.sid, entry.first) !=
566             cb::engine_errc::success) {
567             continue; // skip this collection
568         }
569 
570         const auto scopeItr = current->findScope(entry.second.sid);
571         Expects(scopeItr != current->endScopes());
572         cachedStats.addStatsForCollection(
573                 scopeItr->second.name, entry.second, collector);
574     }
575     return {cb::engine_errc::success,
576             cb::EngineErrorGetCollectionIDResult::allowSuccess{}};
577 }
578 
579 // handle key "collections <path>" or "collections-byid"
580 cb::EngineErrorGetCollectionIDResult Collections::Manager::doOneCollectionStats(
581         KVBucket& bucket,
582         const BucketStatCollector& collector,
583         const std::string& arg,
584         const std::string& statKey) {
585     cb::EngineErrorGetCollectionIDResult res{cb::engine_errc::failed};
586     // An argument was provided, maybe an id or a 'path'
587     if (cb_isPrefix(statKey, "collections-byid")) {
588         CollectionID cid;
589         // provided argument should be a hex collection ID N, 0xN or 0XN
590         try {
591             cid = std::stoi(arg, nullptr, 16);
592         } catch (const std::logic_error& e) {
593             EP_LOG_WARN(
594                     "Collections::Manager::doOneCollectionStats invalid "
595                     "collection arg:{}, exception:{}",
596                     arg,
597                     e.what());
598             return cb::EngineErrorGetCollectionIDResult{
599                     cb::engine_errc::invalid_arguments};
600         }
601         // Collection's scope is needed for privilege check
602         auto [manifestUid, scope] =
603                 bucket.getCollectionsManager().getScopeID(cid);
604         if (scope) {
605             res = {manifestUid, scope.value(), cid};
606         } else {
607             return {cb::engine_errc::unknown_collection, manifestUid};
608         }
609     } else {
610         // provided argument should be a collection path
611         res = bucket.getCollectionsManager().getCollectionID(arg);
612         if (res.result != cb::engine_errc::success) {
613             EP_LOG_WARN(
614                     "Collections::Manager::doOneCollectionStats could not "
615                     "find "
616                     "collection arg:{} error:{}",
617                     arg,
618                     res.result);
619             return res;
620         }
621     }
622 
623     // Access check for SimpleStats
624     res.result = collector.testPrivilegeForStat(res.getScopeId(),
625                                                 res.getCollectionId());
626     if (res.result != cb::engine_errc::success) {
627         return res;
628     }
629 
630     // Take a copy of the two items needed from the manifest and then release
631     // the lock before vbucket visiting.
632     std::string scopeName;
633     Collections::CollectionEntry entry;
634     {
635         auto current = bucket.getCollectionsManager().currentManifest.rlock();
636         auto collectionItr = current->findCollection(res.getCollectionId());
637 
638         if (collectionItr == current->end()) {
639             EP_LOG_WARN(
640                     "Collections::Manager::doOneCollectionStats unknown "
641                     "collection arg:{} cid:{}",
642                     arg,
643                     res.getCollectionId().to_string());
644             return {cb::engine_errc::unknown_collection, current->getUid()};
645         }
646         auto scopeItr = current->findScope(collectionItr->second.sid);
647         Expects(scopeItr != current->endScopes());
648         scopeName = scopeItr->second.name;
649         entry = collectionItr->second;
650     }
651 
652     // Visit the vbuckets and generate the stat payload
653     auto cachedStats = getPerCollectionStats({entry}, bucket);
654     cachedStats.addStatsForCollection(scopeName, entry, collector);
655     return res;
656 }
657 
658 // scopes-details
659 //   - return top level stats (manager/manifest)
660 //   - iterate vbucket returning detailed VB stats
661 // scopes-details n
662 //   - return detailed VB stats for n only
663 // scopes
664 //   - return top level stats (manager/manifest)
665 //   - return number of collections from all active VBs
666 cb::EngineErrorGetScopeIDResult Collections::Manager::doScopeStats(
667         KVBucket& bucket,
668         const BucketStatCollector& collector,
669         const std::string& statKey) {
670     std::optional<std::string> arg;
671     if (auto pos = statKey.find_first_of(' '); pos != std::string_view::npos) {
672         arg = statKey.substr(pos + 1);
673     }
674     if (cb_isPrefix(statKey, "scopes-details")) {
675         return doScopeDetailStats(bucket, collector, arg);
676     }
677 
678     if (!arg) {
679         return doAllScopesStats(bucket, collector);
680     }
681 
682     return doOneScopeStats(bucket, collector, arg.value(), statKey);
683 }
684 
685 // handler for "scope-details"
686 cb::EngineErrorGetScopeIDResult Collections::Manager::doScopeDetailStats(
687         KVBucket& bucket,
688         const BucketStatCollector& collector,
689         std::optional<std::string> arg) {
690     bool success = true;
691     if (arg) {
692         // VB may be encoded in statKey
693         uint16_t id;
694         try {
695             id = std::stoi(*arg);
696         } catch (const std::logic_error& e) {
697             EP_LOG_WARN(
698                     "Collections::Manager::doScopeDetailStats invalid "
699                     "vbid:{}, exception:{}",
700                     *arg,
701                     e.what());
702             return cb::EngineErrorGetScopeIDResult{
703                     cb::engine_errc::invalid_arguments};
704         }
705 
706         Vbid vbid = Vbid(id);
707         VBucketPtr vb = bucket.getVBucket(vbid);
708         if (!vb) {
709             return cb::EngineErrorGetScopeIDResult{
710                     cb::engine_errc::not_my_vbucket};
711         }
712         success = vb->lockCollections().addScopeStats(vbid, collector);
713     } else {
714         bucket.getCollectionsManager().addScopeStats(bucket, collector);
715         ScopeDetailedVBucketVisitor visitor(collector);
716         bucket.visit(visitor);
717         success = visitor.getSuccess();
718     }
719     return {success ? cb::engine_errc::success : cb::engine_errc::failed,
720             cb::EngineErrorGetScopeIDResult::allowSuccess{}};
721 }
722 
723 // handler for "scopes"
724 cb::EngineErrorGetScopeIDResult Collections::Manager::doAllScopesStats(
725         KVBucket& bucket, const BucketStatCollector& collector) {
726     auto cachedStats = getPerCollectionStats(bucket);
727 
728     // Do the high level stats (includes number of collections)
729     bucket.getCollectionsManager().addScopeStats(bucket, collector);
730     auto current = bucket.getCollectionsManager().currentManifest.rlock();
731     for (auto itr = current->beginScopes(); itr != current->endScopes();
732          ++itr) {
733         // Access check for SimpleStats. Use testPrivilege as it won't log
734         if (collector.testPrivilegeForStat(itr->first, {}) !=
735             cb::engine_errc::success) {
736             continue; // skip this scope
737         }
738 
739         cachedStats.addStatsForScope(itr->first,
740                                      itr->second.name,
741                                      itr->second.collections,
742                                      collector);
743     }
744     return {cb::engine_errc::success,
745             cb::EngineErrorGetScopeIDResult::allowSuccess{}};
746 }
747 
748 // handler for "scopes name" or "scopes byid id"
749 cb::EngineErrorGetScopeIDResult Collections::Manager::doOneScopeStats(
750         KVBucket& bucket,
751         const BucketStatCollector& collector,
752         const std::string& arg,
753         const std::string& statKey) {
754     cb::EngineErrorGetScopeIDResult res{cb::engine_errc::failed};
755     if (cb_isPrefix(statKey, "scopes-byid")) {
756         ScopeID scopeID;
757         // provided argument should be a hex scope ID N, 0xN or 0XN
758         try {
759             scopeID = std::stoi(arg, nullptr, 16);
760         } catch (const std::logic_error& e) {
761             EP_LOG_WARN(
762                     "Collections::Manager::doOneScopeStats invalid "
763                     "scope arg:{}, exception:{}",
764                     arg,
765                     e.what());
766             return cb::EngineErrorGetScopeIDResult{
767                     cb::engine_errc::invalid_arguments};
768         }
769         res = bucket.getCollectionsManager().isScopeIDValid(scopeID);
770     } else {
771         // provided argument should be a scope name
772         res = bucket.getCollectionsManager().getScopeID(arg);
773     }
774 
775     if (res.result != cb::engine_errc::success) {
776         return res;
777     }
778 
779     // Access check for SimpleStats
780     res.result = collector.testPrivilegeForStat(res.getScopeId(), {});
781     if (res.result != cb::engine_errc::success) {
782         return res;
783     }
784 
785     // Take a copy of the two items needed from the manifest and then release
786     // the lock before vbucket visiting.
787     std::string scopeName;
788     std::vector<Collections::CollectionEntry> scopeCollections;
789     {
790         auto current = bucket.getCollectionsManager().currentManifest.rlock();
791         auto scopeItr = current->findScope(res.getScopeId());
792 
793         if (scopeItr == current->endScopes()) {
794             EP_LOG_WARN(
795                     "Collections::Manager::doOneScopeStats unknown "
796                     "scope arg:{} sid:{}",
797                     arg,
798                     res.getScopeId().to_string());
799             return cb::EngineErrorGetScopeIDResult{current->getUid()};
800         }
801 
802         scopeName = scopeItr->second.name;
803         scopeCollections = scopeItr->second.collections;
804     }
805     auto cachedStats = getPerCollectionStats(scopeCollections, bucket);
806     cachedStats.addStatsForScope(
807             res.getScopeId(), scopeName, scopeCollections, collector);
808     // add stats for each collection in the scope
809     for (const auto& entry : scopeCollections) {
810         cachedStats.addStatsForCollection(scopeName, entry, collector);
811     }
812     return res;
813 }
814 
815 cb::engine_errc Collections::Manager::doPrometheusCollectionStats(
816         KVBucket& bucket, const BucketStatCollector& collector) {
817     return doAllCollectionsStats(bucket, collector).result;
818 }
819 
820 void Collections::Manager::dump() const {
821     std::cerr << *this;
822 }
823 
824 std::ostream& Collections::operator<<(std::ostream& os,
825                                       const Collections::Manager& manager) {
826     os << "Collections::Manager current:" << *manager.currentManifest.rlock()
827        << "\n";
828     os << "collectionMeta:\n" << *manager.collectionSMT.rlock() << "\n";
829     os << "scopeMeta:\n" << *manager.scopeSMT.rlock() << "\n";
830     return os;
831 }
832 
833 Collections::CachedStats Collections::Manager::getPerCollectionStats(
834         KVBucket& bucket) {
835     auto memUsed = bucket.getEPEngine().getEpStats().getAllCollectionsMemUsed();
836 
837     AllCollectionsGetStatsVBucketVisitor visitor;
838     bucket.visit(visitor);
839 
840     return {std::move(memUsed),
841             std::move(visitor.summary) /* accumulated collection stats */};
842 }
843 
844 Collections::CachedStats Collections::Manager::getPerCollectionStats(
845         const std::vector<Collections::CollectionEntry>& collections,
846         KVBucket& bucket) {
847     // Gather per-vbucket stats for the collections of interest
848     CollectionsGetStatsVBucketVisitor visitor{collections};
849     bucket.visit(visitor);
850 
851     // And the mem_used which is stored in EpStats
852     std::unordered_map<CollectionID, size_t> memUsed;
853     for (const auto& entry : collections) {
854         memUsed.emplace(entry.cid,
855                         bucket.getEPEngine().getEpStats().getCollectionMemUsed(
856                                 entry.cid));
857     }
858     return {std::move(memUsed),
859             std::move(visitor.summary) /* accumulated collection stats */};
860 }
861 
862 Collections::CachedStats::CachedStats(
863         std::unordered_map<CollectionID, size_t>&& colMemUsed,
864         std::unordered_map<CollectionID, AccumulatedStats>&& accumulatedStats)
865     : colMemUsed(std::move(colMemUsed)),
866       accumulatedStats(std::move(accumulatedStats)) {
867 }
868 
869 void Collections::CachedStats::addStatsForCollection(
870         std::string_view scopeName,
871         const CollectionEntry& collection,
872         const BucketStatCollector& collector) {
873     auto collectionC = collector.forScope(scopeName, collection.sid)
874                                .forCollection(collection.name, collection.cid);
875 
876     addAggregatedCollectionStats({collection.cid}, collectionC);
877 
878     using namespace cb::stats;
879     collectionC.addStat(Key::collection_name, collection.name);
880     collectionC.addStat(Key::collection_scope_name, scopeName);
881 
882     // add ttl if valid
883     if (collection.maxTtl.has_value()) {
884         collectionC.addStat(Key::collection_maxTTL,
885                             collection.maxTtl.value().count());
886     }
887 }
888 
889 void Collections::CachedStats::addStatsForScope(
890         ScopeID sid,
891         std::string_view scopeName,
892         const std::vector<Collections::CollectionEntry>& scopeCollections,
893         const BucketStatCollector& collector) {
894     auto scopeC = collector.forScope(scopeName, sid);
895     std::vector<CollectionID> collections;
896     collections.reserve(scopeCollections.size());
897 
898     // get the CollectionIDs - extract the keys from the map
899     for (const auto& entry : scopeCollections) {
900         collections.push_back(entry.cid);
901     }
902     addAggregatedCollectionStats(collections, scopeC);
903 
904     using namespace cb::stats;
905     // add scope name
906     scopeC.addStat(Key::scope_name, scopeName);
907     // add scope collection count
908     scopeC.addStat(Key::scope_collection_count, scopeCollections.size());
909 }
910 
911 void Collections::CachedStats::addAggregatedCollectionStats(
912         const std::vector<CollectionID>& cids, const StatCollector& collector) {
913     size_t memUsed = 0;
914     AccumulatedStats stats;
915 
916     for (const auto& cid : cids) {
917         memUsed += colMemUsed[cid];
918         stats += accumulatedStats[cid];
919     }
920 
921     using namespace cb::stats;
922 
923     collector.addStat(Key::collection_mem_used, memUsed);
924     collector.addStat(Key::collection_item_count, stats.itemCount);
925     collector.addStat(Key::collection_data_size, stats.diskSize);
926 
927     collector.addStat(Key::collection_ops_store, stats.opsStore);
928     collector.addStat(Key::collection_ops_delete, stats.opsDelete);
929     collector.addStat(Key::collection_ops_get, stats.opsGet);
930 }
931