1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2017 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <daemon/buckets.h>
19#include <daemon/cccp_notification_task.h>
20#include <daemon/executorpool.h>
21#include <daemon/mcbp.h>
22#include <daemon/memcached.h>
23#include <daemon/session_cas.h>
24
25#include "executors.h"
26
27void get_cluster_config_executor(Cookie& cookie) {
28    auto& connection = cookie.getConnection();
29    auto& bucket = connection.getBucket();
30    if (bucket.type == BucketType::NoBucket) {
31        if (connection.isXerrorSupport()) {
32            cookie.setErrorContext("No bucket selected");
33            cookie.sendResponse(cb::mcbp::Status::NoBucket);
34        } else {
35            LOG_WARNING(
36                    "{}: Can't get cluster configuration without "
37                    "selecting a bucket. Disconnecting {}",
38                    connection.getId(),
39                    connection.getDescription());
40            connection.setState(McbpStateMachine::State::closing);
41        }
42        return;
43    }
44
45    auto pair = bucket.clusterConfiguration.getConfiguration();
46    if (pair.first == ClusterConfiguration::NoConfiguration) {
47        cookie.sendResponse(cb::mcbp::Status::KeyEnoent);
48    } else {
49        cookie.sendResponse(cb::mcbp::Status::Success,
50                            {},
51                            {},
52                            {pair.second->data(), pair.second->size()},
53                            cb::mcbp::Datatype::JSON,
54                            0);
55        connection.setClustermapRevno(pair.first);
56    }
57}
58
59void set_cluster_config_executor(Cookie& cookie) {
60    auto& connection = cookie.getConnection();
61    auto& bucket = connection.getBucket();
62    if (bucket.type == BucketType::NoBucket) {
63        if (connection.isXerrorSupport()) {
64            cookie.setErrorContext("No bucket selected");
65            cookie.sendResponse(cb::mcbp::Status::NoBucket);
66        } else {
67            LOG_WARNING(
68                    "{}: Can't set cluster configuration without "
69                    "selecting a bucket. Disconnecting {}",
70                    connection.getId(),
71                    connection.getDescription());
72            connection.setState(McbpStateMachine::State::closing);
73        }
74        return;
75    }
76
77    // First validate that the provided configuration is a valid payload
78    const auto* req = reinterpret_cast<const cb::mcbp::Request*>(
79            cookie.getPacketAsVoidPtr());
80    auto cas = req->getCas();
81
82    // verify that this is a legal session cas:
83    if (!session_cas.increment_session_counter(cas)) {
84        cookie.sendResponse(cb::mcbp::Status::KeyEexists);
85        return;
86    }
87
88    try {
89        auto payload = req->getValue();
90        cb::const_char_buffer conf{
91                reinterpret_cast<const char*>(payload.data()), payload.size()};
92        bucket.clusterConfiguration.setConfiguration(conf);
93        cookie.setCas(cas);
94        cookie.sendResponse(cb::mcbp::Status::Success);
95
96        const long revision =
97                bucket.clusterConfiguration.getConfiguration().first;
98
99        LOG_INFO(
100                "{}: {} Updated cluster configuration for bucket [{}]. New "
101                "revision: {}",
102                connection.getId(),
103                connection.getDescription(),
104                bucket.name,
105                revision);
106
107        // Start an executor job to walk through the connections and tell
108        // them to push new clustermaps
109        std::shared_ptr<Task> task;
110        task = std::make_shared<CccpNotificationTask>(
111                connection.getBucketIndex(), revision);
112        std::lock_guard<std::mutex> guard(task->getMutex());
113        executorPool->schedule(task, true);
114    } catch (const std::invalid_argument& e) {
115        LOG_WARNING(
116                "{}: {} Failed to update cluster configuration for bucket "
117                "[{}] - {}",
118                connection.getId(),
119                connection.getDescription(),
120                bucket.name,
121                e.what());
122        cookie.setErrorContext(e.what());
123        cookie.sendResponse(cb::mcbp::Status::Einval);
124    } catch (const std::exception& e) {
125        LOG_WARNING(
126                "{}: {} Failed to update cluster configuration for bucket "
127                "[{}] - {}",
128                connection.getId(),
129                connection.getDescription(),
130                bucket.name,
131                e.what());
132        cookie.setErrorContext(e.what());
133        cookie.sendResponse(cb::mcbp::Status::Einternal);
134    }
135
136    session_cas.decrement_session_counter();
137}
138