xref: /5.5.2/couchstore/src/db_compact.cc (revision ea89a543)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "config.h"
3#include "internal.h"
4#include "couch_btree.h"
5#include "reduces.h"
6#include "bitfield.h"
7#include "arena.h"
8#include "tree_writer.h"
9#include "node_types.h"
10#include "util.h"
11#include "couch_latency_internal.h"
12
13#include <platform/cb_malloc.h>
14#include <stdexcept>
15#include <stdio.h>
16#include <stdlib.h>
17
18typedef struct compact_ctx {
19    TreeWriter* tree_writer;
20    /* Using this for stuff that doesn't need to live longer than it takes to write
21     * out a b-tree node (the k/v pairs) */
22    arena *transient_arena;
23    /* This is for stuff that lasts the duration of the b-tree writing (node pointers) */
24    arena *persistent_arena;
25    couchfile_modify_result *target_mr;
26    Db* target;
27    couchstore_compact_hook hook;
28    couchstore_docinfo_hook dhook;
29    void* hook_ctx;
30    couchstore_compact_flags flags;
31} compact_ctx;
32
33static couchstore_error_t compact_seq_tree(Db* source, Db* target, compact_ctx *ctx);
34static couchstore_error_t compact_localdocs_tree(Db* source, Db* target, compact_ctx *ctx);
35
36couchstore_error_t couchstore_compact_db_ex(Db* source, const char* target_filename,
37                                            couchstore_compact_flags flags,
38                                            couchstore_compact_hook hook,
39                                            couchstore_docinfo_hook dhook,
40                                            void* hook_ctx,
41                                            FileOpsInterface* ops)
42{
43    COLLECT_LATENCY();
44
45    Db* target = NULL;
46    char tmpFile[PATH_MAX]; // keep this on the stack for duration of the call
47    couchstore_error_t errcode;
48    // Local error code for seq-tree scan.
49    couchstore_error_t scan_err = COUCHSTORE_SUCCESS;
50    compact_ctx ctx = {NULL, new_arena(0), new_arena(0), NULL, NULL, hook, dhook, hook_ctx, 0};
51    ctx.flags = flags;
52    couchstore_open_flags open_flags = COUCHSTORE_OPEN_FLAG_CREATE;
53    error_unless(!source->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
54    error_unless(ctx.transient_arena && ctx.persistent_arena, COUCHSTORE_ERROR_ALLOC_FAIL);
55
56    // If the old file is downlevel ...
57    // ... and upgrade is not requested
58    // then the new file must use the old/legacy crc
59    if (source->header.disk_version <= COUCH_DISK_VERSION_11 &&
60        !(flags & COUCHSTORE_COMPACT_FLAG_UPGRADE_DB)) {
61        open_flags |= COUCHSTORE_OPEN_WITH_LEGACY_CRC;
62    }
63
64    if (flags & COUCHSTORE_COMPACT_FLAG_UNBUFFERED) {
65        open_flags |= COUCHSTORE_OPEN_FLAG_UNBUFFERED;
66    }
67
68    if (flags & COUCHSTORE_COMPACT_WITH_PERIODIC_SYNC) {
69        static_assert(uint64_t(COUCHSTORE_OPEN_WITH_PERIODIC_SYNC) ==
70                      uint64_t(COUCHSTORE_COMPACT_WITH_PERIODIC_SYNC),
71                      "COUCHSTORE_OPEN_WITH_PERIODIC_SYNC and "
72                      "COUCHSTORE_COMPACT_WITH_PERIODIC_SYNC should have the same"
73                      "encoding");
74
75        open_flags |= (flags & COUCHSTORE_OPEN_WITH_PERIODIC_SYNC);
76    }
77
78    // Transfer current B+tree node settings to new file.
79    if (source->file.options.kp_nodesize) {
80        uint32_t kp_flag = source->file.options.kp_nodesize / 1024;
81        open_flags |= (kp_flag << 20);
82    }
83    if (source->file.options.kv_nodesize) {
84        uint32_t kv_flag = source->file.options.kv_nodesize / 1024;
85        open_flags |= (kv_flag << 16);
86    }
87
88    error_pass(couchstore_open_db_ex(target_filename, open_flags, ops, &target));
89
90    ctx.target = target;
91    target->file.pos = 1;
92    target->header.update_seq = source->header.update_seq;
93    if (flags & COUCHSTORE_COMPACT_FLAG_DROP_DELETES) {
94        //Count the number of times purge has happened
95        target->header.purge_seq = source->header.purge_seq + 1;
96    } else {
97        target->header.purge_seq = source->header.purge_seq;
98    }
99    target->header.purge_ptr = source->header.purge_ptr;
100
101    if (source->header.by_seq_root) {
102        strcpy(tmpFile, target_filename);
103        strcat(tmpFile, ".btree-tmp_0");
104        error_pass(TreeWriterOpen(tmpFile, ebin_cmp, by_id_reduce, by_id_rereduce, NULL, &ctx.tree_writer));
105        scan_err = compact_seq_tree(source, target, &ctx);
106        if (!(flags & COUCHSTORE_COMPACT_RECOVERY_MODE)) {
107            // Normal mode: 'compact_seq_tree()' should succeed.
108            error_pass(scan_err);
109        } // Recovery mode: we can tolerate corruptions.
110        error_pass(TreeWriterSort(ctx.tree_writer));
111        error_pass(TreeWriterWrite(ctx.tree_writer, &target->file, &target->header.by_id_root));
112        TreeWriterFree(ctx.tree_writer);
113        ctx.tree_writer = NULL;
114    }
115
116    if (source->header.local_docs_root) {
117        error_pass(compact_localdocs_tree(source, target, &ctx));
118    }
119    if(ctx.hook != NULL) {
120        error_pass(
121            static_cast<couchstore_error_t>(ctx.hook(ctx.target,
122                                                     nullptr, // docinfo
123                                                     {},
124                                                     ctx.hook_ctx)));
125    }
126    error_pass(couchstore_commit(target));
127cleanup:
128    TreeWriterFree(ctx.tree_writer);
129    delete_arena(ctx.transient_arena);
130    delete_arena(ctx.persistent_arena);
131    if (target != NULL) {
132        couchstore_close_file(target);
133        couchstore_free_db(target);
134        if (errcode != COUCHSTORE_SUCCESS) {
135            remove(target_filename);
136        }
137    }
138
139    if (errcode == COUCHSTORE_SUCCESS) {
140        return scan_err;
141    }
142    return errcode;
143}
144
145couchstore_error_t couchstore_compact_db(Db* source, const char* target_filename)
146{
147    return couchstore_compact_db_ex(source, target_filename, 0, NULL, NULL, NULL,
148                                    couchstore_get_default_file_ops());
149}
150
151static couchstore_error_t output_seqtree_item(const sized_buf *k,
152                                              const sized_buf *v,
153                                              const DocInfo *docinfo,
154                                              compact_ctx *ctx)
155{
156    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
157    sized_buf *v_c;
158    const raw_seq_index_value* rawSeq;
159    uint32_t idsize, datasize;
160    uint32_t revMetaSize;
161    sized_buf id_k, id_v;
162    raw_id_index_value *raw;
163    sized_buf *k_c = arena_copy_buf(ctx->transient_arena, k);
164
165    if (k_c == NULL) {
166        error_pass(COUCHSTORE_ERROR_READ);
167    }
168
169    if (docinfo) {
170        v_c = arena_special_copy_buf_and_revmeta(ctx->transient_arena,
171                                                 v, docinfo);
172    } else {
173        v_c = arena_copy_buf(ctx->transient_arena, v);
174    }
175
176    if (v_c == NULL) {
177        error_pass(COUCHSTORE_ERROR_READ);
178    }
179
180    error_pass(mr_push_item(k_c, v_c, ctx->target_mr));
181
182    // Decode the by-sequence index value. See the file format doc or
183    // assemble_id_index_value in couch_db.c:
184    rawSeq = (const raw_seq_index_value*)v_c->buf;
185    decode_kv_length(&rawSeq->sizes, &idsize, &datasize);
186    revMetaSize = (uint32_t)v_c->size - (sizeof(raw_seq_index_value) + idsize);
187
188    // Set up sized_bufs for the ID tree key and value:
189    id_k.buf = (char*)(rawSeq + 1);
190    id_k.size = idsize;
191    id_v.size = sizeof(raw_id_index_value) + revMetaSize;
192    id_v.buf = static_cast<char*>(arena_alloc(ctx->transient_arena, id_v.size));
193
194    raw = (raw_id_index_value*)id_v.buf;
195    raw->db_seq = *(raw_48*)k->buf;  //Copy db seq from seq tree key
196    raw->size = encode_raw32(datasize);
197    raw->bp = rawSeq->bp;
198    raw->content_meta = rawSeq->content_meta;
199    raw->rev_seq = rawSeq->rev_seq;
200    memcpy(raw + 1, (uint8_t*)(rawSeq + 1) + idsize, revMetaSize); //Copy rev_meta
201
202    error_pass(TreeWriterAddItem(ctx->tree_writer, id_k, id_v));
203
204    if (ctx->target_mr->count == 0) {
205        /* No items queued, we must have just flushed. We can safely rewind the transient arena. */
206        arena_free_all(ctx->transient_arena);
207    }
208
209cleanup:
210    return errcode;
211}
212
213static couchstore_error_t compact_seq_fetchcb(couchfile_lookup_request *rq,
214                                              const sized_buf *k,
215                                              const sized_buf *v)
216{
217    DocInfo* info = NULL;
218    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
219    compact_ctx *ctx = (compact_ctx *) rq->callback_ctx;
220    raw_seq_index_value* rawSeq = (raw_seq_index_value*)v->buf;
221    uint64_t bpWithDeleted = decode_raw48(rawSeq->bp);
222    uint64_t bp = bpWithDeleted & ~BP_DELETED_FLAG;
223    int ret_val = 0;
224
225    if ((bpWithDeleted & BP_DELETED_FLAG) &&
226       (ctx->hook == NULL) &&
227       (ctx->flags & COUCHSTORE_COMPACT_FLAG_DROP_DELETES)) {
228        return COUCHSTORE_SUCCESS;
229    }
230
231    sized_buf item;
232    item.buf = nullptr;
233    item.size = 0xffffff;
234
235    if (ctx->hook) {
236        error_pass(by_seq_read_docinfo(&info, k, v));
237        /* If the hook returns with the client requiring the whole body,
238         * then the whole body is read from disk and the hook is called
239         * again
240         */
241        int hook_action = ctx->hook(ctx->target, info, item, ctx->hook_ctx);
242        if (hook_action == COUCHSTORE_COMPACT_NEED_BODY) {
243            int size = pread_bin(rq->file, bp, &item.buf);
244            if (size < 0) {
245                couchstore_free_docinfo(info);
246                return static_cast<couchstore_error_t>(size);
247            }
248            item.size = size_t(size);
249            hook_action = ctx->hook(ctx->target, info, item, ctx->hook_ctx);
250        }
251
252        switch (hook_action) {
253        case COUCHSTORE_COMPACT_NEED_BODY:
254            throw std::logic_error(
255                "compact_seq_fetchcb: COUCHSTORE_COMPACT_NEED_BODY should not be returned "
256                "if the body was provided");
257        case COUCHSTORE_COMPACT_KEEP_ITEM:
258            break;
259        case COUCHSTORE_COMPACT_DROP_ITEM:
260            goto cleanup;
261        default:
262            error_pass(static_cast<couchstore_error_t>(hook_action));
263        }
264    }
265
266    if (bp != 0) {
267        cs_off_t new_bp = 0;
268        // Copy the document from the old db file to the new one:
269        size_t new_size = 0;
270
271        if (item.buf == nullptr) {
272            int size = pread_bin(rq->file, bp, &item.buf);
273            if (size < 0) {
274                couchstore_free_docinfo(info);
275                return static_cast<couchstore_error_t>(size);
276            }
277            item.size = size_t(size);
278        }
279
280        if (ctx->dhook) {
281            ret_val = ctx->dhook(&info, &item);
282        }
283        int err = db_write_buf(ctx->target_mr->rq->file, &item, &new_bp,
284                               &new_size);
285
286        bpWithDeleted = (bpWithDeleted & BP_DELETED_FLAG) | new_bp;  //Preserve high bit
287        encode_raw48(bpWithDeleted, &rawSeq->bp);
288        error_pass(static_cast<couchstore_error_t>(err));
289    }
290
291    if (ret_val) {
292        error_pass(output_seqtree_item(k, v, info, ctx));
293    } else {
294        error_pass(output_seqtree_item(k, v, NULL, ctx));
295    }
296
297cleanup:
298    cb_free(item.buf);
299    couchstore_free_docinfo(info);
300    return errcode;
301}
302
303static couchstore_error_t compact_seq_tree(Db* source, Db* target, compact_ctx *ctx)
304{
305    couchstore_error_t errcode;
306    compare_info seqcmp;
307    seqcmp.compare = seq_cmp;
308    couchfile_lookup_request srcfold;
309    sized_buf low_key;
310    //Keys in seq tree are 48-bit numbers, this is 0, lowest possible key
311    low_key.buf = const_cast<char*>("\0\0\0\0\0\0");
312    low_key.size = 6;
313    sized_buf *low_key_list = &low_key;
314
315    ctx->target_mr = new_btree_modres(ctx->persistent_arena,
316                                      ctx->transient_arena,
317                                      &target->file,
318                                      &seqcmp,
319                                      by_seq_reduce,
320                                      by_seq_rereduce,
321                                      NULL,
322                                      source->file.options.kv_nodesize,
323                                      source->file.options.kp_nodesize);
324    if (ctx->target_mr == NULL) {
325        error_pass(COUCHSTORE_ERROR_ALLOC_FAIL);
326    }
327
328    srcfold.cmp = seqcmp;
329    srcfold.file = &source->file;
330    srcfold.num_keys = 1;
331    srcfold.keys = &low_key_list;
332    srcfold.fold = 1;
333    srcfold.in_fold = 1;
334    srcfold.tolerate_corruption =
335            (ctx->flags & COUCHSTORE_COMPACT_RECOVERY_MODE) != 0;
336    srcfold.callback_ctx = ctx;
337    srcfold.fetch_callback = compact_seq_fetchcb;
338    srcfold.node_callback = NULL;
339
340    errcode = btree_lookup(&srcfold, source->header.by_seq_root->pointer);
341    if (errcode == COUCHSTORE_SUCCESS || srcfold.tolerate_corruption) {
342        if(target->header.by_seq_root != nullptr) {
343            cb_free(target->header.by_seq_root);
344        }
345        couchstore_error_t errcode_local;
346        target->header.by_seq_root =
347                complete_new_btree(ctx->target_mr, &errcode_local);
348        error_tolerate(errcode_local);
349    }
350cleanup:
351    arena_free_all(ctx->persistent_arena);
352    arena_free_all(ctx->transient_arena);
353    return errcode;
354}
355
356static couchstore_error_t compact_localdocs_fetchcb(couchfile_lookup_request *rq,
357                                                    const sized_buf *k,
358                                                    const sized_buf *v)
359{
360    compact_ctx *ctx = (compact_ctx *) rq->callback_ctx;
361    //printf("V: '%.*s'\n", v->size, v->buf);
362    return mr_push_item(arena_copy_buf(ctx->persistent_arena, k),
363                        arena_copy_buf(ctx->persistent_arena, v),
364                        ctx->target_mr);
365}
366
367static couchstore_error_t compact_localdocs_tree(Db* source, Db* target, compact_ctx *ctx)
368{
369    couchstore_error_t errcode;
370    compare_info idcmp;
371    idcmp.compare = ebin_cmp;
372    couchfile_lookup_request srcfold;
373
374    sized_buf low_key;
375    low_key.buf = NULL;
376    low_key.size = 0;
377    sized_buf *low_key_list = &low_key;
378
379    ctx->target_mr = new_btree_modres(ctx->persistent_arena, NULL, &target->file,
380                                      &idcmp, NULL, NULL, NULL,
381                                      source->file.options.kv_nodesize,
382                                      source->file.options.kp_nodesize);
383    if (ctx->target_mr == NULL) {
384        error_pass(COUCHSTORE_ERROR_ALLOC_FAIL);
385    }
386
387    srcfold.cmp = idcmp;
388    srcfold.file = &source->file;
389    srcfold.num_keys = 1;
390    srcfold.keys = &low_key_list;
391    srcfold.fold = 1;
392    srcfold.in_fold = 1;
393    srcfold.callback_ctx = ctx;
394    srcfold.fetch_callback = compact_localdocs_fetchcb;
395    srcfold.node_callback = NULL;
396
397    errcode = btree_lookup(&srcfold, source->header.local_docs_root->pointer);
398    if (errcode == COUCHSTORE_SUCCESS) {
399        target->header.local_docs_root = complete_new_btree(ctx->target_mr, &errcode);
400    }
401cleanup:
402    arena_free_all(ctx->persistent_arena);
403    return errcode;
404}
405
406couchstore_error_t couchstore_set_purge_seq(Db* target, uint64_t purge_seq) {
407    target->header.purge_seq = purge_seq;
408    return COUCHSTORE_SUCCESS;
409
410}
411
412