xref: /5.5.2/couchstore/src/db_compact.cc (revision 50b449d5)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "config.h"
3#include "internal.h"
4#include "couch_btree.h"
5#include "reduces.h"
6#include "bitfield.h"
7#include "arena.h"
8#include "tree_writer.h"
9#include "node_types.h"
10#include "util.h"
11#include "couch_latency_internal.h"
12
13#include <platform/cb_malloc.h>
14#include <stdexcept>
15#include <stdio.h>
16#include <stdlib.h>
17
18typedef struct compact_ctx {
19    TreeWriter* tree_writer;
20    /* Using this for stuff that doesn't need to live longer than it takes to write
21     * out a b-tree node (the k/v pairs) */
22    arena *transient_arena;
23    /* This is for stuff that lasts the duration of the b-tree writing (node pointers) */
24    arena *persistent_arena;
25    couchfile_modify_result *target_mr;
26    Db* target;
27    couchstore_compact_hook hook;
28    couchstore_docinfo_hook dhook;
29    void* hook_ctx;
30    couchstore_compact_flags flags;
31} compact_ctx;
32
33static couchstore_error_t compact_seq_tree(Db* source, Db* target, compact_ctx *ctx);
34static couchstore_error_t compact_localdocs_tree(Db* source, Db* target, compact_ctx *ctx);
35
36couchstore_error_t couchstore_compact_db_ex(Db* source, const char* target_filename,
37                                            couchstore_compact_flags flags,
38                                            couchstore_compact_hook hook,
39                                            couchstore_docinfo_hook dhook,
40                                            void* hook_ctx,
41                                            FileOpsInterface* ops)
42{
43    COLLECT_LATENCY();
44
45    Db* target = NULL;
46    char tmpFile[PATH_MAX]; // keep this on the stack for duration of the call
47    couchstore_error_t errcode;
48    // Local error code for seq-tree scan.
49    couchstore_error_t scan_err = COUCHSTORE_SUCCESS;
50    compact_ctx ctx = {NULL, new_arena(0), new_arena(0), NULL, NULL, hook, dhook, hook_ctx, 0};
51    ctx.flags = flags;
52    couchstore_open_flags open_flags = COUCHSTORE_OPEN_FLAG_CREATE;
53    error_unless(!source->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
54    error_unless(ctx.transient_arena && ctx.persistent_arena, COUCHSTORE_ERROR_ALLOC_FAIL);
55
56    // If the old file is downlevel ...
57    // ... and upgrade is not requested
58    // then the new file must use the old/legacy crc
59    if (source->header.disk_version <= COUCH_DISK_VERSION_11 &&
60        !(flags & COUCHSTORE_COMPACT_FLAG_UPGRADE_DB)) {
61        open_flags |= COUCHSTORE_OPEN_WITH_LEGACY_CRC;
62    }
63
64    if (flags & COUCHSTORE_COMPACT_FLAG_UNBUFFERED) {
65        open_flags |= COUCHSTORE_OPEN_FLAG_UNBUFFERED;
66    }
67
68    // Transfer current B+tree node settings to new file.
69    if (source->file.options.kp_nodesize) {
70        uint32_t kp_flag = source->file.options.kp_nodesize / 1024;
71        open_flags |= (kp_flag << 20);
72    }
73    if (source->file.options.kv_nodesize) {
74        uint32_t kv_flag = source->file.options.kv_nodesize / 1024;
75        open_flags |= (kv_flag << 16);
76    }
77
78    error_pass(couchstore_open_db_ex(target_filename, open_flags, ops, &target));
79
80    ctx.target = target;
81    target->file.pos = 1;
82    target->header.update_seq = source->header.update_seq;
83    if (flags & COUCHSTORE_COMPACT_FLAG_DROP_DELETES) {
84        //Count the number of times purge has happened
85        target->header.purge_seq = source->header.purge_seq + 1;
86    } else {
87        target->header.purge_seq = source->header.purge_seq;
88    }
89    target->header.purge_ptr = source->header.purge_ptr;
90
91    if (source->header.by_seq_root) {
92        strcpy(tmpFile, target_filename);
93        strcat(tmpFile, ".btree-tmp_0");
94        error_pass(TreeWriterOpen(tmpFile, ebin_cmp, by_id_reduce, by_id_rereduce, NULL, &ctx.tree_writer));
95        scan_err = compact_seq_tree(source, target, &ctx);
96        if (!(flags & COUCHSTORE_COMPACT_RECOVERY_MODE)) {
97            // Normal mode: 'compact_seq_tree()' should succeed.
98            error_pass(scan_err);
99        } // Recovery mode: we can tolerate corruptions.
100        error_pass(TreeWriterSort(ctx.tree_writer));
101        error_pass(TreeWriterWrite(ctx.tree_writer, &target->file, &target->header.by_id_root));
102        TreeWriterFree(ctx.tree_writer);
103        ctx.tree_writer = NULL;
104    }
105
106    if (source->header.local_docs_root) {
107        error_pass(compact_localdocs_tree(source, target, &ctx));
108    }
109    if(ctx.hook != NULL) {
110        error_pass(
111            static_cast<couchstore_error_t>(ctx.hook(ctx.target,
112                                                     nullptr, // docinfo
113                                                     {},
114                                                     ctx.hook_ctx)));
115    }
116    error_pass(couchstore_commit(target));
117cleanup:
118    TreeWriterFree(ctx.tree_writer);
119    delete_arena(ctx.transient_arena);
120    delete_arena(ctx.persistent_arena);
121    if (target != NULL) {
122        couchstore_close_file(target);
123        couchstore_free_db(target);
124        if (errcode != COUCHSTORE_SUCCESS) {
125            remove(target_filename);
126        }
127    }
128
129    if (errcode == COUCHSTORE_SUCCESS) {
130        return scan_err;
131    }
132    return errcode;
133}
134
135couchstore_error_t couchstore_compact_db(Db* source, const char* target_filename)
136{
137    return couchstore_compact_db_ex(source, target_filename, 0, NULL, NULL, NULL,
138                                    couchstore_get_default_file_ops());
139}
140
141static couchstore_error_t output_seqtree_item(const sized_buf *k,
142                                              const sized_buf *v,
143                                              const DocInfo *docinfo,
144                                              compact_ctx *ctx)
145{
146    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
147    sized_buf *v_c;
148    const raw_seq_index_value* rawSeq;
149    uint32_t idsize, datasize;
150    uint32_t revMetaSize;
151    sized_buf id_k, id_v;
152    raw_id_index_value *raw;
153    sized_buf *k_c = arena_copy_buf(ctx->transient_arena, k);
154
155    if (k_c == NULL) {
156        error_pass(COUCHSTORE_ERROR_READ);
157    }
158
159    if (docinfo) {
160        v_c = arena_special_copy_buf_and_revmeta(ctx->transient_arena,
161                                                 v, docinfo);
162    } else {
163        v_c = arena_copy_buf(ctx->transient_arena, v);
164    }
165
166    if (v_c == NULL) {
167        error_pass(COUCHSTORE_ERROR_READ);
168    }
169
170    error_pass(mr_push_item(k_c, v_c, ctx->target_mr));
171
172    // Decode the by-sequence index value. See the file format doc or
173    // assemble_id_index_value in couch_db.c:
174    rawSeq = (const raw_seq_index_value*)v_c->buf;
175    decode_kv_length(&rawSeq->sizes, &idsize, &datasize);
176    revMetaSize = (uint32_t)v_c->size - (sizeof(raw_seq_index_value) + idsize);
177
178    // Set up sized_bufs for the ID tree key and value:
179    id_k.buf = (char*)(rawSeq + 1);
180    id_k.size = idsize;
181    id_v.size = sizeof(raw_id_index_value) + revMetaSize;
182    id_v.buf = static_cast<char*>(arena_alloc(ctx->transient_arena, id_v.size));
183
184    raw = (raw_id_index_value*)id_v.buf;
185    raw->db_seq = *(raw_48*)k->buf;  //Copy db seq from seq tree key
186    raw->size = encode_raw32(datasize);
187    raw->bp = rawSeq->bp;
188    raw->content_meta = rawSeq->content_meta;
189    raw->rev_seq = rawSeq->rev_seq;
190    memcpy(raw + 1, (uint8_t*)(rawSeq + 1) + idsize, revMetaSize); //Copy rev_meta
191
192    error_pass(TreeWriterAddItem(ctx->tree_writer, id_k, id_v));
193
194    if (ctx->target_mr->count == 0) {
195        /* No items queued, we must have just flushed. We can safely rewind the transient arena. */
196        arena_free_all(ctx->transient_arena);
197    }
198
199cleanup:
200    return errcode;
201}
202
203static couchstore_error_t compact_seq_fetchcb(couchfile_lookup_request *rq,
204                                              const sized_buf *k,
205                                              const sized_buf *v)
206{
207    DocInfo* info = NULL;
208    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
209    compact_ctx *ctx = (compact_ctx *) rq->callback_ctx;
210    raw_seq_index_value* rawSeq = (raw_seq_index_value*)v->buf;
211    uint64_t bpWithDeleted = decode_raw48(rawSeq->bp);
212    uint64_t bp = bpWithDeleted & ~BP_DELETED_FLAG;
213    int ret_val = 0;
214
215    if ((bpWithDeleted & BP_DELETED_FLAG) &&
216       (ctx->hook == NULL) &&
217       (ctx->flags & COUCHSTORE_COMPACT_FLAG_DROP_DELETES)) {
218        return COUCHSTORE_SUCCESS;
219    }
220
221    sized_buf item;
222    item.buf = nullptr;
223    item.size = 0xffffff;
224
225    if (ctx->hook) {
226        error_pass(by_seq_read_docinfo(&info, k, v));
227        /* If the hook returns with the client requiring the whole body,
228         * then the whole body is read from disk and the hook is called
229         * again
230         */
231        int hook_action = ctx->hook(ctx->target, info, item, ctx->hook_ctx);
232        if (hook_action == COUCHSTORE_COMPACT_NEED_BODY) {
233            int size = pread_bin(rq->file, bp, &item.buf);
234            if (size < 0) {
235                couchstore_free_docinfo(info);
236                return static_cast<couchstore_error_t>(size);
237            }
238            item.size = size_t(size);
239            hook_action = ctx->hook(ctx->target, info, item, ctx->hook_ctx);
240        }
241
242        switch (hook_action) {
243        case COUCHSTORE_COMPACT_NEED_BODY:
244            throw std::logic_error(
245                "compact_seq_fetchcb: COUCHSTORE_COMPACT_NEED_BODY should not be returned "
246                "if the body was provided");
247        case COUCHSTORE_COMPACT_KEEP_ITEM:
248            break;
249        case COUCHSTORE_COMPACT_DROP_ITEM:
250            goto cleanup;
251        default:
252            error_pass(static_cast<couchstore_error_t>(hook_action));
253        }
254    }
255
256    if (bp != 0) {
257        cs_off_t new_bp = 0;
258        // Copy the document from the old db file to the new one:
259        size_t new_size = 0;
260
261        if (item.buf == nullptr) {
262            int size = pread_bin(rq->file, bp, &item.buf);
263            if (size < 0) {
264                couchstore_free_docinfo(info);
265                return static_cast<couchstore_error_t>(size);
266            }
267            item.size = size_t(size);
268        }
269
270        if (ctx->dhook) {
271            ret_val = ctx->dhook(&info, &item);
272        }
273        int err = db_write_buf(ctx->target_mr->rq->file, &item, &new_bp,
274                               &new_size);
275
276        bpWithDeleted = (bpWithDeleted & BP_DELETED_FLAG) | new_bp;  //Preserve high bit
277        encode_raw48(bpWithDeleted, &rawSeq->bp);
278        error_pass(static_cast<couchstore_error_t>(err));
279    }
280
281    if (ret_val) {
282        error_pass(output_seqtree_item(k, v, info, ctx));
283    } else {
284        error_pass(output_seqtree_item(k, v, NULL, ctx));
285    }
286
287cleanup:
288    cb_free(item.buf);
289    couchstore_free_docinfo(info);
290    return errcode;
291}
292
293static couchstore_error_t compact_seq_tree(Db* source, Db* target, compact_ctx *ctx)
294{
295    couchstore_error_t errcode;
296    compare_info seqcmp;
297    seqcmp.compare = seq_cmp;
298    couchfile_lookup_request srcfold;
299    sized_buf low_key;
300    //Keys in seq tree are 48-bit numbers, this is 0, lowest possible key
301    low_key.buf = const_cast<char*>("\0\0\0\0\0\0");
302    low_key.size = 6;
303    sized_buf *low_key_list = &low_key;
304
305    ctx->target_mr = new_btree_modres(ctx->persistent_arena,
306                                      ctx->transient_arena,
307                                      &target->file,
308                                      &seqcmp,
309                                      by_seq_reduce,
310                                      by_seq_rereduce,
311                                      NULL,
312                                      source->file.options.kv_nodesize,
313                                      source->file.options.kp_nodesize);
314    if (ctx->target_mr == NULL) {
315        error_pass(COUCHSTORE_ERROR_ALLOC_FAIL);
316    }
317
318    srcfold.cmp = seqcmp;
319    srcfold.file = &source->file;
320    srcfold.num_keys = 1;
321    srcfold.keys = &low_key_list;
322    srcfold.fold = 1;
323    srcfold.in_fold = 1;
324    srcfold.tolerate_corruption =
325            (ctx->flags & COUCHSTORE_COMPACT_RECOVERY_MODE) != 0;
326    srcfold.callback_ctx = ctx;
327    srcfold.fetch_callback = compact_seq_fetchcb;
328    srcfold.node_callback = NULL;
329
330    errcode = btree_lookup(&srcfold, source->header.by_seq_root->pointer);
331    if (errcode == COUCHSTORE_SUCCESS || srcfold.tolerate_corruption) {
332        if(target->header.by_seq_root != nullptr) {
333            cb_free(target->header.by_seq_root);
334        }
335        couchstore_error_t errcode_local;
336        target->header.by_seq_root =
337                complete_new_btree(ctx->target_mr, &errcode_local);
338        error_tolerate(errcode_local);
339    }
340cleanup:
341    arena_free_all(ctx->persistent_arena);
342    arena_free_all(ctx->transient_arena);
343    return errcode;
344}
345
346static couchstore_error_t compact_localdocs_fetchcb(couchfile_lookup_request *rq,
347                                                    const sized_buf *k,
348                                                    const sized_buf *v)
349{
350    compact_ctx *ctx = (compact_ctx *) rq->callback_ctx;
351    //printf("V: '%.*s'\n", v->size, v->buf);
352    return mr_push_item(arena_copy_buf(ctx->persistent_arena, k),
353                        arena_copy_buf(ctx->persistent_arena, v),
354                        ctx->target_mr);
355}
356
357static couchstore_error_t compact_localdocs_tree(Db* source, Db* target, compact_ctx *ctx)
358{
359    couchstore_error_t errcode;
360    compare_info idcmp;
361    idcmp.compare = ebin_cmp;
362    couchfile_lookup_request srcfold;
363
364    sized_buf low_key;
365    low_key.buf = NULL;
366    low_key.size = 0;
367    sized_buf *low_key_list = &low_key;
368
369    ctx->target_mr = new_btree_modres(ctx->persistent_arena, NULL, &target->file,
370                                      &idcmp, NULL, NULL, NULL,
371                                      source->file.options.kv_nodesize,
372                                      source->file.options.kp_nodesize);
373    if (ctx->target_mr == NULL) {
374        error_pass(COUCHSTORE_ERROR_ALLOC_FAIL);
375    }
376
377    srcfold.cmp = idcmp;
378    srcfold.file = &source->file;
379    srcfold.num_keys = 1;
380    srcfold.keys = &low_key_list;
381    srcfold.fold = 1;
382    srcfold.in_fold = 1;
383    srcfold.callback_ctx = ctx;
384    srcfold.fetch_callback = compact_localdocs_fetchcb;
385    srcfold.node_callback = NULL;
386
387    errcode = btree_lookup(&srcfold, source->header.local_docs_root->pointer);
388    if (errcode == COUCHSTORE_SUCCESS) {
389        target->header.local_docs_root = complete_new_btree(ctx->target_mr, &errcode);
390    }
391cleanup:
392    arena_free_all(ctx->persistent_arena);
393    return errcode;
394}
395
396couchstore_error_t couchstore_set_purge_seq(Db* target, uint64_t purge_seq) {
397    target->header.purge_seq = purge_seq;
398    return COUCHSTORE_SUCCESS;
399
400}
401
402