1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2014-2020 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 /**
19  * This file contains the CCCP (Cluster Carrier Configuration Protocol)
20  * implementation of the confmon provider. It utilizes a memcached connection
21  * to retrieve configuration information.
22  */
23 
24 #include "internal.h"
25 #include "clconfig.h"
26 #include "packetutils.h"
27 #include <mcserver/negotiate.h>
28 #include <lcbio/lcbio.h>
29 #include <lcbio/timer-cxx.h>
30 #include <lcbio/ssl.h>
31 #include "ctx-log-inl.h"
32 
33 #include <cstring>
34 
35 #define LOGFMT CTX_LOGFMT
36 #define LOGID(p) CTX_LOGID(p->ioctx)
37 #define LOGARGS(cccp, lvl) cccp->parent->settings, "cccp", LCB_LOG_##lvl, __FILE__, __LINE__
38 
39 struct CccpCookie;
40 
41 using namespace lcb::clconfig;
42 
43 struct CccpProvider : public Provider {
44     explicit CccpProvider(Confmon *);
45     ~CccpProvider() override;
46 
47     /**
48      * Stops the current request.
49      * @param is_clean Whether the state of the current request is 'clean',
50      *        i.e. whether we are stopping because of an error condition, or
51      *        because we have received a successful response.
52      */
53     void stop_current_request(bool is_clean);
54     lcb_STATUS schedule_next_request(lcb_STATUS err, bool can_rollover);
55     lcb_STATUS mcio_error(lcb_STATUS err);
on_timeoutCccpProvider56     void on_timeout()
57     {
58         mcio_error(LCB_ERR_TIMEOUT);
59     }
60     lcb_STATUS update(const char *host, const char *data);
61     void request_config();
62     void on_io_read();
63 
64     bool pause() override;
65     void configure_nodes(const lcb::Hostlist &) override;
66     void config_updated(lcbvb_CONFIG *) override;
67     void dump(FILE *) const override;
68     lcb_STATUS refresh() override;
69 
70     ConfigInfo *get_cached() override
71     {
72         return config;
73     }
74 
75     const lcb::Hostlist *get_nodes() const override
76     {
77         return nodes;
78     }
79 
80     void enable(void *arg) override
81     {
82         instance = reinterpret_cast<lcb_INSTANCE *>(arg);
83         Provider::enable();
84     }
85 
86     // Whether there is a pending CCCP config request.
has_pending_requestCccpProvider87     bool has_pending_request() const
88     {
89         return creq != nullptr || cmdcookie != nullptr || ioctx != nullptr;
90     }
91 
92     lcb::Hostlist *nodes;
93     ConfigInfo *config;
94     lcb::io::Timer<CccpProvider, &CccpProvider::on_timeout> timer;
95     lcb_INSTANCE *instance;
96     lcb::io::ConnectionRequest *creq{};
97     lcbio_CTX *ioctx;
98     CccpCookie *cmdcookie;
99 };
100 
101 struct CccpCookie {
102     CccpProvider *parent;
103     bool active;
104     lcb_STATUS select_rc;
105     int refcnt{0};
CccpCookieCccpCookie106     explicit CccpCookie(CccpProvider *parent_) : parent(parent_), active(true), select_rc(LCB_SUCCESS) {}
107 
increfCccpCookie108     void incref()
109     {
110         ++refcnt;
111     }
112 
decrefCccpCookie113     void decref()
114     {
115         --refcnt;
116         if (refcnt <= 0) {
117             delete this;
118         }
119     }
120 };
121 
122 static void io_error_handler(lcbio_CTX *, lcb_STATUS);
123 static void io_read_handler(lcbio_CTX *, unsigned nr);
124 static void on_connected(lcbio_SOCKET *, void *, lcb_STATUS, lcbio_OSERR);
125 
pooled_close_cb(lcbio_SOCKET *sock, int reusable, void *arg)126 static void pooled_close_cb(lcbio_SOCKET *sock, int reusable, void *arg)
127 {
128     bool *ru_ex = reinterpret_cast<bool *>(arg);
129     lcbio_ref(sock);
130     if (reusable && *ru_ex) {
131         lcb::io::Pool::put(sock);
132     } else {
133         lcb::io::Pool::discard(sock);
134     }
135 }
136 
stop_current_request(bool is_clean)137 void CccpProvider::stop_current_request(bool is_clean)
138 {
139     if (cmdcookie) {
140         cmdcookie->active = false;
141         cmdcookie = nullptr;
142     }
143 
144     lcb::io::ConnectionRequest::cancel(&creq);
145 
146     if (ioctx) {
147         lcbio_ctx_close(ioctx, pooled_close_cb, &is_clean);
148         ioctx = nullptr;
149     }
150 }
151 
schedule_next_request(lcb_STATUS err, bool can_rollover)152 lcb_STATUS CccpProvider::schedule_next_request(lcb_STATUS err, bool can_rollover)
153 {
154     lcb_host_t *next_host = nodes->next(can_rollover);
155     if (!next_host) {
156         timer.cancel();
157         parent->provider_failed(this, err);
158         return err;
159     }
160 
161     lcb::Server *server = instance->find_server(*next_host);
162     if (server) {
163         cmdcookie = new CccpCookie(this);
164         lcb_log(LOGARGS(this, TRACE), "Re-Issuing CCCP Command on server struct %p (" LCB_HOST_FMT ")", (void *)server,
165                 LCB_HOST_ARG(this->parent->settings, next_host));
166         timer.rearm(settings().config_node_timeout);
167         if (settings().bucket && settings().bucket[0] != '\0' && !server->selected_bucket) {
168             cmdcookie->incref();
169             instance->select_bucket(cmdcookie, server);
170         }
171         cmdcookie->incref();
172         instance->request_config(cmdcookie, server);
173 
174     } else {
175 
176         lcb_log(LOGARGS(this, INFO), "Requesting connection to node " LCB_HOST_FMT " for CCCP configuration",
177                 LCB_HOST_ARG(this->parent->settings, next_host));
178         creq = instance->memd_sockpool->get(*next_host, settings().config_node_timeout, on_connected, this);
179     }
180 
181     return LCB_SUCCESS;
182 }
183 
mcio_error(lcb_STATUS err)184 lcb_STATUS CccpProvider::mcio_error(lcb_STATUS err)
185 {
186     if (err != LCB_ERR_UNSUPPORTED_OPERATION) {
187         lcb_log(LOGARGS(this, ERR), LOGFMT "Could not get configuration: %s", LOGID(this), lcb_strerror_short(err));
188     }
189 
190     stop_current_request(err == LCB_ERR_UNSUPPORTED_OPERATION);
191     if (err == LCB_ERR_PROTOCOL_ERROR && LCBT_SETTING(instance, conntype) == LCB_TYPE_CLUSTER) {
192         lcb_log(LOGARGS(this, WARN), LOGFMT "Failed to bootstrap using CCCP", LOGID(this));
193         timer.cancel();
194         parent->provider_failed(this, err);
195         return err;
196     } else {
197         return schedule_next_request(err, false);
198     }
199 }
200 
201 /** Update the configuration from a server. */
cccp_update(Provider *provider, const char *host, const char *data)202 lcb_STATUS lcb::clconfig::cccp_update(Provider *provider, const char *host, const char *data)
203 {
204     return static_cast<CccpProvider *>(provider)->update(host, data);
205 }
206 
update(const char *host, const char *data)207 lcb_STATUS CccpProvider::update(const char *host, const char *data)
208 {
209     lcbvb_CONFIG *vbc;
210     int rv;
211     ConfigInfo *new_config;
212     vbc = lcbvb_create();
213 
214     if (!vbc) {
215         return LCB_ERR_NO_MEMORY;
216     }
217     rv = lcbvb_load_json_ex(vbc, data, host, &LCBT_SETTING(this->parent, network));
218 
219     if (rv) {
220         lcb_log(LOGARGS(this, ERROR), LOGFMT "Failed to parse config", LOGID(this));
221         lcb_log_badconfig(LOGARGS(this, ERROR), vbc, data);
222         lcbvb_destroy(vbc);
223         return LCB_ERR_PROTOCOL_ERROR;
224     }
225 
226     lcbvb_replace_host(vbc, host);
227     new_config = ConfigInfo::create(vbc, CLCONFIG_CCCP, host);
228 
229     if (!new_config) {
230         lcbvb_destroy(vbc);
231         return LCB_ERR_NO_MEMORY;
232     }
233 
234     if (config) {
235         config->decref();
236     }
237 
238     /** TODO: Figure out the comparison vector */
239     config = new_config;
240     parent->provider_got_config(this, new_config);
241     return LCB_SUCCESS;
242 }
243 
select_status(const void *cookie_, lcb_STATUS err)244 void lcb::clconfig::select_status(const void *cookie_, lcb_STATUS err)
245 {
246     auto *cookie = reinterpret_cast<CccpCookie *>(const_cast<void *>(cookie_));
247     cookie->select_rc = err;
248     cookie->decref();
249 }
250 
cccp_update(const void *cookie_, lcb_STATUS err, const void *bytes, size_t nbytes, const lcb_host_t *origin)251 void lcb::clconfig::cccp_update(const void *cookie_, lcb_STATUS err, const void *bytes, size_t nbytes,
252                                 const lcb_host_t *origin)
253 {
254     auto *cookie = reinterpret_cast<CccpCookie *>(const_cast<void *>(cookie_));
255     CccpProvider *cccp = cookie->parent;
256 
257     lcb_STATUS select_rc = cookie->select_rc;
258     bool was_active = cookie->active;
259     if (cookie->active) {
260         cookie->active = false;
261         cccp->timer.cancel();
262         cccp->cmdcookie = nullptr;
263     }
264     cookie->decref();
265 
266     if (select_rc != LCB_SUCCESS) {
267         cccp->mcio_error(select_rc);
268         return;
269     }
270 
271     if (err == LCB_SUCCESS) {
272         std::string ss(reinterpret_cast<const char *>(bytes), nbytes);
273         err = cccp->update(origin->host, ss.c_str());
274     }
275 
276     if (err != LCB_SUCCESS && was_active) {
277         cccp->mcio_error(err);
278     }
279 }
280 
on_connected(lcbio_SOCKET *sock, void *data, lcb_STATUS err, lcbio_OSERR)281 static void on_connected(lcbio_SOCKET *sock, void *data, lcb_STATUS err, lcbio_OSERR)
282 {
283     lcbio_CTXPROCS ioprocs{};
284     auto *cccp = reinterpret_cast<CccpProvider *>(data);
285     lcb_settings *settings = cccp->parent->settings;
286     cccp->creq = nullptr;
287 
288     if (err != LCB_SUCCESS) {
289         if (sock) {
290             lcb::io::Pool::discard(sock);
291         }
292         cccp->mcio_error(err);
293         return;
294     }
295 
296     if (lcbio_protoctx_get(sock, LCBIO_PROTOCTX_SESSINFO) == nullptr) {
297         cccp->creq = lcb::SessionRequest::start(sock, settings, settings->config_node_timeout, on_connected, cccp);
298         return;
299     }
300 
301     ioprocs.cb_err = io_error_handler;
302     ioprocs.cb_read = io_read_handler;
303     cccp->ioctx = lcbio_ctx_new(sock, data, &ioprocs);
304     cccp->ioctx->subsys = "bc_cccp";
305     sock->service = LCBIO_SERVICE_CFG;
306     cccp->request_config();
307 }
308 
refresh()309 lcb_STATUS CccpProvider::refresh()
310 {
311     if (has_pending_request()) {
312         return LCB_ERR_BUSY;
313     }
314 
315     return schedule_next_request(LCB_SUCCESS, true);
316 }
317 
pause()318 bool CccpProvider::pause()
319 {
320     if (!has_pending_request()) {
321         return true;
322     }
323 
324     stop_current_request(false);
325     timer.cancel();
326     return true;
327 }
328 
~CccpProvider()329 CccpProvider::~CccpProvider()
330 {
331     stop_current_request(false);
332 
333     if (config) {
334         config->decref();
335     }
336     delete nodes;
337     timer.release();
338 }
339 
configure_nodes(const lcb::Hostlist &nodes_)340 void CccpProvider::configure_nodes(const lcb::Hostlist &nodes_)
341 {
342     nodes->assign(nodes_);
343     if (parent->settings->randomize_bootstrap_nodes) {
344         nodes->randomize();
345     }
346 }
347 
config_updated(lcbvb_CONFIG *vbc)348 void CccpProvider::config_updated(lcbvb_CONFIG *vbc)
349 {
350     lcbvb_SVCMODE mode = LCBT_SETTING_SVCMODE(parent);
351     if (LCBVB_NSERVERS(vbc) < 1) {
352         return;
353     }
354 
355     nodes->clear();
356     for (size_t ii = 0; ii < LCBVB_NSERVERS(vbc); ii++) {
357         const char *mcaddr = lcbvb_get_hostport(vbc, ii, LCBVB_SVCTYPE_DATA, mode);
358         if (!mcaddr) {
359             lcb_log(LOGARGS(this, DEBUG), "Node %lu has no data service", (unsigned long int)ii);
360             continue;
361         }
362         nodes->add(mcaddr, LCB_CONFIG_MCD_PORT);
363     }
364 
365     if (settings().randomize_bootstrap_nodes) {
366         nodes->randomize();
367     }
368 }
369 
io_error_handler(lcbio_CTX *ctx, lcb_STATUS err)370 static void io_error_handler(lcbio_CTX *ctx, lcb_STATUS err)
371 {
372     auto *cccp = reinterpret_cast<CccpProvider *>(lcbio_ctx_data(ctx));
373     cccp->mcio_error(err);
374 }
375 
io_read_handler(lcbio_CTX *ioctx, unsigned)376 static void io_read_handler(lcbio_CTX *ioctx, unsigned)
377 {
378     reinterpret_cast<CccpProvider *>(lcbio_ctx_data(ioctx))->on_io_read();
379 }
380 
on_io_read()381 void CccpProvider::on_io_read()
382 {
383     unsigned required;
384 
385 #define return_error(e)                                                                                                \
386     resp.release(ioctx);                                                                                               \
387     mcio_error(e);                                                                                                     \
388     return
389 
390     lcb::MemcachedResponse resp;
391     if (!resp.load(ioctx, &required)) {
392         lcbio_ctx_rwant(ioctx, required);
393         lcbio_ctx_schedule(ioctx);
394         return;
395     }
396 
397     if (resp.status() != PROTOCOL_BINARY_RESPONSE_SUCCESS) {
398         std::string value{};
399         if (resp.vallen()) {
400             value.assign(resp.value(), resp.vallen());
401         }
402         lcb_log(LOGARGS(this, WARN), LOGFMT "CCCP Packet responded with 0x%02x; nkey=%d, cmd=0x%x, seq=0x%x, value=%s",
403                 LOGID(this), resp.status(), resp.keylen(), resp.opcode(), resp.opaque(), value.c_str());
404 
405         if (settings().bucket == nullptr) {
406             switch (resp.status()) {
407                 case PROTOCOL_BINARY_RESPONSE_NO_BUCKET:
408                     return_error(LCB_ERR_UNSUPPORTED_OPERATION);
409             }
410         }
411 
412         switch (resp.status()) {
413             case PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED:
414             case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
415                 return_error(LCB_ERR_UNSUPPORTED_OPERATION);
416             default:
417                 return_error(LCB_ERR_PROTOCOL_ERROR);
418         }
419     }
420 
421     if (!resp.bodylen()) {
422         return_error(LCB_ERR_PROTOCOL_ERROR);
423     }
424 
425     std::string jsonstr(resp.value(), resp.vallen());
426     std::string hoststr(lcbio_get_host(lcbio_ctx_sock(ioctx))->host);
427 
428     resp.release(ioctx);
429     stop_current_request(true);
430 
431     lcb_STATUS err = update(hoststr.c_str(), jsonstr.c_str());
432 
433     if (err == LCB_SUCCESS) {
434         timer.cancel();
435     } else {
436         schedule_next_request(LCB_ERR_PROTOCOL_ERROR, false);
437     }
438 
439 #undef return_error
440 }
441 
request_config()442 void CccpProvider::request_config()
443 {
444     lcb::MemcachedRequest req(PROTOCOL_BINARY_CMD_GET_CLUSTER_CONFIG);
445     req.opaque(0xF00D);
446     lcbio_ctx_put(ioctx, req.data(), req.size());
447     lcbio_ctx_rwant(ioctx, 24);
448     lcbio_ctx_schedule(ioctx);
449     timer.rearm(settings().config_node_timeout);
450 }
451 
dump(FILE *fp) const452 void CccpProvider::dump(FILE *fp) const
453 {
454     if (!enabled) {
455         return;
456     }
457 
458     fprintf(fp, "## BEGIN CCCP PROVIDER DUMP ##\n");
459     fprintf(fp, "TIMER ACTIVE: %s\n", timer.is_armed() ? "YES" : "NO");
460     fprintf(fp, "PIPELINE RESPONSE COOKIE: %p\n", (void *)cmdcookie);
461     if (ioctx) {
462         fprintf(fp, "CCCP Owns connection:\n");
463         lcbio_ctx_dump(ioctx, fp);
464     } else if (creq) {
465         fprintf(fp, "CCCP Is connecting\n");
466     } else {
467         fprintf(fp, "CCCP does not have a dedicated connection\n");
468     }
469 
470     for (size_t ii = 0; ii < nodes->size(); ii++) {
471         const lcb_host_t &curhost = (*nodes)[ii];
472         lcb_settings *dummy = nullptr;
473         fprintf(fp, "CCCP NODE: " LCB_HOST_FMT "\n", LCB_HOST_ARG(dummy, &curhost));
474     }
475     fprintf(fp, "## END CCCP PROVIDER DUMP ##\n");
476 }
477 
CccpProvider(Confmon *mon)478 CccpProvider::CccpProvider(Confmon *mon)
479     : Provider(mon, CLCONFIG_CCCP), nodes(new lcb::Hostlist()), config(nullptr), timer(mon->iot, this),
480       instance(nullptr), ioctx(nullptr), cmdcookie(nullptr)
481 {
482 }
483 
new_cccp_provider(Confmon *mon)484 Provider *lcb::clconfig::new_cccp_provider(Confmon *mon)
485 {
486     return new CccpProvider(mon);
487 }
488