xref: /6.0.3/couchstore/src/btree_modify.cc (revision ce50e226)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "config.h"
3
4#include <platform/cb_malloc.h>
5#include <stdlib.h>
6#include <string.h>
7#include <signal.h>
8#include <platform/cbassert.h>
9
10#include "couch_btree.h"
11#include "util.h"
12#include "arena.h"
13#include "node_types.h"
14
15
16static couchstore_error_t flush_mr_partial(couchfile_modify_result *res, size_t mr_quota);
17static couchstore_error_t flush_mr(couchfile_modify_result *res);
18static couchstore_error_t purge_node(couchfile_modify_request *rq,
19                                      node_pointer *nptr,
20                                      couchfile_modify_result *dst);
21
22static couchstore_error_t maybe_flush(couchfile_modify_result *mr)
23{
24    if(mr->rq->compacting) {
25        /* The compactor can (and should), just write out nodes
26         * of size CHUNK_SIZE as soon as it can, so that it can
27         * free memory it no longer needs. Flush at least 2 KP
28         * nodes,so that we don't create one parent node for
29         * each of the large KP nodes (one node size > threshold).
30         */
31        if (mr->modified && (((mr->node_type == KV_NODE) &&
32            mr->node_len > (mr->rq->kv_chunk_threshold * 2 / 3)) ||
33            ((mr->node_type == KP_NODE) &&
34             mr->node_len > (mr->rq->kp_chunk_threshold * 2 / 3) &&
35             mr->count > 1))) {
36            return flush_mr(mr);
37        }
38    } else if (mr->modified &&  mr->count > 3) {
39        /* Don't write out a partial node unless we've collected
40         * at least three items */
41        if ((mr->node_type == KV_NODE) &&
42             mr->node_len > mr->rq->kv_chunk_threshold) {
43            return flush_mr_partial(mr, (mr->rq->kv_chunk_threshold * 2 / 3));
44        }
45        if ((mr->node_type == KP_NODE) &&
46             mr->node_len > mr->rq->kp_chunk_threshold) {
47            return flush_mr_partial(mr, (mr->rq->kp_chunk_threshold * 2 / 3));
48        }
49    }
50
51    return COUCHSTORE_SUCCESS;
52}
53
54
55static nodelist *make_nodelist(arena* a, size_t bufsize)
56{
57    nodelist *r = static_cast<nodelist*>(arena_alloc(a, sizeof(nodelist) + bufsize));
58    if (!r) {
59        return NULL;
60    }
61    memset(r, 0, sizeof(nodelist));
62    r->data.size = bufsize;
63    r->data.buf = ((char *) r) + (sizeof(nodelist));
64    return r;
65}
66
67couchfile_modify_result *make_modres(arena* a, couchfile_modify_request *rq)
68{
69    couchfile_modify_result *res;
70    res = static_cast<couchfile_modify_result*>(arena_alloc(a, sizeof(couchfile_modify_result)));
71    if (!res) {
72        return NULL;
73    }
74    res->arena = a;
75    res->arena_transient = NULL;
76    res->values = make_nodelist(a, 0);
77    if (!res->values) {
78        return NULL;
79    }
80    res->values_end = res->values;
81    res->pointers = make_nodelist(a, 0);
82    if (!res->pointers) {
83        return NULL;
84    }
85    res->pointers_end = res->pointers;
86    res->node_len = 0;
87    res->count = 0;
88    res->modified = 0;
89    res->error_state = 0;
90    res->rq = rq;
91    return res;
92}
93
94couchfile_modify_result *new_btree_modres(arena *a, arena *transient_arena, tree_file *file,
95                                          compare_info* cmp, reduce_fn reduce, reduce_fn rereduce,
96                                          void *user_reduce_ctx,
97                                          int kv_chunk_threshold, int kp_chunk_threshold)
98{
99    couchfile_modify_request* rq;
100    rq = static_cast<couchfile_modify_request*>(arena_alloc(a, sizeof(couchfile_modify_request)));
101    if (!rq) {
102        return NULL;
103    }
104    rq->cmp = *cmp;
105    rq->file = file;
106    rq->num_actions = 0;
107    rq->fetch_callback = NULL;
108    rq->reduce = reduce;
109    rq->rereduce = rereduce;
110    rq->user_reduce_ctx = user_reduce_ctx;
111    rq->compacting = 1;
112    rq->enable_purging = 0;
113    rq->purge_kp = NULL;
114    rq->purge_kv = NULL;
115    rq->kv_chunk_threshold = kv_chunk_threshold;
116    rq->kp_chunk_threshold = kp_chunk_threshold;
117
118    couchfile_modify_result* mr = make_modres(a, rq);
119    if (!mr)
120        return NULL;
121    mr->arena_transient = transient_arena;
122    mr->modified = 1;
123    mr->node_type = KV_NODE;
124
125    return mr;
126}
127
128couchstore_error_t mr_push_item(sized_buf *k, sized_buf *v, couchfile_modify_result *dst)
129{
130    nodelist *itm = NULL;
131    if(dst->arena_transient != NULL)
132    {
133        itm = make_nodelist(dst->arena_transient, 0);
134    } else {
135        itm = make_nodelist(dst->arena, 0);
136    }
137    if (!itm) {
138        return COUCHSTORE_ERROR_ALLOC_FAIL;
139    }
140    itm->key = *k;
141    itm->data = *v;
142    itm->pointer = NULL;
143    dst->values_end->next = itm;
144    dst->values_end = itm;
145    //Encoded size (see flush_mr)
146    dst->node_len += k->size + v->size + sizeof(raw_kv_length);
147    dst->count++;
148    return maybe_flush(dst);
149}
150
151static nodelist *encode_pointer(arena* a, node_pointer *ptr)
152{
153    nodelist *pel = make_nodelist(a, sizeof(raw_node_pointer) + ptr->reduce_value.size);
154    if (!pel) {
155        return NULL;
156    }
157    raw_node_pointer *raw = (raw_node_pointer*)pel->data.buf;
158    encode_raw48(ptr->pointer, &raw->pointer);
159    encode_raw48(ptr->subtreesize, &raw->subtreesize);
160    raw->reduce_value_size = encode_raw16((uint16_t)ptr->reduce_value.size);
161    memcpy(raw + 1, ptr->reduce_value.buf, ptr->reduce_value.size);
162    pel->pointer = ptr;
163    pel->key = ptr->key;
164    return pel;
165}
166
167static couchstore_error_t mr_push_pointerinfo(node_pointer *ptr,
168                                              couchfile_modify_result *dst)
169{
170    nodelist *pel = encode_pointer(dst->arena, ptr);
171    if (!pel) {
172        return COUCHSTORE_ERROR_ALLOC_FAIL;
173    }
174    dst->values_end->next = pel;
175    dst->values_end = pel;
176    dst->node_len += pel->key.size + pel->data.size + sizeof(raw_kv_length);
177    dst->count++;
178    return maybe_flush(dst);
179}
180
181node_pointer *read_pointer(arena* a, sized_buf *key, char *buf)
182{
183    //Parse KP pair into a node_pointer {K, {ptr, reduce_value, subtreesize}}
184    node_pointer *p = (node_pointer *) arena_alloc(a, sizeof(node_pointer));
185    if (!p) {
186        return NULL;
187    }
188    const raw_node_pointer *raw = (const raw_node_pointer*)buf;
189    p->pointer = decode_raw48(raw->pointer);
190    p->subtreesize = decode_raw48(raw->subtreesize);
191    p->reduce_value.size = decode_raw16(raw->reduce_value_size);
192    p->reduce_value.buf = buf + sizeof(*raw);
193    p->key = *key;
194    return p;
195}
196
197//Write the current contents of the values list to disk as a node
198//and add the resulting pointer to the pointers list.
199static couchstore_error_t flush_mr(couchfile_modify_result *res)
200{
201    return flush_mr_partial(res, res->node_len);
202}
203
204//Write a node using enough items from the values list to create a node
205//with uncompressed size of at least mr_quota
206static couchstore_error_t flush_mr_partial(couchfile_modify_result *res, size_t mr_quota)
207{
208    char *dst;
209    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
210    int itmcount = 0;
211    char *nodebuf = NULL;
212    sized_buf writebuf;
213    char reducebuf[MAX_REDUCTION_SIZE];
214    size_t reducesize = 0;
215    uint64_t subtreesize = 0;
216    cs_off_t diskpos;
217    size_t disk_size;
218    sized_buf final_key = {NULL, 0};
219
220    if (res->values_end == res->values || ! res->modified) {
221        //Empty
222        return COUCHSTORE_SUCCESS;
223    }
224
225    // nodebuf/writebuf is very short-lived and can be large, so use regular malloc heap for it:
226    nodebuf = static_cast<char*>(cb_malloc(res->node_len + 1));
227    if (!nodebuf) {
228        return COUCHSTORE_ERROR_ALLOC_FAIL;
229    }
230
231    writebuf.buf = nodebuf;
232
233    dst = nodebuf;
234    *(dst++) = (char) res->node_type;
235
236    nodelist *i = res->values->next;
237    final_key = i->key;
238    //We don't care that we've reached mr_quota if we haven't written out
239    //at least two items and we're not writing a leaf node.
240    while (i != NULL && (mr_quota > 0 || (itmcount < 2 && res->node_type == KP_NODE))) {
241        dst = static_cast<char*>(write_kv(dst, i->key, i->data));
242        if (i->pointer) {
243            subtreesize += i->pointer->subtreesize;
244        }
245        mr_quota -= i->key.size + i->data.size + sizeof(raw_kv_length);
246        final_key = i->key;
247        i = i->next;
248        res->count--;
249        itmcount++;
250    }
251
252    writebuf.size = dst - nodebuf;
253
254    errcode = static_cast<couchstore_error_t>(db_write_buf_compressed(res->rq->file, &writebuf, &diskpos, &disk_size));
255    cb_free(nodebuf);  // here endeth the nodebuf.
256    if (errcode != COUCHSTORE_SUCCESS) {
257        return errcode;
258    }
259
260    if (res->node_type == KV_NODE && res->rq->reduce) {
261        errcode = res->rq->reduce(reducebuf, &reducesize, res->values->next, itmcount, res->rq->user_reduce_ctx);
262        if (errcode != COUCHSTORE_SUCCESS) {
263            return errcode;
264        }
265        cb_assert(reducesize <= sizeof(reducebuf));
266    }
267
268    if (res->node_type == KP_NODE && res->rq->rereduce) {
269        errcode = res->rq->rereduce(reducebuf, &reducesize, res->values->next, itmcount, res->rq->user_reduce_ctx);
270        if (errcode != COUCHSTORE_SUCCESS) {
271            return errcode;
272        }
273        cb_assert(reducesize <= sizeof(reducebuf));
274    }
275
276    node_pointer *ptr = (node_pointer *) arena_alloc(res->arena, sizeof(node_pointer) + final_key.size + reducesize);
277    if (!ptr) {
278        return COUCHSTORE_ERROR_ALLOC_FAIL;
279    }
280
281    ptr->key.buf = ((char *)ptr) + sizeof(node_pointer);
282    ptr->reduce_value.buf = ((char *)ptr) + sizeof(node_pointer) + final_key.size;
283
284    ptr->key.size = final_key.size;
285    ptr->reduce_value.size = reducesize;
286
287    memcpy(ptr->key.buf, final_key.buf, final_key.size);
288    memcpy(ptr->reduce_value.buf, reducebuf, reducesize);
289
290    ptr->subtreesize = subtreesize + disk_size;
291    ptr->pointer = diskpos;
292
293    nodelist *pel = encode_pointer(res->arena, ptr);
294    if (!pel) {
295        return COUCHSTORE_ERROR_ALLOC_FAIL;
296    }
297
298    res->pointers_end->next = pel;
299    res->pointers_end = pel;
300
301    res->node_len -= (writebuf.size - 1);
302
303    res->values->next = i;
304    if(i == NULL) {
305        res->values_end = res->values;
306    }
307
308    return COUCHSTORE_SUCCESS;
309}
310
311//Move this node's pointers list to dst node's values list.
312static couchstore_error_t mr_move_pointers(couchfile_modify_result *src,
313                                           couchfile_modify_result *dst)
314{
315    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
316    if (src->pointers_end == src->pointers) {
317        return COUCHSTORE_SUCCESS;
318    }
319
320    nodelist *ptr = src->pointers->next;
321    nodelist *next = ptr;
322    while (ptr != NULL && errcode == 0) {
323        dst->node_len += ptr->data.size + ptr->key.size + sizeof(raw_kv_length);
324        dst->count++;
325
326        next = ptr->next;
327        ptr->next = NULL;
328
329        dst->values_end->next = ptr;
330        dst->values_end = ptr;
331        ptr = next;
332        error_pass(maybe_flush(dst));
333    }
334
335cleanup:
336    src->pointers->next = next;
337    src->pointers_end = src->pointers;
338    return errcode;
339}
340
341// Perform purging for a kv-node if it qualifies for purging
342static couchstore_error_t maybe_purgekv(couchfile_modify_request *rq,
343                                            sized_buf *key,
344                                            sized_buf *val,
345                                            couchfile_modify_result *result)
346{
347    int action;
348    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
349
350    if (rq->enable_purging && rq->purge_kv) {
351        action = rq->purge_kv(key, val, rq->guided_purge_ctx);
352        if (action < 0) {
353            return (couchstore_error_t) action;
354        }
355    } else {
356        action = PURGE_KEEP;
357    }
358
359    switch (action) {
360    case PURGE_ITEM:
361        result->modified = 1;
362        break;
363
364    case PURGE_STOP:
365        rq->enable_purging = 0;
366
367    case PURGE_KEEP:
368        errcode = mr_push_item(key, val, result);
369        break;
370    }
371
372    return errcode;
373}
374
375// Perform purging for a kp-node if it qualifies for purging
376static couchstore_error_t maybe_purgekp(couchfile_modify_request *rq, node_pointer *node,
377                                        couchfile_modify_result *result)
378{
379    int action;
380    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
381
382    if (rq->enable_purging && rq->purge_kp) {
383        action = rq->purge_kp(node, rq->guided_purge_ctx);
384        if (action < 0) {
385            return (couchstore_error_t) action;
386        }
387    } else {
388        action = PURGE_KEEP;
389    }
390
391    switch (action) {
392    case PURGE_ITEM:
393        result->modified = 1;
394        break;
395
396    case PURGE_PARTIAL:
397        errcode = purge_node(rq, node, result);
398        break;
399
400    case PURGE_STOP:
401        rq->enable_purging = 0;
402
403    case PURGE_KEEP:
404        errcode = mr_push_pointerinfo(node, result);
405        break;
406    }
407
408    return errcode;
409}
410
411static couchstore_error_t modify_node(couchfile_modify_request *rq,
412                                      node_pointer *nptr,
413                                      int start, int end,
414                                      couchfile_modify_result *dst)
415{
416    char *nodebuf = NULL;  // FYI, nodebuf is a malloced block, not in the arena
417    int bufpos = 1;
418    int nodebuflen = 0;
419    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
420    couchfile_modify_result *local_result = NULL;
421
422    if (start == end) {
423        return COUCHSTORE_SUCCESS;
424    }
425
426    if (nptr) {
427        if ((nodebuflen = pread_compressed(rq->file, nptr->pointer, (char **) &nodebuf)) < 0) {
428            error_pass(static_cast<couchstore_error_t>(nodebuflen));
429        }
430    }
431
432    local_result = make_modres(dst->arena, rq);
433    error_unless(local_result, COUCHSTORE_ERROR_ALLOC_FAIL);
434
435    if (nptr == NULL || nodebuf[0] == 1) { //KV Node
436        local_result->node_type = KV_NODE;
437        while (bufpos < nodebuflen) {
438            sized_buf cmp_key, val_buf;
439            bufpos += read_kv(nodebuf + bufpos, &cmp_key, &val_buf);
440            int advance = 0;
441            while (!advance && start < end) {
442                advance = 1;
443                int cmp_val = rq->cmp.compare(&cmp_key, rq->actions[start].key);
444
445                if (cmp_val < 0) { //Key less than action key
446                    errcode = maybe_purgekv(rq, &cmp_key, &val_buf, local_result);
447                    if (errcode != COUCHSTORE_SUCCESS) {
448                        goto cleanup;
449                    }
450                } else if (cmp_val > 0) { //Key greater than action key
451                    switch (rq->actions[start].type) {
452                    case ACTION_INSERT:
453                        local_result->modified = 1;
454                        mr_push_item(rq->actions[start].key, rq->actions[start].value.data, local_result);
455                        break;
456
457                    case ACTION_REMOVE:
458                        local_result->modified = 1;
459                        break;
460
461                    case ACTION_FETCH:
462                        if (rq->fetch_callback) {
463                            //not found
464                            (*rq->fetch_callback)(rq, rq->actions[start].key, NULL, rq->actions[start].value.arg);
465                        }
466                    }
467                    start++;
468                    //Do next action on same item in the node, as our action was
469                    //not >= it.
470                    advance = 0;
471                } else if (cmp_val == 0) { //Node key is equal to action key
472                    switch (rq->actions[start].type) {
473                    case ACTION_INSERT:
474                        local_result->modified = 1;
475                        mr_push_item(rq->actions[start].key, rq->actions[start].value.data, local_result);
476                        break;
477
478                    case ACTION_REMOVE:
479                        local_result->modified = 1;
480                        break;
481
482                    case ACTION_FETCH:
483                        if (rq->fetch_callback) {
484                            (*rq->fetch_callback)(rq, rq->actions[start].key, &val_buf, rq->actions[start].value.arg);
485                        }
486                        //Do next action on same item in the node, as our action was a fetch
487                        //and there may be an equivalent insert or remove
488                        //following.
489                        advance = 0;
490                    }
491                    start++;
492                }
493            }
494            if (start == end && !advance) {
495                //If we've exhausted actions then just keep this key
496                errcode = maybe_purgekv(rq, &cmp_key, &val_buf, local_result);
497                if (errcode != COUCHSTORE_SUCCESS) {
498                    goto cleanup;
499                }
500            }
501        }
502        while (start < end) {
503            //We're at the end of a leaf node.
504            switch (rq->actions[start].type) {
505            case ACTION_INSERT:
506                local_result->modified = 1;
507                mr_push_item(rq->actions[start].key, rq->actions[start].value.data, local_result);
508                break;
509
510            case ACTION_REMOVE:
511                local_result->modified = 1;
512                break;
513
514            case ACTION_FETCH:
515                if (rq->fetch_callback) {
516                    //not found
517                    (*rq->fetch_callback)(rq, rq->actions[start].key, NULL, rq->actions[start].value.arg);
518                }
519                break;
520            }
521            start++;
522        }
523    } else if (nodebuf[0] == 0) { //KP Node
524        local_result->node_type = KP_NODE;
525        while (bufpos < nodebuflen && start < end) {
526            sized_buf cmp_key, val_buf;
527            bufpos += read_kv(nodebuf + bufpos, &cmp_key, &val_buf);
528            int cmp_val = rq->cmp.compare(&cmp_key, rq->actions[start].key);
529            if (bufpos == nodebuflen) {
530                //We're at the last item in the kpnode, must apply all our
531                //actions here.
532                node_pointer *desc = read_pointer(dst->arena, &cmp_key, val_buf.buf);
533                if (!desc) {
534                    errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
535                    goto cleanup;
536                }
537
538                errcode = modify_node(rq, desc, start, end, local_result);
539                if (errcode != COUCHSTORE_SUCCESS) {
540                    goto cleanup;
541                }
542                break;
543            }
544
545            if (cmp_val < 0) {
546                //Key in node item less than action item and not at end
547                //position, so just add it and continue.
548                node_pointer *add = read_pointer(dst->arena, &cmp_key, val_buf.buf);
549                if (!add) {
550                    errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
551                    goto cleanup;
552                }
553
554                errcode = maybe_purgekp(rq, add, local_result);
555                if (errcode != COUCHSTORE_SUCCESS) {
556                    goto cleanup;
557                }
558            } else if (cmp_val >= 0) {
559                //Found a key in the node greater than the one in the current
560                //action. Descend into the pointed node with as many actions as
561                //are less than the key here.
562                int range_end = start;
563                while (range_end < end &&
564                        rq->cmp.compare(rq->actions[range_end].key, &cmp_key) <= 0) {
565                    range_end++;
566                }
567
568                node_pointer *desc = read_pointer(dst->arena, &cmp_key, val_buf.buf);
569                if (!desc) {
570                    errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
571                    goto cleanup;
572                }
573
574                errcode = modify_node(rq, desc, start, range_end, local_result);
575                start = range_end;
576                if (errcode != COUCHSTORE_SUCCESS) {
577                    goto cleanup;
578                }
579            }
580        }
581        while (bufpos < nodebuflen) {
582            sized_buf cmp_key, val_buf;
583            bufpos += read_kv(nodebuf + bufpos, &cmp_key, &val_buf);
584            node_pointer *add = read_pointer(dst->arena, &cmp_key, val_buf.buf);
585            if (!add) {
586                errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
587                goto cleanup;
588            }
589
590            errcode = maybe_purgekp(rq, add, local_result);
591            if (errcode != COUCHSTORE_SUCCESS) {
592                goto cleanup;
593            }
594        }
595    } else {
596        errcode = COUCHSTORE_ERROR_CORRUPT;
597        goto cleanup;
598    }
599    //If we've done modifications, write out the last leaf node.
600    error_pass(flush_mr(local_result));
601    if (!local_result->modified && nptr != NULL) {
602        //If we didn't do anything, give back the pointer to the original
603        mr_push_pointerinfo(nptr, dst);
604    } else {
605        //Otherwise, give back the pointers to the nodes we've created.
606        dst->modified = 1;
607        error_pass(mr_move_pointers(local_result, dst));
608    }
609cleanup:
610    if (nodebuf) {
611        cb_free(nodebuf);
612    }
613
614    return errcode;
615}
616
617node_pointer *finish_root(couchfile_modify_request *rq,
618                                 couchfile_modify_result *root_result,
619                                 couchstore_error_t *errcode)
620{
621    node_pointer *ret_ptr = NULL;
622    couchfile_modify_result *collector = make_modres(root_result->arena, rq);
623    if (!collector) {
624        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
625        return NULL;
626    }
627    collector->modified = 1;
628    collector->node_type = KP_NODE;
629    flush_mr(root_result);
630    while (1) {
631        if (root_result->pointers_end == root_result->pointers->next) {
632            //The root result split into exactly one kp_node.
633            //Return the pointer to it.
634            ret_ptr = root_result->pointers_end->pointer;
635            break;
636        } else {
637            //The root result split into more than one kp_node.
638            //Move the pointer list to the value list and write out the new node.
639            *errcode = mr_move_pointers(root_result, collector);
640
641            if (*errcode < 0) {
642                goto cleanup;
643            }
644
645            *errcode = flush_mr(collector);
646            if (*errcode < 0) {
647                goto cleanup;
648            }
649            //Swap root_result and collector mr's.
650            couchfile_modify_result *tmp = root_result;
651            root_result = collector;
652            collector = tmp;
653        }
654    }
655cleanup:
656    return ret_ptr;
657}
658
659// Copies a node_pointer and its values to the malloc heap.
660node_pointer* copy_node_pointer(node_pointer* ptr)
661{
662    if (!ptr) {
663        return NULL;
664    }
665    node_pointer* ret_ptr;
666    ret_ptr = static_cast<node_pointer*>(cb_malloc(sizeof(node_pointer) + ptr->key.size + ptr->reduce_value.size));
667    if (!ret_ptr) {
668        return NULL;
669    }
670    *ret_ptr = *ptr;
671    ret_ptr->key.buf = (char*)ret_ptr + sizeof(node_pointer);
672    memcpy(ret_ptr->key.buf, ptr->key.buf, ptr->key.size);
673    ret_ptr->reduce_value.buf = (char*)ret_ptr->key.buf + ptr->key.size;
674    memcpy(ret_ptr->reduce_value.buf, ptr->reduce_value.buf, ptr->reduce_value.size);
675    return ret_ptr;
676}
677
678// Finished creating a new b-tree (from compaction), build pointers and get a
679// root node.
680
681node_pointer* complete_new_btree(couchfile_modify_result* mr, couchstore_error_t *errcode)
682{
683    *errcode = flush_mr(mr);
684    if(*errcode != COUCHSTORE_SUCCESS) {
685        return NULL;
686    }
687
688    couchfile_modify_result* targ_mr = make_modres(mr->arena, mr->rq);
689    if (!targ_mr) {
690        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
691        return NULL;
692    }
693    targ_mr->modified = 1;
694    targ_mr->node_type = KP_NODE;
695
696    *errcode = mr_move_pointers(mr, targ_mr);
697    if(*errcode != COUCHSTORE_SUCCESS) {
698        return NULL;
699    }
700
701    node_pointer* ret_ptr;
702    if(targ_mr->count > 1 || targ_mr->pointers != targ_mr->pointers_end) {
703        ret_ptr = finish_root(mr->rq, targ_mr, errcode);
704    } else {
705        ret_ptr = targ_mr->values_end->pointer;
706    }
707
708    if (*errcode != COUCHSTORE_SUCCESS || ret_ptr == NULL) {
709        return NULL;
710    }
711
712    ret_ptr = copy_node_pointer(ret_ptr);
713    if (ret_ptr == NULL) {
714        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
715    }
716    return ret_ptr;
717}
718
719node_pointer *modify_btree(couchfile_modify_request *rq,
720                           node_pointer *root,
721                           couchstore_error_t *errcode)
722{
723    arena* a = new_arena(0);
724    if (!a) {
725        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
726        return NULL;
727    }
728    node_pointer *ret_ptr = root;
729    couchfile_modify_result *root_result = make_modres(a, rq);
730    if (!root_result) {
731        delete_arena(a);
732        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
733        return root;
734    }
735    root_result->node_type = KP_NODE;
736    *errcode = modify_node(rq, root, 0, rq->num_actions, root_result);
737    if (*errcode < 0) {
738        delete_arena(a);
739        return NULL;
740    }
741
742    if (root_result->values_end->pointer == root) {
743        //If we got the root pointer back, remove it from the list
744        //so we don't try to free it.
745        root_result->values_end->pointer = NULL;
746    }
747
748    if (root_result->modified) {
749        if (root_result->count > 1 || root_result->pointers != root_result->pointers_end) {
750            //The root was split
751            //Write it to disk and return the pointer to it.
752            ret_ptr = finish_root(rq, root_result, errcode);
753            if (*errcode < 0) {
754                ret_ptr = NULL;
755            }
756        } else {
757            ret_ptr = root_result->values_end->pointer;
758        }
759    }
760    if (ret_ptr != root) {
761        ret_ptr = copy_node_pointer(ret_ptr);
762    }
763    delete_arena(a);
764    return ret_ptr;
765}
766
767static couchstore_error_t purge_node(couchfile_modify_request *rq,
768                                     node_pointer *nptr,
769                                     couchfile_modify_result *dst)
770{
771    char *nodebuf = NULL;  // FYI, nodebuf is a malloced block, not in the arena
772    int bufpos = 1;
773    int nodebuflen = 0;
774    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
775    couchfile_modify_result *local_result = NULL;
776
777    if (nptr == NULL) {
778        return errcode;
779    }
780
781    // noop: add back current node to destination
782    if (!rq->enable_purging) {
783        dst->modified = 1;
784        return mr_push_pointerinfo(nptr, dst);
785    }
786
787    if ((nodebuflen = pread_compressed(rq->file, nptr->pointer, (char **) &nodebuf)) < 0) {
788        error_pass(static_cast<couchstore_error_t>(nodebuflen));
789    }
790
791    local_result = make_modres(dst->arena, rq);
792    error_unless(local_result, COUCHSTORE_ERROR_ALLOC_FAIL);
793
794    if (nodebuf[0] == 1) { //KV Node
795        local_result->node_type = KV_NODE;
796        while (bufpos < nodebuflen) {
797            sized_buf cmp_key, val_buf;
798            bufpos += read_kv(nodebuf + bufpos, &cmp_key, &val_buf);
799
800            errcode = maybe_purgekv(rq, &cmp_key, &val_buf, local_result);
801            if (errcode != COUCHSTORE_SUCCESS) {
802                goto cleanup;
803            }
804        }
805    } else if (nodebuf[0] == 0) { //KP Node
806        local_result->node_type = KP_NODE;
807        while (bufpos < nodebuflen) {
808            sized_buf cmp_key, val_buf;
809            bufpos += read_kv(nodebuf + bufpos, &cmp_key, &val_buf);
810
811            node_pointer *desc = read_pointer(dst->arena, &cmp_key, val_buf.buf);
812            if (!desc) {
813                errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
814                goto cleanup;
815            }
816
817            errcode = maybe_purgekp(rq, desc, local_result);
818            if (errcode != COUCHSTORE_SUCCESS) {
819                goto cleanup;
820            }
821        }
822    } else {
823        errcode = COUCHSTORE_ERROR_CORRUPT;
824        goto cleanup;
825    }
826
827    // Write out changes and add node back to parent
828    if (local_result->modified) {
829        error_pass(flush_mr(local_result));
830        dst->modified = 1;
831        error_pass(mr_move_pointers(local_result, dst));
832    }
833
834cleanup:
835    cb_free(nodebuf);
836    return errcode;
837}
838
839node_pointer *guided_purge_btree(couchfile_modify_request *rq, node_pointer *root,
840                                                couchstore_error_t *errcode)
841{
842    rq->enable_purging = 1;
843    arena* a = new_arena(0);
844    if (!a) {
845        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
846        return NULL;
847    }
848
849    node_pointer *ret_ptr = root;
850    couchfile_modify_result *root_result = make_modres(a, rq);
851    if (!root_result) {
852        delete_arena(a);
853        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
854        return NULL;
855    }
856
857    root_result->node_type = KP_NODE;
858    *errcode = purge_node(rq, root, root_result);
859    if (*errcode < 0) {
860        delete_arena(a);
861        return NULL;
862    }
863
864    if (root_result->modified) {
865        ret_ptr = root_result->values_end->pointer;
866    }
867
868    if (ret_ptr && ret_ptr != root) {
869        ret_ptr = copy_node_pointer(ret_ptr);
870    }
871
872    delete_arena(a);
873    return ret_ptr;
874}
875