1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013-2020 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "internal.h"
19 #include "clconfig.h"
20 #include "bc_http.h"
21 #include "auth-priv.h"
22 #include <lcbio/ssl.h>
23 #include "ctx-log-inl.h"
24 #include "strcodecs/strcodecs.h"
25 
26 #define LOGARGS(ht, lvlbase) ht->parent->settings, "htconfig", LCB_LOG_##lvlbase, __FILE__, __LINE__
27 
28 #define LOGFMT CTX_LOGFMT
29 #define LOGID(p) CTX_LOGID(p->ioctx)
30 
31 using namespace lcb::clconfig;
32 
33 static void io_error_handler(lcbio_CTX *, lcb_STATUS);
34 static void on_connected(lcbio_SOCKET *, void *, lcb_STATUS, lcbio_OSERR);
35 static void read_common(lcbio_CTX *, unsigned);
36 
37 /**
38  * Determine if we're in compatibility mode with the previous versions of the
39  * library - where the idle timeout is disabled and a perpetual streaming
40  * connection will always remain open (regardless of whether it was triggered
41  * by start_refresh/get_refresh).
42  */
is_v220_compat() const43 bool HttpProvider::is_v220_compat() const
44 {
45     lcb_uint32_t setting = parent->settings->bc_http_stream_time;
46     return setting == (lcb_uint32_t)-1;
47 }
48 
close_current()49 void HttpProvider::close_current()
50 {
51     disconn_timer.cancel();
52     if (ioctx) {
53         lcbio_ctx_close(ioctx, nullptr, nullptr);
54     } else if (creq) {
55         lcbio_connect_cancel(creq);
56     }
57     creq = nullptr;
58     ioctx = nullptr;
59 }
60 
61 /**
62  * Call when there is an error in I/O. This includes read, write, connect
63  * and timeouts.
64  */
on_io_error(lcb_STATUS origerr)65 lcb_STATUS HttpProvider::on_io_error(lcb_STATUS origerr)
66 {
67     close_current();
68 
69     creq = lcbio_connect_hl(parent->iot, &settings(), nodes, 0, settings().config_node_timeout, on_connected, this);
70     if (creq) {
71         return LCB_SUCCESS;
72     }
73     parent->provider_failed(this, origerr);
74     io_timer.cancel();
75     if (is_v220_compat() && parent->config != nullptr) {
76         lcb_log(LOGARGS(this, INFO), "HTTP node list finished. Trying to obtain connection from first node in list");
77         as_reconnect.arm_if_disarmed(settings().grace_next_cycle);
78     }
79     return origerr;
80 }
81 
set_new_config(HttpProvider *http)82 void set_new_config(HttpProvider *http)
83 {
84     const lcb_host_t *curhost;
85     if (http->current_config) {
86         http->current_config->decref();
87     }
88 
89     curhost = lcbio_get_host(lcbio_ctx_sock(http->ioctx));
90     http->current_config = http->last_parsed;
91     http->current_config->incref();
92     lcbvb_replace_host(http->current_config->vbc, curhost->host);
93     http->parent->provider_got_config(http, http->current_config);
94 }
95 
process_chunk(HttpProvider *http, const void *buf, unsigned nbuf)96 static lcb_STATUS process_chunk(HttpProvider *http, const void *buf, unsigned nbuf)
97 {
98     namespace htp = lcb::htparse;
99     lcb_STATUS err = LCB_SUCCESS;
100     int rv;
101     lcbvb_CONFIG *cfgh;
102     unsigned state, oldstate, diff;
103     lcb_host_t *host;
104     htp::Response &resp = http->htp->get_cur_response();
105 
106     oldstate = resp.state;
107     state = http->htp->parse(buf, nbuf);
108     diff = state ^ oldstate;
109 
110     if (state & htp::Parser::S_ERROR) {
111         return LCB_ERR_PROTOCOL_ERROR;
112     }
113 
114     if (diff & htp::Parser::S_HEADER) {
115         /* see that we got a success? */
116         if (resp.status == 200) {
117             /* nothing */
118         } else if (resp.status == 404) {
119             const unsigned urlmode = http->settings().bc_http_urltype;
120             err = LCB_ERR_BUCKET_NOT_FOUND;
121 
122             if (++http->uritype > LCB_HTCONFIG_URLTYPE_COMPAT) {
123                 lcb_log(LOGARGS(http, ERR),
124                         LOGFMT "Got 404 on config stream. Assuming bucket does not exist as we've tried both URL types",
125                         LOGID(http));
126                 goto GT_HT_ERROR;
127 
128             } else if ((urlmode & LCB_HTCONFIG_URLTYPE_COMPAT) == 0) {
129                 lcb_log(LOGARGS(http, ERR),
130                         LOGFMT "Got 404 on config stream for terse URI. Compat URI disabled, so not trying",
131                         LOGID(http));
132 
133             } else {
134                 /* reissue the request; but wait for it to drain */
135                 lcb_log(LOGARGS(http, WARN),
136                         LOGFMT "Got 404 on config stream. Assuming terse URI not supported on cluster", LOGID(http));
137                 http->try_nexturi = true;
138                 goto GT_CHECKDONE;
139             }
140         } else if (resp.status == 401) {
141             err = LCB_ERR_AUTHENTICATION_FAILURE;
142         } else {
143             err = LCB_ERR_GENERIC;
144         }
145 
146     GT_HT_ERROR:
147         if (err != LCB_SUCCESS) {
148             lcb_log(LOGARGS(http, ERR), LOGFMT "Got non-success HTTP status code %d", LOGID(http), resp.status);
149             return err;
150         }
151     }
152 
153 GT_CHECKDONE:
154     if (http->try_nexturi) {
155         if (!(state & htp::Parser::S_DONE)) {
156             return LCB_SUCCESS;
157         }
158         host = lcbio_get_host(lcbio_ctx_sock(http->ioctx));
159         http->try_nexturi = false;
160         if ((err = http->setup_request_header(*host)) != LCB_SUCCESS) {
161             return err;
162         }
163 
164         /* reset the state? */
165         http->htp->reset();
166         lcbio_ctx_put(http->ioctx, http->request_buf.c_str(), http->request_buf.size());
167         return LCB_SUCCESS;
168     }
169 
170     if (!(state & htp::Parser::S_BODY)) {
171         /* nothing to parse yet */
172         return LCB_SUCCESS;
173     }
174 
175     /* seek ahead for strstr */
176     size_t termpos = resp.body.find(CONFIG_DELIMITER);
177     if (termpos == std::string::npos) {
178         return LCB_SUCCESS;
179     }
180     resp.body[termpos] = '\0';
181     cfgh = lcbvb_create();
182     if (!cfgh) {
183         return LCB_ERR_NO_MEMORY;
184     }
185     host = lcbio_get_host(lcbio_ctx_sock(http->ioctx));
186     rv = lcbvb_load_json_ex(cfgh, resp.body.c_str(), host->host, &LCBT_SETTING(http->parent, network));
187     if (rv != 0) {
188         lcb_log(LOGARGS(http, ERR), LOGFMT "Failed to parse a valid config from HTTP stream", LOGID(http));
189         lcb_log_badconfig(LOGARGS(http, ERR), cfgh, resp.body.c_str());
190         lcbvb_destroy(cfgh);
191         return LCB_ERR_PROTOCOL_ERROR;
192     }
193     if (http->last_parsed) {
194         http->last_parsed->decref();
195     }
196     http->last_parsed = ConfigInfo::create(cfgh, CLCONFIG_HTTP, host->host);
197     http->generation++;
198 
199     /** Relocate the stream */
200     resp.body.erase(0, termpos + sizeof(CONFIG_DELIMITER) - 1);
201     return LCB_SUCCESS;
202 }
203 
204 /**
205  * Common function to handle parsing the HTTP stream for both v0 and v1 io
206  * implementations.
207  */
read_common(lcbio_CTX *ctx, unsigned nr)208 static void read_common(lcbio_CTX *ctx, unsigned nr)
209 {
210     lcbio_CTXRDITER riter;
211     auto *http = reinterpret_cast<HttpProvider *>(lcbio_ctx_data(ctx));
212     int old_generation = http->generation;
213 
214     lcb_log(LOGARGS(http, TRACE), LOGFMT "Received %d bytes on HTTP stream", LOGID(http), nr);
215     http->io_timer.rearm(http->settings().config_node_timeout);
216 
217     LCBIO_CTX_ITERFOR(ctx, &riter, nr)
218     {
219         unsigned nbuf = lcbio_ctx_risize(&riter);
220         void *buf = lcbio_ctx_ribuf(&riter);
221         lcb_STATUS err = process_chunk(http, buf, nbuf);
222 
223         if (err != LCB_SUCCESS) {
224             http->on_io_error(err);
225             return;
226         }
227     }
228 
229     if (http->generation != old_generation) {
230         lcb_log(LOGARGS(http, DEBUG), LOGFMT "Generation %d -> %d", LOGID(http), old_generation, http->generation);
231         http->io_timer.cancel();
232         set_new_config(http);
233     }
234 
235     lcbio_ctx_rwant(ctx, 1);
236     lcbio_ctx_schedule(ctx);
237 }
238 
setup_request_header(const lcb_host_t &host)239 lcb_STATUS HttpProvider::setup_request_header(const lcb_host_t &host)
240 {
241     request_buf.assign("GET ");
242     if (settings().conntype == LCB_TYPE_BUCKET || settings().conntype == LCB_TYPE_CLUSTER) {
243         if (settings().bucket) {
244             if (uritype == LCB_HTCONFIG_URLTYPE_25PLUS) {
245                 request_buf.append(REQBUCKET_TERSE_PREFIX);
246             } else {
247                 request_buf.append(REQBUCKET_COMPAT_PREFIX);
248             }
249             request_buf.append(settings().bucket);
250         } else {
251             request_buf.append(REQBUCKET_BUCKETLESS_PREFIX);
252         }
253     } else {
254         return LCB_ERR_INVALID_ARGUMENT;
255     }
256 
257     request_buf.append(" HTTP/1.1\r\n");
258     if (!settings().keypath) {
259         // not using SSL client certificate to authenticate
260         auto creds = settings().auth->credentials_for(LCBAUTH_SERVICE_MANAGEMENT, LCBAUTH_REASON_NEW_OPERATION,
261                                                       host.host, host.port, settings().bucket);
262         if (creds.result() == LCBAUTH_RESULT_OK) {
263             std::string cred;
264             cred.append(creds.username()).append(":").append(creds.password());
265             char b64[256] = {0};
266             if (lcb_base64_encode(cred.c_str(), cred.size(), b64, sizeof(b64)) == -1) {
267                 return LCB_ERR_SDK_INTERNAL;
268             }
269             request_buf.append("Authorization: Basic ").append(b64).append("\r\n");
270         } else {
271             return LCB_ERR_AUTHENTICATION_FAILURE;
272         }
273     }
274 
275     request_buf.append("Host: ").append(host.host).append(":").append(host.port).append("\r\n");
276     request_buf.append("User-Agent: ").append(LCB_CLIENT_ID);
277     if (settings().client_string) {
278         request_buf.append(" ").append(settings().client_string);
279     }
280     request_buf.append("\r\n");
281     request_buf.append("\r\n");
282     return LCB_SUCCESS;
283 }
284 
reset_stream_state()285 void HttpProvider::reset_stream_state()
286 {
287     const int urlmode = settings().bc_http_urltype;
288     if (last_parsed) {
289         last_parsed->decref();
290         last_parsed = nullptr;
291     }
292     if (urlmode & LCB_HTCONFIG_URLTYPE_25PLUS) {
293         uritype = LCB_HTCONFIG_URLTYPE_25PLUS;
294     } else {
295         uritype = LCB_HTCONFIG_URLTYPE_COMPAT;
296     }
297     try_nexturi = false;
298     htp->reset();
299 }
300 
on_connected(lcbio_SOCKET *sock, void *arg, lcb_STATUS err, lcbio_OSERR syserr)301 static void on_connected(lcbio_SOCKET *sock, void *arg, lcb_STATUS err, lcbio_OSERR syserr)
302 {
303     auto *http = reinterpret_cast<HttpProvider *>(arg);
304     lcb_host_t *host;
305     lcbio_CTXPROCS procs{};
306     http->creq = nullptr;
307 
308     if (err != LCB_SUCCESS) {
309         lcb_log(LOGARGS(http, ERR), "Connection to REST API failed with %s (os errno = %d)", lcb_strerror_short(err),
310                 syserr);
311         http->on_io_error(err);
312         return;
313     }
314     host = lcbio_get_host(sock);
315     lcb_log(LOGARGS(http, DEBUG), "Successfuly connected to REST API " LCB_HOST_FMT,
316             LCB_HOST_ARG(http->parent->settings, host));
317 
318     lcbio_sslify_if_needed(sock, http->parent->settings);
319     http->reset_stream_state();
320 
321     if ((err = http->setup_request_header(*host)) != LCB_SUCCESS) {
322         lcb_log(LOGARGS(http, ERR), "Couldn't setup request header");
323         http->on_io_error(err);
324         return;
325     }
326 
327     procs.cb_err = io_error_handler;
328     procs.cb_read = read_common;
329     http->ioctx = lcbio_ctx_new(sock, http, &procs);
330     http->ioctx->subsys = "bc_http";
331     sock->service = LCBIO_SERVICE_CFG;
332 
333     lcbio_ctx_put(http->ioctx, http->request_buf.c_str(), http->request_buf.size());
334     lcbio_ctx_rwant(http->ioctx, 1);
335     lcbio_ctx_schedule(http->ioctx);
336     http->io_timer.rearm(http->settings().config_node_timeout);
337 }
338 
on_timeout()339 void HttpProvider::on_timeout()
340 {
341     lcb_log(LOGARGS(this, ERR), LOGFMT "HTTP Provider timed out waiting for I/O", LOGID(this));
342 
343     /**
344      * If we're not the current provider then ignore the timeout until we're
345      * actively requested to do so
346      */
347     if (this != parent->cur_provider || !parent->is_refreshing()) {
348         lcb_log(LOGARGS(this, DEBUG),
349                 LOGFMT "Ignoring timeout because we're either not in a refresh or not the current provider",
350                 LOGID(this));
351         return;
352     }
353 
354     on_io_error(LCB_ERR_TIMEOUT);
355 }
356 
connect_next()357 lcb_STATUS HttpProvider::connect_next()
358 {
359     lcb_log(LOGARGS(this, TRACE), "Starting HTTP Configuration Provider %p", (void *)this);
360     close_current();
361     as_reconnect.cancel();
362 
363     if (nodes->empty()) {
364         lcb_log(LOGARGS(this, ERROR),
365                 "Not scheduling HTTP provider since no nodes have been configured for HTTP bootstrap");
366         return LCB_ERR_CONNECT_ERROR;
367     }
368 
369     creq = lcbio_connect_hl(parent->iot, &settings(), nodes, 1, settings().config_node_timeout, on_connected, this);
370     if (creq) {
371         return LCB_SUCCESS;
372     }
373     lcb_log(LOGARGS(this, ERROR), "%p: Couldn't schedule connection", (void *)this);
374     return LCB_ERR_CONNECT_ERROR;
375 }
376 
delayed_disconn()377 void HttpProvider::delayed_disconn()
378 {
379     lcb_log(LOGARGS(this, DEBUG), "Stopping HTTP provider %p", (void *)this);
380 
381     /** closes the connection and cleans up the timer */
382     close_current();
383     io_timer.cancel();
384 }
385 
delayed_reconnect()386 void HttpProvider::delayed_reconnect()
387 {
388     if (ioctx) {
389         /* have a context already */
390         return;
391     }
392     lcb_STATUS err = connect_next();
393     if (err != LCB_SUCCESS) {
394         on_io_error(err);
395     }
396 }
397 
pause()398 bool HttpProvider::pause()
399 {
400     if (is_v220_compat()) {
401         return LCB_SUCCESS;
402     }
403     disconn_timer.arm_if_disarmed(settings().bc_http_stream_time);
404     return LCB_SUCCESS;
405 }
406 
refresh()407 lcb_STATUS HttpProvider::refresh()
408 {
409     /**
410      * We want a grace interval here because we might already be fetching a
411      * connection. HOWEVER we don't want to indefinitely wait on a socket
412      * so we issue a timer indicating how long we expect to wait for a
413      * streaming update until we get something.
414      */
415 
416     /** If we need a new socket, we do connect_next. */
417     if (ioctx == nullptr && creq == nullptr) {
418         as_reconnect.signal();
419     }
420     disconn_timer.cancel();
421     if (ioctx) {
422         io_timer.rearm(settings().config_node_timeout);
423     }
424     return LCB_SUCCESS;
425 }
426 
get_cached()427 ConfigInfo *HttpProvider::get_cached()
428 {
429     return current_config;
430 }
431 
config_updated(lcbvb_CONFIG *newconfig)432 void HttpProvider::config_updated(lcbvb_CONFIG *newconfig)
433 {
434     lcbvb_SVCMODE mode = LCBT_SETTING_SVCMODE(parent);
435     nodes->clear();
436 
437     for (size_t ii = 0; ii < newconfig->nsrv; ++ii) {
438         const char *ss;
439         lcb_STATUS status;
440         ss = lcbvb_get_hostport(newconfig, ii, LCBVB_SVCTYPE_MGMT, mode);
441         if (!ss) {
442             /* not supported? */
443             continue;
444         }
445         status = nodes->add(ss, LCB_CONFIG_HTTP_PORT);
446         lcb_assert(status == LCB_SUCCESS);
447     }
448     if (nodes->empty()) {
449         lcb_log(LOGARGS(this, FATAL), "New nodes do not contain management ports");
450     }
451 
452     if (settings().randomize_bootstrap_nodes) {
453         nodes->randomize();
454     }
455 }
456 
configure_nodes(const lcb::Hostlist &newnodes)457 void HttpProvider::configure_nodes(const lcb::Hostlist &newnodes)
458 {
459     nodes->assign(newnodes);
460     if (settings().randomize_bootstrap_nodes) {
461         nodes->randomize();
462     }
463 }
464 
get_nodes() const465 const lcb::Hostlist *HttpProvider::get_nodes() const
466 {
467     return nodes;
468 }
469 
~HttpProvider()470 HttpProvider::~HttpProvider()
471 {
472     reset_stream_state();
473     close_current();
474     delete htp;
475     disconn_timer.release();
476     io_timer.release();
477     as_reconnect.release();
478 
479     if (current_config) {
480         current_config->decref();
481     }
482     delete nodes;
483 }
484 
dump(FILE *fp) const485 void HttpProvider::dump(FILE *fp) const
486 {
487     fprintf(fp, "## BEGIN HTTP PROVIDER DUMP\n");
488     fprintf(fp, "NUMBER OF CONFIGS RECEIVED: %u\n", generation);
489     fprintf(fp, "DUMPING I/O TIMER\n");
490     io_timer.dump(fp);
491     if (ioctx) {
492         fprintf(fp, "DUMPING CURRENT CONNECTION:\n");
493         lcbio_ctx_dump(ioctx, fp);
494     } else if (creq) {
495         fprintf(fp, "CURRENTLY CONNECTING..\n");
496     } else {
497         fprintf(fp, "NO CONNECTION ACTIVE\n");
498     }
499 }
500 
HttpProvider(Confmon *parent_)501 HttpProvider::HttpProvider(Confmon *parent_)
502     : Provider(parent_, CLCONFIG_HTTP), ioctx(nullptr), htp(new lcb::htparse::Parser(parent->settings)),
503       disconn_timer(parent->iot, this), io_timer(parent->iot, this), as_reconnect(parent->iot, this),
504       nodes(new Hostlist()), current_config(nullptr), last_parsed(nullptr), generation(0), try_nexturi(false),
505       uritype(0)
506 {
507 }
508 
io_error_handler(lcbio_CTX *ctx, lcb_STATUS err)509 static void io_error_handler(lcbio_CTX *ctx, lcb_STATUS err)
510 {
511     reinterpret_cast<HttpProvider *>(lcbio_ctx_data(ctx))->on_io_error(err);
512 }
513 
http_get_conn(const Provider *p)514 const lcbio_SOCKET *lcb::clconfig::http_get_conn(const Provider *p)
515 {
516     const auto *http = static_cast<const HttpProvider *>(p);
517     if (!http->ioctx) {
518         return nullptr;
519     }
520     return lcbio_ctx_sock(http->ioctx);
521 }
522 
http_get_host(const Provider *p)523 const lcb_host_t *lcb::clconfig::http_get_host(const Provider *p)
524 {
525     const lcbio_SOCKET *sock = http_get_conn(p);
526     if (sock) {
527         return lcbio_get_host(sock);
528     }
529     return nullptr;
530 }
531 
new_http_provider(Confmon *mon)532 Provider *lcb::clconfig::new_http_provider(Confmon *mon)
533 {
534     return new HttpProvider(mon);
535 }
536