1042bf504SJim Walker/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2042bf504SJim Walker/*
3042bf504SJim Walker *     Copyright 2017 Couchbase, Inc
4042bf504SJim Walker *
5042bf504SJim Walker *   Licensed under the Apache License, Version 2.0 (the "License");
6042bf504SJim Walker *   you may not use this file except in compliance with the License.
7042bf504SJim Walker *   You may obtain a copy of the License at
8042bf504SJim Walker *
9042bf504SJim Walker *       http://www.apache.org/licenses/LICENSE-2.0
10042bf504SJim Walker *
11042bf504SJim Walker *   Unless required by applicable law or agreed to in writing, software
12042bf504SJim Walker *   distributed under the License is distributed on an "AS IS" BASIS,
13042bf504SJim Walker *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14042bf504SJim Walker *   See the License for the specific language governing permissions and
15042bf504SJim Walker *   limitations under the License.
16042bf504SJim Walker */
17042bf504SJim Walker
18042bf504SJim Walker#include "collections/vbucket_filter.h"
19042bf504SJim Walker#include "collections/collections_dockey.h"
20042bf504SJim Walker#include "collections/filter.h"
21042bf504SJim Walker#include "collections/vbucket_manifest.h"
22042bf504SJim Walker#include "dcp/response.h"
23042bf504SJim Walker#include "statwriter.h"
24042bf504SJim Walker
25042bf504SJim Walker#include <platform/checked_snprintf.h>
26042bf504SJim Walker#include <platform/make_unique.h>
27042bf504SJim Walker
28042bf504SJim WalkerCollections::VB::Filter::Filter(const Collections::Filter& filter,
29042bf504SJim Walker                                const Collections::VB::Manifest& manifest)
30042bf504SJim Walker    : defaultAllowed(false),
31042bf504SJim Walker      passthrough(filter.isPassthrough()),
32042bf504SJim Walker      systemEventsAllowed(filter.allowSystemEvents()) {
33042bf504SJim Walker    // Don't build a filter if all documents are allowed
34042bf504SJim Walker    if (passthrough) {
35042bf504SJim Walker        defaultAllowed = true;
36042bf504SJim Walker        return;
37042bf504SJim Walker    }
38042bf504SJim Walker
39042bf504SJim Walker    // Lock for reading and create a VB filter
40042bf504SJim Walker    auto rh = manifest.lock();
41042bf504SJim Walker    separator = rh.getSeparator();
42042bf504SJim Walker    if (filter.allowDefaultCollection()) {
43042bf504SJim Walker        if (rh.doesDefaultCollectionExist()) {
44042bf504SJim Walker            defaultAllowed = true;
45042bf504SJim Walker        } else {
46042bf504SJim Walker            // The VB::Manifest no longer has $default so don't filter it
47042bf504SJim Walker            LOG(EXTENSION_LOG_NOTICE,
48042bf504SJim Walker                "VB::Filter::Filter: dropping $default as it's not in the "
49042bf504SJim Walker                "VB::Manifest");
50042bf504SJim Walker        }
51042bf504SJim Walker    }
52042bf504SJim Walker
53042bf504SJim Walker    for (const auto& c : filter.getFilter()) {
541f8437cfSJim Walker        bool validFilterEntry = false;
551f8437cfSJim Walker        if (c.second.is_initialized()) {
561f8437cfSJim Walker            // strict name/uid lookup
571f8437cfSJim Walker            validFilterEntry = rh.isCollectionOpen(Identifier{
581f8437cfSJim Walker                    {c.first.data(), c.first.size()}, c.second.get()});
591f8437cfSJim Walker        } else {
601f8437cfSJim Walker            // just name lookup
611f8437cfSJim Walker            validFilterEntry = rh.isCollectionOpen(
621f8437cfSJim Walker                    cb::const_char_buffer{c.first.data(), c.first.size()});
631f8437cfSJim Walker        }
641f8437cfSJim Walker
651f8437cfSJim Walker        if (validFilterEntry) {
661f8437cfSJim Walker            auto m = std::make_unique<std::string>(c.first);
67042bf504SJim Walker            cb::const_char_buffer b{m->data(), m->size()};
68042bf504SJim Walker            this->filter.emplace(b, std::move(m));
69042bf504SJim Walker        } else {
701f8437cfSJim Walker            // The VB::Manifest doesn't have the collection, or the collection
715cf47c05SJim Walker            // is deleted
721f8437cfSJim Walker            if (c.second.is_initialized()) {
731f8437cfSJim Walker                LOG(EXTENSION_LOG_NOTICE,
741f8437cfSJim Walker                    "VB::Filter::Filter: dropping collection:%s:%" PRIu64
751f8437cfSJim Walker                    " as it's not open",
761f8437cfSJim Walker                    c.first.c_str(),
771f8437cfSJim Walker                    c.second.get());
781f8437cfSJim Walker            } else {
791f8437cfSJim Walker                LOG(EXTENSION_LOG_NOTICE,
801f8437cfSJim Walker                    "VB::Filter::Filter: dropping collection:%s as it's not "
811f8437cfSJim Walker                    "open",
821f8437cfSJim Walker                    c.first.c_str());
831f8437cfSJim Walker            }
84042bf504SJim Walker        }
85042bf504SJim Walker    }
86042bf504SJim Walker}
87042bf504SJim Walker
8812e87fb6SDave Rigbybool Collections::VB::Filter::checkAndUpdateSlow(const Item& item) {
89f8ac377cSJim Walker    bool allowed = false;
9012e87fb6SDave Rigby    if (item.getKey().getDocNamespace() == DocNamespace::Collections &&
9112e87fb6SDave Rigby        !filter.empty()) {
92042bf504SJim Walker        // Collections require a look up in the filter
93513d8e79SJim Walker        const auto cKey = Collections::DocKey::make(item.getKey(), separator);
94f8ac377cSJim Walker        allowed = filter.count(cKey.getCollection());
95f8ac377cSJim Walker    } else if (item.getKey().getDocNamespace() == DocNamespace::System) {
96f8ac377cSJim Walker        allowed = allowSystemEvent(item);
97f8ac377cSJim Walker
98f8ac377cSJim Walker        if (item.isDeleted()) {
99f8ac377cSJim Walker            remove(item);
100f8ac377cSJim Walker        }
101042bf504SJim Walker    }
102f8ac377cSJim Walker    return allowed;
103042bf504SJim Walker}
104042bf504SJim Walker
105f8ac377cSJim Walkervoid Collections::VB::Filter::remove(const Item& item) {
106042bf504SJim Walker    if (passthrough) {
107f8ac377cSJim Walker        return;
108042bf504SJim Walker    }
109042bf504SJim Walker
110dea18910SJim Walker    const auto cKey = Collections::DocKey::make(item.getKey());
111f8ac377cSJim Walker    if (cKey.getKey() == DefaultCollectionIdentifier) {
112042bf504SJim Walker        defaultAllowed = false;
113042bf504SJim Walker    } else {
114f8ac377cSJim Walker        filter.erase(cKey.getKey());
115042bf504SJim Walker    }
116f8ac377cSJim Walker}
117042bf504SJim Walker
118f8ac377cSJim Walkerbool Collections::VB::Filter::empty() const {
119042bf504SJim Walker    return filter.empty() && !defaultAllowed;
120042bf504SJim Walker}
121042bf504SJim Walker
122513d8e79SJim Walkerbool Collections::VB::Filter::allowSystemEvent(const Item& item) const {
123f8ac377cSJim Walker    if (!systemEventsAllowed) {
124f8ac377cSJim Walker        return false;
125f8ac377cSJim Walker    }
126f8ac377cSJim Walker
127513d8e79SJim Walker    switch (SystemEvent(item.getFlags())) {
128513d8e79SJim Walker    case SystemEvent::Collection: {
129dea18910SJim Walker        const auto cKey = Collections::DocKey::make(item.getKey());
130513d8e79SJim Walker        if ((cKey.getKey() == DefaultCollectionIdentifier && defaultAllowed) ||
131042bf504SJim Walker            passthrough) {
132042bf504SJim Walker            return true;
133042bf504SJim Walker        } else {
134042bf504SJim Walker            // These events are sent only if they relate to a collection in the
135042bf504SJim Walker            // filter
136513d8e79SJim Walker            return filter.count(cKey.getKey()) > 0;
137042bf504SJim Walker        }
138042bf504SJim Walker    }
139513d8e79SJim Walker    case SystemEvent::DeleteCollectionHard:
140513d8e79SJim Walker    case SystemEvent::DeleteCollectionSoft: {
141513d8e79SJim Walker        return false;
142513d8e79SJim Walker    }
143513d8e79SJim Walker    case SystemEvent::CollectionsSeparatorChanged: {
144042bf504SJim Walker        // The separator changed event is sent if system events are allowed
145042bf504SJim Walker        return systemEventsAllowed;
146042bf504SJim Walker    }
147513d8e79SJim Walker    default: {
148513d8e79SJim Walker        throw std::invalid_argument(
149513d8e79SJim Walker                "VB::Filter::allowSystemEvent:: event unknown:" +
150513d8e79SJim Walker                std::to_string(int(item.getFlags())));
151513d8e79SJim Walker    }
152513d8e79SJim Walker    }
153042bf504SJim Walker}
154042bf504SJim Walker
155042bf504SJim Walkervoid Collections::VB::Filter::addStats(ADD_STAT add_stat,
156042bf504SJim Walker                                       const void* c,
157042bf504SJim Walker                                       const std::string& prefix,
158042bf504SJim Walker                                       uint16_t vb) const {
159042bf504SJim Walker    try {
160042bf504SJim Walker        const int bsize = 1024;
161042bf504SJim Walker        char buffer[bsize];
162042bf504SJim Walker        checked_snprintf(
163042bf504SJim Walker                buffer, bsize, "%s:filter_%d_passthrough", prefix.c_str(), vb);
164042bf504SJim Walker        add_casted_stat(buffer, passthrough, add_stat, c);
165042bf504SJim Walker
166042bf504SJim Walker        checked_snprintf(buffer,
167042bf504SJim Walker                         bsize,
168042bf504SJim Walker                         "%s:filter_%d_default_allowed",
169042bf504SJim Walker                         prefix.c_str(),
170042bf504SJim Walker                         vb);
171042bf504SJim Walker        add_casted_stat(buffer, defaultAllowed, add_stat, c);
172042bf504SJim Walker
173f8ac377cSJim Walker        checked_snprintf(buffer,
174f8ac377cSJim Walker                         bsize,
175f8ac377cSJim Walker                         "%s:filter_%d_system_allowed",
176f8ac377cSJim Walker                         prefix.c_str(),
177f8ac377cSJim Walker                         vb);
178f8ac377cSJim Walker        add_casted_stat(buffer, systemEventsAllowed, add_stat, c);
179f8ac377cSJim Walker
180042bf504SJim Walker        checked_snprintf(
181042bf504SJim Walker                buffer, bsize, "%s:filter_%d_size", prefix.c_str(), vb);
182042bf504SJim Walker        add_casted_stat(buffer, filter.size(), add_stat, c);
183042bf504SJim Walker    } catch (std::exception& error) {
184042bf504SJim Walker        LOG(EXTENSION_LOG_WARNING,
185042bf504SJim Walker            "Collections::VB::Filter::addStats: %s:vb:%" PRIu16
186042bf504SJim Walker            " exception.what:%s",
187042bf504SJim Walker            prefix.c_str(),
188042bf504SJim Walker            vb,
189042bf504SJim Walker            error.what());
190042bf504SJim Walker    }
191042bf504SJim Walker}
192042bf504SJim Walker
193042bf504SJim Walkervoid Collections::VB::Filter::dump() const {
194042bf504SJim Walker    std::cerr << *this << std::endl;
195042bf504SJim Walker}
196042bf504SJim Walker
197042bf504SJim Walkerstd::ostream& Collections::VB::operator<<(
198042bf504SJim Walker        std::ostream& os, const Collections::VB::Filter& filter) {
199042bf504SJim Walker    os << "VBucket::Filter"
200042bf504SJim Walker       << ": defaultAllowed:" << filter.defaultAllowed
201042bf504SJim Walker       << ", passthrough:" << filter.passthrough
202042bf504SJim Walker       << ", systemEventsAllowed:" << filter.systemEventsAllowed;
203042bf504SJim Walker
204042bf504SJim Walker    if (filter.separator.empty()) {
205042bf504SJim Walker        os << ", separator empty";
206042bf504SJim Walker    } else {
207042bf504SJim Walker        os << ", separator:" << filter.separator;
208042bf504SJim Walker    }
209042bf504SJim Walker
210042bf504SJim Walker    os << ", filter.size:" << filter.filter.size() << std::endl;
211042bf504SJim Walker    for (auto& m : filter.filter) {
212042bf504SJim Walker        os << *m.second << std::endl;
213042bf504SJim Walker    }
214042bf504SJim Walker    return os;
21512e87fb6SDave Rigby}