1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2017 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "engine_wrapper.h"
19#include "executors.h"
20#include "utilities.h"
21#include <daemon/mcaudit.h>
22#include <daemon/mcbp.h>
23#include <logger/logger.h>
24#include <memcached/protocol_binary.h>
25#include <string>
26
27void dcp_open_executor(Cookie& cookie) {
28    using cb::mcbp::request::DcpOpenPayload;
29
30    auto& request = cookie.getHeader().getRequest();
31    auto ext = request.getExtdata();
32    const auto* payload = reinterpret_cast<const DcpOpenPayload*>(ext.data());
33    const uint32_t flags = payload->getFlags();
34
35    auto ret = cookie.swapAiostat(ENGINE_SUCCESS);
36
37    auto& connection = cookie.getConnection();
38    connection.enableDatatype(cb::mcbp::Feature::JSON);
39
40    const bool dcpNotifier =
41            (flags & DcpOpenPayload::Notifier) == DcpOpenPayload::Notifier;
42
43    if (ret == ENGINE_SUCCESS) {
44        cb::rbac::Privilege privilege = cb::rbac::Privilege::DcpProducer;
45        if (dcpNotifier) {
46            privilege = cb::rbac::Privilege::DcpConsumer;
47        }
48
49        ret = mcbp::checkPrivilege(cookie, privilege);
50
51        if (ret == ENGINE_SUCCESS) {
52            auto key = request.getKey();
53            auto value = request.getValue();
54
55            ret = dcpOpen(
56                    cookie,
57                    request.getOpaque(),
58                    payload->getSeqno(),
59                    flags,
60                    {reinterpret_cast<const char*>(key.data()), key.size()},
61                    {reinterpret_cast<const char*>(value.data()),
62                     value.size()});
63        }
64    }
65
66    ret = connection.remapErrorCode(ret);
67    switch (ret) {
68    case ENGINE_SUCCESS: {
69        const bool dcpXattrAware =
70                (flags & DcpOpenPayload::IncludeXattrs) != 0 &&
71                connection.selectedBucketIsXattrEnabled();
72        const bool dcpDeletedUserXattr =
73                (flags & DcpOpenPayload::IncludeDeletedUserXattrs) != 0 &&
74                connection.selectedBucketIsXattrEnabled();
75        const bool dcpNoValue = (flags & DcpOpenPayload::NoValue) != 0;
76        const bool dcpDeleteTimes =
77                (flags & DcpOpenPayload::IncludeDeleteTimes) != 0;
78        connection.setDcpXattrAware(dcpXattrAware);
79        connection.setDcpDeletedUserXattr(dcpDeletedUserXattr);
80        connection.setDcpNoValue(dcpNoValue);
81        connection.setDcpDeleteTimeEnabled(dcpDeleteTimes);
82        connection.setDCP(true);
83        connection.disableSaslAuth();
84
85        // String buffer with max length = total length of all possible contents
86        std::string logBuffer;
87
88        const bool dcpProducer =
89                (flags & DcpOpenPayload::Producer) == DcpOpenPayload::Producer;
90        if (dcpProducer) {
91            logBuffer.append("PRODUCER, ");
92        }
93        if (dcpNotifier) {
94            logBuffer.append("NOTIFIER, ");
95        }
96        if (dcpXattrAware) {
97            logBuffer.append("INCLUDE_XATTRS, ");
98        }
99        if (dcpNoValue) {
100            logBuffer.append("NO_VALUE, ");
101        }
102        if (dcpDeleteTimes) {
103            logBuffer.append("DELETE_TIMES, ");
104        }
105        if (connection.isDcpDeletedUserXattr()) {
106            logBuffer.append("INCLUDE_DELETED_USER_XATTRS, ");
107        }
108
109        // Remove trailing whitespace and comma
110        if (!logBuffer.empty()) {
111            logBuffer.resize(logBuffer.size() - 2);
112        }
113
114        LOG_INFO("{}: DCP connection opened successfully. {} {}",
115                 connection.getId(),
116                 logBuffer,
117                 connection.getDescription().c_str());
118
119        audit_dcp_open(connection);
120        cookie.sendResponse(cb::mcbp::Status::Success);
121        break;
122    }
123
124    case ENGINE_DISCONNECT:
125        connection.setState(StateMachine::State::closing);
126        break;
127
128    case ENGINE_EWOULDBLOCK:
129        cookie.setEwouldblock(true);
130        break;
131
132    default:
133        cookie.sendResponse(cb::engine_errc(ret));
134    }
135}
136