xref: /trunk/libcouchbase/src/operations/store.cc (revision f6822fcc)
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010-2020 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 #include "internal.h"
18 #include "collections.h"
19 #include "mc/compress.h"
20 #include "trace.h"
21 #include "defer.h"
22 #include "durability_internal.h"
23 
24 #include "capi/cmd_store.hh"
25 
lcb_mutation_token_is_valid(const lcb_MUTATION_TOKEN * token)26 LIBCOUCHBASE_API int lcb_mutation_token_is_valid(const lcb_MUTATION_TOKEN *token)
27 {
28     return token && !(token->uuid_ == 0 && token->seqno_ == 0 && token->vbid_ == 0);
29 }
30 
lcb_respstore_status(const lcb_RESPSTORE * resp)31 LIBCOUCHBASE_API lcb_STATUS lcb_respstore_status(const lcb_RESPSTORE *resp)
32 {
33     return resp->ctx.rc;
34 }
35 
lcb_respstore_error_context(const lcb_RESPSTORE * resp,const lcb_KEY_VALUE_ERROR_CONTEXT ** ctx)36 LIBCOUCHBASE_API lcb_STATUS lcb_respstore_error_context(const lcb_RESPSTORE *resp,
37                                                         const lcb_KEY_VALUE_ERROR_CONTEXT **ctx)
38 {
39     *ctx = &resp->ctx;
40     return LCB_SUCCESS;
41 }
42 
lcb_respstore_cookie(const lcb_RESPSTORE * resp,void ** cookie)43 LIBCOUCHBASE_API lcb_STATUS lcb_respstore_cookie(const lcb_RESPSTORE *resp, void **cookie)
44 {
45     *cookie = resp->cookie;
46     return LCB_SUCCESS;
47 }
48 
lcb_respstore_cas(const lcb_RESPSTORE * resp,uint64_t * cas)49 LIBCOUCHBASE_API lcb_STATUS lcb_respstore_cas(const lcb_RESPSTORE *resp, uint64_t *cas)
50 {
51     *cas = resp->ctx.cas;
52     return LCB_SUCCESS;
53 }
54 
lcb_respstore_key(const lcb_RESPSTORE * resp,const char ** key,size_t * key_len)55 LIBCOUCHBASE_API lcb_STATUS lcb_respstore_key(const lcb_RESPSTORE *resp, const char **key, size_t *key_len)
56 {
57     *key = resp->ctx.key.c_str();
58     *key_len = resp->ctx.key.size();
59     return LCB_SUCCESS;
60 }
61 
lcb_respstore_operation(const lcb_RESPSTORE * resp,lcb_STORE_OPERATION * operation)62 LIBCOUCHBASE_API lcb_STATUS lcb_respstore_operation(const lcb_RESPSTORE *resp, lcb_STORE_OPERATION *operation)
63 {
64     *operation = resp->op;
65     return LCB_SUCCESS;
66 }
67 
lcb_respstore_observe_stored(const lcb_RESPSTORE * resp,int * store_ok)68 LIBCOUCHBASE_API lcb_STATUS lcb_respstore_observe_stored(const lcb_RESPSTORE *resp, int *store_ok)
69 {
70     if (resp->dur_resp == nullptr) {
71         return LCB_ERR_UNSUPPORTED_OPERATION;
72     }
73     *store_ok = resp->store_ok;
74     return LCB_SUCCESS;
75 }
76 
lcb_respstore_observe_attached(const lcb_RESPSTORE * resp)77 LIBCOUCHBASE_API int lcb_respstore_observe_attached(const lcb_RESPSTORE *resp)
78 {
79     return resp->dur_resp != nullptr;
80 }
81 
lcb_respstore_observe_master_exists(const lcb_RESPSTORE * resp,int * master_exists)82 LIBCOUCHBASE_API lcb_STATUS lcb_respstore_observe_master_exists(const lcb_RESPSTORE *resp, int *master_exists)
83 {
84     if (resp->dur_resp == nullptr) {
85         return LCB_ERR_UNSUPPORTED_OPERATION;
86     }
87     *master_exists = resp->dur_resp->exists_master;
88     return LCB_SUCCESS;
89 }
90 
lcb_respstore_observe_master_persisted(const lcb_RESPSTORE * resp,int * master_persisted)91 LIBCOUCHBASE_API lcb_STATUS lcb_respstore_observe_master_persisted(const lcb_RESPSTORE *resp, int *master_persisted)
92 {
93     if (resp->dur_resp == nullptr) {
94         return LCB_ERR_UNSUPPORTED_OPERATION;
95     }
96     *master_persisted = resp->dur_resp->persisted_master;
97     return LCB_SUCCESS;
98 }
99 
lcb_respstore_observe_num_responses(const lcb_RESPSTORE * resp,uint16_t * num_responses)100 LIBCOUCHBASE_API lcb_STATUS lcb_respstore_observe_num_responses(const lcb_RESPSTORE *resp, uint16_t *num_responses)
101 {
102     if (resp->dur_resp == nullptr) {
103         return LCB_ERR_UNSUPPORTED_OPERATION;
104     }
105     *num_responses = resp->dur_resp->nresponses;
106     return LCB_SUCCESS;
107 }
108 
lcb_respstore_observe_num_persisted(const lcb_RESPSTORE * resp,uint16_t * num_persisted)109 LIBCOUCHBASE_API lcb_STATUS lcb_respstore_observe_num_persisted(const lcb_RESPSTORE *resp, uint16_t *num_persisted)
110 {
111     if (resp->dur_resp == nullptr) {
112         return LCB_ERR_UNSUPPORTED_OPERATION;
113     }
114     *num_persisted = resp->dur_resp->npersisted;
115     return LCB_SUCCESS;
116 }
117 
lcb_respstore_observe_num_replicated(const lcb_RESPSTORE * resp,uint16_t * num_replicated)118 LIBCOUCHBASE_API lcb_STATUS lcb_respstore_observe_num_replicated(const lcb_RESPSTORE *resp, uint16_t *num_replicated)
119 {
120     if (resp->dur_resp == nullptr) {
121         return LCB_ERR_UNSUPPORTED_OPERATION;
122     }
123     *num_replicated = resp->dur_resp->nreplicated;
124     return LCB_SUCCESS;
125 }
126 
lcb_respstore_mutation_token(const lcb_RESPSTORE * resp,lcb_MUTATION_TOKEN * token)127 LIBCOUCHBASE_API lcb_STATUS lcb_respstore_mutation_token(const lcb_RESPSTORE *resp, lcb_MUTATION_TOKEN *token)
128 {
129     if (token) {
130         *token = resp->mt;
131     }
132     return LCB_SUCCESS;
133 }
134 
lcb_cmdstore_create(lcb_CMDSTORE ** cmd,lcb_STORE_OPERATION operation)135 LIBCOUCHBASE_API lcb_STATUS lcb_cmdstore_create(lcb_CMDSTORE **cmd, lcb_STORE_OPERATION operation)
136 {
137     *cmd = new lcb_CMDSTORE{};
138     (*cmd)->operation(operation);
139     return LCB_SUCCESS;
140 }
141 
lcb_cmdstore_destroy(lcb_CMDSTORE * cmd)142 LIBCOUCHBASE_API lcb_STATUS lcb_cmdstore_destroy(lcb_CMDSTORE *cmd)
143 {
144     delete cmd;
145     return LCB_SUCCESS;
146 }
147 
lcb_cmdstore_timeout(lcb_CMDSTORE * cmd,uint32_t timeout)148 LIBCOUCHBASE_API lcb_STATUS lcb_cmdstore_timeout(lcb_CMDSTORE *cmd, uint32_t timeout)
149 {
150     return cmd->timeout_in_microseconds(timeout);
151 }
152 
lcb_cmdstore_parent_span(lcb_CMDSTORE * cmd,lcbtrace_SPAN * span)153 LIBCOUCHBASE_API lcb_STATUS lcb_cmdstore_parent_span(lcb_CMDSTORE *cmd, lcbtrace_SPAN *span)
154 {
155     return cmd->parent_span(span);
156 }
157 
lcb_cmdstore_collection(lcb_CMDSTORE * cmd,const char * scope,size_t scope_len,const char * collection,size_t collection_len)158 LIBCOUCHBASE_API lcb_STATUS lcb_cmdstore_collection(lcb_CMDSTORE *cmd, const char *scope, size_t scope_len,
159                                                     const char *collection, size_t collection_len)
160 {
161     try {
162         lcb::collection_qualifier qualifier(scope, scope_len, collection, collection_len);
163         return cmd->collection(std::move(qualifier));
164     } catch (const std::invalid_argument &) {
165         return LCB_ERR_INVALID_ARGUMENT;
166     }
167 }
168 
lcb_cmdstore_key(lcb_CMDSTORE * cmd,const char * key,size_t key_len)169 LIBCOUCHBASE_API lcb_STATUS lcb_cmdstore_key(lcb_CMDSTORE *cmd, const char *key, size_t key_len)
170 {
171     if (key == nullptr || key_len == 0) {
172         return LCB_ERR_INVALID_ARGUMENT;
173     }
174     return cmd->key(std::string(key, key_len));
175 }
176 
lcb_cmdstore_value(lcb_CMDSTORE * cmd,const char * value,size_t value_len)177 LIBCOUCHBASE_API lcb_STATUS lcb_cmdstore_value(lcb_CMDSTORE *cmd, const char *value, size_t value_len)
178 {
179     if (value == nullptr || value_len == 0) {
180         return LCB_SUCCESS; /* empty values allowed */
181     }
182 
183     return cmd->value(std::string(value, value_len));
184 }
185 
lcb_cmdstore_value_iov(lcb_CMDSTORE * cmd,const lcb_IOV * value,size_t value_len)186 LIBCOUCHBASE_API lcb_STATUS lcb_cmdstore_value_iov(lcb_CMDSTORE *cmd, const lcb_IOV *value, size_t value_len)
187 {
188     return cmd->value(value, value_len);
189 }
190 
lcb_cmdstore_expiry(lcb_CMDSTORE * cmd,uint32_t expiration)191 LIBCOUCHBASE_API lcb_STATUS lcb_cmdstore_expiry(lcb_CMDSTORE *cmd, uint32_t expiration)
192 {
193     return cmd->expiry(expiration);
194 }
195 
lcb_cmdstore_preserve_expiry(lcb_CMDSTORE * cmd,int should_preserve)196 LIBCOUCHBASE_API lcb_STATUS lcb_cmdstore_preserve_expiry(lcb_CMDSTORE *cmd, int should_preserve)
197 {
198     return cmd->preserve_expiry(should_preserve);
199 }
200 
lcb_cmdstore_cas(lcb_CMDSTORE * cmd,uint64_t cas)201 LIBCOUCHBASE_API lcb_STATUS lcb_cmdstore_cas(lcb_CMDSTORE *cmd, uint64_t cas)
202 {
203     return cmd->cas(cas);
204 }
205 
lcb_cmdstore_flags(lcb_CMDSTORE * cmd,uint32_t flags)206 LIBCOUCHBASE_API lcb_STATUS lcb_cmdstore_flags(lcb_CMDSTORE *cmd, uint32_t flags)
207 {
208     return cmd->flags(flags);
209 }
210 
lcb_cmdstore_datatype(lcb_CMDSTORE * cmd,uint8_t datatype)211 LIBCOUCHBASE_API lcb_STATUS lcb_cmdstore_datatype(lcb_CMDSTORE *cmd, uint8_t datatype)
212 {
213     if (datatype & LCB_VALUE_F_SNAPPYCOMP) {
214         cmd->value_is_compressed(true);
215     }
216     if (datatype & LCB_VALUE_F_JSON) {
217         cmd->value_is_json(true);
218     }
219     return LCB_SUCCESS;
220 }
221 
lcb_cmdstore_durability(lcb_CMDSTORE * cmd,lcb_DURABILITY_LEVEL level)222 LIBCOUCHBASE_API lcb_STATUS lcb_cmdstore_durability(lcb_CMDSTORE *cmd, lcb_DURABILITY_LEVEL level)
223 {
224     return cmd->durability_level(level);
225 }
226 
lcb_cmdstore_durability_observe(lcb_CMDSTORE * cmd,int persist_to,int replicate_to)227 LIBCOUCHBASE_API lcb_STATUS lcb_cmdstore_durability_observe(lcb_CMDSTORE *cmd, int persist_to, int replicate_to)
228 {
229     return cmd->durability_poll(persist_to, replicate_to);
230 }
231 
lcb_cmdstore_on_behalf_of(lcb_CMDSTORE * cmd,const char * data,size_t data_len)232 LIBCOUCHBASE_API lcb_STATUS lcb_cmdstore_on_behalf_of(lcb_CMDSTORE *cmd, const char *data, size_t data_len)
233 {
234     return cmd->on_behalf_of(std::string(data, data_len));
235 }
236 
lcb_cmdstore_on_behalf_of_extra_privilege(lcb_CMDSTORE * cmd,const char * privilege,size_t privilege_len)237 LIBCOUCHBASE_API lcb_STATUS lcb_cmdstore_on_behalf_of_extra_privilege(lcb_CMDSTORE *cmd, const char *privilege,
238                                                                       size_t privilege_len)
239 {
240     return cmd->on_behalf_of_add_extra_privilege(std::string(privilege, privilege_len));
241 }
242 
243 struct DurStoreCtx : mc_REQDATAEX {
244     lcb_INSTANCE *instance;
245     lcb_U16 persist_to;
246     lcb_U16 replicate_to;
247 
248     static mc_REQDATAPROCS proctable;
249 
DurStoreCtxDurStoreCtx250     DurStoreCtx(lcb_INSTANCE *instance_, lcb_U16 persist_, lcb_U16 replicate_, void *cookie_)
251         : mc_REQDATAEX(cookie_, proctable, 0), instance(instance_), persist_to(persist_), replicate_to(replicate_)
252     {
253     }
254 };
255 
256 /** Observe stuff */
handle_dur_storecb(mc_PIPELINE *,mc_PACKET * pkt,lcb_CALLBACK_TYPE,lcb_STATUS err,const void * arg)257 static void handle_dur_storecb(mc_PIPELINE *, mc_PACKET *pkt, lcb_CALLBACK_TYPE /* cbtype */, lcb_STATUS err,
258                                const void *arg)
259 {
260     lcb_RESPCALLBACK cb;
261     lcb_RESPSTORE resp{};
262     lcb_CMDENDURE dcmd = {0};
263     auto *dctx = static_cast<DurStoreCtx *>(pkt->u_rdata.exdata);
264     lcb_MULTICMD_CTX *mctx;
265     lcb_durability_opts_t opts = {0};
266     const auto *sresp = (const lcb_RESPSTORE *)arg;
267     lcbtrace_SPAN *span = nullptr;
268 
269     if (err != LCB_SUCCESS) {
270         goto GT_BAIL;
271     }
272     if (sresp->ctx.rc != LCB_SUCCESS) {
273         err = sresp->ctx.rc;
274         goto GT_BAIL;
275     }
276 
277     resp.store_ok = 1;
278     LCB_CMD_SET_KEY(&dcmd, sresp->ctx.key.c_str(), sresp->ctx.key.size());
279     dcmd.cas = sresp->ctx.cas;
280 
281     if (LCB_MUTATION_TOKEN_ISVALID(&sresp->mt)) {
282         dcmd.mutation_token = &sresp->mt;
283     }
284 
285     /* Set the options.. */
286     opts.v.v0.persist_to = dctx->persist_to;
287     opts.v.v0.replicate_to = dctx->replicate_to;
288 
289     mctx = lcb_endure3_ctxnew(dctx->instance, &opts, &err);
290     if (mctx == nullptr) {
291         goto GT_BAIL;
292     }
293 
294     span = MCREQ_PKT_RDATA(pkt)->span;
295     if (span) {
296         mctx->setspan(mctx, span);
297     }
298 
299     lcbdurctx_set_durstore(mctx, 1);
300     err = mctx->add_endure(mctx, &dcmd);
301     if (err != LCB_SUCCESS) {
302         mctx->fail(mctx);
303         goto GT_BAIL;
304     }
305     lcb_sched_enter(dctx->instance);
306     err = mctx->done(mctx, sresp->cookie);
307     lcb_sched_leave(dctx->instance);
308 
309     if (err == LCB_SUCCESS) {
310         /* Everything OK? */
311         delete dctx;
312         return;
313     }
314 
315 GT_BAIL : {
316     lcb_RESPENDURE dresp{};
317     resp.ctx.key = sresp->ctx.key;
318     resp.cookie = sresp->cookie;
319     resp.ctx.rc = err;
320     resp.dur_resp = &dresp;
321     cb = lcb_find_callback(dctx->instance, LCB_CALLBACK_STORE);
322     cb(dctx->instance, LCB_CALLBACK_STORE, (const lcb_RESPBASE *)&resp);
323     delete dctx;
324 }
325 }
326 
handle_dur_schedfail(mc_PACKET * pkt)327 static void handle_dur_schedfail(mc_PACKET *pkt)
328 {
329     delete static_cast<DurStoreCtx *>(pkt->u_rdata.exdata);
330 }
331 
332 mc_REQDATAPROCS DurStoreCtx::proctable = {handle_dur_storecb, handle_dur_schedfail};
333 
get_value_size(const mc_PACKET * packet)334 static lcb_size_t get_value_size(const mc_PACKET *packet)
335 {
336     if (packet->flags & MCREQ_F_VALUE_IOV) {
337         return packet->u_value.multi.total_length;
338     } else {
339         return packet->u_value.single.size;
340     }
341 }
342 
can_compress(lcb_INSTANCE * instance,const mc_PIPELINE * pipeline,bool already_compressed)343 static bool can_compress(lcb_INSTANCE *instance, const mc_PIPELINE *pipeline, bool already_compressed)
344 {
345     if (already_compressed) {
346         return false;
347     }
348 
349     const auto *server = static_cast<const lcb::Server *>(pipeline);
350     uint8_t compressopts = LCBT_SETTING(instance, compressopts);
351 
352     if ((compressopts & LCB_COMPRESS_OUT) == 0) {
353         return false;
354     }
355     if (server->supports_compression() == false && (compressopts & LCB_COMPRESS_FORCE) == 0) {
356         return false;
357     }
358     return true;
359 }
360 
store_validate(lcb_INSTANCE * instance,const lcb_CMDSTORE * cmd)361 static lcb_STATUS store_validate(lcb_INSTANCE *instance, const lcb_CMDSTORE *cmd)
362 {
363     if (cmd->key().empty()) {
364         return LCB_ERR_EMPTY_KEY;
365     }
366     if (!LCBT_SETTING(instance, use_collections) && !cmd->collection().is_default_collection()) {
367         /* only allow default collection when collections disabled for the instance */
368         return LCB_ERR_SDK_FEATURE_UNAVAILABLE;
369     }
370     if (!LCBT_SETTING(instance, enable_durable_write) && cmd->has_sync_durability_requirements()) {
371         return LCB_ERR_UNSUPPORTED_OPERATION;
372     }
373 
374     return LCB_SUCCESS;
375 }
376 
store_schedule(lcb_INSTANCE * instance,std::shared_ptr<lcb_CMDSTORE> cmd)377 static lcb_STATUS store_schedule(lcb_INSTANCE *instance, std::shared_ptr<lcb_CMDSTORE> cmd)
378 {
379     lcb_STATUS err;
380 
381     mc_PIPELINE *pipeline;
382     mc_PACKET *packet;
383     mc_CMDQUEUE *cq = &instance->cmdq;
384     protocol_binary_request_header hdr{};
385     int new_durability_supported = LCBT_SUPPORT_SYNCREPLICATION(instance);
386 
387     std::vector<std::uint8_t> framing_extras;
388     if (new_durability_supported && cmd->has_sync_durability_requirements()) {
389         auto durability_timeout = htons(lcb_durability_timeout(instance, cmd->timeout_in_microseconds()));
390         std::uint8_t frame_id = 0x01;
391         std::uint8_t frame_size = durability_timeout > 0 ? 3 : 1;
392         framing_extras.emplace_back(frame_id << 4U | frame_size);
393         framing_extras.emplace_back(cmd->durability_level());
394         if (durability_timeout > 0) {
395             framing_extras.emplace_back(durability_timeout >> 8U);
396             framing_extras.emplace_back(durability_timeout & 0xff);
397         }
398     }
399     if (cmd->should_preserve_expiry()) {
400         std::uint8_t frame_id = 0x05;
401         std::uint8_t frame_size = 0x00;
402         framing_extras.emplace_back(frame_id << 4U | frame_size);
403     }
404     if (cmd->want_impersonation()) {
405         err = lcb::flexible_framing_extras::encode_impersonate_user(cmd->impostor(), framing_extras);
406         if (err != LCB_SUCCESS) {
407             return err;
408         }
409         for (const auto &privilege : cmd->extra_privileges()) {
410             err = lcb::flexible_framing_extras::encode_impersonate_users_extra_privilege(privilege, framing_extras);
411             if (err != LCB_SUCCESS) {
412                 return err;
413             }
414         }
415     }
416     auto ffextlen = static_cast<std::uint8_t>(framing_extras.size());
417     hdr.request.magic = (ffextlen == 0) ? PROTOCOL_BINARY_REQ : PROTOCOL_BINARY_AREQ;
418     hdr.request.opcode = cmd->opcode();
419     hdr.request.extlen = cmd->extras_size();
420     lcb_KEYBUF keybuf{LCB_KV_COPY, {cmd->key().c_str(), cmd->key().size()}};
421     err = mcreq_basic_packet(cq, &keybuf, cmd->collection().collection_id(), &hdr, hdr.request.extlen, ffextlen,
422                              &packet, &pipeline, MCREQ_BASICPACKET_F_FALLBACKOK);
423     if (err != LCB_SUCCESS) {
424         return err;
425     }
426 
427     int should_compress = can_compress(instance, pipeline, cmd->value_is_compressed());
428     lcb_VALBUF valuebuf{LCB_KV_COPY, {{cmd->value().c_str(), cmd->value().size()}}};
429     if (should_compress) {
430         int rv = mcreq_compress_value(pipeline, packet, &valuebuf, instance->settings, &should_compress);
431         if (rv != 0) {
432             mcreq_release_packet(pipeline, packet);
433             return LCB_ERR_NO_MEMORY;
434         }
435     } else {
436         mcreq_reserve_value(pipeline, packet, &valuebuf);
437     }
438 
439     if (cmd->need_poll_durability()) {
440         int duropts = 0;
441         std::uint16_t persist_to = cmd->persist_to();
442         std::uint16_t replicate_to = cmd->replicate_to();
443         if (cmd->cap_to_maximum_nodes()) {
444             duropts = LCB_DURABILITY_VALIDATE_CAPMAX;
445         }
446         err = lcb_durability_validate(instance, &persist_to, &replicate_to, duropts);
447         if (err != LCB_SUCCESS) {
448             mcreq_wipe_packet(pipeline, packet);
449             mcreq_release_packet(pipeline, packet);
450             return err;
451         }
452 
453         auto *dctx = new DurStoreCtx(instance, persist_to, replicate_to, cmd->cookie());
454         packet->u_rdata.exdata = dctx;
455         packet->flags |= MCREQ_F_REQEXT;
456     }
457     mc_REQDATA *rdata = MCREQ_PKT_RDATA(packet);
458     rdata->cookie = cmd->cookie();
459     rdata->start = cmd->start_time_or_default_in_nanoseconds(gethrtime());
460     rdata->deadline =
461         rdata->start + cmd->timeout_or_default_in_nanoseconds(LCB_US2NS(LCBT_SETTING(instance, operation_timeout)));
462 
463     hdr.request.cas = lcb_htonll(cmd->cas());
464     hdr.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
465     if (should_compress || cmd->value_is_compressed()) {
466         hdr.request.datatype |= PROTOCOL_BINARY_DATATYPE_COMPRESSED;
467     }
468 
469     if (cmd->value_is_json() && static_cast<const lcb::Server *>(pipeline)->supports_json()) {
470         hdr.request.datatype |= PROTOCOL_BINARY_DATATYPE_JSON;
471     }
472 
473     hdr.request.opaque = packet->opaque;
474     hdr.request.bodylen = htonl(hdr.request.extlen + ffextlen + mcreq_get_key_size(&hdr) + get_value_size(packet));
475 
476     if (cmd->is_cookie_callback()) {
477         packet->flags |= MCREQ_F_PRIVCALLBACK;
478     }
479 
480     memcpy(SPAN_BUFFER(&packet->kh_span), &hdr, sizeof(hdr));
481 
482     std::size_t offset = sizeof(hdr);
483     if (!framing_extras.empty()) {
484         memcpy(SPAN_BUFFER(&packet->kh_span) + offset, framing_extras.data(), framing_extras.size());
485         offset += framing_extras.size();
486     }
487 
488     if (hdr.request.extlen == 2 * sizeof(std::uint32_t)) {
489         std::uint32_t flags = htonl(cmd->flags());
490         memcpy(SPAN_BUFFER(&packet->kh_span) + offset, &flags, sizeof(flags));
491         offset += sizeof(flags);
492 
493         std::uint32_t expiry = htonl(cmd->expiry());
494         memcpy(SPAN_BUFFER(&packet->kh_span) + offset, &expiry, sizeof(expiry));
495     }
496 
497     if (cmd->is_replace_semantics()) {
498         packet->flags |= MCREQ_F_REPLACE_SEMANTICS;
499     }
500     rdata->span = lcb::trace::start_kv_span_with_durability(instance->settings, packet, cmd);
501     LCB_SCHED_ADD(instance, pipeline, packet)
502 
503     TRACE_STORE_BEGIN(instance, &hdr, cmd);
504 
505     return LCB_SUCCESS;
506 }
507 
store_execute(lcb_INSTANCE * instance,std::shared_ptr<lcb_CMDSTORE> cmd)508 static lcb_STATUS store_execute(lcb_INSTANCE *instance, std::shared_ptr<lcb_CMDSTORE> cmd)
509 {
510     if (!LCBT_SETTING(instance, use_collections)) {
511         /* fast path if collections are not enabled */
512         return store_schedule(instance, cmd);
513     }
514 
515     if (collcache_get(instance, cmd->collection()) == LCB_SUCCESS) {
516         return store_schedule(instance, cmd);
517     }
518 
519     return collcache_resolve(
520         instance, cmd,
521         [instance](lcb_STATUS status, const lcb_RESPGETCID *resp, std::shared_ptr<lcb_CMDSTORE> operation) {
522             const auto callback_type = LCB_CALLBACK_STORE;
523             lcb_RESPCALLBACK operation_callback = lcb_find_callback(instance, callback_type);
524             lcb_RESPSTORE response{};
525             if (resp != nullptr) {
526                 response.ctx = resp->ctx;
527             }
528             response.ctx.key = operation->key();
529             response.ctx.scope = operation->collection().scope();
530             response.ctx.collection = operation->collection().collection();
531             response.cookie = operation->cookie();
532             if (status == LCB_ERR_SHEDULE_FAILURE || resp == nullptr) {
533                 response.ctx.rc = LCB_ERR_TIMEOUT;
534                 operation_callback(instance, callback_type, &response);
535                 return;
536             }
537             if (resp->ctx.rc != LCB_SUCCESS) {
538                 operation_callback(instance, callback_type, &response);
539                 return;
540             }
541             response.ctx.rc = store_schedule(instance, operation);
542             if (response.ctx.rc != LCB_SUCCESS) {
543                 operation_callback(instance, callback_type, &response);
544             }
545         });
546 }
547 
lcb_store(lcb_INSTANCE * instance,void * cookie,const lcb_CMDSTORE * command)548 LIBCOUCHBASE_API lcb_STATUS lcb_store(lcb_INSTANCE *instance, void *cookie, const lcb_CMDSTORE *command)
549 {
550     lcb_STATUS rc;
551 
552     rc = store_validate(instance, command);
553     if (rc != LCB_SUCCESS) {
554         return rc;
555     }
556 
557     auto cmd = std::make_shared<lcb_CMDSTORE>(*command);
558     cmd->cookie(cookie);
559 
560     if (instance->cmdq.config == nullptr) {
561         cmd->start_time_in_nanoseconds(gethrtime());
562         return lcb::defer_operation(instance, [instance, cmd](lcb_STATUS status) {
563             const auto callback_type = LCB_CALLBACK_STORE;
564             lcb_RESPCALLBACK operation_callback = lcb_find_callback(instance, callback_type);
565             lcb_RESPSTORE response{};
566             response.ctx.key = cmd->key();
567             response.cookie = cmd->cookie();
568             if (status == LCB_ERR_REQUEST_CANCELED) {
569                 response.ctx.rc = status;
570                 operation_callback(instance, callback_type, &response);
571                 return;
572             }
573             response.ctx.rc = store_execute(instance, cmd);
574             if (response.ctx.rc != LCB_SUCCESS) {
575                 operation_callback(instance, callback_type, &response);
576             }
577         });
578     }
579     return store_execute(instance, cmd);
580 }
581