xref: /4.0.0/couchstore/src/btree_modify.cc (revision d826ffba)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "config.h"
3#include <assert.h>
4#include <stdlib.h>
5#include <string.h>
6#include <signal.h>
7
8#include "couch_btree.h"
9#include "util.h"
10#include "arena.h"
11#include "node_types.h"
12
13
14static couchstore_error_t flush_mr_partial(couchfile_modify_result *res, size_t mr_quota);
15static couchstore_error_t flush_mr(couchfile_modify_result *res);
16static couchstore_error_t purge_node(couchfile_modify_request *rq,
17                                      node_pointer *nptr,
18                                      couchfile_modify_result *dst);
19
20static couchstore_error_t maybe_flush(couchfile_modify_result *mr)
21{
22    if(mr->rq->compacting) {
23        /* The compactor can (and should), just write out nodes
24         * of size CHUNK_SIZE as soon as it can, so that it can
25         * free memory it no longer needs. Flush at least 2 KP
26         * nodes,so that we don't create one parent node for
27         * each of the large KP nodes (one node size > threshold).
28         */
29        if (mr->modified && (((mr->node_type == KV_NODE) &&
30            mr->node_len > (mr->rq->kv_chunk_threshold * 2 / 3)) ||
31            ((mr->node_type == KP_NODE) &&
32             mr->node_len > (mr->rq->kp_chunk_threshold * 2 / 3) &&
33             mr->count > 1))) {
34            return flush_mr(mr);
35        }
36    } else if (mr->modified &&  mr->count > 3) {
37        /* Don't write out a partial node unless we've collected
38         * at least three items */
39        if ((mr->node_type == KV_NODE) &&
40             mr->node_len > mr->rq->kv_chunk_threshold) {
41            return flush_mr_partial(mr, (mr->rq->kv_chunk_threshold * 2 / 3));
42        }
43        if ((mr->node_type == KP_NODE) &&
44             mr->node_len > mr->rq->kp_chunk_threshold) {
45            return flush_mr_partial(mr, (mr->rq->kp_chunk_threshold * 2 / 3));
46        }
47    }
48
49    return COUCHSTORE_SUCCESS;
50}
51
52
53static nodelist *make_nodelist(arena* a, size_t bufsize)
54{
55    nodelist *r = static_cast<nodelist*>(arena_alloc(a, sizeof(nodelist) + bufsize));
56    if (!r) {
57        return NULL;
58    }
59    memset(r, 0, sizeof(nodelist));
60    r->data.size = bufsize;
61    r->data.buf = ((char *) r) + (sizeof(nodelist));
62    return r;
63}
64
65couchfile_modify_result *make_modres(arena* a, couchfile_modify_request *rq)
66{
67    couchfile_modify_result *res;
68    res = static_cast<couchfile_modify_result*>(arena_alloc(a, sizeof(couchfile_modify_result)));
69    if (!res) {
70        return NULL;
71    }
72    res->arena = a;
73    res->arena_transient = NULL;
74    res->values = make_nodelist(a, 0);
75    if (!res->values) {
76        return NULL;
77    }
78    res->values_end = res->values;
79    res->pointers = make_nodelist(a, 0);
80    if (!res->pointers) {
81        return NULL;
82    }
83    res->pointers_end = res->pointers;
84    res->node_len = 0;
85    res->count = 0;
86    res->modified = 0;
87    res->error_state = 0;
88    res->rq = rq;
89    return res;
90}
91
92couchfile_modify_result *new_btree_modres(arena *a, arena *transient_arena, tree_file *file,
93                                          compare_info* cmp, reduce_fn reduce, reduce_fn rereduce,
94                                          void *user_reduce_ctx,
95                                          int kv_chunk_threshold, int kp_chunk_threshold)
96{
97    couchfile_modify_request* rq;
98    rq = static_cast<couchfile_modify_request*>(arena_alloc(a, sizeof(couchfile_modify_request)));
99    rq->cmp = *cmp;
100    rq->file = file;
101    rq->num_actions = 0;
102    rq->fetch_callback = NULL;
103    rq->reduce = reduce;
104    rq->rereduce = rereduce;
105    rq->user_reduce_ctx = user_reduce_ctx;
106    rq->compacting = 1;
107    rq->enable_purging = 0;
108    rq->purge_kp = NULL;
109    rq->purge_kv = NULL;
110    rq->kv_chunk_threshold = kv_chunk_threshold;
111    rq->kp_chunk_threshold = kp_chunk_threshold;
112
113    couchfile_modify_result* mr = make_modres(a, rq);
114    if (!mr)
115        return NULL;
116    mr->arena_transient = transient_arena;
117    mr->modified = 1;
118    mr->node_type = KV_NODE;
119
120    return mr;
121}
122
123couchstore_error_t mr_push_item(sized_buf *k, sized_buf *v, couchfile_modify_result *dst)
124{
125    nodelist *itm = NULL;
126    if(dst->arena_transient != NULL)
127    {
128        itm = make_nodelist(dst->arena_transient, 0);
129    } else {
130        itm = make_nodelist(dst->arena, 0);
131    }
132    if (!itm) {
133        return COUCHSTORE_ERROR_ALLOC_FAIL;
134    }
135    itm->key = *k;
136    itm->data = *v;
137    itm->pointer = NULL;
138    dst->values_end->next = itm;
139    dst->values_end = itm;
140    //Encoded size (see flush_mr)
141    dst->node_len += k->size + v->size + sizeof(raw_kv_length);
142    dst->count++;
143    return maybe_flush(dst);
144}
145
146static nodelist *encode_pointer(arena* a, node_pointer *ptr)
147{
148    nodelist *pel = make_nodelist(a, sizeof(raw_node_pointer) + ptr->reduce_value.size);
149    if (!pel) {
150        return NULL;
151    }
152    raw_node_pointer *raw = (raw_node_pointer*)pel->data.buf;
153    encode_raw48(ptr->pointer, &raw->pointer);
154    encode_raw48(ptr->subtreesize, &raw->subtreesize);
155    raw->reduce_value_size = encode_raw16((uint16_t)ptr->reduce_value.size);
156    memcpy(raw + 1, ptr->reduce_value.buf, ptr->reduce_value.size);
157    pel->pointer = ptr;
158    pel->key = ptr->key;
159    return pel;
160}
161
162static couchstore_error_t mr_push_pointerinfo(node_pointer *ptr,
163                                              couchfile_modify_result *dst)
164{
165    nodelist *pel = encode_pointer(dst->arena, ptr);
166    if (!pel) {
167        return COUCHSTORE_ERROR_ALLOC_FAIL;
168    }
169    dst->values_end->next = pel;
170    dst->values_end = pel;
171    dst->node_len += pel->key.size + pel->data.size + sizeof(raw_kv_length);
172    dst->count++;
173    return maybe_flush(dst);
174}
175
176node_pointer *read_pointer(arena* a, sized_buf *key, char *buf)
177{
178    //Parse KP pair into a node_pointer {K, {ptr, reduce_value, subtreesize}}
179    node_pointer *p = (node_pointer *) arena_alloc(a, sizeof(node_pointer));
180    if (!p) {
181        return NULL;
182    }
183    const raw_node_pointer *raw = (const raw_node_pointer*)buf;
184    p->pointer = decode_raw48(raw->pointer);
185    p->subtreesize = decode_raw48(raw->subtreesize);
186    p->reduce_value.size = decode_raw16(raw->reduce_value_size);
187    p->reduce_value.buf = buf + sizeof(*raw);
188    p->key = *key;
189    return p;
190}
191
192//Write the current contents of the values list to disk as a node
193//and add the resulting pointer to the pointers list.
194static couchstore_error_t flush_mr(couchfile_modify_result *res)
195{
196    return flush_mr_partial(res, res->node_len);
197}
198
199//Write a node using enough items from the values list to create a node
200//with uncompressed size of at least mr_quota
201static couchstore_error_t flush_mr_partial(couchfile_modify_result *res, size_t mr_quota)
202{
203    char *dst;
204    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
205    int itmcount = 0;
206    char *nodebuf = NULL;
207    sized_buf writebuf;
208    char reducebuf[MAX_REDUCTION_SIZE];
209    size_t reducesize = 0;
210    uint64_t subtreesize = 0;
211    cs_off_t diskpos;
212    size_t disk_size;
213    sized_buf final_key = {NULL, 0};
214
215    if (res->values_end == res->values || ! res->modified) {
216        //Empty
217        return COUCHSTORE_SUCCESS;
218    }
219
220    // nodebuf/writebuf is very short-lived and can be large, so use regular malloc heap for it:
221    nodebuf = static_cast<char*>(malloc(res->node_len + 1));
222    if (!nodebuf) {
223        return COUCHSTORE_ERROR_ALLOC_FAIL;
224    }
225
226    writebuf.buf = nodebuf;
227
228    dst = nodebuf;
229    *(dst++) = (char) res->node_type;
230
231    nodelist *i = res->values->next;
232    //We don't care that we've reached mr_quota if we haven't written out
233    //at least two items and we're not writing a leaf node.
234    while (i != NULL && (mr_quota > 0 || (itmcount < 2 && res->node_type == KP_NODE))) {
235        dst = static_cast<char*>(write_kv(dst, i->key, i->data));
236        if (i->pointer) {
237            subtreesize += i->pointer->subtreesize;
238        }
239        mr_quota -= i->key.size + i->data.size + sizeof(raw_kv_length);
240        final_key = i->key;
241        i = i->next;
242        res->count--;
243        itmcount++;
244    }
245
246    writebuf.size = dst - nodebuf;
247
248    errcode = static_cast<couchstore_error_t>(db_write_buf_compressed(res->rq->file, &writebuf, &diskpos, &disk_size));
249    free(nodebuf);  // here endeth the nodebuf.
250    if (errcode != COUCHSTORE_SUCCESS) {
251        return errcode;
252    }
253
254    if (res->node_type == KV_NODE && res->rq->reduce) {
255        errcode = res->rq->reduce(reducebuf, &reducesize, res->values->next, itmcount, res->rq->user_reduce_ctx);
256        if (errcode != COUCHSTORE_SUCCESS) {
257            return errcode;
258        }
259        assert(reducesize <= sizeof(reducebuf));
260    }
261
262    if (res->node_type == KP_NODE && res->rq->rereduce) {
263        errcode = res->rq->rereduce(reducebuf, &reducesize, res->values->next, itmcount, res->rq->user_reduce_ctx);
264        if (errcode != COUCHSTORE_SUCCESS) {
265            return errcode;
266        }
267        assert(reducesize <= sizeof(reducebuf));
268    }
269
270    node_pointer *ptr = (node_pointer *) arena_alloc(res->arena, sizeof(node_pointer) + final_key.size + reducesize);
271    if (!ptr) {
272        return COUCHSTORE_ERROR_ALLOC_FAIL;
273    }
274
275    ptr->key.buf = ((char *)ptr) + sizeof(node_pointer);
276    ptr->reduce_value.buf = ((char *)ptr) + sizeof(node_pointer) + final_key.size;
277
278    ptr->key.size = final_key.size;
279    ptr->reduce_value.size = reducesize;
280
281    memcpy(ptr->key.buf, final_key.buf, final_key.size);
282    memcpy(ptr->reduce_value.buf, reducebuf, reducesize);
283
284    ptr->subtreesize = subtreesize + disk_size;
285    ptr->pointer = diskpos;
286
287    nodelist *pel = encode_pointer(res->arena, ptr);
288    if (!pel) {
289        return COUCHSTORE_ERROR_ALLOC_FAIL;
290    }
291
292    res->pointers_end->next = pel;
293    res->pointers_end = pel;
294
295    res->node_len -= (writebuf.size - 1);
296
297    res->values->next = i;
298    if(i == NULL) {
299        res->values_end = res->values;
300    }
301
302    return COUCHSTORE_SUCCESS;
303}
304
305//Move this node's pointers list to dst node's values list.
306static couchstore_error_t mr_move_pointers(couchfile_modify_result *src,
307                                           couchfile_modify_result *dst)
308{
309    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
310    if (src->pointers_end == src->pointers) {
311        return COUCHSTORE_SUCCESS;
312    }
313
314    nodelist *ptr = src->pointers->next;
315    nodelist *next = ptr;
316    while (ptr != NULL && errcode == 0) {
317        dst->node_len += ptr->data.size + ptr->key.size + sizeof(raw_kv_length);
318        dst->count++;
319
320        next = ptr->next;
321        ptr->next = NULL;
322
323        dst->values_end->next = ptr;
324        dst->values_end = ptr;
325        ptr = next;
326        error_pass(maybe_flush(dst));
327    }
328
329cleanup:
330    src->pointers->next = next;
331    src->pointers_end = src->pointers;
332    return errcode;
333}
334
335// Perform purging for a kv-node if it qualifies for purging
336static couchstore_error_t maybe_purgekv(couchfile_modify_request *rq,
337                                            sized_buf *key,
338                                            sized_buf *val,
339                                            couchfile_modify_result *result)
340{
341    int action;
342    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
343
344    if (rq->enable_purging && rq->purge_kv) {
345        action = rq->purge_kv(key, val, rq->guided_purge_ctx);
346        if (action < 0) {
347            return (couchstore_error_t) action;
348        }
349    } else {
350        action = PURGE_KEEP;
351    }
352
353    switch (action) {
354    case PURGE_ITEM:
355        result->modified = 1;
356        break;
357
358    case PURGE_STOP:
359        rq->enable_purging = 0;
360
361    case PURGE_KEEP:
362        errcode = mr_push_item(key, val, result);
363        break;
364    }
365
366    return errcode;
367}
368
369// Perform purging for a kp-node if it qualifies for purging
370static couchstore_error_t maybe_purgekp(couchfile_modify_request *rq, node_pointer *node,
371                                        couchfile_modify_result *result)
372{
373    int action;
374    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
375
376    if (rq->enable_purging && rq->purge_kp) {
377        action = rq->purge_kp(node, rq->guided_purge_ctx);
378        if (action < 0) {
379            return (couchstore_error_t) action;
380        }
381    } else {
382        action = PURGE_KEEP;
383    }
384
385    switch (action) {
386    case PURGE_ITEM:
387        result->modified = 1;
388        break;
389
390    case PURGE_PARTIAL:
391        errcode = purge_node(rq, node, result);
392        break;
393
394    case PURGE_STOP:
395        rq->enable_purging = 0;
396
397    case PURGE_KEEP:
398        errcode = mr_push_pointerinfo(node, result);
399        break;
400    }
401
402    return errcode;
403}
404
405static couchstore_error_t modify_node(couchfile_modify_request *rq,
406                                      node_pointer *nptr,
407                                      int start, int end,
408                                      couchfile_modify_result *dst)
409{
410    char *nodebuf = NULL;  // FYI, nodebuf is a malloced block, not in the arena
411    int bufpos = 1;
412    int nodebuflen = 0;
413    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
414    couchfile_modify_result *local_result = NULL;
415
416    if (start == end) {
417        return COUCHSTORE_SUCCESS;
418    }
419
420    if (nptr) {
421        if ((nodebuflen = pread_compressed(rq->file, nptr->pointer, (char **) &nodebuf)) < 0) {
422            error_pass(COUCHSTORE_ERROR_READ);
423        }
424    }
425
426    local_result = make_modres(dst->arena, rq);
427    error_unless(local_result, COUCHSTORE_ERROR_ALLOC_FAIL);
428
429    if (nptr == NULL || nodebuf[0] == 1) { //KV Node
430        local_result->node_type = KV_NODE;
431        while (bufpos < nodebuflen) {
432            sized_buf cmp_key, val_buf;
433            bufpos += read_kv(nodebuf + bufpos, &cmp_key, &val_buf);
434            int advance = 0;
435            while (!advance && start < end) {
436                advance = 1;
437                int cmp_val = rq->cmp.compare(&cmp_key, rq->actions[start].key);
438
439                if (cmp_val < 0) { //Key less than action key
440                    errcode = maybe_purgekv(rq, &cmp_key, &val_buf, local_result);
441                    if (errcode != COUCHSTORE_SUCCESS) {
442                        goto cleanup;
443                    }
444                } else if (cmp_val > 0) { //Key greater than action key
445                    switch (rq->actions[start].type) {
446                    case ACTION_INSERT:
447                        local_result->modified = 1;
448                        mr_push_item(rq->actions[start].key, rq->actions[start].value.data, local_result);
449                        break;
450
451                    case ACTION_REMOVE:
452                        local_result->modified = 1;
453                        break;
454
455                    case ACTION_FETCH:
456                        if (rq->fetch_callback) {
457                            //not found
458                            (*rq->fetch_callback)(rq, rq->actions[start].key, NULL, rq->actions[start].value.arg);
459                        }
460                    }
461                    start++;
462                    //Do next action on same item in the node, as our action was
463                    //not >= it.
464                    advance = 0;
465                } else if (cmp_val == 0) { //Node key is equal to action key
466                    switch (rq->actions[start].type) {
467                    case ACTION_INSERT:
468                        local_result->modified = 1;
469                        mr_push_item(rq->actions[start].key, rq->actions[start].value.data, local_result);
470                        break;
471
472                    case ACTION_REMOVE:
473                        local_result->modified = 1;
474                        break;
475
476                    case ACTION_FETCH:
477                        if (rq->fetch_callback) {
478                            (*rq->fetch_callback)(rq, rq->actions[start].key, &val_buf, rq->actions[start].value.arg);
479                        }
480                        //Do next action on same item in the node, as our action was a fetch
481                        //and there may be an equivalent insert or remove
482                        //following.
483                        advance = 0;
484                    }
485                    start++;
486                }
487            }
488            if (start == end && !advance) {
489                //If we've exhausted actions then just keep this key
490                errcode = maybe_purgekv(rq, &cmp_key, &val_buf, local_result);
491                if (errcode != COUCHSTORE_SUCCESS) {
492                    goto cleanup;
493                }
494            }
495        }
496        while (start < end) {
497            //We're at the end of a leaf node.
498            switch (rq->actions[start].type) {
499            case ACTION_INSERT:
500                local_result->modified = 1;
501                mr_push_item(rq->actions[start].key, rq->actions[start].value.data, local_result);
502                break;
503
504            case ACTION_REMOVE:
505                local_result->modified = 1;
506                break;
507
508            case ACTION_FETCH:
509                if (rq->fetch_callback) {
510                    //not found
511                    (*rq->fetch_callback)(rq, rq->actions[start].key, NULL, rq->actions[start].value.arg);
512                }
513                break;
514            }
515            start++;
516        }
517    } else if (nodebuf[0] == 0) { //KP Node
518        local_result->node_type = KP_NODE;
519        while (bufpos < nodebuflen && start < end) {
520            sized_buf cmp_key, val_buf;
521            bufpos += read_kv(nodebuf + bufpos, &cmp_key, &val_buf);
522            int cmp_val = rq->cmp.compare(&cmp_key, rq->actions[start].key);
523            if (bufpos == nodebuflen) {
524                //We're at the last item in the kpnode, must apply all our
525                //actions here.
526                node_pointer *desc = read_pointer(dst->arena, &cmp_key, val_buf.buf);
527                if (!desc) {
528                    errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
529                    goto cleanup;
530                }
531
532                errcode = modify_node(rq, desc, start, end, local_result);
533                if (errcode != COUCHSTORE_SUCCESS) {
534                    goto cleanup;
535                }
536                break;
537            }
538
539            if (cmp_val < 0) {
540                //Key in node item less than action item and not at end
541                //position, so just add it and continue.
542                node_pointer *add = read_pointer(dst->arena, &cmp_key, val_buf.buf);
543                if (!add) {
544                    errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
545                    goto cleanup;
546                }
547
548                errcode = maybe_purgekp(rq, add, local_result);
549                if (errcode != COUCHSTORE_SUCCESS) {
550                    goto cleanup;
551                }
552            } else if (cmp_val >= 0) {
553                //Found a key in the node greater than the one in the current
554                //action. Descend into the pointed node with as many actions as
555                //are less than the key here.
556                int range_end = start;
557                while (range_end < end &&
558                        rq->cmp.compare(rq->actions[range_end].key, &cmp_key) <= 0) {
559                    range_end++;
560                }
561
562                node_pointer *desc = read_pointer(dst->arena, &cmp_key, val_buf.buf);
563                if (!desc) {
564                    errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
565                    goto cleanup;
566                }
567
568                errcode = modify_node(rq, desc, start, range_end, local_result);
569                start = range_end;
570                if (errcode != COUCHSTORE_SUCCESS) {
571                    goto cleanup;
572                }
573            }
574        }
575        while (bufpos < nodebuflen) {
576            sized_buf cmp_key, val_buf;
577            bufpos += read_kv(nodebuf + bufpos, &cmp_key, &val_buf);
578            node_pointer *add = read_pointer(dst->arena, &cmp_key, val_buf.buf);
579            if (!add) {
580                errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
581                goto cleanup;
582            }
583
584            errcode = maybe_purgekp(rq, add, local_result);
585            if (errcode != COUCHSTORE_SUCCESS) {
586                goto cleanup;
587            }
588        }
589    } else {
590        errcode = COUCHSTORE_ERROR_CORRUPT;
591        goto cleanup;
592    }
593    //If we've done modifications, write out the last leaf node.
594    error_pass(flush_mr(local_result));
595    if (!local_result->modified && nptr != NULL) {
596        //If we didn't do anything, give back the pointer to the original
597        mr_push_pointerinfo(nptr, dst);
598    } else {
599        //Otherwise, give back the pointers to the nodes we've created.
600        dst->modified = 1;
601        error_pass(mr_move_pointers(local_result, dst));
602    }
603cleanup:
604    if (nodebuf) {
605        free(nodebuf);
606    }
607
608    return errcode;
609}
610
611node_pointer *finish_root(couchfile_modify_request *rq,
612                                 couchfile_modify_result *root_result,
613                                 couchstore_error_t *errcode)
614{
615    node_pointer *ret_ptr = NULL;
616    couchfile_modify_result *collector = make_modres(root_result->arena, rq);
617    if (!collector) {
618        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
619        return NULL;
620    }
621    collector->modified = 1;
622    collector->node_type = KP_NODE;
623    flush_mr(root_result);
624    while (1) {
625        if (root_result->pointers_end == root_result->pointers->next) {
626            //The root result split into exactly one kp_node.
627            //Return the pointer to it.
628            ret_ptr = root_result->pointers_end->pointer;
629            break;
630        } else {
631            //The root result split into more than one kp_node.
632            //Move the pointer list to the value list and write out the new node.
633            *errcode = mr_move_pointers(root_result, collector);
634
635            if (*errcode < 0) {
636                goto cleanup;
637            }
638
639            *errcode = flush_mr(collector);
640            if (*errcode < 0) {
641                goto cleanup;
642            }
643            //Swap root_result and collector mr's.
644            couchfile_modify_result *tmp = root_result;
645            root_result = collector;
646            collector = tmp;
647        }
648    }
649cleanup:
650    return ret_ptr;
651}
652
653// Copies a node_pointer and its values to the malloc heap.
654node_pointer* copy_node_pointer(node_pointer* ptr)
655{
656    if (!ptr) {
657        return NULL;
658    }
659    node_pointer* ret_ptr;
660    ret_ptr = static_cast<node_pointer*>(malloc(sizeof(node_pointer) + ptr->key.size + ptr->reduce_value.size));
661    if (!ret_ptr) {
662        return NULL;
663    }
664    *ret_ptr = *ptr;
665    ret_ptr->key.buf = (char*)ret_ptr + sizeof(node_pointer);
666    memcpy(ret_ptr->key.buf, ptr->key.buf, ptr->key.size);
667    ret_ptr->reduce_value.buf = (char*)ret_ptr->key.buf + ptr->key.size;
668    memcpy(ret_ptr->reduce_value.buf, ptr->reduce_value.buf, ptr->reduce_value.size);
669    return ret_ptr;
670}
671
672// Finished creating a new b-tree (from compaction), build pointers and get a
673// root node.
674
675node_pointer* complete_new_btree(couchfile_modify_result* mr, couchstore_error_t *errcode)
676{
677    *errcode = flush_mr(mr);
678    if(*errcode != COUCHSTORE_SUCCESS) {
679        return NULL;
680    }
681
682    couchfile_modify_result* targ_mr = make_modres(mr->arena, mr->rq);
683    if (!targ_mr) {
684        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
685        return NULL;
686    }
687    targ_mr->modified = 1;
688    targ_mr->node_type = KP_NODE;
689
690    *errcode = mr_move_pointers(mr, targ_mr);
691    if(*errcode != COUCHSTORE_SUCCESS) {
692        return NULL;
693    }
694
695    node_pointer* ret_ptr;
696    if(targ_mr->count > 1 || targ_mr->pointers != targ_mr->pointers_end) {
697        ret_ptr = finish_root(mr->rq, targ_mr, errcode);
698    } else {
699        ret_ptr = targ_mr->values_end->pointer;
700    }
701
702    if (*errcode != COUCHSTORE_SUCCESS || ret_ptr == NULL) {
703        return NULL;
704    }
705
706    ret_ptr = copy_node_pointer(ret_ptr);
707    if (ret_ptr == NULL) {
708        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
709    }
710    return ret_ptr;
711}
712
713node_pointer *modify_btree(couchfile_modify_request *rq,
714                           node_pointer *root,
715                           couchstore_error_t *errcode)
716{
717    arena* a = new_arena(0);
718    node_pointer *ret_ptr = root;
719    couchfile_modify_result *root_result = make_modres(a, rq);
720    if (!root_result) {
721        delete_arena(a);
722        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
723        return root;
724    }
725    root_result->node_type = KP_NODE;
726    *errcode = modify_node(rq, root, 0, rq->num_actions, root_result);
727    if (*errcode < 0) {
728        delete_arena(a);
729        return NULL;
730    }
731
732    if (root_result->values_end->pointer == root) {
733        //If we got the root pointer back, remove it from the list
734        //so we don't try to free it.
735        root_result->values_end->pointer = NULL;
736    }
737
738    if (root_result->modified) {
739        if (root_result->count > 1 || root_result->pointers != root_result->pointers_end) {
740            //The root was split
741            //Write it to disk and return the pointer to it.
742            ret_ptr = finish_root(rq, root_result, errcode);
743            if (*errcode < 0) {
744                ret_ptr = NULL;
745            }
746        } else {
747            ret_ptr = root_result->values_end->pointer;
748        }
749    }
750    if (ret_ptr != root) {
751        ret_ptr = copy_node_pointer(ret_ptr);
752    }
753    delete_arena(a);
754    return ret_ptr;
755}
756
757static couchstore_error_t purge_node(couchfile_modify_request *rq,
758                                     node_pointer *nptr,
759                                     couchfile_modify_result *dst)
760{
761    char *nodebuf = NULL;  // FYI, nodebuf is a malloced block, not in the arena
762    int bufpos = 1;
763    int nodebuflen = 0;
764    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
765    couchfile_modify_result *local_result = NULL;
766
767    if (nptr == NULL) {
768        return errcode;
769    }
770
771    // noop: add back current node to destination
772    if (!rq->enable_purging) {
773        dst->modified = 1;
774        return mr_push_pointerinfo(nptr, dst);
775    }
776
777    if ((nodebuflen = pread_compressed(rq->file, nptr->pointer, (char **) &nodebuf)) < 0) {
778        error_pass(COUCHSTORE_ERROR_READ);
779    }
780
781    local_result = make_modres(dst->arena, rq);
782    error_unless(local_result, COUCHSTORE_ERROR_ALLOC_FAIL);
783
784    if (nodebuf[0] == 1) { //KV Node
785        local_result->node_type = KV_NODE;
786        while (bufpos < nodebuflen) {
787            sized_buf cmp_key, val_buf;
788            bufpos += read_kv(nodebuf + bufpos, &cmp_key, &val_buf);
789
790            errcode = maybe_purgekv(rq, &cmp_key, &val_buf, local_result);
791            if (errcode != COUCHSTORE_SUCCESS) {
792                goto cleanup;
793            }
794        }
795    } else if (nodebuf[0] == 0) { //KP Node
796        local_result->node_type = KP_NODE;
797        while (bufpos < nodebuflen) {
798            sized_buf cmp_key, val_buf;
799            bufpos += read_kv(nodebuf + bufpos, &cmp_key, &val_buf);
800
801            node_pointer *desc = read_pointer(dst->arena, &cmp_key, val_buf.buf);
802            if (!desc) {
803                errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
804                goto cleanup;
805            }
806
807            errcode = maybe_purgekp(rq, desc, local_result);
808            if (errcode != COUCHSTORE_SUCCESS) {
809                goto cleanup;
810            }
811        }
812    } else {
813        errcode = COUCHSTORE_ERROR_CORRUPT;
814        goto cleanup;
815    }
816
817    // Write out changes and add node back to parent
818    if (local_result->modified) {
819        error_pass(flush_mr(local_result));
820        dst->modified = 1;
821        error_pass(mr_move_pointers(local_result, dst));
822    }
823
824cleanup:
825    free(nodebuf);
826    return errcode;
827}
828
829node_pointer *guided_purge_btree(couchfile_modify_request *rq, node_pointer *root,
830                                                couchstore_error_t *errcode)
831{
832    rq->enable_purging = 1;
833    arena* a = new_arena(0);
834    if (!a) {
835        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
836        return NULL;
837    }
838
839    node_pointer *ret_ptr = root;
840    couchfile_modify_result *root_result = make_modres(a, rq);
841    if (!root_result) {
842        delete_arena(a);
843        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
844        return NULL;
845    }
846
847    root_result->node_type = KP_NODE;
848    *errcode = purge_node(rq, root, root_result);
849    if (*errcode < 0) {
850        delete_arena(a);
851        return NULL;
852    }
853
854    if (root_result->modified) {
855        ret_ptr = root_result->values_end->pointer;
856    }
857
858    if (ret_ptr && ret_ptr != root) {
859        ret_ptr = copy_node_pointer(ret_ptr);
860    }
861
862    delete_arena(a);
863    return ret_ptr;
864}
865