1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "collections/vbucket_filter.h"
19 #include "collections/collections_dockey.h"
20 #include "collections/filter.h"
21 #include "collections/vbucket_manifest.h"
22 #include "dcp/response.h"
23 #include "statwriter.h"
24 
25 #include <platform/checked_snprintf.h>
26 #include <platform/make_unique.h>
27 
Filter(const Collections::Filter& filter, const Collections::VB::Manifest& manifest)28 Collections::VB::Filter::Filter(const Collections::Filter& filter,
29                                 const Collections::VB::Manifest& manifest)
30     : defaultAllowed(false),
31       passthrough(filter.isPassthrough()),
32       systemEventsAllowed(filter.allowSystemEvents()) {
33     // Don't build a filter if all documents are allowed
34     if (passthrough) {
35         defaultAllowed = true;
36         return;
37     }
38 
39     // Lock for reading and create a VB filter
40     auto rh = manifest.lock();
41     separator = rh.getSeparator();
42     if (filter.allowDefaultCollection()) {
43         if (rh.doesDefaultCollectionExist()) {
44             defaultAllowed = true;
45         } else {
46             // The VB::Manifest no longer has $default so don't filter it
47             LOG(EXTENSION_LOG_NOTICE,
48                 "VB::Filter::Filter: dropping $default as it's not in the "
49                 "VB::Manifest");
50         }
51     }
52 
53     for (const auto& c : filter.getFilter()) {
54         bool validFilterEntry = false;
55         if (c.second.is_initialized()) {
56             // strict name/uid lookup
57             validFilterEntry = rh.isCollectionOpen(Identifier{
58                     {c.first.data(), c.first.size()}, c.second.get()});
59         } else {
60             // just name lookup
61             validFilterEntry = rh.isCollectionOpen(
62                     cb::const_char_buffer{c.first.data(), c.first.size()});
63         }
64 
65         if (validFilterEntry) {
66             auto m = std::make_unique<std::string>(c.first);
67             cb::const_char_buffer b{m->data(), m->size()};
68             this->filter.emplace(b, std::move(m));
69         } else {
70             // The VB::Manifest doesn't have the collection, or the collection
71             // is deleted
72             if (c.second.is_initialized()) {
73                 LOG(EXTENSION_LOG_NOTICE,
74                     "VB::Filter::Filter: dropping collection:%s:%" PRIu64
75                     " as it's not open",
76                     c.first.c_str(),
77                     c.second.get());
78             } else {
79                 LOG(EXTENSION_LOG_NOTICE,
80                     "VB::Filter::Filter: dropping collection:%s as it's not "
81                     "open",
82                     c.first.c_str());
83             }
84         }
85     }
86 }
87 
checkAndUpdateSlow(const Item& item)88 bool Collections::VB::Filter::checkAndUpdateSlow(const Item& item) {
89     bool allowed = false;
90     if (item.getKey().getDocNamespace() == DocNamespace::Collections &&
91         !filter.empty()) {
92         // Collections require a look up in the filter
93         const auto cKey = Collections::DocKey::make(item.getKey(), separator);
94         allowed = filter.count(cKey.getCollection());
95     } else if (item.getKey().getDocNamespace() == DocNamespace::System) {
96         allowed = allowSystemEvent(item);
97 
98         if (item.isDeleted()) {
99             remove(item);
100         }
101     }
102     return allowed;
103 }
104 
remove(const Item& item)105 void Collections::VB::Filter::remove(const Item& item) {
106     if (passthrough) {
107         return;
108     }
109 
110     const auto cKey = Collections::DocKey::make(item.getKey());
111     if (cKey.getKey() == DefaultCollectionIdentifier) {
112         defaultAllowed = false;
113     } else {
114         filter.erase(cKey.getKey());
115     }
116 }
117 
empty() const118 bool Collections::VB::Filter::empty() const {
119     return filter.empty() && !defaultAllowed;
120 }
121 
allowSystemEvent(const Item& item) const122 bool Collections::VB::Filter::allowSystemEvent(const Item& item) const {
123     if (!systemEventsAllowed) {
124         return false;
125     }
126 
127     switch (SystemEvent(item.getFlags())) {
128     case SystemEvent::Collection: {
129         const auto cKey = Collections::DocKey::make(item.getKey());
130         if ((cKey.getKey() == DefaultCollectionIdentifier && defaultAllowed) ||
131             passthrough) {
132             return true;
133         } else {
134             // These events are sent only if they relate to a collection in the
135             // filter
136             return filter.count(cKey.getKey()) > 0;
137         }
138     }
139     case SystemEvent::DeleteCollectionHard:
140     case SystemEvent::DeleteCollectionSoft: {
141         return false;
142     }
143     case SystemEvent::CollectionsSeparatorChanged: {
144         // The separator changed event is sent if system events are allowed
145         return systemEventsAllowed;
146     }
147     default: {
148         throw std::invalid_argument(
149                 "VB::Filter::allowSystemEvent:: event unknown:" +
150                 std::to_string(int(item.getFlags())));
151     }
152     }
153 }
154 
addStats(ADD_STAT add_stat, const void* c, const std::string& prefix, uint16_t vb) const155 void Collections::VB::Filter::addStats(ADD_STAT add_stat,
156                                        const void* c,
157                                        const std::string& prefix,
158                                        uint16_t vb) const {
159     try {
160         const int bsize = 1024;
161         char buffer[bsize];
162         checked_snprintf(
163                 buffer, bsize, "%s:filter_%d_passthrough", prefix.c_str(), vb);
164         add_casted_stat(buffer, passthrough, add_stat, c);
165 
166         checked_snprintf(buffer,
167                          bsize,
168                          "%s:filter_%d_default_allowed",
169                          prefix.c_str(),
170                          vb);
171         add_casted_stat(buffer, defaultAllowed, add_stat, c);
172 
173         checked_snprintf(buffer,
174                          bsize,
175                          "%s:filter_%d_system_allowed",
176                          prefix.c_str(),
177                          vb);
178         add_casted_stat(buffer, systemEventsAllowed, add_stat, c);
179 
180         checked_snprintf(
181                 buffer, bsize, "%s:filter_%d_size", prefix.c_str(), vb);
182         add_casted_stat(buffer, filter.size(), add_stat, c);
183     } catch (std::exception& error) {
184         LOG(EXTENSION_LOG_WARNING,
185             "Collections::VB::Filter::addStats: %s:vb:%" PRIu16
186             " exception.what:%s",
187             prefix.c_str(),
188             vb,
189             error.what());
190     }
191 }
192 
dump() const193 void Collections::VB::Filter::dump() const {
194     std::cerr << *this << std::endl;
195 }
196 
operator <<( std::ostream& os, const Collections::VB::Filter& filter)197 std::ostream& Collections::VB::operator<<(
198         std::ostream& os, const Collections::VB::Filter& filter) {
199     os << "VBucket::Filter"
200        << ": defaultAllowed:" << filter.defaultAllowed
201        << ", passthrough:" << filter.passthrough
202        << ", systemEventsAllowed:" << filter.systemEventsAllowed;
203 
204     if (filter.separator.empty()) {
205         os << ", separator empty";
206     } else {
207         os << ", separator:" << filter.separator;
208     }
209 
210     os << ", filter.size:" << filter.filter.size() << std::endl;
211     for (auto& m : filter.filter) {
212         os << *m.second << std::endl;
213     }
214     return os;
215 }
216