xref: /3.0.2-MP2/couchstore/src/btree_modify.cc (revision 744ff400)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "config.h"
3#include <assert.h>
4#include <stdlib.h>
5#include <string.h>
6#include <signal.h>
7
8#include "couch_btree.h"
9#include "util.h"
10#include "arena.h"
11#include "node_types.h"
12
13
14static couchstore_error_t flush_mr_partial(couchfile_modify_result *res, size_t mr_quota);
15static couchstore_error_t flush_mr(couchfile_modify_result *res);
16static couchstore_error_t purge_node(couchfile_modify_request *rq,
17                                      node_pointer *nptr,
18                                      couchfile_modify_result *dst);
19
20static couchstore_error_t maybe_flush(couchfile_modify_result *mr)
21{
22    if(mr->rq->compacting) {
23        /* The compactor can (and should), just write out nodes
24         * of size CHUNK_SIZE as soon as it can, so that it can
25         * free memory it no longer needs. */
26        if (mr->modified && (((mr->node_type == KV_NODE) &&
27            mr->node_len > (mr->rq->kv_chunk_threshold * 2 / 3)) ||
28            ((mr->node_type == KP_NODE) &&
29             mr->node_len > (mr->rq->kp_chunk_threshold * 2 / 3)))) {
30            return flush_mr(mr);
31        }
32    } else if (mr->modified &&  mr->count > 3) {
33        /* Don't write out a partial node unless we've collected
34         * at least three items */
35        if ((mr->node_type == KV_NODE) &&
36             mr->node_len > mr->rq->kv_chunk_threshold) {
37            return flush_mr_partial(mr, (mr->rq->kv_chunk_threshold * 2 / 3));
38        }
39        if ((mr->node_type == KP_NODE) &&
40             mr->node_len > mr->rq->kp_chunk_threshold) {
41            return flush_mr_partial(mr, (mr->rq->kp_chunk_threshold * 2 / 3));
42        }
43    }
44
45    return COUCHSTORE_SUCCESS;
46}
47
48
49static nodelist *make_nodelist(arena* a, size_t bufsize)
50{
51    nodelist *r = static_cast<nodelist*>(arena_alloc(a, sizeof(nodelist) + bufsize));
52    if (!r) {
53        return NULL;
54    }
55    memset(r, 0, sizeof(nodelist));
56    r->data.size = bufsize;
57    r->data.buf = ((char *) r) + (sizeof(nodelist));
58    return r;
59}
60
61static couchfile_modify_result *make_modres(arena* a, couchfile_modify_request *rq)
62{
63    couchfile_modify_result *res;
64    res = static_cast<couchfile_modify_result*>(arena_alloc(a, sizeof(couchfile_modify_result)));
65    if (!res) {
66        return NULL;
67    }
68    res->arena = a;
69    res->arena_transient = NULL;
70    res->values = make_nodelist(a, 0);
71    if (!res->values) {
72        return NULL;
73    }
74    res->values_end = res->values;
75    res->pointers = make_nodelist(a, 0);
76    if (!res->pointers) {
77        return NULL;
78    }
79    res->pointers_end = res->pointers;
80    res->node_len = 0;
81    res->count = 0;
82    res->modified = 0;
83    res->error_state = 0;
84    res->rq = rq;
85    return res;
86}
87
88couchfile_modify_result *new_btree_modres(arena *a, arena *transient_arena, tree_file *file,
89                                          compare_info* cmp, reduce_fn reduce, reduce_fn rereduce,
90                                          void *user_reduce_ctx,
91                                          int kv_chunk_threshold, int kp_chunk_threshold)
92{
93    couchfile_modify_request* rq;
94    rq = static_cast<couchfile_modify_request*>(arena_alloc(a, sizeof(couchfile_modify_request)));
95    rq->cmp = *cmp;
96    rq->file = file;
97    rq->num_actions = 0;
98    rq->fetch_callback = NULL;
99    rq->reduce = reduce;
100    rq->rereduce = rereduce;
101    rq->user_reduce_ctx = user_reduce_ctx;
102    rq->compacting = 1;
103    rq->enable_purging = 0;
104    rq->purge_kp = NULL;
105    rq->purge_kv = NULL;
106    rq->kv_chunk_threshold = kv_chunk_threshold;
107    rq->kp_chunk_threshold = kp_chunk_threshold;
108
109    couchfile_modify_result* mr = make_modres(a, rq);
110    if (!mr)
111        return NULL;
112    mr->arena_transient = transient_arena;
113    mr->modified = 1;
114    mr->node_type = KV_NODE;
115
116    return mr;
117}
118
119couchstore_error_t mr_push_item(sized_buf *k, sized_buf *v, couchfile_modify_result *dst)
120{
121    nodelist *itm = NULL;
122    if(dst->arena_transient != NULL)
123    {
124        itm = make_nodelist(dst->arena_transient, 0);
125    } else {
126        itm = make_nodelist(dst->arena, 0);
127    }
128    if (!itm) {
129        return COUCHSTORE_ERROR_ALLOC_FAIL;
130    }
131    itm->key = *k;
132    itm->data = *v;
133    itm->pointer = NULL;
134    dst->values_end->next = itm;
135    dst->values_end = itm;
136    //Encoded size (see flush_mr)
137    dst->node_len += k->size + v->size + sizeof(raw_kv_length);
138    dst->count++;
139    return maybe_flush(dst);
140}
141
142static nodelist *encode_pointer(arena* a, node_pointer *ptr)
143{
144    nodelist *pel = make_nodelist(a, sizeof(raw_node_pointer) + ptr->reduce_value.size);
145    if (!pel) {
146        return NULL;
147    }
148    raw_node_pointer *raw = (raw_node_pointer*)pel->data.buf;
149    encode_raw48(ptr->pointer, &raw->pointer);
150    encode_raw48(ptr->subtreesize, &raw->subtreesize);
151    raw->reduce_value_size = encode_raw16((uint16_t)ptr->reduce_value.size);
152    memcpy(raw + 1, ptr->reduce_value.buf, ptr->reduce_value.size);
153    pel->pointer = ptr;
154    pel->key = ptr->key;
155    return pel;
156}
157
158static couchstore_error_t mr_push_pointerinfo(node_pointer *ptr,
159                                              couchfile_modify_result *dst)
160{
161    nodelist *pel = encode_pointer(dst->arena, ptr);
162    if (!pel) {
163        return COUCHSTORE_ERROR_ALLOC_FAIL;
164    }
165    dst->values_end->next = pel;
166    dst->values_end = pel;
167    dst->node_len += pel->key.size + pel->data.size + sizeof(raw_kv_length);
168    dst->count++;
169    return maybe_flush(dst);
170}
171
172node_pointer *read_pointer(arena* a, sized_buf *key, char *buf)
173{
174    //Parse KP pair into a node_pointer {K, {ptr, reduce_value, subtreesize}}
175    node_pointer *p = (node_pointer *) arena_alloc(a, sizeof(node_pointer));
176    if (!p) {
177        return NULL;
178    }
179    const raw_node_pointer *raw = (const raw_node_pointer*)buf;
180    p->pointer = decode_raw48(raw->pointer);
181    p->subtreesize = decode_raw48(raw->subtreesize);
182    p->reduce_value.size = decode_raw16(raw->reduce_value_size);
183    p->reduce_value.buf = buf + sizeof(*raw);
184    p->key = *key;
185    return p;
186}
187
188//Write the current contents of the values list to disk as a node
189//and add the resulting pointer to the pointers list.
190static couchstore_error_t flush_mr(couchfile_modify_result *res)
191{
192    return flush_mr_partial(res, res->node_len);
193}
194
195//Write a node using enough items from the values list to create a node
196//with uncompressed size of at least mr_quota
197static couchstore_error_t flush_mr_partial(couchfile_modify_result *res, size_t mr_quota)
198{
199    char *dst;
200    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
201    int itmcount = 0;
202    char *nodebuf = NULL;
203    sized_buf writebuf;
204    char reducebuf[MAX_REDUCTION_SIZE];
205    size_t reducesize = 0;
206    uint64_t subtreesize = 0;
207    cs_off_t diskpos;
208    size_t disk_size;
209    sized_buf final_key = {NULL, 0};
210
211    if (res->values_end == res->values || ! res->modified) {
212        //Empty
213        return COUCHSTORE_SUCCESS;
214    }
215
216    // nodebuf/writebuf is very short-lived and can be large, so use regular malloc heap for it:
217    nodebuf = static_cast<char*>(malloc(res->node_len + 1));
218    if (!nodebuf) {
219        return COUCHSTORE_ERROR_ALLOC_FAIL;
220    }
221
222    writebuf.buf = nodebuf;
223
224    dst = nodebuf;
225    *(dst++) = (char) res->node_type;
226
227    nodelist *i = res->values->next;
228    //We don't care that we've reached mr_quota if we haven't written out
229    //at least two items and we're not writing a leaf node.
230    while (i != NULL && (mr_quota > 0 || (itmcount < 2 && res->node_type == KP_NODE))) {
231        dst = static_cast<char*>(write_kv(dst, i->key, i->data));
232        if (i->pointer) {
233            subtreesize += i->pointer->subtreesize;
234        }
235        mr_quota -= i->key.size + i->data.size + sizeof(raw_kv_length);
236        final_key = i->key;
237        i = i->next;
238        res->count--;
239        itmcount++;
240    }
241
242    writebuf.size = dst - nodebuf;
243
244    errcode = static_cast<couchstore_error_t>(db_write_buf_compressed(res->rq->file, &writebuf, &diskpos, &disk_size));
245    free(nodebuf);  // here endeth the nodebuf.
246    if (errcode != COUCHSTORE_SUCCESS) {
247        return errcode;
248    }
249
250    if (res->node_type == KV_NODE && res->rq->reduce) {
251        errcode = res->rq->reduce(reducebuf, &reducesize, res->values->next, itmcount, res->rq->user_reduce_ctx);
252        if (errcode != COUCHSTORE_SUCCESS) {
253            return errcode;
254        }
255        assert(reducesize <= sizeof(reducebuf));
256    }
257
258    if (res->node_type == KP_NODE && res->rq->rereduce) {
259        errcode = res->rq->rereduce(reducebuf, &reducesize, res->values->next, itmcount, res->rq->user_reduce_ctx);
260        if (errcode != COUCHSTORE_SUCCESS) {
261            return errcode;
262        }
263        assert(reducesize <= sizeof(reducebuf));
264    }
265
266    node_pointer *ptr = (node_pointer *) arena_alloc(res->arena, sizeof(node_pointer) + final_key.size + reducesize);
267    if (!ptr) {
268        return COUCHSTORE_ERROR_ALLOC_FAIL;
269    }
270
271    ptr->key.buf = ((char *)ptr) + sizeof(node_pointer);
272    ptr->reduce_value.buf = ((char *)ptr) + sizeof(node_pointer) + final_key.size;
273
274    ptr->key.size = final_key.size;
275    ptr->reduce_value.size = reducesize;
276
277    memcpy(ptr->key.buf, final_key.buf, final_key.size);
278    memcpy(ptr->reduce_value.buf, reducebuf, reducesize);
279
280    ptr->subtreesize = subtreesize + disk_size;
281    ptr->pointer = diskpos;
282
283    nodelist *pel = encode_pointer(res->arena, ptr);
284    if (!pel) {
285        return COUCHSTORE_ERROR_ALLOC_FAIL;
286    }
287
288    res->pointers_end->next = pel;
289    res->pointers_end = pel;
290
291    res->node_len -= (writebuf.size - 1);
292
293    res->values->next = i;
294    if(i == NULL) {
295        res->values_end = res->values;
296    }
297
298    return COUCHSTORE_SUCCESS;
299}
300
301//Move this node's pointers list to dst node's values list.
302static couchstore_error_t mr_move_pointers(couchfile_modify_result *src,
303                                           couchfile_modify_result *dst)
304{
305    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
306    if (src->pointers_end == src->pointers) {
307        return COUCHSTORE_SUCCESS;
308    }
309
310    nodelist *ptr = src->pointers->next;
311    nodelist *next = ptr;
312    while (ptr != NULL && errcode == 0) {
313        dst->node_len += ptr->data.size + ptr->key.size + sizeof(raw_kv_length);
314        dst->count++;
315
316        next = ptr->next;
317        ptr->next = NULL;
318
319        dst->values_end->next = ptr;
320        dst->values_end = ptr;
321        ptr = next;
322        error_pass(maybe_flush(dst));
323    }
324
325cleanup:
326    src->pointers->next = next;
327    src->pointers_end = src->pointers;
328    return errcode;
329}
330
331// Perform purging for a kv-node if it qualifies for purging
332static couchstore_error_t maybe_purgekv(couchfile_modify_request *rq,
333                                            sized_buf *key,
334                                            sized_buf *val,
335                                            couchfile_modify_result *result)
336{
337    int action;
338    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
339
340    if (rq->enable_purging && rq->purge_kv) {
341        action = rq->purge_kv(key, val, rq->guided_purge_ctx);
342        if (action < 0) {
343            return (couchstore_error_t) action;
344        }
345    } else {
346        action = PURGE_KEEP;
347    }
348
349    switch (action) {
350    case PURGE_ITEM:
351        result->modified = 1;
352        break;
353
354    case PURGE_STOP:
355        rq->enable_purging = 0;
356
357    case PURGE_KEEP:
358        errcode = mr_push_item(key, val, result);
359        break;
360    }
361
362    return errcode;
363}
364
365// Perform purging for a kp-node if it qualifies for purging
366static couchstore_error_t maybe_purgekp(couchfile_modify_request *rq, node_pointer *node,
367                                        couchfile_modify_result *result)
368{
369    int action;
370    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
371
372    if (rq->enable_purging && rq->purge_kp) {
373        action = rq->purge_kp(node, rq->guided_purge_ctx);
374        if (action < 0) {
375            return (couchstore_error_t) action;
376        }
377    } else {
378        action = PURGE_KEEP;
379    }
380
381    switch (action) {
382    case PURGE_ITEM:
383        result->modified = 1;
384        break;
385
386    case PURGE_PARTIAL:
387        errcode = purge_node(rq, node, result);
388        break;
389
390    case PURGE_STOP:
391        rq->enable_purging = 0;
392
393    case PURGE_KEEP:
394        errcode = mr_push_pointerinfo(node, result);
395        break;
396    }
397
398    return errcode;
399}
400
401static couchstore_error_t modify_node(couchfile_modify_request *rq,
402                                      node_pointer *nptr,
403                                      int start, int end,
404                                      couchfile_modify_result *dst)
405{
406    char *nodebuf = NULL;  // FYI, nodebuf is a malloced block, not in the arena
407    int bufpos = 1;
408    int nodebuflen = 0;
409    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
410    couchfile_modify_result *local_result = NULL;
411
412    if (start == end) {
413        return COUCHSTORE_SUCCESS;
414    }
415
416    if (nptr) {
417        if ((nodebuflen = pread_compressed(rq->file, nptr->pointer, (char **) &nodebuf)) < 0) {
418            error_pass(COUCHSTORE_ERROR_READ);
419        }
420    }
421
422    local_result = make_modres(dst->arena, rq);
423    error_unless(local_result, COUCHSTORE_ERROR_ALLOC_FAIL);
424
425    if (nptr == NULL || nodebuf[0] == 1) { //KV Node
426        local_result->node_type = KV_NODE;
427        while (bufpos < nodebuflen) {
428            sized_buf cmp_key, val_buf;
429            bufpos += read_kv(nodebuf + bufpos, &cmp_key, &val_buf);
430            int advance = 0;
431            while (!advance && start < end) {
432                advance = 1;
433                int cmp_val = rq->cmp.compare(&cmp_key, rq->actions[start].key);
434
435                if (cmp_val < 0) { //Key less than action key
436                    errcode = maybe_purgekv(rq, &cmp_key, &val_buf, local_result);
437                    if (errcode != COUCHSTORE_SUCCESS) {
438                        goto cleanup;
439                    }
440                } else if (cmp_val > 0) { //Key greater than action key
441                    switch (rq->actions[start].type) {
442                    case ACTION_INSERT:
443                        local_result->modified = 1;
444                        mr_push_item(rq->actions[start].key, rq->actions[start].value.data, local_result);
445                        break;
446
447                    case ACTION_REMOVE:
448                        local_result->modified = 1;
449                        break;
450
451                    case ACTION_FETCH:
452                        if (rq->fetch_callback) {
453                            //not found
454                            (*rq->fetch_callback)(rq, rq->actions[start].key, NULL, rq->actions[start].value.arg);
455                        }
456                    }
457                    start++;
458                    //Do next action on same item in the node, as our action was
459                    //not >= it.
460                    advance = 0;
461                } else if (cmp_val == 0) { //Node key is equal to action key
462                    switch (rq->actions[start].type) {
463                    case ACTION_INSERT:
464                        local_result->modified = 1;
465                        mr_push_item(rq->actions[start].key, rq->actions[start].value.data, local_result);
466                        break;
467
468                    case ACTION_REMOVE:
469                        local_result->modified = 1;
470                        break;
471
472                    case ACTION_FETCH:
473                        if (rq->fetch_callback) {
474                            (*rq->fetch_callback)(rq, rq->actions[start].key, &val_buf, rq->actions[start].value.arg);
475                        }
476                        //Do next action on same item in the node, as our action was a fetch
477                        //and there may be an equivalent insert or remove
478                        //following.
479                        advance = 0;
480                    }
481                    start++;
482                }
483            }
484            if (start == end && !advance) {
485                //If we've exhausted actions then just keep this key
486                errcode = maybe_purgekv(rq, &cmp_key, &val_buf, local_result);
487                if (errcode != COUCHSTORE_SUCCESS) {
488                    goto cleanup;
489                }
490            }
491        }
492        while (start < end) {
493            //We're at the end of a leaf node.
494            switch (rq->actions[start].type) {
495            case ACTION_INSERT:
496                local_result->modified = 1;
497                mr_push_item(rq->actions[start].key, rq->actions[start].value.data, local_result);
498                break;
499
500            case ACTION_REMOVE:
501                local_result->modified = 1;
502                break;
503
504            case ACTION_FETCH:
505                if (rq->fetch_callback) {
506                    //not found
507                    (*rq->fetch_callback)(rq, rq->actions[start].key, NULL, rq->actions[start].value.arg);
508                }
509                break;
510            }
511            start++;
512        }
513    } else if (nodebuf[0] == 0) { //KP Node
514        local_result->node_type = KP_NODE;
515        while (bufpos < nodebuflen && start < end) {
516            sized_buf cmp_key, val_buf;
517            bufpos += read_kv(nodebuf + bufpos, &cmp_key, &val_buf);
518            int cmp_val = rq->cmp.compare(&cmp_key, rq->actions[start].key);
519            if (bufpos == nodebuflen) {
520                //We're at the last item in the kpnode, must apply all our
521                //actions here.
522                node_pointer *desc = read_pointer(dst->arena, &cmp_key, val_buf.buf);
523                if (!desc) {
524                    errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
525                    goto cleanup;
526                }
527
528                errcode = modify_node(rq, desc, start, end, local_result);
529                if (errcode != COUCHSTORE_SUCCESS) {
530                    goto cleanup;
531                }
532                break;
533            }
534
535            if (cmp_val < 0) {
536                //Key in node item less than action item and not at end
537                //position, so just add it and continue.
538                node_pointer *add = read_pointer(dst->arena, &cmp_key, val_buf.buf);
539                if (!add) {
540                    errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
541                    goto cleanup;
542                }
543
544                errcode = maybe_purgekp(rq, add, local_result);
545                if (errcode != COUCHSTORE_SUCCESS) {
546                    goto cleanup;
547                }
548            } else if (cmp_val >= 0) {
549                //Found a key in the node greater than the one in the current
550                //action. Descend into the pointed node with as many actions as
551                //are less than the key here.
552                int range_end = start;
553                while (range_end < end &&
554                        rq->cmp.compare(rq->actions[range_end].key, &cmp_key) <= 0) {
555                    range_end++;
556                }
557
558                node_pointer *desc = read_pointer(dst->arena, &cmp_key, val_buf.buf);
559                if (!desc) {
560                    errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
561                    goto cleanup;
562                }
563
564                errcode = modify_node(rq, desc, start, range_end, local_result);
565                start = range_end;
566                if (errcode != COUCHSTORE_SUCCESS) {
567                    goto cleanup;
568                }
569            }
570        }
571        while (bufpos < nodebuflen) {
572            sized_buf cmp_key, val_buf;
573            bufpos += read_kv(nodebuf + bufpos, &cmp_key, &val_buf);
574            node_pointer *add = read_pointer(dst->arena, &cmp_key, val_buf.buf);
575            if (!add) {
576                errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
577                goto cleanup;
578            }
579
580            errcode = maybe_purgekp(rq, add, local_result);
581            if (errcode != COUCHSTORE_SUCCESS) {
582                goto cleanup;
583            }
584        }
585    } else {
586        errcode = COUCHSTORE_ERROR_CORRUPT;
587        goto cleanup;
588    }
589    //If we've done modifications, write out the last leaf node.
590    error_pass(flush_mr(local_result));
591    if (!local_result->modified && nptr != NULL) {
592        //If we didn't do anything, give back the pointer to the original
593        mr_push_pointerinfo(nptr, dst);
594    } else {
595        //Otherwise, give back the pointers to the nodes we've created.
596        dst->modified = 1;
597        error_pass(mr_move_pointers(local_result, dst));
598    }
599cleanup:
600    if (nodebuf) {
601        free(nodebuf);
602    }
603
604    return errcode;
605}
606
607static node_pointer *finish_root(couchfile_modify_request *rq,
608                                 couchfile_modify_result *root_result,
609                                 couchstore_error_t *errcode)
610{
611    node_pointer *ret_ptr = NULL;
612    couchfile_modify_result *collector = make_modres(root_result->arena, rq);
613    if (!collector) {
614        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
615        return NULL;
616    }
617    collector->modified = 1;
618    collector->node_type = KP_NODE;
619    flush_mr(root_result);
620    while (1) {
621        if (root_result->pointers_end == root_result->pointers->next) {
622            //The root result split into exactly one kp_node.
623            //Return the pointer to it.
624            ret_ptr = root_result->pointers_end->pointer;
625            break;
626        } else {
627            //The root result split into more than one kp_node.
628            //Move the pointer list to the value list and write out the new node.
629            *errcode = mr_move_pointers(root_result, collector);
630
631            if (*errcode < 0) {
632                goto cleanup;
633            }
634
635            *errcode = flush_mr(collector);
636            if (*errcode < 0) {
637                goto cleanup;
638            }
639            //Swap root_result and collector mr's.
640            couchfile_modify_result *tmp = root_result;
641            root_result = collector;
642            collector = tmp;
643        }
644    }
645cleanup:
646    return ret_ptr;
647}
648
649// Copies a node_pointer and its values to the malloc heap.
650node_pointer* copy_node_pointer(node_pointer* ptr)
651{
652    if (!ptr) {
653        return NULL;
654    }
655    node_pointer* ret_ptr;
656    ret_ptr = static_cast<node_pointer*>(malloc(sizeof(node_pointer) + ptr->key.size + ptr->reduce_value.size));
657    if (!ret_ptr) {
658        return NULL;
659    }
660    *ret_ptr = *ptr;
661    ret_ptr->key.buf = (char*)ret_ptr + sizeof(node_pointer);
662    memcpy(ret_ptr->key.buf, ptr->key.buf, ptr->key.size);
663    ret_ptr->reduce_value.buf = (char*)ret_ptr->key.buf + ptr->key.size;
664    memcpy(ret_ptr->reduce_value.buf, ptr->reduce_value.buf, ptr->reduce_value.size);
665    return ret_ptr;
666}
667
668// Finished creating a new b-tree (from compaction), build pointers and get a
669// root node.
670
671node_pointer* complete_new_btree(couchfile_modify_result* mr, couchstore_error_t *errcode)
672{
673    *errcode = flush_mr(mr);
674    if(*errcode != COUCHSTORE_SUCCESS) {
675        return NULL;
676    }
677
678    couchfile_modify_result* targ_mr = make_modres(mr->arena, mr->rq);
679    if (!targ_mr) {
680        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
681        return NULL;
682    }
683    targ_mr->modified = 1;
684    targ_mr->node_type = KP_NODE;
685
686    *errcode = mr_move_pointers(mr, targ_mr);
687    if(*errcode != COUCHSTORE_SUCCESS) {
688        return NULL;
689    }
690
691    node_pointer* ret_ptr;
692    if(targ_mr->count > 1 || targ_mr->pointers != targ_mr->pointers_end) {
693        ret_ptr = finish_root(mr->rq, targ_mr, errcode);
694    } else {
695        ret_ptr = targ_mr->values_end->pointer;
696    }
697
698    if (*errcode != COUCHSTORE_SUCCESS || ret_ptr == NULL) {
699        return NULL;
700    }
701
702    ret_ptr = copy_node_pointer(ret_ptr);
703    if (ret_ptr == NULL) {
704        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
705    }
706    return ret_ptr;
707}
708
709node_pointer *modify_btree(couchfile_modify_request *rq,
710                           node_pointer *root,
711                           couchstore_error_t *errcode)
712{
713    arena* a = new_arena(0);
714    node_pointer *ret_ptr = root;
715    couchfile_modify_result *root_result = make_modres(a, rq);
716    if (!root_result) {
717        delete_arena(a);
718        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
719        return root;
720    }
721    root_result->node_type = KP_NODE;
722    *errcode = modify_node(rq, root, 0, rq->num_actions, root_result);
723    if (*errcode < 0) {
724        delete_arena(a);
725        return NULL;
726    }
727
728    if (root_result->values_end->pointer == root) {
729        //If we got the root pointer back, remove it from the list
730        //so we don't try to free it.
731        root_result->values_end->pointer = NULL;
732    }
733
734    if (root_result->modified) {
735        if (root_result->count > 1 || root_result->pointers != root_result->pointers_end) {
736            //The root was split
737            //Write it to disk and return the pointer to it.
738            ret_ptr = finish_root(rq, root_result, errcode);
739            if (*errcode < 0) {
740                ret_ptr = NULL;
741            }
742        } else {
743            ret_ptr = root_result->values_end->pointer;
744        }
745    }
746    if (ret_ptr != root) {
747        ret_ptr = copy_node_pointer(ret_ptr);
748    }
749    delete_arena(a);
750    return ret_ptr;
751}
752
753static couchstore_error_t purge_node(couchfile_modify_request *rq,
754                                     node_pointer *nptr,
755                                     couchfile_modify_result *dst)
756{
757    char *nodebuf = NULL;  // FYI, nodebuf is a malloced block, not in the arena
758    int bufpos = 1;
759    int nodebuflen = 0;
760    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
761    couchfile_modify_result *local_result = NULL;
762
763    if (nptr == NULL) {
764        return errcode;
765    }
766
767    // noop: add back current node to destination
768    if (!rq->enable_purging) {
769        dst->modified = 1;
770        return mr_push_pointerinfo(nptr, dst);
771    }
772
773    if ((nodebuflen = pread_compressed(rq->file, nptr->pointer, (char **) &nodebuf)) < 0) {
774        error_pass(COUCHSTORE_ERROR_READ);
775    }
776
777    local_result = make_modres(dst->arena, rq);
778    error_unless(local_result, COUCHSTORE_ERROR_ALLOC_FAIL);
779
780    if (nodebuf[0] == 1) { //KV Node
781        local_result->node_type = KV_NODE;
782        while (bufpos < nodebuflen) {
783            sized_buf cmp_key, val_buf;
784            bufpos += read_kv(nodebuf + bufpos, &cmp_key, &val_buf);
785
786            errcode = maybe_purgekv(rq, &cmp_key, &val_buf, local_result);
787            if (errcode != COUCHSTORE_SUCCESS) {
788                goto cleanup;
789            }
790        }
791    } else if (nodebuf[0] == 0) { //KP Node
792        local_result->node_type = KP_NODE;
793        while (bufpos < nodebuflen) {
794            sized_buf cmp_key, val_buf;
795            bufpos += read_kv(nodebuf + bufpos, &cmp_key, &val_buf);
796
797            node_pointer *desc = read_pointer(dst->arena, &cmp_key, val_buf.buf);
798            if (!desc) {
799                errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
800                goto cleanup;
801            }
802
803            errcode = maybe_purgekp(rq, desc, local_result);
804            if (errcode != COUCHSTORE_SUCCESS) {
805                goto cleanup;
806            }
807        }
808    } else {
809        errcode = COUCHSTORE_ERROR_CORRUPT;
810        goto cleanup;
811    }
812
813    // Write out changes and add node back to parent
814    if (local_result->modified) {
815        error_pass(flush_mr(local_result));
816        dst->modified = 1;
817        error_pass(mr_move_pointers(local_result, dst));
818    }
819
820cleanup:
821    free(nodebuf);
822    return errcode;
823}
824
825node_pointer *guided_purge_btree(couchfile_modify_request *rq, node_pointer *root,
826                                                couchstore_error_t *errcode)
827{
828    rq->enable_purging = 1;
829    arena* a = new_arena(0);
830    if (!a) {
831        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
832        return NULL;
833    }
834
835    node_pointer *ret_ptr = root;
836    couchfile_modify_result *root_result = make_modres(a, rq);
837    if (!root_result) {
838        delete_arena(a);
839        *errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
840        return NULL;
841    }
842
843    root_result->node_type = KP_NODE;
844    *errcode = purge_node(rq, root, root_result);
845    if (*errcode < 0) {
846        delete_arena(a);
847        return NULL;
848    }
849
850    if (root_result->modified) {
851        ret_ptr = root_result->values_end->pointer;
852    }
853
854    if (ret_ptr && ret_ptr != root) {
855        ret_ptr = copy_node_pointer(ret_ptr);
856    }
857
858    delete_arena(a);
859    return ret_ptr;
860}
861