1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "systemevent.h"
19 
20 #include "collections/collections_types.h"
21 #include "collections/vbucket_manifest.h"
22 #include "dcp/response.h"
23 #include "item.h"
24 #include "kvstore.h"
25 
make( SystemEvent se, const std::string& keyExtra, size_t itemSize, OptionalSeqno seqno)26 std::unique_ptr<Item> SystemEventFactory::make(
27         SystemEvent se,
28         const std::string& keyExtra,
29         size_t itemSize,
30         OptionalSeqno seqno) {
31     std::string key = makeKey(se, keyExtra);
32 
33     auto item = std::make_unique<Item>(DocKey(key, DocNamespace::System),
34                                        uint32_t(se) /*flags*/,
35                                        0 /*exptime*/,
36                                        nullptr, /*no data to copy-in*/
37                                        itemSize);
38 
39     if (seqno) {
40         item->setBySeqno(seqno.value());
41     }
42 
43     return item;
44 }
45 
make(const DocKey& key, SystemEvent se)46 std::unique_ptr<Item> SystemEventFactory::make(const DocKey& key,
47                                                SystemEvent se) {
48     if (key.getDocNamespace() != DocNamespace::System) {
49         throw std::invalid_argument(
50                 "SystemEventFactory::::make cannot use key with namespace: " +
51                 std::to_string(int(key.getDocNamespace())));
52     }
53 
54     auto item = std::make_unique<Item>(key,
55                                        uint32_t(se) /*flags*/,
56                                        0 /*exptime*/,
57                                        nullptr, /*no data to copy-in*/
58                                        0);
59 
60     return item;
61 }
62 
63 // Build a key using the separator so we can split it if needed
makeKey(SystemEvent se, const std::string& keyExtra)64 std::string SystemEventFactory::makeKey(SystemEvent se,
65                                         const std::string& keyExtra) {
66     std::string key;
67     switch (se) {
68     case SystemEvent::Collection:
69         // $collection:<collection-name>
70         key = Collections::SystemEventPrefixWithSeparator + keyExtra;
71         break;
72     case SystemEvent::DeleteCollectionSoft:
73     case SystemEvent::DeleteCollectionHard: {
74         // $collections:delete:<collection-name>
75         key = Collections::DeleteKey + keyExtra;
76         break;
77     }
78     case SystemEvent::CollectionsSeparatorChanged: {
79         // $collections_separator:<key-extra>
80         key = Collections::SeparatorChangePrefixWithSeparator + keyExtra;
81         break;
82     }
83     }
84     return key;
85 }
86 
process(const queued_item& item)87 ProcessStatus SystemEventFlush::process(const queued_item& item) {
88     if (item->getOperation() != queue_op::system_event) {
89         return ProcessStatus::Continue;
90     }
91 
92     switch (SystemEvent(item->getFlags())) {
93     case SystemEvent::Collection: {
94         saveCollectionsManifestItem(item); // Updates manifest
95         return ProcessStatus::Continue; // And flushes an item
96     }
97     case SystemEvent::CollectionsSeparatorChanged: {
98         if (!item->isDeleted()) {
99             saveCollectionsManifestItem(item); // Updates manifest
100         }
101         return ProcessStatus::Continue; // And flushes an item
102     }
103     case SystemEvent::DeleteCollectionHard:
104     case SystemEvent::DeleteCollectionSoft: {
105         saveCollectionsManifestItem(item); // Updates manifest
106         return ProcessStatus::Skip; // But skips flushing the item
107     }
108     }
109 
110     throw std::invalid_argument("SystemEventFlush::process unknown event " +
111                                 std::to_string(item->getFlags()));
112 }
113 
getCollectionsManifestItem() const114 const Item* SystemEventFlush::getCollectionsManifestItem() const {
115     return collectionManifestItem.get();
116 }
117 
saveCollectionsManifestItem(const queued_item& item)118 void SystemEventFlush::saveCollectionsManifestItem(const queued_item& item) {
119     // For a given checkpoint only the highest system event should be the
120     // one which writes the manifest
121     if ((collectionManifestItem &&
122          item->getBySeqno() > collectionManifestItem->getBySeqno()) ||
123         !collectionManifestItem) {
124         collectionManifestItem = item;
125     }
126 }
127 
process(const Item& item)128 ProcessStatus SystemEventReplicate::process(const Item& item) {
129     if (item.shouldReplicate()) {
130         if (item.getOperation() != queue_op::system_event) {
131             // Not a system event, so no further filtering
132             return ProcessStatus::Continue;
133         } else {
134             switch (SystemEvent(item.getFlags())) {
135             case SystemEvent::Collection:
136             case SystemEvent::CollectionsSeparatorChanged:
137                 // Collection and change separator events both replicate
138                 return ProcessStatus::Continue;
139             case SystemEvent::DeleteCollectionHard:
140             case SystemEvent::DeleteCollectionSoft: {
141                 // Delete H/S do not replicate
142                 return ProcessStatus::Skip;
143             }
144             }
145         }
146     }
147     return ProcessStatus::Skip;
148 }
149 
make( uint32_t opaque, const queued_item& item)150 std::unique_ptr<SystemEventProducerMessage> SystemEventProducerMessage::make(
151         uint32_t opaque, const queued_item& item) {
152     switch (SystemEvent(item->getFlags())) {
153     case SystemEvent::Collection: {
154         // Note: constructor is private and make_unique is a pain to make friend
155         return std::unique_ptr<CollectionsProducerMessage>{
156                 new CollectionsProducerMessage(
157                         opaque,
158                         item,
159                         Collections::VB::Manifest::getSystemEventData(
160                                 {item->getData(), item->getNBytes()}))};
161     }
162     case SystemEvent::CollectionsSeparatorChanged: {
163         // Note: constructor is private and make_unique is a pain to make friend
164         return std::unique_ptr<ChangeSeparatorProducerMessage>{
165                 new ChangeSeparatorProducerMessage(
166                         opaque,
167                         item,
168                         Collections::VB::Manifest::getSystemEventSeparatorData(
169                                 {item->getData(), item->getNBytes()}))};
170     }
171     case SystemEvent::DeleteCollectionHard:
172     case SystemEvent::DeleteCollectionSoft:
173         break;
174     }
175 
176     throw std::logic_error("SystemEventProducerMessage::make not valid for " +
177                            std::to_string(item->getFlags()));
178 }
179