xref: /6.0.3/couchstore/src/couch_save.cc (revision 28ec7f5b)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "config.h"
3
4#include <platform/cb_malloc.h>
5#include <string.h>
6#include <stddef.h>
7#include <stdlib.h>
8
9#include "internal.h"
10#include "node_types.h"
11#include "util.h"
12#include "reduces.h"
13#include "couch_btree.h"
14
15#include "couch_latency_internal.h"
16
17
18#define SEQ_INDEX_RAW_VALUE_SIZE(doc_info) \
19    (sizeof(raw_seq_index_value) + (doc_info).id.size + (doc_info).rev_meta.size)
20
21#define ID_INDEX_RAW_VALUE_SIZE(doc_info) \
22    (sizeof(raw_id_index_value) + (doc_info).rev_meta.size)
23
24#define RAW_SEQ_SIZE sizeof(raw_48)
25
26
27static size_t assemble_seq_index_value(DocInfo *docinfo, char *dst)
28{
29    char* const start = dst;
30    raw_seq_index_value *raw = (raw_seq_index_value*)dst;
31    raw->sizes = encode_kv_length(docinfo->id.size, docinfo->size);
32    encode_raw48(docinfo->bp | (docinfo->deleted ? 1LL<<47 : 0), &raw->bp);
33    raw->content_meta = encode_raw08(docinfo->content_meta);
34    encode_raw48(docinfo->rev_seq, &raw->rev_seq);
35    dst += sizeof(*raw);
36
37    memcpy(dst, docinfo->id.buf, docinfo->id.size);
38    dst += docinfo->id.size;
39    if (docinfo->rev_meta.size > 0) {
40        memcpy(dst, docinfo->rev_meta.buf, docinfo->rev_meta.size);
41        dst += docinfo->rev_meta.size;
42    }
43    return dst - start;
44}
45
46static size_t assemble_id_index_value(DocInfo *docinfo, char *dst)
47{
48    char* const start = dst;
49    raw_id_index_value *raw = (raw_id_index_value*)dst;
50    encode_raw48(docinfo->db_seq, &raw->db_seq);
51    raw->size = encode_raw32((uint32_t)docinfo->size);
52    encode_raw48(docinfo->bp | (docinfo->deleted ? 1LL<<47 : 0), &raw->bp);
53    raw->content_meta = encode_raw08(docinfo->content_meta);
54    encode_raw48(docinfo->rev_seq, &raw->rev_seq);
55    dst += sizeof(*raw);
56
57    if (docinfo->rev_meta.size > 0) {
58        memcpy(dst, docinfo->rev_meta.buf, docinfo->rev_meta.size);
59        dst += docinfo->rev_meta.size;
60    }
61    return dst - start;
62}
63
64static couchstore_error_t write_doc(Db *db, const Doc *doc, uint64_t *bp,
65                                    size_t* disk_size, couchstore_save_options writeopts)
66{
67    couchstore_error_t errcode;
68    if (writeopts & COMPRESS_DOC_BODIES) {
69        errcode = db_write_buf_compressed(&db->file, &doc->data, (cs_off_t *) bp, disk_size);
70    } else {
71        errcode = static_cast<couchstore_error_t>(db_write_buf(&db->file, &doc->data, (cs_off_t *) bp, disk_size));
72    }
73
74    return errcode;
75}
76
77static int ebin_ptr_compare(const void *a, const void *b)
78{
79    const sized_buf* const* buf1 = static_cast<const sized_buf* const *>(a);
80    const sized_buf* const* buf2 = static_cast<const sized_buf* const *>(b);
81    return ebin_cmp(*buf1, *buf2);
82}
83
84static int seq_action_compare(const void *actv1, const void *actv2)
85{
86    const couchfile_modify_action *act1, *act2;
87    act1 = static_cast<const couchfile_modify_action *>(actv1);
88    act2 = static_cast<const couchfile_modify_action *>(actv2);
89
90    uint64_t seq1, seq2;
91
92    seq1 = decode_sequence_key(act1->key);
93    seq2 = decode_sequence_key(act2->key);
94
95    if (seq1 < seq2) {
96        return -1;
97    }
98    if (seq1 == seq2) {
99        if (act1->type < act2->type) {
100            return -1;
101        }
102        if (act1->type > act2->type) {
103            return 1;
104        }
105        return 0;
106    }
107    if (seq1 > seq2) {
108        return 1;
109    }
110    return 0;
111}
112
113typedef struct _idxupdatectx {
114    couchfile_modify_action *seqacts;
115    int actpos;
116
117    sized_buf **seqs;
118    sized_buf **seqvals;
119    int valpos;
120
121    fatbuf *deltermbuf;
122} index_update_ctx;
123
124static void idfetch_update_cb(couchfile_modify_request *rq,
125                              sized_buf *k, sized_buf *v, void *arg)
126{
127    (void)k;
128    (void)rq;
129    //v contains a seq we need to remove ( {Seq,_,_,_,_} )
130    uint64_t oldseq;
131    sized_buf *delbuf = NULL;
132    index_update_ctx *ctx = (index_update_ctx *) arg;
133
134    if (v == NULL) { //Doc not found
135        return;
136    }
137
138    const raw_id_index_value *raw = (raw_id_index_value*) v->buf;
139    oldseq = decode_raw48(raw->db_seq);
140
141    delbuf = (sized_buf *) fatbuf_get(ctx->deltermbuf, sizeof(sized_buf));
142    delbuf->buf = (char *) fatbuf_get(ctx->deltermbuf, 6);
143    delbuf->size = 6;
144    memset(delbuf->buf, 0, 6);
145    encode_raw48(oldseq, (raw_48*)delbuf->buf);
146
147    ctx->seqacts[ctx->actpos].type = ACTION_REMOVE;
148    ctx->seqacts[ctx->actpos].value.data = NULL;
149    ctx->seqacts[ctx->actpos].key = delbuf;
150
151    ctx->actpos++;
152}
153
154static couchstore_error_t update_indexes(Db *db,
155                                         sized_buf *seqs,
156                                         sized_buf *seqvals,
157                                         sized_buf *ids,
158                                         sized_buf *idvals,
159                                         int numdocs)
160{
161    couchfile_modify_action *idacts;
162    couchfile_modify_action *seqacts;
163    const sized_buf **sorted_ids = NULL;
164    size_t size;
165    fatbuf *actbuf;
166    node_pointer *new_id_root;
167    node_pointer *new_seq_root;
168    couchstore_error_t errcode;
169    couchstore_error_t err;
170    couchfile_modify_request seqrq, idrq;
171    int ii;
172    index_update_ctx fetcharg;
173
174    /*
175    ** Two action list up to numdocs * 2 in size + Compare keys for ids,
176    ** and compare keys for removed seqs found from id index +
177    ** Max size of a int64 erlang term (for deleted seqs)
178    */
179    size = 4 * sizeof(couchfile_modify_action) + 2 * sizeof(sized_buf) + 10;
180
181    actbuf = fatbuf_alloc(numdocs * size);
182    error_unless(actbuf, COUCHSTORE_ERROR_ALLOC_FAIL);
183
184    idacts = static_cast<couchfile_modify_action*>(fatbuf_get(actbuf, numdocs * sizeof(couchfile_modify_action) * 2));
185    seqacts = static_cast<couchfile_modify_action*>(fatbuf_get(actbuf, numdocs * sizeof(couchfile_modify_action) * 2));
186    error_unless(idacts && seqacts, COUCHSTORE_ERROR_ALLOC_FAIL);
187
188    memset(&fetcharg, 0, sizeof(fetcharg));
189    fetcharg.seqacts = seqacts;
190    fetcharg.actpos = 0;
191    fetcharg.seqs = &seqs;
192    fetcharg.seqvals = &seqvals;
193    fetcharg.valpos = 0;
194    fetcharg.deltermbuf = actbuf;
195
196    // Sort the array indexes of ids[] by ascending id. Since we can't pass context info to qsort,
197    // actually sort an array of pointers to the elements of ids[], rather than the array indexes.
198    sorted_ids = static_cast<const sized_buf**>(cb_malloc(numdocs * sizeof(sized_buf*)));
199    error_unless(sorted_ids, COUCHSTORE_ERROR_ALLOC_FAIL);
200    for (ii = 0; ii < numdocs; ++ii) {
201        sorted_ids[ii] = &ids[ii];
202    }
203    qsort(sorted_ids, numdocs, sizeof(sorted_ids[0]), &ebin_ptr_compare);
204
205    // Assemble idacts[] array, in sorted order by id:
206    for (ii = 0; ii < numdocs; ii++) {
207        ptrdiff_t isorted = sorted_ids[ii] - ids;   // recover index of ii'th id in sort order
208
209        idacts[ii * 2].type = ACTION_FETCH;
210        idacts[ii * 2].value.arg = &fetcharg;
211        idacts[ii * 2 + 1].type = ACTION_INSERT;
212        idacts[ii * 2 + 1].value.data = &idvals[isorted];
213        idacts[ii * 2].key = &ids[isorted];
214        idacts[ii * 2 + 1].key = &ids[isorted];
215    }
216
217    idrq.cmp.compare = ebin_cmp;
218    idrq.file = &db->file;
219    idrq.actions = idacts;
220    idrq.num_actions = numdocs * 2;
221    idrq.reduce = by_id_reduce;
222    idrq.rereduce = by_id_rereduce;
223    idrq.fetch_callback = idfetch_update_cb;
224    idrq.compacting = 0;
225    idrq.enable_purging = false;
226    idrq.purge_kp = NULL;
227    idrq.purge_kv = NULL;
228    idrq.kv_chunk_threshold = db->file.options.kv_nodesize;
229    idrq.kp_chunk_threshold = db->file.options.kp_nodesize;
230
231    new_id_root = modify_btree(&idrq, db->header.by_id_root, &err);
232    error_pass(err);
233
234    while (fetcharg.valpos < numdocs) {
235        seqacts[fetcharg.actpos].type = ACTION_INSERT;
236        seqacts[fetcharg.actpos].value.data = &seqvals[fetcharg.valpos];
237        seqacts[fetcharg.actpos].key = &seqs[fetcharg.valpos];
238        fetcharg.valpos++;
239        fetcharg.actpos++;
240    }
241
242    //printf("Total seq actions: %d\n", fetcharg.actpos);
243    qsort(seqacts, fetcharg.actpos, sizeof(couchfile_modify_action),
244          seq_action_compare);
245
246    seqrq.cmp.compare = seq_cmp;
247    seqrq.actions = seqacts;
248    seqrq.num_actions = fetcharg.actpos;
249    seqrq.reduce = by_seq_reduce;
250    seqrq.rereduce = by_seq_rereduce;
251    seqrq.file = &db->file;
252    seqrq.compacting = 0;
253    seqrq.enable_purging = false;
254    seqrq.purge_kp = NULL;
255    seqrq.purge_kv = NULL;
256    seqrq.kv_chunk_threshold = db->file.options.kv_nodesize;
257    seqrq.kp_chunk_threshold = db->file.options.kp_nodesize;
258
259    new_seq_root = modify_btree(&seqrq, db->header.by_seq_root, &errcode);
260    if (errcode != COUCHSTORE_SUCCESS) {
261        cb_free(new_id_root);
262        error_pass(errcode);
263    }
264
265    if (db->header.by_id_root != new_id_root) {
266        cb_free(db->header.by_id_root);
267        db->header.by_id_root = new_id_root;
268    }
269
270    if (db->header.by_seq_root != new_seq_root) {
271        cb_free(db->header.by_seq_root);
272        db->header.by_seq_root = new_seq_root;
273    }
274
275cleanup:
276    cb_free(sorted_ids);
277    fatbuf_free(actbuf);
278    return errcode;
279}
280
281static couchstore_error_t add_doc_to_update_list(Db *db,
282                                                 const Doc *doc,
283                                                 const DocInfo *info,
284                                                 fatbuf *fb,
285                                                 sized_buf *seqterm,
286                                                 sized_buf *idterm,
287                                                 sized_buf *seqval,
288                                                 sized_buf *idval,
289                                                 uint64_t seq,
290                                                 couchstore_save_options options)
291{
292    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
293    DocInfo updated = *info;
294    updated.db_seq = seq;
295
296    seqterm->buf = (char *) fatbuf_get(fb, RAW_SEQ_SIZE);
297    seqterm->size = RAW_SEQ_SIZE;
298    error_unless(seqterm->buf, COUCHSTORE_ERROR_ALLOC_FAIL);
299    encode_raw48(seq, (raw_48*)seqterm->buf);
300
301    if (doc) {
302        size_t disk_size;
303
304        // Don't compress a doc unless the meta flag is set
305        if (!(info->content_meta & COUCH_DOC_IS_COMPRESSED)) {
306            options &= ~COMPRESS_DOC_BODIES;
307        }
308        errcode = write_doc(db, doc, &updated.bp, &disk_size, options);
309
310        if (errcode != COUCHSTORE_SUCCESS) {
311            return errcode;
312        }
313        updated.size = disk_size;
314    } else {
315        updated.deleted = 1;
316        updated.bp = 0;
317        updated.size = 0;
318    }
319
320    *idterm = updated.id;
321
322    seqval->buf = (char *) fatbuf_get(fb, SEQ_INDEX_RAW_VALUE_SIZE(updated));
323    error_unless(seqval->buf, COUCHSTORE_ERROR_ALLOC_FAIL);
324    seqval->size = assemble_seq_index_value(&updated, seqval->buf);
325
326    idval->buf = (char *) fatbuf_get(fb, ID_INDEX_RAW_VALUE_SIZE(updated));
327    error_unless(idval->buf, COUCHSTORE_ERROR_ALLOC_FAIL);
328    idval->size = assemble_id_index_value(&updated, idval->buf);
329
330    //We use 37 + id.size + 2 * rev_meta.size bytes
331cleanup:
332    return errcode;
333}
334
335LIBCOUCHSTORE_API
336couchstore_error_t couchstore_save_documents(Db *db,
337                                             Doc* const docs[],
338                                             DocInfo *infos[],
339                                             unsigned numdocs,
340                                             couchstore_save_options options)
341{
342    COLLECT_LATENCY();
343
344    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
345    unsigned ii;
346    sized_buf *seqklist, *idklist, *seqvlist, *idvlist;
347    size_t term_meta_size = 0;
348    const Doc *curdoc;
349    uint64_t seq = db->header.update_seq;
350
351    fatbuf *fb;
352
353    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
354
355    for (ii = 0; ii < numdocs; ii++) {
356        // Get additional size for terms to be inserted into indexes
357        // IMPORTANT: This must match the sizes of the fatbuf_get calls in add_doc_to_update_list!
358        term_meta_size += RAW_SEQ_SIZE;
359        term_meta_size += SEQ_INDEX_RAW_VALUE_SIZE(*infos[ii]);
360        term_meta_size += ID_INDEX_RAW_VALUE_SIZE(*infos[ii]);
361    }
362
363    fb = fatbuf_alloc(term_meta_size +
364                      numdocs * (sizeof(sized_buf) * 4)); //seq/id key and value lists
365
366    if (fb == NULL) {
367        return COUCHSTORE_ERROR_ALLOC_FAIL;
368    }
369
370
371    seqklist = static_cast<sized_buf*>(fatbuf_get(fb, numdocs * sizeof(sized_buf)));
372    idklist = static_cast<sized_buf*>(fatbuf_get(fb, numdocs * sizeof(sized_buf)));
373    seqvlist = static_cast<sized_buf*>(fatbuf_get(fb, numdocs * sizeof(sized_buf)));
374    idvlist = static_cast<sized_buf*>(fatbuf_get(fb, numdocs * sizeof(sized_buf)));
375
376    for (ii = 0; ii < numdocs; ii++) {
377        if(options & COUCHSTORE_SEQUENCE_AS_IS) {
378            seq = infos[ii]->db_seq;
379        } else {
380            seq++;
381        }
382
383        if (docs) {
384            curdoc = docs[ii];
385        } else {
386            curdoc = NULL;
387        }
388
389        errcode = add_doc_to_update_list(db, curdoc, infos[ii], fb,
390                                         &seqklist[ii], &idklist[ii],
391                                         &seqvlist[ii], &idvlist[ii],
392                                         seq, options);
393        if (errcode != COUCHSTORE_SUCCESS) {
394            break;
395        }
396    }
397
398    if (errcode == COUCHSTORE_SUCCESS) {
399        errcode = update_indexes(db, seqklist, seqvlist,
400                                 idklist, idvlist, numdocs);
401    }
402
403    fatbuf_free(fb);
404    if (errcode == COUCHSTORE_SUCCESS) {
405        if(options & COUCHSTORE_SEQUENCE_AS_IS) {
406            // Sequences are passed as-is, make sure update_seq is >= the highest.
407            seq = db->header.update_seq;
408            for(ii = 0; ii < numdocs; ii++) {
409                if(infos[ii]->db_seq >= seq) {
410                    seq = infos[ii]->db_seq;
411                }
412            }
413            db->header.update_seq = seq;
414        } else {
415            // Fill in the assigned sequence numbers for caller's later use:
416            seq = db->header.update_seq;
417            for (ii = 0; ii < numdocs; ii++) {
418                infos[ii]->db_seq = ++seq;
419            }
420            db->header.update_seq = seq;
421        }
422    }
423 cleanup:
424    return errcode;
425}
426
427LIBCOUCHSTORE_API
428couchstore_error_t couchstore_save_document(Db *db, const Doc *doc,
429                                            DocInfo *info, couchstore_save_options options)
430{
431    return couchstore_save_documents(db, (Doc**)&doc, (DocInfo**)&info, 1, options);
432}
433