xref: /6.0.3/couchstore/src/btree_modify.cc (revision c2c458ff)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "config.h"
3
4#include "arena.h"
5#include "couch_btree.h"
6#include "node_types.h"
7#include "util.h"
8#include <phosphor/phosphor.h>
9#include <platform/cb_malloc.h>
10#include <platform/cbassert.h>
11#include <signal.h>
12#include <stdlib.h>
13#include <string.h>
14
15static couchstore_error_t flush_mr_partial(couchfile_modify_result *res, size_t mr_quota);
16static couchstore_error_t flush_mr(couchfile_modify_result *res);
17static couchstore_error_t purge_node(couchfile_modify_request *rq,
18                                      node_pointer *nptr,
19                                      couchfile_modify_result *dst);
20
21static couchstore_error_t maybe_flush(couchfile_modify_result *mr)
22{
23    if(mr->rq->compacting) {
24        /* The compactor can (and should), just write out nodes
25         * of size CHUNK_SIZE as soon as it can, so that it can
26         * free memory it no longer needs. Flush at least 2 KP
27         * nodes,so that we don't create one parent node for
28         * each of the large KP nodes (one node size > threshold).
29         */
30        if (mr->modified && (((mr->node_type == KV_NODE) &&
31            mr->node_len > (mr->rq->kv_chunk_threshold * 2 / 3)) ||
32            ((mr->node_type == KP_NODE) &&
33             mr->node_len > (mr->rq->kp_chunk_threshold * 2 / 3) &&
34             mr->count > 1))) {
35            return flush_mr(mr);
36        }
37    } else if (mr->modified &&  mr->count > 3) {
38        /* Don't write out a partial node unless we've collected
39         * at least three items */
40        if ((mr->node_type == KV_NODE) &&
41             mr->node_len > mr->rq->kv_chunk_threshold) {
42            return flush_mr_partial(mr, (mr->rq->kv_chunk_threshold * 2 / 3));
43        }
44        if ((mr->node_type == KP_NODE) &&
45             mr->node_len > mr->rq->kp_chunk_threshold) {
46            return flush_mr_partial(mr, (mr->rq->kp_chunk_threshold * 2 / 3));
47        }
48    }
49
50    return COUCHSTORE_SUCCESS;
51}
52
53
54static nodelist *make_nodelist(arena* a, size_t bufsize)
55{
56    nodelist *r = static_cast<nodelist*>(arena_alloc(a, sizeof(nodelist) + bufsize));
57    if (!r) {
58        return NULL;
59    }
60    memset(r, 0, sizeof(nodelist));
61    r->data.size = bufsize;
62    r->data.buf = ((char *) r) + (sizeof(nodelist));
63    return r;
64}
65
66couchfile_modify_result *make_modres(arena* a, couchfile_modify_request *rq)
67{
68    couchfile_modify_result *res;
69    res = static_cast<couchfile_modify_result*>(arena_alloc(a, sizeof(couchfile_modify_result)));
70    if (!res) {
71        return NULL;
72    }
73    res->arena = a;
74    res->arena_transient = NULL;
75    res->values = make_nodelist(a, 0);
76    if (!res->values) {
77        return NULL;
78    }
79    res->values_end = res->values;
80    res->pointers = make_nodelist(a, 0);
81    if (!res->pointers) {
82        return NULL;
83    }
84    res->pointers_end = res->pointers;
85    res->node_len = 0;
86    res->count = 0;
87    res->modified = 0;
88    res->error_state = 0;
89    res->rq = rq;
90    return res;
91}
92
93couchfile_modify_result *new_btree_modres(arena *a, arena *transient_arena, tree_file *file,
94                                          compare_info* cmp, reduce_fn reduce, reduce_fn rereduce,
95                                          void *user_reduce_ctx,
96                                          int kv_chunk_threshold, int kp_chunk_threshold)
97{
98    couchfile_modify_request* rq;
99    rq = static_cast<couchfile_modify_request*>(arena_alloc(a, sizeof(couchfile_modify_request)));
100    if (!rq) {
101        return NULL;
102    }
103    rq->cmp = *cmp;
104    rq->file = file;
105    rq->num_actions = 0;
106    rq->fetch_callback = NULL;
107    rq->reduce = reduce;
108    rq->rereduce = rereduce;
109    rq->user_reduce_ctx = user_reduce_ctx;
110    rq->compacting = 1;
111    rq->enable_purging = 0;
112    rq->purge_kp = NULL;
113    rq->purge_kv = NULL;
114    rq->kv_chunk_threshold = kv_chunk_threshold;
115    rq->kp_chunk_threshold = kp_chunk_threshold;
116
117    couchfile_modify_result* mr = make_modres(a, rq);
118    if (!mr)
119        return NULL;
120    mr->arena_transient = transient_arena;
121    mr->modified = 1;
122    mr->node_type = KV_NODE;
123
124    return mr;
125}
126
127couchstore_error_t mr_push_item(sized_buf *k, sized_buf *v, couchfile_modify_result *dst)
128{
129    nodelist *itm = NULL;
130    if(dst->arena_transient != NULL)
131    {
132        itm = make_nodelist(dst->arena_transient, 0);
133    } else {
134        itm = make_nodelist(dst->arena, 0);
135    }
136    if (!itm) {
137        return COUCHSTORE_ERROR_ALLOC_FAIL;
138    }
139    itm->key = *k;
140    itm->data = *v;
141    itm->pointer = NULL;
142    dst->values_end->next = itm;
143    dst->values_end = itm;
144    //Encoded size (see flush_mr)
145    dst->node_len += k->size + v->size + sizeof(raw_kv_length);
146    dst->count++;
147    return maybe_flush(dst);
148}
149
150static nodelist *encode_pointer(arena* a, node_pointer *ptr)
151{
152    nodelist *pel = make_nodelist(a, sizeof(raw_node_pointer) + ptr->reduce_value.size);
153    if (!pel) {
154        return NULL;
155    }
156    raw_node_pointer *raw = (raw_node_pointer*)pel->data.buf;
157    encode_raw48(ptr->pointer, &raw->pointer);
158    encode_raw48(ptr->subtreesize, &raw->subtreesize);
159    raw->reduce_value_size = encode_raw16((uint16_t)ptr->reduce_value.size);
160    memcpy(raw + 1, ptr->reduce_value.buf, ptr->reduce_value.size);
161    pel->pointer = ptr;
162    pel->key = ptr->key;
163    return pel;
164}
165
166static couchstore_error_t mr_push_pointerinfo(node_pointer *ptr,
167                                              couchfile_modify_result *dst)
168{
169    nodelist *pel = encode_pointer(dst->arena, ptr);
170    if (!pel) {
171        return COUCHSTORE_ERROR_ALLOC_FAIL;
172    }
173    dst->values_end->next = pel;
174    dst->values_end = pel;
175    dst->node_len += pel->key.size + pel->data.size + sizeof(raw_kv_length);
176    dst->count++;
177    return maybe_flush(dst);
178}
179
180node_pointer *read_pointer(arena* a, sized_buf *key, char *buf)
181{
182    //Parse KP pair into a node_pointer {K, {ptr, reduce_value, subtreesize}}
183    node_pointer *p = (node_pointer *) arena_alloc(a, sizeof(node_pointer));
184    if (!p) {
185        return NULL;
186    }
187    const raw_node_pointer *raw = (const raw_node_pointer*)buf;
188    p->pointer = decode_raw48(raw->pointer);
189    p->subtreesize = decode_raw48(raw->subtreesize);
190    p->reduce_value.size = decode_raw16(raw->reduce_value_size);
191    p->reduce_value.buf = buf + sizeof(*raw);
192    p->key = *key;
193    return p;
194}
195
196//Write the current contents of the values list to disk as a node
197//and add the resulting pointer to the pointers list.
198static couchstore_error_t flush_mr(couchfile_modify_result *res)
199{
200    return flush_mr_partial(res, res->node_len);
201}
202
203//Write a node using enough items from the values list to create a node
204//with uncompressed size of at least mr_quota
205static couchstore_error_t flush_mr_partial(couchfile_modify_result *res, size_t mr_quota)
206{
207    char *dst;
208    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
209    int itmcount = 0;
210    char *nodebuf = NULL;
211    sized_buf writebuf;
212    char reducebuf[MAX_REDUCTION_SIZE];
213    size_t reducesize = 0;
214    uint64_t subtreesize = 0;
215    cs_off_t diskpos;
216    size_t disk_size;
217    sized_buf final_key = {NULL, 0};
218
219    if (res->values_end == res->values || ! res->modified) {
220        //Empty
221        return COUCHSTORE_SUCCESS;
222    }
223
224    // nodebuf/writebuf is very short-lived and can be large, so use regular malloc heap for it:
225    nodebuf = static_cast<char*>(cb_malloc(res->node_len + 1));
226    if (!nodebuf) {
227        return COUCHSTORE_ERROR_ALLOC_FAIL;
228    }
229
230    writebuf.buf = nodebuf;
231
232    dst = nodebuf;
233    *(dst++) = (char) res->node_type;
234
235    nodelist *i = res->values->next;
236    final_key = i->key;
237    //We don't care that we've reached mr_quota if we haven't written out
238    //at least two items and we're not writing a leaf node.
239    while (i != NULL && (mr_quota > 0 || (itmcount < 2 && res->node_type == KP_NODE))) {
240        dst = static_cast<char*>(write_kv(dst, i->key, i->data));
241        if (i->pointer) {
242            subtreesize += i->pointer->subtreesize;
243        }
244        mr_quota -= i->key.size + i->data.size + sizeof(raw_kv_length);
245        final_key = i->key;
246        i = i->next;
247        res->count--;
248        itmcount++;
249    }
250
251    writebuf.size = dst - nodebuf;
252
253    errcode = static_cast<couchstore_error_t>(db_write_buf_compressed(res->rq->file, &writebuf, &diskpos, &disk_size));
254    cb_free(nodebuf);  // here endeth the nodebuf.
255    if (errcode != COUCHSTORE_SUCCESS) {
256        if (res->rq->file->options.tracing_enabled) {
257            TRACE_INSTANT1("couchstore_write",
258                           "flush_mr_partial",
259                           "errcode",
260                           int(errcode));
261        }
262        return errcode;
263    }
264
265    if (res->node_type == KV_NODE && res->rq->reduce) {
266        errcode = res->rq->reduce(reducebuf, &reducesize, res->values->next, itmcount, res->rq->user_reduce_ctx);
267        if (errcode != COUCHSTORE_SUCCESS) {
268            return errcode;
269        }
270        cb_assert(reducesize <= sizeof(reducebuf));
271    }
272
273    if (res->node_type == KP_NODE && res->rq->rereduce) {
274        errcode = res->rq->rereduce(reducebuf, &reducesize, res->values->next, itmcount, res->rq->user_reduce_ctx);
275        if (errcode != COUCHSTORE_SUCCESS) {
276            return errcode;
277        }
278        cb_assert(reducesize <= sizeof(reducebuf));
279    }
280
281    node_pointer *ptr = (node_pointer *) arena_alloc(res->arena, sizeof(node_pointer) + final_key.size + reducesize);
282    if (!ptr) {
283        return COUCHSTORE_ERROR_ALLOC_FAIL;
284    }
285
286    ptr->key.buf = ((char *)ptr) + sizeof(node_pointer);
287    ptr->reduce_value.buf = ((char *)ptr) + sizeof(node_pointer) + final_key.size;
288
289    ptr->key.size = final_key.size;
290    ptr->reduce_value.size = reducesize;
291
292    memcpy(ptr->key.buf, final_key.buf, final_key.size);
293    memcpy(ptr->reduce_value.buf, reducebuf, reducesize);
294
295    ptr->subtreesize = subtreesize + disk_size;
296    ptr->pointer = diskpos;
297
298    nodelist *pel = encode_pointer(res->arena, ptr);
299    if (!pel) {
300        return COUCHSTORE_ERROR_ALLOC_FAIL;
301    }
302
303    res->pointers_end->next = pel;
304    res->pointers_end = pel;
305
306    res->node_len -= (writebuf.size - 1);
307
308    res->values->next = i;
309    if(i == NULL) {
310        res->values_end = res->values;
311    }
312
313    return COUCHSTORE_SUCCESS;
314}
315
316//Move this node's pointers list to dst node's values list.
317static couchstore_error_t mr_move_pointers(couchfile_modify_result *src,
318                                           couchfile_modify_result *dst)
319{
320    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
321    if (src->pointers_end == src->pointers) {
322        return COUCHSTORE_SUCCESS;
323    }
324
325    nodelist *ptr = src->pointers->next;
326    nodelist *next = ptr;
327    while (ptr != NULL && errcode == 0) {
328        dst->node_len += ptr->data.size + ptr->key.size + sizeof(raw_kv_length);
329        dst->count++;
330
331        next = ptr->next;
332        ptr->next = NULL;
333
334        dst->values_end->next = ptr;
335        dst->values_end = ptr;
336        ptr = next;
337        error_pass(maybe_flush(dst));
338    }
339
340cleanup:
341    src->pointers->next = next;
342    src->pointers_end = src->pointers;
343    return errcode;
344}
345
346// Perform purging for a kv-node if it qualifies for purging
347static couchstore_error_t maybe_purgekv(couchfile_modify_request *rq,
348                                            sized_buf *key,
349                                            sized_buf *val,
350                                            couchfile_modify_result *result)
351{
352    int action;
353    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
354
355    if (rq->enable_purging && rq->purge_kv) {
356        action = rq->purge_kv(key, val, rq->guided_purge_ctx);
357        if (action < 0) {
358            return (couchstore_error_t) action;
359        }
360    } else {
361        action = PURGE_KEEP;
362    }
363
364    switch (action) {
365    case PURGE_ITEM:
366        result->modified = 1;
367        break;
368
369    case PURGE_STOP:
370        rq->enable_purging = 0;
371
372    case PURGE_KEEP:
373        errcode = mr_push_item(key, val, result);
374        break;
375    }
376
377    return errcode;
378}
379
380// Perform purging for a kp-node if it qualifies for purging
381static couchstore_error_t maybe_purgekp(couchfile_modify_request *rq, node_pointer *node,
382                                        couchfile_modify_result *result)
383{
384    int action;
385    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
386
387    if (rq->enable_purging && rq->purge_kp) {
388        action = rq->purge_kp(node, rq->guided_purge_ctx);
389        if (action < 0) {
390            return (couchstore_error_t) action;
391        }
392    } else {
393        action = PURGE_KEEP;
394    }
395
396    switch (action) {
397    case PURGE_ITEM:
398        result->modified = 1;
399        break;
400
401    case PURGE_PARTIAL:
402        errcode = purge_node(rq, node, result);
403        break;
404
405    case PURGE_STOP:
406        rq->enable_purging = 0;
407
408    case PURGE_KEEP:
409        errcode = mr_push_pointerinfo(node, result);
410        break;
411    }
412
413    return errcode;
414}
415
416static couchstore_error_t modify_node(couchfile_modify_request *rq,
417                                      node_pointer *nptr,
418                                      int start, int end,
419                                      couchfile_modify_result *dst)
420{
421    char *nodebuf = NULL;  // FYI, nodebuf is a malloced block, not in the arena
422    int bufpos = 1;
423    int nodebuflen = 0;
424    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
425    couchfile_modify_result *local_result = NULL;
426
427    if (start == end) {
428        return COUCHSTORE_SUCCESS;
429    }
430
431    if (nptr) {
432        if ((nodebuflen = pread_compressed(rq->file, nptr->pointer, (char **) &nodebuf)) < 0) {
433            error_pass(static_cast<couchstore_error_t>(nodebuflen));
434        }
435    }
436
437    local_result = make_modres(dst->arena, rq);
438    error_unless(local_result, COUCHSTORE_ERROR_ALLOC_FAIL);
439
440    if (nptr == NULL || nodebuf[0] == 1) { //KV Node
441        local_result->node_type = KV_NODE;
442        while (bufpos < nodebuflen) {
443            sized_buf cmp_key, val_buf;
444            bufpos += read_kv(nodebuf + bufpos, &cmp_key, &val_buf);
445            int advance = 0;
446            while (!advance && start < end) {
447                advance = 1;
448                int cmp_val = rq->cmp.compare(&cmp_key, rq->actions[start].key);
449
450                if (cmp_val < 0) { //Key less than action key
451                    errcode = maybe_purgekv(rq, &cmp_key, &val_buf, local_result);
452                    if (errcode != COUCHSTORE_SUCCESS) {
453                        goto cleanup;
454                    }
455                } else if (cmp_val > 0) { //Key greater than action key
456                    switch (rq->actions[start].type) {
457                    case ACTION_INSERT:
458                        local_result->modified = 1;
459                        mr_push_item(rq->actions[start].key, rq->actions[start].value.data, local_result);
460                        break;
461
462                    case ACTION_REMOVE:
463                        local_result->modified = 1;
464                        break;
465
466                    case ACTION_FETCH:
467                        if (rq->fetch_callback) {
468                            //not found
469                            (*rq->fetch_callback)(rq, rq->actions[start].key, NULL, rq->actions[start].value.arg);
470                        }
471                    }
472                    start++;
473                    //Do next action on same item in the node, as our action was
474                    //not >= it.
475                    advance = 0;
476                } else if (cmp_val == 0) { //Node key is equal to action key
477                    switch (rq->actions[start].type) {
478                    case ACTION_INSERT:
479                        local_result->modified = 1;
480                        mr_push_item(rq->actions[start].key, rq->actions[start].value.data, local_result);
481                        break;
482
483                    case ACTION_REMOVE:
484                        local_result->modified = 1;
485                        break;
486
487                    case ACTION_FETCH:
488                        if (rq->fetch_callback) {
489                            (*rq->fetch_callback)(rq, rq->actions[start].key, &val_buf, rq->actions[start].value.arg);
490                        }
491                        //Do next action on same item in the node, as our action was a fetch
492                        //and there may be an equivalent insert or remove
493                        //following.
494                        advance = 0;
495                    }
496                    start++;
497                }
498            }
499            if (start == end && !advance) {
500                //If we've exhausted actions then just keep this key
501                errcode = maybe_purgekv(rq, &cmp_key, &val_buf, local_result);
502                if (errcode != COUCHSTORE_SUCCESS) {
503                    goto cleanup;
504                }
505            }
506        }
507        while (start < end) {
508            //We're at the end of a leaf node.
509            switch (rq->actions[start].type) {
510            case ACTION_INSERT:
511                local_result->modified = 1;
512                mr_push_item(rq->actions[start].key, rq->actions[start].value.data, local_result);
513                break;
514
515            case ACTION_REMOVE:
516                local_result->modified = 1;
517                break;
518
519            case ACTION_FETCH:
520                if (rq->fetch_callback) {
521                    //not found
522                    (*rq->fetch_callback)(rq, rq->actions[start].key, NULL, rq->actions[start].value.arg);
523                }
524                break;
525            }
526            start++;
527        }
528    } else if (nodebuf[0] == 0) { //KP Node
529        local_result->node_type = KP_NODE;
530        while (bufpos < nodebuflen && start < end) {
531            sized_buf cmp_key, val_buf;
532            bufpos += read_kv(nodebuf + bufpos, &cmp_key, &val_buf);
533            int cmp_val = rq->cmp.compare(&cmp_key, rq->actions[start].key);
534            if (bufpos == nodebuflen) {
535                //We're at the last item in the kpnode, must apply all our
536                //actions here.
537                node_pointer *desc = read_pointer(dst->arena, &cmp_key, val_buf.buf);
538                if (!desc) {
539                    errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
540                    goto cleanup;
541                }
542
543                errcode = modify_node(rq, desc, start, end, local_result);
544                if (errcode != COUCHSTORE_SUCCESS) {
545                    goto cleanup;
546                }
547                break;
548            }
549
550            if (cmp_val < 0) {
551                //Key in node item less than action item and not at end
552                //position, so just add it and continue.
553                node_pointer *add = read_pointer(dst->arena, &cmp_key, val_buf.buf);
554                if (!add) {
555                    errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
556                    goto cleanup;
557                }
558
559                errcode = maybe_purgekp(rq, add, local_result);
560                if (errcode != COUCHSTORE_SUCCESS) {
561                    goto cleanup;
562                }
563            } else if (cmp_val >= 0) {
564                //Found a key in the node greater than the one in the current
565                //action. Descend into the pointed node with as many actions as
566                //are less than the key here.
567                int range_end = start;
568                while (range_end < end &&
569                        rq->cmp.compare(rq->actions[range_end].key, &cmp_key) <= 0) {
570                    range_end++;
571                }
572
573                node_pointer *desc = read_pointer(dst->arena, &cmp_key, val_buf.buf);
574                if (!desc) {
575                    errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
576                    goto cleanup;
577                }
578
579                errcode = modify_node(rq, desc, start, range_end, local_result);
580                start = range_end;
581                if (errcode != COUCHSTORE_SUCCESS) {
582                    goto cleanup;
583                }
584            }
585        }
586        while (bufpos < nodebuflen) {
587            sized_buf cmp_key, val_buf;
588            bufpos += read_kv(nodebuf + bufpos, &cmp_key, &val_buf);
589            node_pointer *add = read_pointer(dst->arena, &cmp_key, val_buf.buf);
590            if (!add) {
591                errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
592                goto cleanup;
593            }
594
595            errcode = maybe_purgekp(rq, add, local_result);
596            if (errcode != COUCHSTORE_SUCCESS) {
597                goto cleanup;
598            }
599        }
600    } else {
601        errcode = COUCHSTORE_ERROR_CORRUPT;
602        goto cleanup;
603    }
604    //If we've done modifications, write out the last leaf node.
605    error_pass(flush_mr(local_result));
606    if (!local_result->modified && nptr != NULL) {
607        //If we didn't do anything, give back the pointer to the original
608        mr_push_pointerinfo(nptr, dst);
609    } else {
610        //Otherwise, give back the pointers to the nodes we've created.
611        dst->modified = 1;
612        error_pass(mr_move_pointers(local_result, dst));
613    }
614cleanup:
615    if (nodebuf) {
616        cb_free(nodebuf);
617    }
618
619    return errcode;
620}
621
622node_pointer *finish_root(couchfile_modify_request *rq,
623                                 couchfile_modify_result *root_result,
624                                 couchstore_error_t *errcode)
625{
626    node_pointer *ret_ptr = NULL;
627    couchfile_modify_result *collector = make_modres(root_result->arena, rq);
628    if (!collector) {
629        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
630        return NULL;
631    }
632    collector->modified = 1;
633    collector->node_type = KP_NODE;
634    flush_mr(root_result);
635    while (1) {
636        if (root_result->pointers_end == root_result->pointers->next) {
637            //The root result split into exactly one kp_node.
638            //Return the pointer to it.
639            ret_ptr = root_result->pointers_end->pointer;
640            break;
641        } else {
642            //The root result split into more than one kp_node.
643            //Move the pointer list to the value list and write out the new node.
644            *errcode = mr_move_pointers(root_result, collector);
645
646            if (*errcode < 0) {
647                goto cleanup;
648            }
649
650            *errcode = flush_mr(collector);
651            if (*errcode < 0) {
652                goto cleanup;
653            }
654            //Swap root_result and collector mr's.
655            couchfile_modify_result *tmp = root_result;
656            root_result = collector;
657            collector = tmp;
658        }
659    }
660cleanup:
661    return ret_ptr;
662}
663
664// Copies a node_pointer and its values to the malloc heap.
665node_pointer* copy_node_pointer(node_pointer* ptr)
666{
667    if (!ptr) {
668        return NULL;
669    }
670    node_pointer* ret_ptr;
671    ret_ptr = static_cast<node_pointer*>(cb_malloc(sizeof(node_pointer) + ptr->key.size + ptr->reduce_value.size));
672    if (!ret_ptr) {
673        return NULL;
674    }
675    *ret_ptr = *ptr;
676    ret_ptr->key.buf = (char*)ret_ptr + sizeof(node_pointer);
677    memcpy(ret_ptr->key.buf, ptr->key.buf, ptr->key.size);
678    ret_ptr->reduce_value.buf = (char*)ret_ptr->key.buf + ptr->key.size;
679    memcpy(ret_ptr->reduce_value.buf, ptr->reduce_value.buf, ptr->reduce_value.size);
680    return ret_ptr;
681}
682
683// Finished creating a new b-tree (from compaction), build pointers and get a
684// root node.
685
686node_pointer* complete_new_btree(couchfile_modify_result* mr, couchstore_error_t *errcode)
687{
688    *errcode = flush_mr(mr);
689    if(*errcode != COUCHSTORE_SUCCESS) {
690        return NULL;
691    }
692
693    couchfile_modify_result* targ_mr = make_modres(mr->arena, mr->rq);
694    if (!targ_mr) {
695        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
696        return NULL;
697    }
698    targ_mr->modified = 1;
699    targ_mr->node_type = KP_NODE;
700
701    *errcode = mr_move_pointers(mr, targ_mr);
702    if(*errcode != COUCHSTORE_SUCCESS) {
703        return NULL;
704    }
705
706    node_pointer* ret_ptr;
707    if(targ_mr->count > 1 || targ_mr->pointers != targ_mr->pointers_end) {
708        ret_ptr = finish_root(mr->rq, targ_mr, errcode);
709    } else {
710        ret_ptr = targ_mr->values_end->pointer;
711    }
712
713    if (*errcode != COUCHSTORE_SUCCESS || ret_ptr == NULL) {
714        return NULL;
715    }
716
717    ret_ptr = copy_node_pointer(ret_ptr);
718    if (ret_ptr == NULL) {
719        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
720    }
721    return ret_ptr;
722}
723
724node_pointer *modify_btree(couchfile_modify_request *rq,
725                           node_pointer *root,
726                           couchstore_error_t *errcode)
727{
728    arena* a = new_arena(0);
729    if (!a) {
730        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
731        return NULL;
732    }
733    node_pointer *ret_ptr = root;
734    couchfile_modify_result *root_result = make_modres(a, rq);
735    if (!root_result) {
736        delete_arena(a);
737        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
738        return root;
739    }
740    root_result->node_type = KP_NODE;
741    *errcode = modify_node(rq, root, 0, rq->num_actions, root_result);
742    if (*errcode < 0) {
743        delete_arena(a);
744        return NULL;
745    }
746
747    if (root_result->values_end->pointer == root) {
748        //If we got the root pointer back, remove it from the list
749        //so we don't try to free it.
750        root_result->values_end->pointer = NULL;
751    }
752
753    if (root_result->modified) {
754        if (root_result->count > 1 || root_result->pointers != root_result->pointers_end) {
755            //The root was split
756            //Write it to disk and return the pointer to it.
757            ret_ptr = finish_root(rq, root_result, errcode);
758            if (*errcode < 0) {
759                ret_ptr = NULL;
760            }
761        } else {
762            ret_ptr = root_result->values_end->pointer;
763        }
764    }
765    if (ret_ptr != root) {
766        ret_ptr = copy_node_pointer(ret_ptr);
767    }
768    delete_arena(a);
769    return ret_ptr;
770}
771
772static couchstore_error_t purge_node(couchfile_modify_request *rq,
773                                     node_pointer *nptr,
774                                     couchfile_modify_result *dst)
775{
776    char *nodebuf = NULL;  // FYI, nodebuf is a malloced block, not in the arena
777    int bufpos = 1;
778    int nodebuflen = 0;
779    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
780    couchfile_modify_result *local_result = NULL;
781
782    if (nptr == NULL) {
783        return errcode;
784    }
785
786    // noop: add back current node to destination
787    if (!rq->enable_purging) {
788        dst->modified = 1;
789        return mr_push_pointerinfo(nptr, dst);
790    }
791
792    if ((nodebuflen = pread_compressed(rq->file, nptr->pointer, (char **) &nodebuf)) < 0) {
793        error_pass(static_cast<couchstore_error_t>(nodebuflen));
794    }
795
796    local_result = make_modres(dst->arena, rq);
797    error_unless(local_result, COUCHSTORE_ERROR_ALLOC_FAIL);
798
799    if (nodebuf[0] == 1) { //KV Node
800        local_result->node_type = KV_NODE;
801        while (bufpos < nodebuflen) {
802            sized_buf cmp_key, val_buf;
803            bufpos += read_kv(nodebuf + bufpos, &cmp_key, &val_buf);
804
805            errcode = maybe_purgekv(rq, &cmp_key, &val_buf, local_result);
806            if (errcode != COUCHSTORE_SUCCESS) {
807                goto cleanup;
808            }
809        }
810    } else if (nodebuf[0] == 0) { //KP Node
811        local_result->node_type = KP_NODE;
812        while (bufpos < nodebuflen) {
813            sized_buf cmp_key, val_buf;
814            bufpos += read_kv(nodebuf + bufpos, &cmp_key, &val_buf);
815
816            node_pointer *desc = read_pointer(dst->arena, &cmp_key, val_buf.buf);
817            if (!desc) {
818                errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
819                goto cleanup;
820            }
821
822            errcode = maybe_purgekp(rq, desc, local_result);
823            if (errcode != COUCHSTORE_SUCCESS) {
824                goto cleanup;
825            }
826        }
827    } else {
828        errcode = COUCHSTORE_ERROR_CORRUPT;
829        goto cleanup;
830    }
831
832    // Write out changes and add node back to parent
833    if (local_result->modified) {
834        error_pass(flush_mr(local_result));
835        dst->modified = 1;
836        error_pass(mr_move_pointers(local_result, dst));
837    }
838
839cleanup:
840    cb_free(nodebuf);
841    return errcode;
842}
843
844node_pointer *guided_purge_btree(couchfile_modify_request *rq, node_pointer *root,
845                                                couchstore_error_t *errcode)
846{
847    rq->enable_purging = 1;
848    arena* a = new_arena(0);
849    if (!a) {
850        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
851        return NULL;
852    }
853
854    node_pointer *ret_ptr = root;
855    couchfile_modify_result *root_result = make_modres(a, rq);
856    if (!root_result) {
857        delete_arena(a);
858        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
859        return NULL;
860    }
861
862    root_result->node_type = KP_NODE;
863    *errcode = purge_node(rq, root, root_result);
864    if (*errcode < 0) {
865        delete_arena(a);
866        return NULL;
867    }
868
869    if (root_result->modified) {
870        ret_ptr = root_result->values_end->pointer;
871    }
872
873    if (ret_ptr && ret_ptr != root) {
874        ret_ptr = copy_node_pointer(ret_ptr);
875    }
876
877    delete_arena(a);
878    return ret_ptr;
879}
880