xref: /6.6.0/couchstore/src/couch_save.cc (revision cf120ada)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "couchstore_config.h"
3
4#include <platform/cb_malloc.h>
5#include <string.h>
6#include <stddef.h>
7#include <stdlib.h>
8
9#include "internal.h"
10#include "node_types.h"
11#include "util.h"
12#include "reduces.h"
13#include "couch_btree.h"
14
15#include "couch_latency_internal.h"
16
17
18#define SEQ_INDEX_RAW_VALUE_SIZE(doc_info) \
19    (sizeof(raw_seq_index_value) + (doc_info).id.size + (doc_info).rev_meta.size)
20
21#define ID_INDEX_RAW_VALUE_SIZE(doc_info) \
22    (sizeof(raw_id_index_value) + (doc_info).rev_meta.size)
23
24#define RAW_SEQ_SIZE sizeof(raw_48)
25
26
27static size_t assemble_seq_index_value(DocInfo *docinfo, char *dst)
28{
29    char* const start = dst;
30    raw_seq_index_value *raw = (raw_seq_index_value*)dst;
31    raw->sizes = encode_kv_length(docinfo->id.size, docinfo->size);
32    encode_raw48(docinfo->bp | (docinfo->deleted ? 1LL<<47 : 0), &raw->bp);
33    raw->content_meta = encode_raw08(docinfo->content_meta);
34    encode_raw48(docinfo->rev_seq, &raw->rev_seq);
35    dst += sizeof(*raw);
36
37    memcpy(dst, docinfo->id.buf, docinfo->id.size);
38    dst += docinfo->id.size;
39    if (docinfo->rev_meta.size > 0) {
40        memcpy(dst, docinfo->rev_meta.buf, docinfo->rev_meta.size);
41        dst += docinfo->rev_meta.size;
42    }
43    return dst - start;
44}
45
46static size_t assemble_id_index_value(DocInfo *docinfo, char *dst)
47{
48    char* const start = dst;
49    raw_id_index_value *raw = (raw_id_index_value*)dst;
50    encode_raw48(docinfo->db_seq, &raw->db_seq);
51    raw->size = encode_raw32((uint32_t)docinfo->size);
52    encode_raw48(docinfo->bp | (docinfo->deleted ? 1LL<<47 : 0), &raw->bp);
53    raw->content_meta = encode_raw08(docinfo->content_meta);
54    encode_raw48(docinfo->rev_seq, &raw->rev_seq);
55    dst += sizeof(*raw);
56
57    if (docinfo->rev_meta.size > 0) {
58        memcpy(dst, docinfo->rev_meta.buf, docinfo->rev_meta.size);
59        dst += docinfo->rev_meta.size;
60    }
61    return dst - start;
62}
63
64static couchstore_error_t write_doc(Db *db, const Doc *doc, uint64_t *bp,
65                                    size_t* disk_size, couchstore_save_options writeopts)
66{
67    couchstore_error_t errcode;
68    if (writeopts & COMPRESS_DOC_BODIES) {
69        errcode = db_write_buf_compressed(&db->file, &doc->data, (cs_off_t *) bp, disk_size);
70    } else {
71        errcode = static_cast<couchstore_error_t>(db_write_buf(&db->file, &doc->data, (cs_off_t *) bp, disk_size));
72    }
73
74    return errcode;
75}
76
77static int ebin_ptr_compare(const void *a, const void *b)
78{
79    const sized_buf* const* buf1 = static_cast<const sized_buf* const *>(a);
80    const sized_buf* const* buf2 = static_cast<const sized_buf* const *>(b);
81    return ebin_cmp(*buf1, *buf2);
82}
83
84static int seq_action_compare(const void *actv1, const void *actv2)
85{
86    const couchfile_modify_action *act1, *act2;
87    act1 = static_cast<const couchfile_modify_action *>(actv1);
88    act2 = static_cast<const couchfile_modify_action *>(actv2);
89
90    uint64_t seq1, seq2;
91
92    seq1 = decode_sequence_key(act1->key);
93    seq2 = decode_sequence_key(act2->key);
94
95    if (seq1 < seq2) {
96        return -1;
97    }
98    if (seq1 == seq2) {
99        if (act1->type < act2->type) {
100            return -1;
101        }
102        if (act1->type > act2->type) {
103            return 1;
104        }
105        return 0;
106    }
107    if (seq1 > seq2) {
108        return 1;
109    }
110    return 0;
111}
112
113typedef struct _idxupdatectx {
114    couchfile_modify_action *seqacts;
115    int actpos;
116
117    sized_buf **seqs;
118    sized_buf **seqvals;
119    int valpos;
120
121    fatbuf *deltermbuf;
122} index_update_ctx;
123
124static void idfetch_update_cb(couchfile_modify_request *rq,
125                              sized_buf *k, sized_buf *v, void *arg)
126{
127    (void)k;
128    (void)rq;
129    //v contains a seq we need to remove ( {Seq,_,_,_,_} )
130    uint64_t oldseq;
131    sized_buf *delbuf = NULL;
132    index_update_ctx *ctx = (index_update_ctx *) arg;
133
134    if (v == NULL) { //Doc not found
135        return;
136    }
137
138    const raw_id_index_value *raw = (raw_id_index_value*) v->buf;
139    oldseq = decode_raw48(raw->db_seq);
140
141    delbuf = (sized_buf *) fatbuf_get(ctx->deltermbuf, sizeof(sized_buf));
142    delbuf->buf = (char *) fatbuf_get(ctx->deltermbuf, 6);
143    delbuf->size = 6;
144    memset(delbuf->buf, 0, 6);
145    encode_raw48(oldseq, (raw_48*)delbuf->buf);
146
147    ctx->seqacts[ctx->actpos].type = ACTION_REMOVE;
148    ctx->seqacts[ctx->actpos].value.data = NULL;
149    ctx->seqacts[ctx->actpos].key = delbuf;
150
151    ctx->actpos++;
152}
153
154static couchstore_error_t update_indexes(Db* db,
155                                         sized_buf* seqs,
156                                         sized_buf* seqvals,
157                                         sized_buf* ids,
158                                         sized_buf* idvals,
159                                         int numdocs,
160                                         save_callback_fn save_callback,
161                                         void* save_callback_ctx) {
162    couchfile_modify_action *idacts;
163    couchfile_modify_action *seqacts;
164    const sized_buf **sorted_ids = NULL;
165    size_t size;
166    fatbuf *actbuf;
167    node_pointer *new_id_root;
168    node_pointer *new_seq_root;
169    couchstore_error_t errcode;
170    couchstore_error_t err;
171    couchfile_modify_request seqrq, idrq;
172    int ii;
173    index_update_ctx fetcharg;
174
175    /*
176    ** Two action list up to numdocs * 2 in size + Compare keys for ids,
177    ** and compare keys for removed seqs found from id index +
178    ** Max size of a int64 erlang term (for deleted seqs)
179    */
180    size = 4 * sizeof(couchfile_modify_action) + 2 * sizeof(sized_buf) + 10;
181
182    actbuf = fatbuf_alloc(numdocs * size);
183    error_unless(actbuf, COUCHSTORE_ERROR_ALLOC_FAIL);
184
185    idacts = static_cast<couchfile_modify_action*>(fatbuf_get(actbuf, numdocs * sizeof(couchfile_modify_action) * 2));
186    seqacts = static_cast<couchfile_modify_action*>(fatbuf_get(actbuf, numdocs * sizeof(couchfile_modify_action) * 2));
187    error_unless(idacts && seqacts, COUCHSTORE_ERROR_ALLOC_FAIL);
188
189    memset(&fetcharg, 0, sizeof(fetcharg));
190    fetcharg.seqacts = seqacts;
191    fetcharg.actpos = 0;
192    fetcharg.seqs = &seqs;
193    fetcharg.seqvals = &seqvals;
194    fetcharg.valpos = 0;
195    fetcharg.deltermbuf = actbuf;
196
197    // Sort the array indexes of ids[] by ascending id. Since we can't pass context info to qsort,
198    // actually sort an array of pointers to the elements of ids[], rather than the array indexes.
199    sorted_ids = static_cast<const sized_buf**>(cb_malloc(numdocs * sizeof(sized_buf*)));
200    error_unless(sorted_ids, COUCHSTORE_ERROR_ALLOC_FAIL);
201    for (ii = 0; ii < numdocs; ++ii) {
202        sorted_ids[ii] = &ids[ii];
203    }
204    qsort(sorted_ids, numdocs, sizeof(sorted_ids[0]), &ebin_ptr_compare);
205
206    // Assemble idacts[] array, in sorted order by id:
207    for (ii = 0; ii < numdocs; ii++) {
208        ptrdiff_t isorted = sorted_ids[ii] - ids;   // recover index of ii'th id in sort order
209
210        idacts[ii * 2].type = ACTION_FETCH;
211        idacts[ii * 2].value.arg = &fetcharg;
212        idacts[ii * 2 + 1].type = ACTION_INSERT;
213        idacts[ii * 2 + 1].value.data = &idvals[isorted];
214        // Allow the by_id building to find the by_seqno for each id.
215        // The save_callback method passes back id and seqno to the caller.
216        idacts[ii * 2 + 1].seq = &seqs[isorted];
217        idacts[ii * 2].key = &ids[isorted];
218        idacts[ii * 2 + 1].key = &ids[isorted];
219    }
220
221    idrq.cmp.compare = ebin_cmp;
222    idrq.file = &db->file;
223    idrq.actions = idacts;
224    idrq.num_actions = numdocs * 2;
225    idrq.reduce = by_id_reduce;
226    idrq.rereduce = by_id_rereduce;
227    idrq.fetch_callback = idfetch_update_cb;
228    idrq.compacting = 0;
229    idrq.enable_purging = false;
230    idrq.purge_kp = NULL;
231    idrq.purge_kv = NULL;
232    idrq.kv_chunk_threshold = db->file.options.kv_nodesize;
233    idrq.kp_chunk_threshold = db->file.options.kp_nodesize;
234    idrq.save_callback = save_callback;
235    idrq.save_callback_ctx = save_callback_ctx;
236    idrq.docinfo_callback = by_id_read_docinfo;
237
238    new_id_root = modify_btree(&idrq, db->header.by_id_root, &err);
239    error_pass(err);
240
241    while (fetcharg.valpos < numdocs) {
242        seqacts[fetcharg.actpos].type = ACTION_INSERT;
243        seqacts[fetcharg.actpos].value.data = &seqvals[fetcharg.valpos];
244        seqacts[fetcharg.actpos].key = &seqs[fetcharg.valpos];
245        fetcharg.valpos++;
246        fetcharg.actpos++;
247    }
248
249    //printf("Total seq actions: %d\n", fetcharg.actpos);
250    qsort(seqacts, fetcharg.actpos, sizeof(couchfile_modify_action),
251          seq_action_compare);
252
253    seqrq.cmp.compare = seq_cmp;
254    seqrq.actions = seqacts;
255    seqrq.num_actions = fetcharg.actpos;
256    seqrq.reduce = by_seq_reduce;
257    seqrq.rereduce = by_seq_rereduce;
258    seqrq.file = &db->file;
259    seqrq.compacting = 0;
260    seqrq.enable_purging = false;
261    seqrq.purge_kp = NULL;
262    seqrq.purge_kv = NULL;
263    seqrq.kv_chunk_threshold = db->file.options.kv_nodesize;
264    seqrq.kp_chunk_threshold = db->file.options.kp_nodesize;
265
266    new_seq_root = modify_btree(&seqrq, db->header.by_seq_root, &errcode);
267    if (errcode != COUCHSTORE_SUCCESS) {
268        cb_free(new_id_root);
269        error_pass(errcode);
270    }
271
272    if (db->header.by_id_root != new_id_root) {
273        cb_free(db->header.by_id_root);
274        db->header.by_id_root = new_id_root;
275    }
276
277    if (db->header.by_seq_root != new_seq_root) {
278        cb_free(db->header.by_seq_root);
279        db->header.by_seq_root = new_seq_root;
280    }
281
282cleanup:
283    cb_free(sorted_ids);
284    fatbuf_free(actbuf);
285    return errcode;
286}
287
288static couchstore_error_t add_doc_to_update_list(Db *db,
289                                                 const Doc *doc,
290                                                 const DocInfo *info,
291                                                 fatbuf *fb,
292                                                 sized_buf *seqterm,
293                                                 sized_buf *idterm,
294                                                 sized_buf *seqval,
295                                                 sized_buf *idval,
296                                                 uint64_t seq,
297                                                 couchstore_save_options options)
298{
299    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
300    DocInfo updated = *info;
301    updated.db_seq = seq;
302
303    seqterm->buf = (char *) fatbuf_get(fb, RAW_SEQ_SIZE);
304    seqterm->size = RAW_SEQ_SIZE;
305    error_unless(seqterm->buf, COUCHSTORE_ERROR_ALLOC_FAIL);
306    encode_raw48(seq, (raw_48*)seqterm->buf);
307
308    if (doc) {
309        size_t disk_size;
310
311        // Don't compress a doc unless the meta flag is set
312        if (!(info->content_meta & COUCH_DOC_IS_COMPRESSED)) {
313            options &= ~COMPRESS_DOC_BODIES;
314        }
315        errcode = write_doc(db, doc, &updated.bp, &disk_size, options);
316
317        if (errcode != COUCHSTORE_SUCCESS) {
318            return errcode;
319        }
320        updated.size = disk_size;
321    } else {
322        updated.deleted = 1;
323        updated.bp = 0;
324        updated.size = 0;
325    }
326
327    *idterm = updated.id;
328
329    seqval->buf = (char *) fatbuf_get(fb, SEQ_INDEX_RAW_VALUE_SIZE(updated));
330    error_unless(seqval->buf, COUCHSTORE_ERROR_ALLOC_FAIL);
331    seqval->size = assemble_seq_index_value(&updated, seqval->buf);
332
333    idval->buf = (char *) fatbuf_get(fb, ID_INDEX_RAW_VALUE_SIZE(updated));
334    error_unless(idval->buf, COUCHSTORE_ERROR_ALLOC_FAIL);
335    idval->size = assemble_id_index_value(&updated, idval->buf);
336
337    //We use 37 + id.size + 2 * rev_meta.size bytes
338cleanup:
339    return errcode;
340}
341
342couchstore_error_t couchstore_save_documents_and_callback(
343        Db* db,
344        Doc* const docs[],
345        DocInfo* infos[],
346        unsigned numdocs,
347        couchstore_save_options options,
348        save_callback_fn save_cb,
349        void* save_cb_ctx) {
350    COLLECT_LATENCY();
351
352    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
353    unsigned ii;
354    sized_buf *seqklist, *idklist, *seqvlist, *idvlist;
355    size_t term_meta_size = 0;
356    const Doc *curdoc;
357    uint64_t seq = db->header.update_seq;
358
359    fatbuf *fb;
360
361    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
362
363    for (ii = 0; ii < numdocs; ii++) {
364        // Get additional size for terms to be inserted into indexes
365        // IMPORTANT: This must match the sizes of the fatbuf_get calls in add_doc_to_update_list!
366        term_meta_size += RAW_SEQ_SIZE;
367        term_meta_size += SEQ_INDEX_RAW_VALUE_SIZE(*infos[ii]);
368        term_meta_size += ID_INDEX_RAW_VALUE_SIZE(*infos[ii]);
369    }
370
371    fb = fatbuf_alloc(term_meta_size +
372                      numdocs * (sizeof(sized_buf) * 4)); //seq/id key and value lists
373
374    if (fb == NULL) {
375        return COUCHSTORE_ERROR_ALLOC_FAIL;
376    }
377
378
379    seqklist = static_cast<sized_buf*>(fatbuf_get(fb, numdocs * sizeof(sized_buf)));
380    idklist = static_cast<sized_buf*>(fatbuf_get(fb, numdocs * sizeof(sized_buf)));
381    seqvlist = static_cast<sized_buf*>(fatbuf_get(fb, numdocs * sizeof(sized_buf)));
382    idvlist = static_cast<sized_buf*>(fatbuf_get(fb, numdocs * sizeof(sized_buf)));
383
384    for (ii = 0; ii < numdocs; ii++) {
385        if(options & COUCHSTORE_SEQUENCE_AS_IS) {
386            seq = infos[ii]->db_seq;
387        } else {
388            seq++;
389        }
390
391        if (docs) {
392            curdoc = docs[ii];
393        } else {
394            curdoc = NULL;
395        }
396
397        errcode = add_doc_to_update_list(db, curdoc, infos[ii], fb,
398                                         &seqklist[ii], &idklist[ii],
399                                         &seqvlist[ii], &idvlist[ii],
400                                         seq, options);
401        if (errcode != COUCHSTORE_SUCCESS) {
402            break;
403        }
404    }
405
406    if (errcode == COUCHSTORE_SUCCESS) {
407        errcode = update_indexes(db,
408                                 seqklist,
409                                 seqvlist,
410                                 idklist,
411                                 idvlist,
412                                 numdocs,
413                                 save_cb,
414                                 save_cb_ctx);
415    }
416
417    fatbuf_free(fb);
418    if (errcode == COUCHSTORE_SUCCESS) {
419        if(options & COUCHSTORE_SEQUENCE_AS_IS) {
420            // Sequences are passed as-is, make sure update_seq is >= the highest.
421            seq = db->header.update_seq;
422            for(ii = 0; ii < numdocs; ii++) {
423                if(infos[ii]->db_seq >= seq) {
424                    seq = infos[ii]->db_seq;
425                }
426            }
427            db->header.update_seq = seq;
428        } else {
429            // Fill in the assigned sequence numbers for caller's later use:
430            seq = db->header.update_seq;
431            for (ii = 0; ii < numdocs; ii++) {
432                infos[ii]->db_seq = ++seq;
433            }
434            db->header.update_seq = seq;
435        }
436    }
437 cleanup:
438    return errcode;
439}
440
441couchstore_error_t couchstore_save_documents(Db* db,
442                                             Doc* const docs[],
443                                             DocInfo* infos[],
444                                             unsigned numDocs,
445                                             couchstore_save_options options) {
446    return couchstore_save_documents_and_callback(
447            db, docs, infos, numDocs, options, nullptr, nullptr);
448}
449
450couchstore_error_t couchstore_save_document(Db *db, const Doc *doc,
451                                            DocInfo *info, couchstore_save_options options)
452{
453    return couchstore_save_documents_and_callback(
454            db, (Doc**)&doc, (DocInfo**)&info, 1, options, nullptr, nullptr);
455}
456