1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2012-2020 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #define LCBDUR_PRIV_SYMS 1
19 #define NOMINMAX
20 
21 #include <cstring>
22 
23 #include "internal.h"
24 #include "durability_internal.h"
25 #include <algorithm>
26 #include <lcbio/iotable.h>
27 
28 #include "capi/cmd_store.hh"
29 #include "capi/cmd_observe.hh"
30 
31 using namespace lcb::durability;
32 
33 #define LOGARGS(c, lvl) (c)->instance->settings, "endure", LCB_LOG_##lvl, __FILE__, __LINE__
34 #define LOGARGS_T(lvl) LOGARGS(this, lvl)
35 
36 static void timer_callback(lcb_socket_t sock, short which, void *arg);
37 
is_all_done() const38 bool Item::is_all_done() const
39 {
40     const lcb_DURABILITYOPTSv0 &opts = parent->opts;
41 
42     if (!res().exists_master) {
43         /** Primary cache doesn't have correct version */
44         return false;
45     }
46     if (opts.persist_to) {
47         if (!res().persisted_master) {
48             return false;
49         }
50         if (res().npersisted < opts.persist_to) {
51             return false;
52         }
53     }
54 
55     if (opts.replicate_to) {
56         if (res().nreplicated < opts.replicate_to) {
57             return false;
58         }
59     }
60 
61     return true;
62 }
63 
is_server_done(const ServerInfo & info,bool is_master) const64 bool Item::is_server_done(const ServerInfo &info, bool is_master) const
65 {
66     // Item not in cache. Return false
67     if (!info.exists) {
68         return false;
69     }
70 
71     // Item is already persisted to the server
72     if (info.persisted) {
73         return true;
74     }
75 
76     // Item not persisted, but no persistence requested
77     if (parent->opts.persist_to == 0) {
78         return true;
79     }
80 
81     // Master persistence requested, but server is not master
82     if (parent->opts.persist_to == 1 && !is_master) {
83         return true;
84     }
85 
86     // Require persistence from this server, but item is not persisted.
87     return false;
88 }
89 
prepare(uint16_t ixarray[4])90 size_t Item::prepare(uint16_t ixarray[4])
91 {
92     size_t oix = 0, maxix = 0;
93     lcb_INSTANCE *instance = parent->instance;
94 
95     res().persisted_master = 0;
96     res().exists_master = 0;
97     res().npersisted = 0;
98     res().nreplicated = 0;
99     res().ctx.cas = 0;
100     res().ctx.rc = LCB_SUCCESS;
101 
102     if (parent->opts.persist_to == 1 && parent->opts.replicate_to == 0) {
103         maxix = 1; /* Only master! */
104     } else {
105         maxix = LCBT_NREPLICAS(instance) + 1;
106     }
107 
108     for (size_t ii = 0; ii < maxix; ii++) {
109         int cur_ix;
110         ServerInfo &info = sinfo[ii];
111 
112         cur_ix = lcbvb_vbserver(LCBT_VBCONFIG(instance), vbid, ii);
113         if (cur_ix < 0) {
114             info.clear();
115             continue;
116         }
117 
118         const lcb::Server *s_exp = instance->get_server(cur_ix);
119         if (s_exp != info.server) {
120             info.clear();
121 
122         } else if (is_server_done(info, ii == 0)) {
123             /* Update counters as required */
124             if (ii == 0) {
125                 res().exists_master = 1;
126             } else {
127                 res().nreplicated++;
128             }
129 
130             if (info.persisted) {
131                 res().npersisted++;
132                 if (ii == 0) {
133                     res().persisted_master = 1;
134                 }
135             }
136             continue;
137         }
138 
139         /* Otherwise, write the expected server out */
140         ixarray[oix++] = s_exp->get_index();
141     }
142 
143     return oix;
144 }
145 
update(int flags,int srvix)146 void Item::update(int flags, int srvix)
147 {
148     if (!flags || done) {
149         return;
150     }
151 
152     ServerInfo *info = get_server_info(srvix);
153     if (!info) {
154         lcb_log(LOGARGS(parent, DEBUG), "Ignoring response from server %d. Not a master or replica for vBucket %d",
155                 srvix, vbid);
156         return;
157     }
158 
159     lcb_INSTANCE *instance = parent->instance;
160     bool is_master = lcbvb_vbmaster(LCBT_VBCONFIG(instance), vbid) == srvix;
161     const lcb::Server *server = instance->get_server(srvix);
162 
163     info->clear();
164     info->server = server;
165 
166     if (flags & UPDATE_PERSISTED) {
167         info->persisted = 1;
168         res().npersisted++;
169         if (is_master) {
170             res().persisted_master = 1;
171         }
172     }
173 
174     if (flags & UPDATE_REPLICATED) {
175         info->exists = 1;
176         if (is_master) {
177             res().exists_master = 1;
178         } else {
179             res().nreplicated++;
180         }
181     }
182 
183     if (is_all_done()) {
184         finish(LCB_SUCCESS);
185     }
186 }
187 
get_server_info(int srvix)188 ServerInfo *Item::get_server_info(int srvix)
189 {
190     size_t ii;
191     lcb_INSTANCE *instance = parent->instance;
192 
193     for (ii = 0; ii < LCBT_NREPLICAS(instance) + 1; ii++) {
194         int ix = lcbvb_vbserver(LCBT_VBCONFIG(instance), vbid, ii);
195         if (ix > -1 && ix == srvix) {
196             return &sinfo[ii];
197         }
198     }
199     return NULL;
200 }
201 
finish()202 void Item::finish()
203 {
204     lcb_RESPCALLBACK cb;
205     lcb_INSTANCE *instance;
206 
207     if (done) {
208         return;
209     }
210 
211     done = 1;
212     parent->nremaining--;
213 
214     /** Invoke the callback now :) */
215     result.cookie = (void *)parent->cookie;
216     instance = parent->instance;
217 
218     if (parent->is_durstore) {
219         lcb_RESPSTORE resp{};
220         resp.ctx.key = result.ctx.key;
221         resp.ctx.rc = result.ctx.rc;
222         resp.ctx.cas = reqcas;
223         resp.cookie = result.cookie;
224         resp.store_ok = 1;
225         resp.dur_resp = &result;
226 
227         cb = lcb_find_callback(instance, LCB_CALLBACK_STORE);
228         cb(instance, LCB_CALLBACK_STORE, (lcb_RESPBASE *)&resp);
229     } else {
230         cb = lcb_find_callback(instance, LCB_CALLBACK_ENDURE);
231         cb(instance, LCB_CALLBACK_ENDURE, (lcb_RESPBASE *)&result);
232     }
233 
234     if (parent->nremaining == 0) {
235         parent->decref();
236     }
237 }
238 
239 /**
240  * Called when the last (primitive) OBSERVE response is received for the entry.
241  */
on_poll_done()242 void Durset::on_poll_done()
243 {
244     lcb_assert(waiting || ("Got NULL callback twice!" && 0));
245 
246     waiting = 0;
247 
248     if (nremaining > 0) {
249         switch_state(STATE_OBSPOLL);
250     }
251     decref();
252 }
253 
254 /**
255  * Schedules a single sweep of observe requests.
256  * The `initial` parameter determines if this is a retry or if this is the
257  * initial scheduling.
258  */
poll()259 void Durset::poll()
260 {
261     lcb_STATUS err;
262 
263     /* We should never be called while an 'iter' operation is still in progress */
264     lcb_assert(waiting == 0);
265     incref();
266 
267     err = poll_impl();
268     if (err == LCB_SUCCESS) {
269         incref();
270         switch_state(STATE_TIMEOUT);
271     } else {
272         lasterr = err;
273         switch_state(STATE_OBSPOLL);
274     }
275 
276     decref();
277 }
278 
279 LIBCOUCHBASE_API
lcb_durability_validate(lcb_INSTANCE * instance,lcb_U16 * persist_to,lcb_U16 * replicate_to,int options)280 lcb_STATUS lcb_durability_validate(lcb_INSTANCE *instance, lcb_U16 *persist_to, lcb_U16 *replicate_to, int options)
281 {
282     if (!LCBT_VBCONFIG(instance)) {
283         return LCB_ERR_NO_CONFIGURATION;
284     }
285     int replica_max = std::min(LCBT_NREPLICAS(instance), LCBT_NDATASERVERS(instance) - 1);
286     int persist_max = replica_max + 1;
287 
288     if (*persist_to == 0 && *replicate_to == 0) {
289         /* Empty values! */
290         return LCB_ERR_INVALID_ARGUMENT;
291     }
292 
293     /* persist_max is always one more than replica_max */
294     if (static_cast<int>(*persist_to) > persist_max) {
295         if (options & LCB_DURABILITY_VALIDATE_CAPMAX) {
296             *persist_to = persist_max;
297         } else {
298             return LCB_ERR_DURABILITY_TOO_MANY;
299         }
300     }
301 
302     if (*replicate_to == 0) {
303         return LCB_SUCCESS;
304     }
305 
306     if (replica_max < 0) {
307         replica_max = 0;
308     }
309 
310     /* now, we need at least as many nodes as we have replicas */
311     if (static_cast<int>(*replicate_to) > replica_max) {
312         if (options & LCB_DURABILITY_VALIDATE_CAPMAX) {
313             *replicate_to = replica_max;
314         } else {
315             return LCB_ERR_DURABILITY_TOO_MANY;
316         }
317     }
318     return LCB_SUCCESS;
319 }
320 
MCTX_setspan(lcbtrace_SPAN * span_)321 void Durset::MCTX_setspan(lcbtrace_SPAN *span_)
322 {
323     span = span_;
324 }
325 
MCTX_add_observe(const lcb_CMDOBSERVE *)326 lcb_STATUS Durset::MCTX_add_observe(const lcb_CMDOBSERVE *)
327 {
328     return LCB_ERR_UNSUPPORTED_OPERATION;
329 }
330 
MCTX_add_endure(const lcb_CMDENDURE * cmd)331 lcb_STATUS Durset::MCTX_add_endure(const lcb_CMDENDURE *cmd)
332 {
333     if (LCB_KEYBUF_IS_EMPTY(&cmd->key)) {
334         return LCB_ERR_EMPTY_KEY;
335     }
336 
337     entries.resize(entries.size() + 1);
338     Item &ent = entries.back();
339 
340     int vbid, srvix;
341     mcreq_map_key(&instance->cmdq, &cmd->key, MCREQ_PKT_BASESIZE, &vbid, &srvix);
342 
343     /* ok. now let's initialize the entry..*/
344     ent.res().ctx.key.assign(static_cast<const char *>(cmd->key.contig.bytes), cmd->key.contig.nbytes);
345     ent.reqcas = cmd->cas;
346     ent.parent = this;
347     ent.vbid = vbid;
348 
349     kvbufs.append(ent.res().ctx.key);
350 
351     return after_add(ent, cmd->mutation_token);
352 }
353 
MCTX_done(void * cookie_)354 lcb_STATUS Durset::MCTX_done(void *cookie_)
355 {
356     lcb_STATUS err;
357 
358     if (entries.empty()) {
359         delete this;
360         return LCB_ERR_INVALID_ARGUMENT;
361     }
362 
363     if ((err = prepare_schedule()) != LCB_SUCCESS) {
364         delete this;
365         return err;
366     }
367 
368     incref();
369 
370     cookie = cookie_;
371     nremaining = entries.size();
372     ns_timeout = gethrtime() + LCB_US2NS(opts.timeout);
373 
374     lcb_aspend_add(&instance->pendops, LCB_PENDTYPE_DURABILITY, this);
375     switch_state(STATE_INIT);
376     return LCB_SUCCESS;
377 }
378 
MCTX_fail()379 void Durset::MCTX_fail()
380 {
381     if (span) {
382         lcbtrace_span_finish(span, LCBTRACE_NOW);
383         span = NULL;
384     }
385     delete this;
386 }
387 
lcbdurctx_set_durstore(lcb_MULTICMD_CTX * mctx,int enabled)388 void lcbdurctx_set_durstore(lcb_MULTICMD_CTX *mctx, int enabled)
389 {
390     static_cast<Durset *>(mctx)->is_durstore = enabled;
391 }
392 
Durset(lcb_INSTANCE * instance_,const lcb_durability_opts_t * options)393 Durset::Durset(lcb_INSTANCE *instance_, const lcb_durability_opts_t *options)
394     : MultiCmdContext(), nremaining(0), waiting(0), refcnt(0), next_state(STATE_OBSPOLL), lasterr(LCB_SUCCESS),
395       is_durstore(false), cookie(NULL), ns_timeout(0), timer(NULL), instance(instance_), span(NULL)
396 {
397     const lcb_DURABILITYOPTSv0 *opts_in = &options->v.v0;
398 
399     std::memset(&opts, 0, sizeof opts);
400 
401     /* Ensure we don't clobber options from older versions */
402     opts.cap_max = opts_in->cap_max;
403     opts.check_delete = opts_in->check_delete;
404     opts.interval = opts_in->interval;
405     opts.persist_to = opts_in->persist_to;
406     opts.replicate_to = opts_in->replicate_to;
407     opts.timeout = opts_in->timeout;
408 
409     if (!opts.timeout) {
410         opts.timeout = LCBT_SETTING(instance, durability_timeout);
411     }
412 
413     if (!opts.interval) {
414         opts.interval = LCBT_SETTING(instance, durability_interval);
415     }
416 
417     lcbio_pTABLE io = instance->iotable;
418     timer = io->timer.create(io->p);
419 
420     lasterr = lcb_durability_validate(instance, &opts.persist_to, &opts.replicate_to,
421                                       opts.cap_max ? LCB_DURABILITY_VALIDATE_CAPMAX : 0);
422 }
423 
424 LIBCOUCHBASE_API
lcb_endure3_ctxnew(lcb_INSTANCE * instance,const lcb_durability_opts_t * options,lcb_STATUS * errp)425 lcb_MULTICMD_CTX *lcb_endure3_ctxnew(lcb_INSTANCE *instance, const lcb_durability_opts_t *options, lcb_STATUS *errp)
426 {
427     lcb_STATUS err_s;
428     if (!errp) {
429         errp = &err_s;
430     }
431 
432     *errp = LCB_SUCCESS;
433 
434     if (!LCBT_VBCONFIG(instance)) {
435         *errp = LCB_ERR_NO_CONFIGURATION;
436         return NULL;
437     }
438 
439     if (LCBT_SETTING(instance, fetch_mutation_tokens)) {
440         for (size_t ii = 0; ii < LCBT_NSERVERS(instance); ii++) {
441             lcb::Server *srv = instance->get_server(ii);
442             if (srv->is_connected() && !srv->supports_mutation_tokens()) {
443                 *errp = LCB_ERR_INVALID_ARGUMENT;
444                 return NULL;
445             }
446         }
447     } else {
448         *errp = LCB_ERR_INVALID_ARGUMENT;
449         return NULL;
450     }
451 
452     Durset *dset = Durset::createSeqnoDurset(instance, options);
453     *errp = dset->lasterr;
454     if (*errp != LCB_SUCCESS) {
455         delete dset;
456         dset = NULL;
457     }
458 
459     return dset;
460 }
461 
462 /**
463  * Actually free the resources allocated by the dset (and all its entries).
464  * Called by some other functions in libcouchbase
465  */
lcbdur_destroy(void * dset)466 void lcbdur_destroy(void *dset)
467 {
468     delete reinterpret_cast<Durset *>(dset);
469 }
470 
~Durset()471 Durset::~Durset()
472 {
473     if (timer) {
474         lcbio_TABLE *io = instance->iotable;
475         io->timer.cancel(io->p, timer);
476         io->timer.destroy(io->p, timer);
477         timer = NULL;
478     }
479 
480     lcb_aspend_del(&instance->pendops, LCB_PENDTYPE_DURABILITY, this);
481     lcb_maybe_breakout(instance);
482 }
483 
484 /**
485  * All-purpose callback dispatcher.
486  */
timer_callback(lcb_socket_t,short,void * arg)487 static void timer_callback(lcb_socket_t, short, void *arg)
488 {
489     reinterpret_cast<Durset *>(arg)->tick();
490 }
491 
tick()492 void Durset::tick()
493 {
494     hrtime_t now = gethrtime();
495 
496     if (ns_timeout && now > ns_timeout) {
497         next_state = STATE_TIMEOUT;
498     }
499 
500     switch (next_state) {
501         case STATE_OBSPOLL:
502         case STATE_INIT:
503             poll();
504             break;
505 
506         case STATE_TIMEOUT: {
507             lcb_STATUS err = lasterr ? lasterr : LCB_ERR_TIMEOUT;
508             ns_timeout = 0;
509             next_state = STATE_IGNORE;
510 
511             lcb_log(LOGARGS_T(WARN), "Polling durability timed out!");
512 
513             incref();
514 
515             for (size_t ii = 0; ii < entries.size(); ii++) {
516                 Item *ent = &entries[ii];
517                 if (ent->done) {
518                     continue;
519                 }
520                 if (ent->res().ctx.rc == LCB_SUCCESS) {
521                     ent->res().ctx.rc = err;
522                 }
523                 ent->finish();
524             }
525 
526             decref();
527             break;
528         }
529 
530         case STATE_IGNORE:
531             break;
532 
533         default:
534             lcb_assert("unexpected state" && 0);
535             break;
536     }
537 }
538 
539 /**
540  * Schedules us to be notified with the given state within a particular amount
541  * of time. This is used both for the timeout and for the interval
542  */
switch_state(State state)543 void Durset::switch_state(State state)
544 {
545     uint32_t delay = 0;
546     lcbio_TABLE *io = instance->iotable;
547     hrtime_t now = gethrtime();
548 
549     if (state == STATE_TIMEOUT) {
550         if (ns_timeout && now < ns_timeout) {
551             delay = LCB_NS2US(ns_timeout - now);
552         } else {
553             delay = 0;
554         }
555     } else if (state == STATE_OBSPOLL) {
556         if (now + LCB_US2NS(opts.interval) < ns_timeout) {
557             delay = opts.interval;
558         } else {
559             delay = 0;
560             state = STATE_TIMEOUT;
561         }
562     } else if (state == STATE_INIT) {
563         delay = 0;
564     }
565 
566     next_state = state;
567     io->timer.cancel(io->p, timer);
568     io->timer.schedule(io->p, timer, delay, this, timer_callback);
569 }
570