xref: /6.0.3/couchstore/src/btree_read.cc (revision 3b4c35d7)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "config.h"
3#include <platform/cb_malloc.h>
4#include <stdlib.h>
5#include "couch_btree.h"
6#include "util.h"
7#include "node_types.h"
8
9/* Helper function to handle lookup specific special cases */
10static int lookup_compare(couchfile_lookup_request *rq,
11                          const sized_buf *key1,
12                          const sized_buf *key2)
13{
14    /* For handling the case where low key is empty for full btree iteration */
15    if (!key1->size && !key2->size) {
16        return 0;
17    } else if (!key1->size) {
18        return -1;
19    } else if (!key2->size) {
20        return 1;
21    }
22
23    return rq->cmp.compare(key1, key2);
24}
25
26static couchstore_error_t btree_lookup_inner(couchfile_lookup_request *rq,
27                                             uint64_t diskpos,
28                                             int current,
29                                             int end)
30{
31    int bufpos = 1, nodebuflen = 0;
32
33    if (current == end) {
34        return COUCHSTORE_SUCCESS;
35    }
36    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
37
38    char *nodebuf = NULL;
39
40    {
41        ScopedFileTag tag(rq->file->ops, rq->file->handle, FileTag::BTree);
42        nodebuflen = pread_compressed(rq->file, diskpos, &nodebuf);
43    }
44    error_unless(nodebuflen >= 0, (static_cast<couchstore_error_t>(nodebuflen)));  // if negative, it's an error code
45
46    if (nodebuf[0] == 0) { //KP Node
47        while (bufpos < nodebuflen && current < end) {
48            sized_buf cmp_key, val_buf;
49            bufpos += read_kv(nodebuf + bufpos, &cmp_key, &val_buf);
50
51            if (lookup_compare(rq, &cmp_key, rq->keys[current]) >= 0) {
52                if (rq->fold) {
53                    rq->in_fold = 1;
54                }
55
56                uint64_t pointer = 0;
57                int last_item = current;
58                //Descend into the pointed to node.
59                //with all keys < item key.
60                do {
61                    last_item++;
62                } while (last_item < end && lookup_compare(rq, &cmp_key, rq->keys[last_item]) >= 0);
63
64                const raw_node_pointer *raw = (const raw_node_pointer*)val_buf.buf;
65                if(rq->node_callback) {
66                    uint64_t subtreeSize = decode_raw48(raw->subtreesize);
67                    sized_buf reduce_value =
68                    {val_buf.buf + sizeof(raw_node_pointer), decode_raw16(raw->reduce_value_size)};
69                    error_pass(rq->node_callback(rq, subtreeSize, &reduce_value));
70                }
71
72                pointer = decode_raw48(raw->pointer);
73
74                couchstore_error_t errcode_local =
75                        btree_lookup_inner(rq, pointer, current, last_item);
76                if (rq->tolerate_corruption) {
77                    error_tolerate(errcode_local);
78                } else {
79                    error_pass(errcode_local);
80                }
81
82                if (!rq->in_fold) {
83                    current = last_item;
84                }
85                if(rq->node_callback) {
86                    error_pass(rq->node_callback(rq, 0, NULL));
87                }
88            }
89        }
90    } else if (nodebuf[0] == 1) { //KV Node
91        sized_buf cmp_key, val_buf;
92        bool next_key = true;
93
94        // Try iterating whilst we have 'input' keys, i.e. keys we're looking up
95        while (current < end) {
96            // Only try and read the next-key if requested and we're still in
97            // the node length
98            if (next_key && bufpos < nodebuflen) {
99                bufpos += read_kv(nodebuf + bufpos, &cmp_key, &val_buf);
100            } else if (next_key) {
101                // else if next_key is true and we're out of buf space, break
102                break;
103            }
104            // else continue to evaluate cmp_key against rq->keys[current]
105
106            int cmp_val = lookup_compare(rq, &cmp_key, rq->keys[current]);
107            if (cmp_val >= 0 && rq->fold && !rq->in_fold) {
108                rq->in_fold = 1;
109            } else if (rq->in_fold && (current + 1) < end &&
110                       (lookup_compare(rq, &cmp_key, rq->keys[current + 1])) > 0) {
111                //We've hit a key past the end of our range.
112                rq->in_fold = 0;
113                rq->fold = 0;
114                current = end;
115            }
116
117            if (cmp_val >= 0) {
118                couchstore_error_t errcode_local;
119                if (cmp_val == 0 || rq->in_fold) { // Found
120                    errcode_local = rq->fetch_callback(rq, &cmp_key, &val_buf);
121                } else {
122                    errcode_local = rq->fetch_callback(
123                            rq, rq->keys[current], NULL);
124                }
125
126                if (rq->tolerate_corruption) {
127                    error_tolerate(errcode_local);
128                } else {
129                    error_pass(errcode_local);
130                }
131
132                if (!rq->in_fold) {
133                    ++current;
134                    next_key = cmp_val == 0;
135                } else {
136                    next_key = true;
137                }
138            } else {
139                next_key = true;
140            }
141        }
142    }
143
144    //Any remaining items are not found.
145    while (current < end && !rq->fold) {
146        error_pass(rq->fetch_callback(rq, rq->keys[current], NULL));
147        current++;
148    }
149
150cleanup:
151    cb_free(nodebuf);
152
153    return errcode;
154}
155
156couchstore_error_t btree_lookup(couchfile_lookup_request *rq,
157                                uint64_t root_pointer)
158{
159    rq->in_fold = 0;
160    return btree_lookup_inner(rq, root_pointer, 0, rq->num_keys);
161}
162