xref: /6.0.3/forestdb/src/hbtrie.cc (revision bb32a279)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21
22#include "hbtrie.h"
23#include "list.h"
24#include "btree.h"
25#include "btree_kv.h"
26#include "btree_fast_str_kv.h"
27#include "internal_types.h"
28
29#include "memleak.h"
30
31#ifdef __DEBUG
32#ifndef __DEBUG_HBTRIE
33    #undef DBG
34    #undef DBGCMD
35    #undef DBGSW
36    #define DBG(...)
37    #define DBGCMD(...)
38    #define DBGSW(n, ...)
39#endif
40#endif
41
42#define HBTRIE_EOK (0xF0)
43
44#define CHUNK_FLAG (0x8000)
45typedef uint16_t chunkno_t;
46struct hbtrie_meta {
47    chunkno_t chunkno;
48    uint16_t prefix_len;
49    void *value;
50    void *prefix;
51};
52
53#define _l2c(trie, len) (( (len) + ((trie)->chunksize-1) ) / (trie)->chunksize)
54
55// MUST return same value to '_get_nchunk(_hbtrie_reform_key(RAWKEY))'
56INLINE int _get_nchunk_raw(struct hbtrie *trie, void *rawkey, int rawkeylen)
57{
58    return _l2c(trie, rawkeylen) + 1;
59}
60
61INLINE int _get_nchunk(struct hbtrie *trie, void *key, int keylen)
62{
63    return (keylen-1) / trie->chunksize + 1;
64}
65
66int _hbtrie_reform_key(struct hbtrie *trie, void *rawkey,
67                       int rawkeylen, void *outkey)
68{
69    int outkeylen;
70    int nchunk;
71    int i;
72    uint8_t rsize;
73    size_t csize = trie->chunksize;
74
75    nchunk = _get_nchunk_raw(trie, rawkey, rawkeylen);
76    outkeylen = nchunk * csize;
77
78    if (nchunk > 2) {
79        // copy chunk[0] ~ chunk[nchunk-2]
80        rsize = rawkeylen - ((nchunk - 2) * csize);
81    } else {
82        rsize = rawkeylen;
83    }
84    fdb_assert(rsize && rsize <= trie->chunksize, rsize, trie);
85    memcpy((uint8_t*)outkey, (uint8_t*)rawkey, rawkeylen);
86
87    if (rsize < csize) {
88        // zero-fill rest space
89        i = nchunk - 2;
90        memset((uint8_t*)outkey + (i*csize) + rsize, 0x0, 2*csize - rsize);
91    } else {
92        // zero-fill the last chunk
93        i = nchunk - 1;
94        memset((uint8_t*)outkey + i * csize, 0x0, csize);
95    }
96
97    // assign rsize at the end of the outkey
98    *((uint8_t*)outkey + outkeylen - 1) = rsize;
99
100    return outkeylen;
101}
102
103// this function only returns (raw) key length
104static int _hbtrie_reform_key_reverse(struct hbtrie *trie,
105                                      void *key,
106                                      int keylen)
107{
108    uint8_t rsize;
109    rsize = *((uint8_t*)key + keylen - 1);
110    fdb_assert(rsize, rsize, trie);
111
112    if (rsize == trie->chunksize) {
113        return keylen - trie->chunksize;
114    } else {
115        // rsize: 1 ~ chunksize-1
116        return keylen - (trie->chunksize * 2) + rsize;
117    }
118}
119
120#define _get_leaf_kv_ops btree_fast_str_kv_get_kb64_vb64
121#define _get_leaf_key btree_fast_str_kv_get_key
122#define _set_leaf_key btree_fast_str_kv_set_key
123#define _set_leaf_inf_key btree_fast_str_kv_set_inf_key
124#define _free_leaf_key btree_fast_str_kv_free_key
125
126void hbtrie_init(struct hbtrie *trie, int chunksize, int valuelen,
127                 int btree_nodesize, bid_t root_bid, void *btreeblk_handle,
128                 struct btree_blk_ops *btree_blk_ops, void *doc_handle,
129                 hbtrie_func_readkey *readkey)
130{
131    struct btree_kv_ops *btree_kv_ops, *btree_leaf_kv_ops;
132
133    trie->chunksize = chunksize;
134    trie->valuelen = valuelen;
135    trie->btree_nodesize = btree_nodesize;
136    trie->btree_blk_ops = btree_blk_ops;
137    trie->btreeblk_handle = btreeblk_handle;
138    trie->doc_handle = doc_handle;
139    trie->root_bid = root_bid;
140    trie->flag = 0x0;
141    trie->leaf_height_limit = 0;
142    trie->cmp_args.chunksize = chunksize;
143    trie->cmp_args.aux = NULL;
144    trie->aux = &trie->cmp_args;
145
146    // assign key-value operations
147    btree_kv_ops = (struct btree_kv_ops *)malloc(sizeof(struct btree_kv_ops));
148    btree_leaf_kv_ops = (struct btree_kv_ops *)malloc(sizeof(struct btree_kv_ops));
149
150    fdb_assert(valuelen == 8, valuelen, trie);
151    fdb_assert((size_t)chunksize >= sizeof(void *), chunksize, trie);
152
153    if (chunksize == 8 && valuelen == 8){
154        btree_kv_ops = btree_kv_get_kb64_vb64(btree_kv_ops);
155        btree_leaf_kv_ops = _get_leaf_kv_ops(btree_leaf_kv_ops);
156    } else if (chunksize == 4 && valuelen == 8) {
157        btree_kv_ops = btree_kv_get_kb32_vb64(btree_kv_ops);
158        btree_leaf_kv_ops = _get_leaf_kv_ops(btree_leaf_kv_ops);
159    } else {
160        btree_kv_ops = btree_kv_get_kbn_vb64(btree_kv_ops);
161        btree_leaf_kv_ops = _get_leaf_kv_ops(btree_leaf_kv_ops);
162    }
163
164    trie->btree_kv_ops = btree_kv_ops;
165    trie->btree_leaf_kv_ops = btree_leaf_kv_ops;
166    trie->readkey = readkey;
167    trie->map = NULL;
168    trie->last_map_chunk = (void *)malloc(chunksize);
169    memset(trie->last_map_chunk, 0xff, chunksize); // set 0xffff...
170}
171
172void hbtrie_free(struct hbtrie *trie)
173{
174    free(trie->btree_kv_ops);
175    free(trie->btree_leaf_kv_ops);
176    free(trie->last_map_chunk);
177}
178
179void hbtrie_set_flag(struct hbtrie *trie, uint8_t flag)
180{
181    trie->flag = flag;
182    if (trie->leaf_height_limit == 0) {
183        trie->leaf_height_limit = 1;
184    }
185}
186
187void hbtrie_set_leaf_height_limit(struct hbtrie *trie, uint8_t limit)
188{
189    trie->leaf_height_limit = limit;
190}
191
192void hbtrie_set_leaf_cmp(struct hbtrie *trie, btree_cmp_func *cmp)
193{
194    trie->btree_leaf_kv_ops->cmp = cmp;
195}
196
197void hbtrie_set_map_function(struct hbtrie *trie,
198                             hbtrie_cmp_map *map_func)
199{
200    trie->map = map_func;
201}
202
203// IMPORTANT: hbmeta doesn't have own allocated memory space (pointers only)
204static void _hbtrie_fetch_meta(struct hbtrie *trie, int metasize,
205                               struct hbtrie_meta *hbmeta, void *buf)
206{
207    // read hbmeta from buf
208    int offset = 0;
209    uint32_t valuelen = 0;
210
211    memcpy(&hbmeta->chunkno, buf, sizeof(hbmeta->chunkno));
212    hbmeta->chunkno = _endian_decode(hbmeta->chunkno);
213    offset += sizeof(hbmeta->chunkno);
214
215    memcpy(&valuelen, (uint8_t*)buf+offset, sizeof(trie->valuelen));
216    offset += sizeof(trie->valuelen);
217
218    if (valuelen > 0) {
219        hbmeta->value = (uint8_t*)buf + offset;
220        offset += trie->valuelen;
221    } else {
222        hbmeta->value = NULL;
223    }
224
225    if (metasize - offset > 0) {
226        //memcpy(hbmeta->prefix, buf+offset, metasize - offset);
227        hbmeta->prefix = (uint8_t*)buf + offset;
228        hbmeta->prefix_len = metasize - offset;
229    } else {
230        hbmeta->prefix = NULL;
231        hbmeta->prefix_len = 0;
232    }
233}
234
235typedef enum {
236    HBMETA_NORMAL,
237    HBMETA_LEAF,
238} hbmeta_opt;
239/* << raw hbtrie meta structure >>
240 * [Total meta length]: 2 bytes
241 * [Chunk number]:      2 bytes
242 * [Value length]:      1 bytes
243 * [Value (optional)]:  x bytes
244 * [Prefix (optional)]: y bytes
245 */
246static void _hbtrie_store_meta(struct hbtrie *trie,
247                               metasize_t *metasize_out,
248                               chunkno_t chunkno,
249                               hbmeta_opt opt,
250                               void *prefix,
251                               int prefixlen,
252                               void *value,
253                               void *buf)
254{
255    chunkno_t _chunkno;
256
257    // write hbmeta to buf
258    *metasize_out = 0;
259
260    if (opt == HBMETA_LEAF) {
261        chunkno |= CHUNK_FLAG;
262    }
263
264    _chunkno = _endian_encode(chunkno);
265    memcpy(buf, &_chunkno, sizeof(chunkno));
266    *metasize_out += sizeof(chunkno);
267
268    if (value) {
269        memcpy((uint8_t*)buf + *metasize_out,
270               &trie->valuelen, sizeof(trie->valuelen));
271        *metasize_out += sizeof(trie->valuelen);
272        memcpy((uint8_t*)buf + *metasize_out,
273               value, trie->valuelen);
274        *metasize_out += trie->valuelen;
275    } else {
276        memset((uint8_t*)buf + *metasize_out, 0x0, sizeof(trie->valuelen));
277        *metasize_out += sizeof(trie->valuelen);
278    }
279
280    if (prefixlen > 0) {
281        memcpy((uint8_t*)buf + *metasize_out, prefix, prefixlen);
282        *metasize_out += prefixlen;
283    }
284}
285
286INLINE int _hbtrie_find_diff_chunk(struct hbtrie *trie,
287                                   void *key1,
288                                   void *key2,
289                                   int start_chunk,
290                                   int end_chunk)
291{
292    int i;
293    for (i=start_chunk; i < end_chunk; ++i) {
294        if (memcmp((uint8_t*)key1 + trie->chunksize*i,
295                   (uint8_t*)key2 + trie->chunksize*i,
296                   trie->chunksize)) {
297             return i;
298        }
299    }
300    return i;
301}
302
303//3 ASSUMPTION: 'VALUE' should be based on same endian to hb+trie
304
305#if defined(__ENDIAN_SAFE) || defined(_BIG_ENDIAN)
306// endian safe option is turned on, OR,
307// the architecture is based on big endian
308INLINE void _hbtrie_set_msb(struct hbtrie *trie, void *value)
309{
310    *((uint8_t*)value) |= (uint8_t)0x80;
311}
312INLINE void _hbtrie_clear_msb(struct hbtrie *trie, void *value)
313{
314    *((uint8_t*)value) &= ~((uint8_t)0x80);
315}
316INLINE int _hbtrie_is_msb_set(struct hbtrie *trie, void *value)
317{
318    return *((uint8_t*)value) & ((uint8_t)0x80);
319}
320#else
321// little endian
322INLINE void _hbtrie_set_msb(struct hbtrie *trie, void *value)
323{
324    *((uint8_t*)value + (trie->valuelen-1)) |= (uint8_t)0x80;
325}
326INLINE void _hbtrie_clear_msb(struct hbtrie *trie, void *value)
327{
328    *((uint8_t*)value + (trie->valuelen-1)) &= ~((uint8_t)0x80);
329}
330INLINE int _hbtrie_is_msb_set(struct hbtrie *trie, void *value)
331{
332    return *((uint8_t*)value + (trie->valuelen-1)) & ((uint8_t)0x80);
333}
334#endif
335
336struct btreelist_item {
337    struct btree btree;
338    chunkno_t chunkno;
339    bid_t child_rootbid;
340    struct list_elem e;
341    uint8_t leaf;
342};
343
344struct btreeit_item {
345    struct btree_iterator btree_it;
346    chunkno_t chunkno;
347    struct list_elem le;
348    uint8_t leaf;
349};
350
351#define _is_leaf_btree(chunkno) ((chunkno) & CHUNK_FLAG)
352#define _get_chunkno(chunkno) ((chunkno) & (~(CHUNK_FLAG)))
353
354hbtrie_result hbtrie_iterator_init(struct hbtrie *trie,
355                                   struct hbtrie_iterator *it,
356                                   void *initial_key,
357                                   size_t keylen)
358{
359    it->trie = *trie;
360
361    // MUST NOT affect the original trie due to sharing the same memory segment
362    it->trie.last_map_chunk = (void *)malloc(it->trie.chunksize);
363    memset(it->trie.last_map_chunk, 0xff, it->trie.chunksize);
364
365    it->curkey = (void *)malloc(HBTRIE_MAX_KEYLEN);
366
367    if (initial_key) {
368        it->keylen = _hbtrie_reform_key(trie, initial_key, keylen, it->curkey);
369        if (it->keylen >= HBTRIE_MAX_KEYLEN) {
370            free(it->curkey);
371            DBG("Error: HBTrie iterator init fails because the init key length %d is "
372                "greater than the max key length %d\n", it->keylen, HBTRIE_MAX_KEYLEN);
373            return HBTRIE_RESULT_FAIL;
374        }
375        memset((uint8_t*)it->curkey + it->keylen, 0, trie->chunksize);
376    }else{
377        it->keylen = 0;
378        memset(it->curkey, 0, trie->chunksize);
379    }
380    list_init(&it->btreeit_list);
381    it->flags = 0;
382
383    return HBTRIE_RESULT_SUCCESS;
384}
385
386hbtrie_result hbtrie_iterator_free(struct hbtrie_iterator *it)
387{
388    struct list_elem *e;
389    struct btreeit_item *item;
390    e = list_begin(&it->btreeit_list);
391    while(e){
392        item = _get_entry(e, struct btreeit_item, le);
393        e = list_remove(&it->btreeit_list, e);
394        btree_iterator_free(&item->btree_it);
395        mempool_free(item);
396    }
397    free(it->trie.last_map_chunk);
398    if (it->curkey) free(it->curkey);
399    return HBTRIE_RESULT_SUCCESS;
400}
401
402// move iterator's cursor to the end of the key range.
403// hbtrie_prev() call after hbtrie_last() will return the last key.
404hbtrie_result hbtrie_last(struct hbtrie_iterator *it)
405{
406    struct hbtrie_iterator temp;
407
408    temp = *it;
409    hbtrie_iterator_free(it);
410
411    it->trie = temp.trie;
412    // MUST NOT affect the original trie due to sharing the same memory segment
413    it->trie.last_map_chunk = (void *)malloc(it->trie.chunksize);
414    memset(it->trie.last_map_chunk, 0xff, it->trie.chunksize);
415
416    it->curkey = (void *)malloc(HBTRIE_MAX_KEYLEN);
417    // init with the infinite (0xff..) key without reforming
418    memset(it->curkey, 0xff, it->trie.chunksize);
419    it->keylen = it->trie.chunksize;
420
421    list_init(&it->btreeit_list);
422    it->flags = 0;
423
424    return HBTRIE_RESULT_SUCCESS;
425}
426
427// recursive function
428#define HBTRIE_PREFIX_MATCH_ONLY (0x1)
429#define HBTRIE_PARTIAL_MATCH (0x2)
430static hbtrie_result _hbtrie_prev(struct hbtrie_iterator *it,
431                                  struct btreeit_item *item,
432                                  void *key_buf,
433                                  size_t *keylen,
434                                  void *value_buf,
435                                  uint8_t flag)
436{
437    struct hbtrie *trie = &it->trie;
438    struct list_elem *e;
439    struct btreeit_item *item_new;
440    struct btree btree;
441    hbtrie_result hr = HBTRIE_RESULT_FAIL;
442    btree_result br;
443    struct hbtrie_meta hbmeta;
444    struct btree_meta bmeta;
445    void *chunk;
446    uint8_t *k = alca(uint8_t, trie->chunksize);
447    uint8_t *v = alca(uint8_t, trie->valuelen);
448    memset(k, 0, trie->chunksize);
449    memset(k, 0, trie->valuelen);
450    bid_t bid;
451    uint64_t offset;
452
453    if (item == NULL) {
454        // this happens only when first call
455        // create iterator for root b-tree
456        if (it->trie.root_bid == BLK_NOT_FOUND) return HBTRIE_RESULT_FAIL;
457        // set current chunk (key for b-tree)
458        chunk = it->curkey;
459        // load b-tree
460        btree_init_from_bid(
461            &btree, trie->btreeblk_handle, trie->btree_blk_ops,
462            trie->btree_kv_ops,
463            trie->btree_nodesize, trie->root_bid);
464        btree.aux = trie->aux;
465        if (btree.ksize != trie->chunksize || btree.vsize != trie->valuelen) {
466            if (((trie->chunksize << 4) | trie->valuelen) == btree.ksize) {
467                // this is an old meta format
468                return HBTRIE_RESULT_INDEX_VERSION_NOT_SUPPORTED;
469            }
470            // B+tree root node is corrupted.
471            return HBTRIE_RESULT_INDEX_CORRUPTED;
472        }
473
474        item = (struct btreeit_item *)mempool_alloc(sizeof(
475                                                    struct btreeit_item));
476        item->chunkno = 0;
477        item->leaf = 0;
478
479        br = btree_iterator_init(&btree, &item->btree_it, chunk);
480        if (br == BTREE_RESULT_FAIL) return HBTRIE_RESULT_FAIL;
481
482        list_push_back(&it->btreeit_list, &item->le);
483    }
484
485    e = list_next(&item->le);
486    if (e) {
487        // if prev sub b-tree exists
488        item_new = _get_entry(e, struct btreeit_item, le);
489        hr = _hbtrie_prev(it, item_new, key_buf, keylen, value_buf, flag);
490        if (hr == HBTRIE_RESULT_SUCCESS) return hr;
491        it->keylen = (item->chunkno+1) * trie->chunksize;
492    }
493
494    while (hr != HBTRIE_RESULT_SUCCESS) {
495        // get key-value from current b-tree iterator
496        memset(k, 0, trie->chunksize);
497        br = btree_prev(&item->btree_it, k, v);
498        if (item->leaf) {
499            _free_leaf_key(k);
500        } else {
501            chunk = (uint8_t*)it->curkey + item->chunkno * trie->chunksize;
502            if (item->btree_it.btree.kv_ops->cmp(k, chunk,
503                    item->btree_it.btree.aux) != 0) {
504                // not exact match key .. the rest of string is not necessary anymore
505                it->keylen = (item->chunkno+1) * trie->chunksize;
506                HBTRIE_ITR_SET_MOVED(it);
507            }
508        }
509
510        if (br == BTREE_RESULT_FAIL) {
511            // no more KV pair in the b-tree
512            btree_iterator_free(&item->btree_it);
513            list_remove(&it->btreeit_list, &item->le);
514            mempool_free(item);
515            return HBTRIE_RESULT_FAIL;
516        }
517
518        // check whether v points to doc or sub b-tree
519        if (_hbtrie_is_msb_set(trie, v)) {
520            // MSB is set -> sub b-tree
521
522            // load sub b-tree and create new iterator for the b-tree
523            _hbtrie_clear_msb(trie, v);
524            bid = trie->btree_kv_ops->value2bid(v);
525            bid = _endian_decode(bid);
526            btree_init_from_bid(&btree, trie->btreeblk_handle,
527                                trie->btree_blk_ops, trie->btree_kv_ops,
528                                trie->btree_nodesize, bid);
529
530            // get sub b-tree's chunk number
531            bmeta.data = (void *)mempool_alloc(trie->btree_nodesize);
532            bmeta.size = btree_read_meta(&btree, bmeta.data);
533            _hbtrie_fetch_meta(trie, bmeta.size, &hbmeta, bmeta.data);
534
535            item_new = (struct btreeit_item *)
536                       mempool_alloc(sizeof(struct btreeit_item));
537            if (_is_leaf_btree(hbmeta.chunkno)) {
538                hbtrie_cmp_func *void_cmp;
539
540                if (trie->map) { // custom cmp functions exist
541                    if (!memcmp(trie->last_map_chunk, it->curkey, trie->chunksize)) {
542                        // same custom function was used in the last call ..
543                        // do nothing
544                    } else {
545                        // get cmp function corresponding to the key
546                        void_cmp = trie->map(it->curkey, (void *)trie);
547                        if (void_cmp) {
548                            memcpy(trie->last_map_chunk, it->curkey, trie->chunksize);
549                            // set aux for _fdb_custom_cmp_wrap()
550                            trie->cmp_args.aux = void_cmp;
551                            trie->aux = &trie->cmp_args;
552                        }
553                    }
554                }
555
556                btree.kv_ops = trie->btree_leaf_kv_ops;
557                item_new->leaf = 1;
558            } else {
559                item_new->leaf = 0;
560            }
561            btree.aux = trie->aux;
562            hbmeta.chunkno = _get_chunkno(hbmeta.chunkno);
563            item_new->chunkno = hbmeta.chunkno;
564
565            // Note: if user's key is exactly aligned to chunk size, then the
566            //       dummy chunk will be a zero-filled value, and it is used
567            //       as a key in the next level of B+tree. Hence, there will be
568            //       no problem to assign the dummy chunk to the 'chunk' variable.
569            if ( (unsigned)((item_new->chunkno+1) * trie->chunksize) <=
570                 it->keylen) {
571                // happen only once for the first call (for each level of b-trees)
572                chunk = (uint8_t*)it->curkey +
573                        item_new->chunkno*trie->chunksize;
574                if (item->chunkno+1 < item_new->chunkno) {
575                    // skipped prefix exists
576                    // Note: all skipped chunks should be compared using the default
577                    //       cmp function
578                    int i, offset_meta = 0, offset_key = 0, chunkcmp = 0;
579                    for (i=item->chunkno+1; i<item_new->chunkno; ++i) {
580                        offset_meta = trie->chunksize * (i - (item->chunkno+1));
581                        offset_key = trie->chunksize * i;
582                        chunkcmp = trie->btree_kv_ops->cmp(
583                            (uint8_t*)it->curkey + offset_key,
584                            (uint8_t*)hbmeta.prefix + offset_meta,
585                            trie->aux);
586                        if (chunkcmp < 0) {
587                            // start_key's prefix is smaller than the skipped prefix
588                            // we have to go back to parent B+tree and pick prev entry
589                            mempool_free(bmeta.data);
590                            mempool_free(item_new);
591                            it->keylen = offset_key;
592                            hr = HBTRIE_RESULT_FAIL;
593                            HBTRIE_ITR_SET_MOVED(it);
594                            break;
595                        } else if (chunkcmp > 0 && trie->chunksize > 0) {
596                            // start_key's prefix is gerater than the skipped prefix
597                            // set largest key for next B+tree
598                            chunk = alca(uint8_t, trie->chunksize);
599                            memset(chunk, 0xff, trie->chunksize);
600                            break;
601                        }
602                    }
603                    if (chunkcmp < 0) {
604                        // go back to parent B+tree
605                        continue;
606                    }
607                }
608
609            } else {
610                // chunk number of the b-tree is shorter than current iterator's key
611                if (!HBTRIE_ITR_IS_MOVED(it)) {
612                    // The first prev call right after iterator init call.
613                    // This means that the init key is smaller than
614                    // the smallest key of the current tree, and larger than
615                    // the largest key of the previous tree.
616                    // So we have to go back to the parent tree, and
617                    // return the largest key of the previous tree.
618                    mempool_free(bmeta.data);
619                    mempool_free(item_new);
620                    it->keylen = (item->chunkno+1) * trie->chunksize;
621                    HBTRIE_ITR_SET_MOVED(it);
622                    continue;
623                }
624                // set largest key
625                chunk = alca(uint8_t, trie->chunksize);
626                memset(chunk, 0xff, trie->chunksize);
627            }
628
629            if (item_new->leaf && chunk && trie->chunksize > 0) {
630                uint8_t *k_temp = alca(uint8_t, trie->chunksize);
631                size_t _leaf_keylen, _leaf_keylen_raw = 0;
632
633                _leaf_keylen = it->keylen - (item_new->chunkno * trie->chunksize);
634                if (_leaf_keylen) {
635                    _leaf_keylen_raw = _hbtrie_reform_key_reverse(
636                                           trie, chunk, _leaf_keylen);
637                    _set_leaf_key(k_temp, chunk, _leaf_keylen_raw);
638                    if (_leaf_keylen_raw) {
639                        btree_iterator_init(&btree, &item_new->btree_it, k_temp);
640                    } else {
641                        btree_iterator_init(&btree, &item_new->btree_it, NULL);
642                    }
643                } else {
644                    // set initial key as the largest key
645                    // for reverse scan from the end of the B+tree
646                    _set_leaf_inf_key(k_temp);
647                    btree_iterator_init(&btree, &item_new->btree_it, k_temp);
648                }
649                _free_leaf_key(k_temp);
650            } else {
651                btree_iterator_init(&btree, &item_new->btree_it, chunk);
652            }
653            list_push_back(&it->btreeit_list, &item_new->le);
654
655            if (hbmeta.value && chunk == NULL) {
656                // NULL key exists .. the smallest key in this tree .. return first
657                offset = trie->btree_kv_ops->value2bid(hbmeta.value);
658                if (!(flag & HBTRIE_PREFIX_MATCH_ONLY)) {
659                    *keylen = trie->readkey(trie->doc_handle, offset, key_buf);
660                    it->keylen = _hbtrie_reform_key(trie, key_buf, *keylen,
661                                                    it->curkey);
662                }
663                memcpy(value_buf, &offset, trie->valuelen);
664                hr = HBTRIE_RESULT_SUCCESS;
665            } else {
666                hr = _hbtrie_prev(it, item_new, key_buf, keylen, value_buf,
667                                  flag);
668            }
669            mempool_free(bmeta.data);
670            if (hr == HBTRIE_RESULT_SUCCESS)
671                return hr;
672
673            // fail searching .. get back to parent tree
674            // (this happens when the initial key is smaller than
675            // the smallest key in the current tree (ITEM_NEW) ..
676            // so return back to ITEM and retrieve next child)
677            it->keylen = (item->chunkno+1) * trie->chunksize;
678            HBTRIE_ITR_SET_MOVED(it);
679
680        } else {
681            // MSB is not set -> doc
682            // read entire key and return the doc offset
683            offset = trie->btree_kv_ops->value2bid(v);
684            if (!(flag & HBTRIE_PREFIX_MATCH_ONLY)) {
685                *keylen = trie->readkey(trie->doc_handle, offset, key_buf);
686                it->keylen = _hbtrie_reform_key(trie, key_buf, *keylen, it->curkey);
687            }
688            memcpy(value_buf, &offset, trie->valuelen);
689
690            return HBTRIE_RESULT_SUCCESS;
691        }
692    }
693    return HBTRIE_RESULT_FAIL;
694}
695
696hbtrie_result hbtrie_prev(struct hbtrie_iterator *it,
697                          void *key_buf,
698                          size_t *keylen,
699                          void *value_buf)
700{
701    hbtrie_result hr;
702
703    if (HBTRIE_ITR_IS_REV(it) && HBTRIE_ITR_IS_FAILED(it)) {
704        return HBTRIE_RESULT_FAIL;
705    }
706
707    struct list_elem *e = list_begin(&it->btreeit_list);
708    struct btreeit_item *item = NULL;
709    if (e) item = _get_entry(e, struct btreeit_item, le);
710
711    hr = _hbtrie_prev(it, item, key_buf, keylen, value_buf, 0x0);
712    HBTRIE_ITR_SET_REV(it);
713    if (hr == HBTRIE_RESULT_SUCCESS) {
714        HBTRIE_ITR_CLR_FAILED(it);
715        HBTRIE_ITR_SET_MOVED(it);
716    } else {
717        HBTRIE_ITR_SET_FAILED(it);
718    }
719    return hr;
720}
721
722// recursive function
723static hbtrie_result _hbtrie_next(struct hbtrie_iterator *it,
724                                  struct btreeit_item *item,
725                                  void *key_buf,
726                                  size_t *keylen,
727                                  void *value_buf,
728                                  uint8_t flag)
729{
730    struct hbtrie *trie = &it->trie;
731    struct list_elem *e;
732    struct btreeit_item *item_new;
733    struct btree btree;
734    hbtrie_result hr = HBTRIE_RESULT_FAIL;
735    btree_result br;
736    struct hbtrie_meta hbmeta;
737    struct btree_meta bmeta;
738    void *chunk;
739    uint8_t *k = alca(uint8_t, trie->chunksize);
740    uint8_t *v = alca(uint8_t, trie->valuelen);
741    bid_t bid;
742    uint64_t offset;
743
744    if (item == NULL) {
745        // this happens only when first call
746        // create iterator for root b-tree
747        if (it->trie.root_bid == BLK_NOT_FOUND) return HBTRIE_RESULT_FAIL;
748        // set current chunk (key for b-tree)
749        chunk = it->curkey;
750        // load b-tree
751        btree_init_from_bid(
752            &btree, trie->btreeblk_handle, trie->btree_blk_ops, trie->btree_kv_ops,
753            trie->btree_nodesize, trie->root_bid);
754        btree.aux = trie->aux;
755        if (btree.ksize != trie->chunksize || btree.vsize != trie->valuelen) {
756            if (((trie->chunksize << 4) | trie->valuelen) == btree.ksize) {
757                // this is an old meta format
758                return HBTRIE_RESULT_INDEX_VERSION_NOT_SUPPORTED;
759            }
760            // B+tree root node is corrupted.
761            return HBTRIE_RESULT_INDEX_CORRUPTED;
762        }
763
764        item = (struct btreeit_item *)mempool_alloc(sizeof(struct btreeit_item));
765        item->chunkno = 0;
766        item->leaf = 0;
767
768        br = btree_iterator_init(&btree, &item->btree_it, chunk);
769        if (br == BTREE_RESULT_FAIL) return HBTRIE_RESULT_FAIL;
770
771        list_push_back(&it->btreeit_list, &item->le);
772    }
773
774    e = list_next(&item->le);
775    if (e) {
776        // if next sub b-tree exists
777        item_new = _get_entry(e, struct btreeit_item, le);
778        hr = _hbtrie_next(it, item_new, key_buf, keylen, value_buf, flag);
779        if (hr != HBTRIE_RESULT_SUCCESS) {
780            it->keylen = (item->chunkno+1) * trie->chunksize;
781        }
782    }
783
784    while (hr != HBTRIE_RESULT_SUCCESS) {
785        // get key-value from current b-tree iterator
786        memset(k, 0, trie->chunksize);
787        br = btree_next(&item->btree_it, k, v);
788        if (item->leaf) {
789            _free_leaf_key(k);
790        } else {
791            chunk = (uint8_t*)it->curkey + item->chunkno * trie->chunksize;
792            if (item->btree_it.btree.kv_ops->cmp(k, chunk,
793                    item->btree_it.btree.aux) != 0) {
794                // not exact match key .. the rest of string is not necessary anymore
795                it->keylen = (item->chunkno+1) * trie->chunksize;
796                HBTRIE_ITR_SET_MOVED(it);
797            }
798        }
799
800        if (br == BTREE_RESULT_FAIL) {
801            // no more KV pair in the b-tree
802            btree_iterator_free(&item->btree_it);
803            list_remove(&it->btreeit_list, &item->le);
804            mempool_free(item);
805            return HBTRIE_RESULT_FAIL;
806        }
807
808        if (flag & HBTRIE_PARTIAL_MATCH) {
809            // in partial match mode, we don't read actual doc key,
810            // and just store & return indexed part of key.
811            memcpy((uint8_t*)it->curkey + item->chunkno * trie->chunksize,
812                   k, trie->chunksize);
813        }
814
815        // check whether v points to doc or sub b-tree
816        if (_hbtrie_is_msb_set(trie, v)) {
817            // MSB is set -> sub b-tree
818
819            // load sub b-tree and create new iterator for the b-tree
820            _hbtrie_clear_msb(trie, v);
821            bid = trie->btree_kv_ops->value2bid(v);
822            bid = _endian_decode(bid);
823            btree_init_from_bid(&btree, trie->btreeblk_handle,
824                                trie->btree_blk_ops, trie->btree_kv_ops,
825                                trie->btree_nodesize, bid);
826
827            // get sub b-tree's chunk number
828            bmeta.data = (void *)mempool_alloc(trie->btree_nodesize);
829            bmeta.size = btree_read_meta(&btree, bmeta.data);
830            _hbtrie_fetch_meta(trie, bmeta.size, &hbmeta, bmeta.data);
831
832            item_new = (struct btreeit_item *)
833                       mempool_alloc(sizeof(struct btreeit_item));
834            if (_is_leaf_btree(hbmeta.chunkno)) {
835                hbtrie_cmp_func *void_cmp;
836
837                if (trie->map) { // custom cmp functions exist
838                    if (!memcmp(trie->last_map_chunk, it->curkey, trie->chunksize)) {
839                        // same custom function was used in the last call ..
840                        // do nothing
841                    } else {
842                        // get cmp function corresponding to the key
843                        void_cmp = trie->map(it->curkey, (void *)trie);
844                        if (void_cmp) {
845                            memcpy(trie->last_map_chunk, it->curkey, trie->chunksize);
846                            // set aux for _fdb_custom_cmp_wrap()
847                            trie->cmp_args.aux = void_cmp;
848                            trie->aux = &trie->cmp_args;
849                        }
850                    }
851                }
852
853                btree.kv_ops = trie->btree_leaf_kv_ops;
854                item_new->leaf = 1;
855            } else {
856                item_new->leaf = 0;
857            }
858            btree.aux = trie->aux;
859            hbmeta.chunkno = _get_chunkno(hbmeta.chunkno);
860            item_new->chunkno = hbmeta.chunkno;
861
862            // Note: if user's key is exactly aligned to chunk size, then the
863            //       dummy chunk will be a zero-filled value, and it is used
864            //       as a key in the next level of B+tree. Hence, there will be
865            //       no problem to assign the dummy chunk to the 'chunk' variable.
866            if ( (unsigned)((item_new->chunkno+1) * trie->chunksize)
867                 <= it->keylen) {
868                // happen only once for the first call (for each level of b-trees)
869                chunk = (uint8_t*)it->curkey +
870                        item_new->chunkno*trie->chunksize;
871                if (item->chunkno+1 < item_new->chunkno) {
872                    // skipped prefix exists
873                    // Note: all skipped chunks should be compared using the default
874                    //       cmp function
875                    int i, offset_meta = 0, offset_key = 0, chunkcmp = 0;
876                    for (i=item->chunkno+1; i<item_new->chunkno; ++i) {
877                        offset_meta = trie->chunksize * (i - (item->chunkno+1));
878                        offset_key = trie->chunksize * i;
879                        chunkcmp = trie->btree_kv_ops->cmp(
880                            (uint8_t*)it->curkey + offset_key,
881                            (uint8_t*)hbmeta.prefix + offset_meta,
882                            trie->aux);
883                        if (chunkcmp < 0) {
884                            // start_key's prefix is smaller than the skipped prefix
885                            // set smallest key for next B+tree
886                            it->keylen = offset_key;
887                            chunk = NULL;
888                            break;
889                        } else if (chunkcmp > 0) {
890                            // start_key's prefix is gerater than the skipped prefix
891                            // we have to go back to parent B+tree and pick next entry
892                            mempool_free(bmeta.data);
893                            mempool_free(item_new);
894                            it->keylen = offset_key;
895                            hr = HBTRIE_RESULT_FAIL;
896                            HBTRIE_ITR_SET_MOVED(it);
897                            break;
898                        }
899                    }
900                    if (chunkcmp > 0) {
901                        // go back to parent B+tree
902                        continue;
903                    }
904                }
905            } else {
906                // chunk number of the b-tree is longer than current iterator's key
907                // set smallest key
908                chunk = NULL;
909            }
910
911            if (item_new->leaf && chunk && trie->chunksize > 0) {
912                uint8_t *k_temp = alca(uint8_t, trie->chunksize);
913                memset(k_temp, 0, trie->chunksize * sizeof(uint8_t));
914                size_t _leaf_keylen, _leaf_keylen_raw = 0;
915
916                _leaf_keylen = it->keylen - (item_new->chunkno * trie->chunksize);
917                if (_leaf_keylen > 0) {
918                    _leaf_keylen_raw = _hbtrie_reform_key_reverse(
919                                           trie, chunk, _leaf_keylen);
920                }
921                if (_leaf_keylen_raw) {
922                    _set_leaf_key(k_temp, chunk, _leaf_keylen_raw);
923                    btree_iterator_init(&btree, &item_new->btree_it, k_temp);
924                    _free_leaf_key(k_temp);
925                } else {
926                    btree_iterator_init(&btree, &item_new->btree_it, NULL);
927                }
928            } else {
929                bool null_btree_init_key = false;
930                if (!HBTRIE_ITR_IS_MOVED(it) && chunk && trie->chunksize > 0 &&
931                    ((uint64_t)item_new->chunkno+1) * trie->chunksize == it->keylen) {
932                    // Next chunk is the last chunk of the current iterator key
933                    // (happens only on iterator_init(), it internally calls next()).
934                    uint8_t *k_temp = alca(uint8_t, trie->chunksize);
935                    memset(k_temp, 0x0, trie->chunksize);
936                    k_temp[trie->chunksize - 1] = trie->chunksize;
937                    if (!memcmp(k_temp, chunk, trie->chunksize)) {
938                        // Extra chunk is same to the specific pattern
939                        // ([0x0] [0x0] ... [trie->chunksize])
940                        // which means that given iterator key is exactly aligned
941                        // to chunk size and shorter than the position of the
942                        // next chunk.
943                        // To guarantee lexicographical order between
944                        // NULL and zero-filled key (NULL < 0x0000...),
945                        // we should init btree iterator with NULL key.
946                        null_btree_init_key = true;
947                    }
948                }
949                if (null_btree_init_key) {
950                    btree_iterator_init(&btree, &item_new->btree_it, NULL);
951                } else {
952                    btree_iterator_init(&btree, &item_new->btree_it, chunk);
953                }
954            }
955            list_push_back(&it->btreeit_list, &item_new->le);
956
957            if (hbmeta.value && chunk == NULL) {
958                // NULL key exists .. the smallest key in this tree .. return first
959                offset = trie->btree_kv_ops->value2bid(hbmeta.value);
960                if (flag & HBTRIE_PARTIAL_MATCH) {
961                    // return indexed key part only
962                    *keylen = (item->chunkno+1) * trie->chunksize;
963                    memcpy(key_buf, it->curkey, *keylen);
964                } else if (!(flag & HBTRIE_PREFIX_MATCH_ONLY)) {
965                    // read entire key from doc's meta
966                    *keylen = trie->readkey(trie->doc_handle, offset, key_buf);
967                    it->keylen = _hbtrie_reform_key(trie, key_buf, *keylen, it->curkey);
968                }
969                memcpy(value_buf, &offset, trie->valuelen);
970                hr = HBTRIE_RESULT_SUCCESS;
971            } else {
972                hr = _hbtrie_next(it, item_new, key_buf, keylen, value_buf, flag);
973            }
974            mempool_free(bmeta.data);
975            if (hr == HBTRIE_RESULT_SUCCESS) {
976                return hr;
977            }
978
979            // fail searching .. get back to parent tree
980            // (this happens when the initial key is greater than
981            // the largest key in the current tree (ITEM_NEW) ..
982            // so return back to ITEM and retrieve next child)
983            it->keylen = (item->chunkno+1) * trie->chunksize;
984
985        } else {
986            // MSB is not set -> doc
987            // read entire key and return the doc offset
988            offset = trie->btree_kv_ops->value2bid(v);
989            if (flag & HBTRIE_PARTIAL_MATCH) {
990                // return indexed key part only
991                *keylen = (item->chunkno+1) * trie->chunksize;
992                memcpy(key_buf, it->curkey, *keylen);
993            } else if (!(flag & HBTRIE_PREFIX_MATCH_ONLY)) {
994                // read entire key from doc's meta
995                *keylen = trie->readkey(trie->doc_handle, offset, key_buf);
996                it->keylen = _hbtrie_reform_key(trie, key_buf, *keylen, it->curkey);
997            }
998            memcpy(value_buf, &offset, trie->valuelen);
999
1000            return HBTRIE_RESULT_SUCCESS;
1001        }
1002    }
1003
1004    return hr;
1005}
1006
1007hbtrie_result hbtrie_next(struct hbtrie_iterator *it,
1008                          void *key_buf,
1009                          size_t *keylen,
1010                          void *value_buf)
1011{
1012    hbtrie_result hr;
1013
1014    if (HBTRIE_ITR_IS_FWD(it) && HBTRIE_ITR_IS_FAILED(it)) {
1015        return HBTRIE_RESULT_FAIL;
1016    }
1017
1018    struct list_elem *e = list_begin(&it->btreeit_list);
1019    struct btreeit_item *item = NULL;
1020    if (e) item = _get_entry(e, struct btreeit_item, le);
1021
1022    hr = _hbtrie_next(it, item, key_buf, keylen, value_buf, 0x0);
1023    HBTRIE_ITR_SET_FWD(it);
1024    if (hr == HBTRIE_RESULT_SUCCESS) {
1025        HBTRIE_ITR_CLR_FAILED(it);
1026        HBTRIE_ITR_SET_MOVED(it);
1027    } else {
1028        HBTRIE_ITR_SET_FAILED(it);
1029    }
1030    return hr;
1031}
1032
1033hbtrie_result hbtrie_next_partial(struct hbtrie_iterator *it,
1034                                  void *key_buf,
1035                                  size_t *keylen,
1036                                  void *value_buf)
1037{
1038    hbtrie_result hr;
1039
1040    if (HBTRIE_ITR_IS_FWD(it) && HBTRIE_ITR_IS_FAILED(it)) {
1041        return HBTRIE_RESULT_FAIL;
1042    }
1043
1044    struct list_elem *e = list_begin(&it->btreeit_list);
1045    struct btreeit_item *item = NULL;
1046    if (e) item = _get_entry(e, struct btreeit_item, le);
1047
1048    hr = _hbtrie_next(it, item, key_buf, keylen, value_buf, HBTRIE_PARTIAL_MATCH);
1049    HBTRIE_ITR_SET_FWD(it);
1050    if (hr == HBTRIE_RESULT_SUCCESS) {
1051        HBTRIE_ITR_CLR_FAILED(it);
1052        HBTRIE_ITR_SET_MOVED(it);
1053    } else {
1054        HBTRIE_ITR_SET_FAILED(it);
1055    }
1056    return hr;
1057}
1058
1059hbtrie_result hbtrie_next_value_only(struct hbtrie_iterator *it,
1060                                     void *value_buf)
1061{
1062    hbtrie_result hr;
1063
1064    if (it->curkey == NULL) return HBTRIE_RESULT_FAIL;
1065
1066    struct list_elem *e = list_begin(&it->btreeit_list);
1067    struct btreeit_item *item = NULL;
1068    if (e) item = _get_entry(e, struct btreeit_item, le);
1069
1070    hr = _hbtrie_next(it, item, NULL, 0, value_buf, HBTRIE_PREFIX_MATCH_ONLY);
1071    if (hr != HBTRIE_RESULT_SUCCESS) {
1072        // this iterator reaches the end of hb-trie
1073        free(it->curkey);
1074        it->curkey = NULL;
1075    }
1076    return hr;
1077}
1078
1079static void _hbtrie_free_btreelist(struct list *btreelist)
1080{
1081    struct btreelist_item *btreeitem;
1082    struct list_elem *e;
1083
1084    // free all items on list
1085    e = list_begin(btreelist);
1086    while(e) {
1087        btreeitem = _get_entry(e, struct btreelist_item, e);
1088        e = list_remove(btreelist, e);
1089        mempool_free(btreeitem);
1090    }
1091}
1092
1093static void _hbtrie_btree_cascaded_update(struct hbtrie *trie,
1094                                          struct list *btreelist,
1095                                          void *key,
1096                                          int free_opt)
1097{
1098    bid_t bid_new, _bid;
1099    struct btreelist_item *btreeitem, *btreeitem_child;
1100    struct list_elem *e, *e_child;
1101
1102    e = e_child = NULL;
1103
1104    //3 cascaded update of each b-tree from leaf to root
1105    e_child = list_end(btreelist);
1106    if (e_child) e = list_prev(e_child);
1107
1108    while(e && e_child) {
1109        btreeitem = _get_entry(e, struct btreelist_item, e);
1110        btreeitem_child = _get_entry(e_child, struct btreelist_item, e);
1111
1112        if (btreeitem->child_rootbid != btreeitem_child->btree.root_bid) {
1113            // root node of child sub-tree has been moved to another block
1114            // update parent sub-tree
1115            bid_new = btreeitem_child->btree.root_bid;
1116            _bid = _endian_encode(bid_new);
1117            _hbtrie_set_msb(trie, (void *)&_bid);
1118            btree_insert(&btreeitem->btree,
1119                    (uint8_t*)key + btreeitem->chunkno * trie->chunksize,
1120                    (void *)&_bid);
1121        }
1122        e_child = e;
1123        e = list_prev(e);
1124    }
1125
1126    // update trie root bid
1127    if (e) {
1128        btreeitem = _get_entry(e, struct btreelist_item, e);
1129        trie->root_bid = btreeitem->btree.root_bid;
1130    }else if (e_child) {
1131        btreeitem = _get_entry(e_child, struct btreelist_item, e);
1132        trie->root_bid = btreeitem->btree.root_bid;
1133    }else {
1134        fdb_assert(0, trie, e_child);
1135    }
1136
1137    if (free_opt) {
1138        _hbtrie_free_btreelist(btreelist);
1139    }
1140}
1141
1142static hbtrie_result _hbtrie_find(struct hbtrie *trie, void *key, int keylen,
1143                                  void *valuebuf, struct list *btreelist, uint8_t flag)
1144{
1145    int nchunk;
1146    int rawkeylen;
1147    int prevchunkno, curchunkno, cpt_node = 0;
1148    struct btree *btree = NULL;
1149    struct btree btree_static;
1150    btree_result r;
1151    struct hbtrie_meta hbmeta;
1152    struct btree_meta meta;
1153    struct btreelist_item *btreeitem = NULL;
1154    uint8_t *k = alca(uint8_t, trie->chunksize);
1155    uint8_t *buf = alca(uint8_t, trie->btree_nodesize);
1156    uint8_t *btree_value = alca(uint8_t, trie->valuelen);
1157    void *chunk = NULL;
1158    hbtrie_cmp_func *void_cmp;
1159    bid_t bid_new;
1160    nchunk = _get_nchunk(trie, key, keylen);
1161
1162    meta.data = buf;
1163    curchunkno = 0;
1164
1165    if (trie->map) { // custom cmp functions exist
1166        if (!memcmp(trie->last_map_chunk, key, trie->chunksize)) {
1167            // same custom function was used in the last call .. do nothing
1168        } else {
1169            // get cmp function corresponding to the key
1170            void_cmp = trie->map(key, (void *)trie);
1171            if (void_cmp) { // custom cmp function matches .. turn on leaf b+tree mode
1172                memcpy(trie->last_map_chunk, key, trie->chunksize);
1173                // set aux for _fdb_custom_cmp_wrap()
1174                trie->cmp_args.aux = void_cmp;
1175                trie->aux = &trie->cmp_args;
1176            }
1177        }
1178    }
1179
1180    if (btreelist) {
1181        list_init(btreelist);
1182        btreeitem = (struct btreelist_item *)mempool_alloc(sizeof(struct btreelist_item));
1183        list_push_back(btreelist, &btreeitem->e);
1184        btree = &btreeitem->btree;
1185    } else {
1186        btree = &btree_static;
1187    }
1188
1189    if (trie->root_bid == BLK_NOT_FOUND) {
1190        // retrieval fail
1191        return HBTRIE_RESULT_FAIL;
1192    } else {
1193        // read from root_bid
1194        r = btree_init_from_bid(btree, trie->btreeblk_handle, trie->btree_blk_ops,
1195                                trie->btree_kv_ops, trie->btree_nodesize,
1196                                trie->root_bid);
1197        if (r != BTREE_RESULT_SUCCESS) {
1198            return HBTRIE_RESULT_FAIL;
1199        }
1200        btree->aux = trie->aux;
1201        if (btree->ksize != trie->chunksize || btree->vsize != trie->valuelen) {
1202            if (((trie->chunksize << 4) | trie->valuelen) == btree->ksize) {
1203                // this is an old meta format
1204                return HBTRIE_RESULT_INDEX_VERSION_NOT_SUPPORTED;
1205            }
1206            // B+tree root node is corrupted.
1207            return HBTRIE_RESULT_INDEX_CORRUPTED;
1208        }
1209    }
1210
1211    while (curchunkno < nchunk) {
1212        // get current chunk number
1213        meta.size = btree_read_meta(btree, meta.data);
1214        _hbtrie_fetch_meta(trie, meta.size, &hbmeta, meta.data);
1215        prevchunkno = curchunkno;
1216        if (_is_leaf_btree(hbmeta.chunkno)) {
1217            cpt_node = 1;
1218            hbmeta.chunkno = _get_chunkno(hbmeta.chunkno);
1219            btree->kv_ops = trie->btree_leaf_kv_ops;
1220        }
1221        curchunkno = hbmeta.chunkno;
1222
1223        if (btreelist) {
1224            btreeitem->chunkno = curchunkno;
1225            btreeitem->leaf = cpt_node;
1226        }
1227
1228        //3 check whether there are skipped prefixes.
1229        if (curchunkno - prevchunkno > 1) {
1230            fdb_assert(hbmeta.prefix != NULL, hbmeta.prefix, trie);
1231            // prefix comparison (find the first different chunk)
1232            int diffchunkno = _hbtrie_find_diff_chunk(
1233                trie, hbmeta.prefix,
1234                (uint8_t*)key + trie->chunksize * (prevchunkno+1),
1235                0, curchunkno - (prevchunkno+1));
1236            if (diffchunkno < curchunkno - (prevchunkno+1)) {
1237                // prefix does not match -> retrieval fail
1238                return HBTRIE_RESULT_FAIL;
1239            }
1240        }
1241
1242        //3 search b-tree using current chunk (or postfix)
1243        rawkeylen = _hbtrie_reform_key_reverse(trie, key, keylen);
1244        if ((cpt_node && rawkeylen == curchunkno * trie->chunksize) ||
1245            (!cpt_node && nchunk == curchunkno)) {
1246            // KEY is exactly same as tree's prefix .. return value in metasection
1247            if (hbmeta.value && trie->valuelen > 0) {
1248                memcpy(valuebuf, hbmeta.value, trie->valuelen);
1249            }
1250            return HBTRIE_RESULT_SUCCESS;
1251        } else {
1252            chunk = (uint8_t*)key + curchunkno*trie->chunksize;
1253            if (cpt_node) {
1254                // leaf b-tree
1255                size_t rawchunklen =
1256                    _hbtrie_reform_key_reverse(trie, chunk,
1257                    (nchunk-curchunkno)*trie->chunksize);
1258
1259                _set_leaf_key(k, chunk, rawchunklen);
1260                r = btree_find(btree, k, btree_value);
1261                _free_leaf_key(k);
1262            } else {
1263                r = btree_find(btree, chunk, btree_value);
1264            }
1265        }
1266
1267        if (r == BTREE_RESULT_FAIL) {
1268            // retrieval fail
1269            return HBTRIE_RESULT_FAIL;
1270        } else {
1271            // same chunk exists
1272            if (flag & HBTRIE_PARTIAL_MATCH &&
1273                curchunkno + 1 == nchunk - 1) {
1274                // partial match mode & the last meaningful chunk
1275                // return btree value
1276                memcpy(valuebuf, btree_value, trie->valuelen);
1277                return HBTRIE_RESULT_SUCCESS;
1278            }
1279
1280            // check whether the value points to sub-tree or document
1281            // check MSB
1282            if (_hbtrie_is_msb_set(trie, btree_value)) {
1283                // this is BID of b-tree node (by clearing MSB)
1284                _hbtrie_clear_msb(trie, btree_value);
1285                bid_new = trie->btree_kv_ops->value2bid(btree_value);
1286                bid_new = _endian_decode(bid_new);
1287
1288                if (btreelist) {
1289                    btreeitem->child_rootbid = bid_new;
1290                    btreeitem = (struct btreelist_item *)
1291                                mempool_alloc(sizeof(struct btreelist_item));
1292                    list_push_back(btreelist, &btreeitem->e);
1293                    btree = &btreeitem->btree;
1294                }
1295
1296                // fetch sub-tree
1297                r = btree_init_from_bid(btree, trie->btreeblk_handle, trie->btree_blk_ops,
1298                                        trie->btree_kv_ops, trie->btree_nodesize, bid_new);
1299                if (r != BTREE_RESULT_SUCCESS) {
1300                    return HBTRIE_RESULT_FAIL;
1301                }
1302                btree->aux = trie->aux;
1303            } else {
1304                // this is offset of document (as it is)
1305                // read entire key
1306                uint8_t *docrawkey = alca(uint8_t, HBTRIE_MAX_KEYLEN);
1307                uint8_t *dockey = alca(uint8_t, HBTRIE_MAX_KEYLEN);
1308                uint32_t docrawkeylen, dockeylen;
1309                uint64_t offset;
1310                int docnchunk, diffchunkno;
1311
1312                // get offset value from btree_value
1313                offset = trie->btree_kv_ops->value2bid(btree_value);
1314                if (!(flag & HBTRIE_PREFIX_MATCH_ONLY)) {
1315                    // read entire key
1316                    docrawkeylen = trie->readkey(trie->doc_handle, offset, docrawkey);
1317                    dockeylen = _hbtrie_reform_key(trie, docrawkey, docrawkeylen, dockey);
1318
1319                    // find first different chunk
1320                    docnchunk = _get_nchunk(trie, dockey, dockeylen);
1321
1322                    if (docnchunk == nchunk) {
1323                        diffchunkno = _hbtrie_find_diff_chunk(trie, key,
1324                                            dockey, curchunkno, nchunk);
1325                        if (diffchunkno == nchunk) {
1326                            // success
1327                            memcpy(valuebuf, btree_value, trie->valuelen);
1328                            return HBTRIE_RESULT_SUCCESS;
1329                        }
1330                    }
1331                    return HBTRIE_RESULT_FAIL;
1332                } else {
1333                    // just return value
1334                    memcpy(valuebuf, btree_value, trie->valuelen);
1335                    return HBTRIE_RESULT_SUCCESS;
1336                }
1337            }
1338        }
1339    }
1340
1341    return HBTRIE_RESULT_FAIL;
1342}
1343
1344hbtrie_result hbtrie_find(struct hbtrie *trie, void *rawkey,
1345                          int rawkeylen, void *valuebuf)
1346{
1347    int nchunk = _get_nchunk_raw(trie, rawkey, rawkeylen);
1348    uint8_t *key = alca(uint8_t, nchunk * trie->chunksize);
1349    int keylen;
1350
1351    keylen = _hbtrie_reform_key(trie, rawkey, rawkeylen, key);
1352    return _hbtrie_find(trie, key, keylen, valuebuf, NULL, 0x0);
1353}
1354
1355hbtrie_result hbtrie_find_offset(struct hbtrie *trie, void *rawkey,
1356                                 int rawkeylen, void *valuebuf)
1357{
1358    int nchunk = _get_nchunk_raw(trie, rawkey, rawkeylen);
1359    uint8_t *key = alca(uint8_t, nchunk * trie->chunksize);
1360    int keylen;
1361
1362    keylen = _hbtrie_reform_key(trie, rawkey, rawkeylen, key);
1363    return _hbtrie_find(trie, key, keylen, valuebuf, NULL,
1364                        HBTRIE_PREFIX_MATCH_ONLY);
1365}
1366
1367hbtrie_result hbtrie_find_partial(struct hbtrie *trie, void *rawkey,
1368                                  int rawkeylen, void *valuebuf)
1369{
1370    int nchunk = _get_nchunk_raw(trie, rawkey, rawkeylen);
1371    uint8_t *key = alca(uint8_t, nchunk * trie->chunksize);
1372    int keylen;
1373
1374    keylen = _hbtrie_reform_key(trie, rawkey, rawkeylen, key);
1375    return _hbtrie_find(trie, key, keylen, valuebuf, NULL,
1376                        HBTRIE_PARTIAL_MATCH);
1377}
1378
1379INLINE hbtrie_result _hbtrie_remove(struct hbtrie *trie,
1380                                    void *rawkey, int rawkeylen,
1381                                    uint8_t flag)
1382{
1383    int nchunk = _get_nchunk_raw(trie, rawkey, rawkeylen);
1384    int keylen;
1385    uint8_t *key = alca(uint8_t, nchunk * trie->chunksize);
1386    uint8_t *valuebuf = alca(uint8_t, trie->valuelen);
1387    hbtrie_result r;
1388    btree_result br = BTREE_RESULT_SUCCESS;
1389    struct list btreelist;
1390    struct btreelist_item *btreeitem;
1391    struct list_elem *e;
1392
1393    keylen = _hbtrie_reform_key(trie, rawkey, rawkeylen, key);
1394
1395    r = _hbtrie_find(trie, key, keylen, valuebuf, &btreelist, flag);
1396
1397    if (r == HBTRIE_RESULT_SUCCESS) {
1398        e = list_end(&btreelist);
1399        fdb_assert(e, trie, flag);
1400
1401        btreeitem = _get_entry(e, struct btreelist_item, e);
1402        if (btreeitem &&
1403            ((btreeitem->leaf && rawkeylen == btreeitem->chunkno * trie->chunksize) ||
1404             (!(btreeitem->leaf) && nchunk == btreeitem->chunkno)) ) {
1405            // key is exactly same as b-tree's prefix .. remove from metasection
1406            struct hbtrie_meta hbmeta;
1407            struct btree_meta meta;
1408            hbmeta_opt opt;
1409            uint8_t *buf = alca(uint8_t, trie->btree_nodesize);
1410
1411            meta.data = buf;
1412            meta.size = btree_read_meta(&btreeitem->btree, meta.data);
1413            _hbtrie_fetch_meta(trie, meta.size, &hbmeta, meta.data);
1414
1415            opt = (_is_leaf_btree(hbmeta.chunkno))?(HBMETA_LEAF):(HBMETA_NORMAL);
1416
1417            // remove value from metasection
1418            _hbtrie_store_meta(
1419                    trie, &meta.size, _get_chunkno(hbmeta.chunkno), opt,
1420                    hbmeta.prefix, hbmeta.prefix_len, NULL, buf);
1421            btree_update_meta(&btreeitem->btree, &meta);
1422        } else {
1423            if (btreeitem && btreeitem->leaf) {
1424                // leaf b-tree
1425                uint8_t *k = alca(uint8_t, trie->chunksize);
1426                _set_leaf_key(k, key + btreeitem->chunkno * trie->chunksize,
1427                    rawkeylen - btreeitem->chunkno * trie->chunksize);
1428                br = btree_remove(&btreeitem->btree, k);
1429                _free_leaf_key(k);
1430            } else if (btreeitem) {
1431                // normal b-tree
1432                br = btree_remove(&btreeitem->btree,
1433                                  key + trie->chunksize * btreeitem->chunkno);
1434            }
1435            if (br == BTREE_RESULT_FAIL) {
1436                r = HBTRIE_RESULT_FAIL;
1437            }
1438        }
1439        _hbtrie_btree_cascaded_update(trie, &btreelist, key, 1);
1440    } else {
1441        // key (to be removed) not found
1442        // no update occurred .. we don't need to update b-trees on the path
1443        // just free the btreelist
1444        _hbtrie_free_btreelist(&btreelist);
1445    }
1446
1447    return r;
1448}
1449
1450hbtrie_result hbtrie_remove(struct hbtrie *trie,
1451                            void *rawkey,
1452                            int rawkeylen)
1453{
1454    return _hbtrie_remove(trie, rawkey, rawkeylen, 0x0);
1455}
1456
1457hbtrie_result hbtrie_remove_partial(struct hbtrie *trie,
1458                                    void *rawkey,
1459                                    int rawkeylen)
1460{
1461    return _hbtrie_remove(trie, rawkey, rawkeylen,
1462                          HBTRIE_PARTIAL_MATCH);
1463}
1464
1465struct _key_item {
1466    size_t keylen;
1467    void *key;
1468    void *value;
1469    struct list_elem le;
1470};
1471
1472static void _hbtrie_extend_leaf_tree(struct hbtrie *trie,
1473                                     struct list *btreelist,
1474                                     struct btreelist_item *btreeitem,
1475                                     void *pre_str,
1476                                     size_t pre_str_len)
1477{
1478    struct list keys;
1479    struct list_elem *e;
1480    struct _key_item *item, *smallest = NULL;
1481    struct btree_iterator it;
1482    struct btree new_btree;
1483    struct btree_meta meta;
1484    struct hbtrie_meta hbmeta;
1485    btree_result br;
1486    void *prefix = NULL, *meta_value = NULL;
1487    uint8_t *key_str = alca(uint8_t, HBTRIE_MAX_KEYLEN);
1488    uint8_t *key_buf = alca(uint8_t, trie->chunksize);
1489    uint8_t *value_buf = alca(uint8_t, trie->valuelen);
1490    uint8_t *buf = alca(uint8_t, trie->btree_nodesize);
1491    size_t keylen, minchunkno = 0, chunksize;
1492
1493    chunksize = trie->chunksize;
1494
1495    // fetch metadata
1496    meta.data = buf;
1497    meta.size = btree_read_meta(&btreeitem->btree, meta.data);
1498    _hbtrie_fetch_meta(trie, meta.size, &hbmeta, meta.data);
1499
1500    // scan all keys
1501    list_init(&keys);
1502    memset(key_buf, 0, chunksize);
1503    minchunkno = 0;
1504
1505    br = btree_iterator_init(&btreeitem->btree, &it, NULL);
1506    while (br == BTREE_RESULT_SUCCESS) {
1507        // get key
1508        if ((br = btree_next(&it, key_buf, value_buf)) == BTREE_RESULT_FAIL) {
1509            break;
1510        }
1511
1512        _get_leaf_key(key_buf, key_str, &keylen);
1513        _free_leaf_key(key_buf);
1514
1515        // insert into list
1516        item = (struct _key_item *)malloc(sizeof(struct _key_item));
1517
1518        item->key = (void *)malloc(keylen);
1519        item->keylen = keylen;
1520        memcpy(item->key, key_str, keylen);
1521
1522        item->value = (void *)malloc(trie->valuelen);
1523        memcpy(item->value, value_buf, trie->valuelen);
1524
1525        list_push_back(&keys, &item->le);
1526
1527        if (hbmeta.value == NULL) {
1528            // check common prefix
1529            if (prefix == NULL) {
1530                // initialize
1531                prefix = item->key;
1532                minchunkno = _l2c(trie, item->keylen);
1533            } else {
1534                // update the length of common prefix
1535                minchunkno = _hbtrie_find_diff_chunk(
1536                    trie, prefix, item->key, 0,
1537                    MIN(_l2c(trie, item->keylen), minchunkno));
1538            }
1539
1540            // update smallest (shortest) key
1541            if (smallest == NULL) {
1542                smallest = item;
1543            } else {
1544                if (item->keylen < smallest->keylen)
1545                    smallest = item;
1546            }
1547        }
1548    }
1549    btree_iterator_free(&it);
1550
1551    // construct new (non-leaf) b-tree
1552    if (hbmeta.value) {
1553        // insert tree's prefix into the list
1554        item = (struct _key_item *)malloc(sizeof(struct _key_item));
1555
1556        item->key = NULL;
1557        item->keylen = 0;
1558
1559        item->value = (void *)malloc(trie->valuelen);
1560        memcpy(item->value, hbmeta.value, trie->valuelen);
1561
1562        list_push_back(&keys, &item->le);
1563
1564        meta_value = smallest = NULL;
1565    } else {
1566        if (smallest) {
1567            if (minchunkno > 0 &&
1568               (size_t) _get_nchunk_raw(trie, smallest->key, smallest->keylen) ==
1569                minchunkno) {
1570                meta_value = smallest->value;
1571            } else {
1572                smallest = NULL;
1573            }
1574        }
1575    }
1576    _hbtrie_store_meta(
1577            trie, &meta.size, _get_chunkno(hbmeta.chunkno) + minchunkno,
1578            HBMETA_NORMAL, prefix, minchunkno * chunksize, meta_value, buf);
1579
1580    btree_init(&new_btree, trie->btreeblk_handle, trie->btree_blk_ops,
1581        trie->btree_kv_ops, trie->btree_nodesize, chunksize, trie->valuelen,
1582        0x0, &meta);
1583    new_btree.aux = trie->aux;
1584
1585    // reset BTREEITEM
1586    btreeitem->btree = new_btree;
1587    btreeitem->chunkno = _get_chunkno(hbmeta.chunkno) + minchunkno;
1588    btreeitem->leaf = 0;
1589
1590    _hbtrie_btree_cascaded_update(trie, btreelist, pre_str, 1);
1591
1592    // insert all keys
1593    memcpy(key_str, pre_str, pre_str_len);
1594    e = list_begin(&keys);
1595    while (e) {
1596        item = _get_entry(e, struct _key_item, le);
1597        if (item != smallest) {
1598            if (item->keylen > 0) {
1599                memcpy(key_str + pre_str_len, item->key, item->keylen);
1600            }
1601            hbtrie_insert(trie, key_str, pre_str_len + item->keylen,
1602                item->value, value_buf);
1603        }
1604
1605        e = list_remove(&keys, e);
1606        if (item->key) {
1607            free(item->key);
1608        }
1609        free(item->value);
1610        free(item);
1611    }
1612
1613}
1614
1615// suppose that VALUE and OLDVALUE_OUT are based on the same endian in hb+trie
1616#define HBTRIE_PARTIAL_UPDATE (0x1)
1617INLINE hbtrie_result _hbtrie_insert(struct hbtrie *trie,
1618                                    void *rawkey, int rawkeylen,
1619                                    void *value, void *oldvalue_out,
1620                                    uint8_t flag)
1621{
1622    /*
1623    <insertion cases>
1624    1. normal insert: there is no creation of new b-tree
1625    2. replacing doc by new b-tree: a doc (which has same prefix) already exists
1626        2-1. a new b-tree that has file offset to a doc in its metadata
1627             is created, and the other doc is inserted into the tree
1628        2-2. two docs are inserted into the new b-tree
1629    3. create new b-tree between existing b-trees: when prefix mismatches
1630    */
1631
1632    int nchunk;
1633    int keylen;
1634    int prevchunkno, curchunkno;
1635    int cpt_node = 0;
1636    int leaf_cond = 0;
1637    uint8_t *k = alca(uint8_t, trie->chunksize);
1638
1639    struct list btreelist;
1640    //struct btree btree, btree_new;
1641    struct btreelist_item *btreeitem, *btreeitem_new;
1642    hbtrie_result ret_result = HBTRIE_RESULT_SUCCESS;
1643    btree_result r;
1644    struct btree_kv_ops *kv_ops;
1645
1646    struct hbtrie_meta hbmeta;
1647    struct btree_meta meta;
1648    hbmeta_opt opt;
1649    hbtrie_cmp_func *void_cmp;
1650
1651    nchunk = _get_nchunk_raw(trie, rawkey, rawkeylen);
1652
1653    uint8_t *key = alca(uint8_t, nchunk * trie->chunksize);
1654    uint8_t *buf = alca(uint8_t, trie->btree_nodesize);
1655    uint8_t *btree_value = alca(uint8_t, trie->valuelen);
1656    void *chunk, *chunk_new;
1657    bid_t bid_new, _bid;
1658
1659    meta.data = buf;
1660    curchunkno = 0;
1661    keylen = _hbtrie_reform_key(trie, rawkey, rawkeylen, key);
1662    (void)keylen;
1663
1664    if (trie->map) { // custom cmp functions exist
1665        if (!memcmp(trie->last_map_chunk, key, trie->chunksize)) {
1666            // same custom function was used in the last call .. leaf b+tree
1667            leaf_cond = 1;
1668        } else {
1669            // get cmp function corresponding to the key
1670            void_cmp = trie->map(key, (void *)trie);
1671            if (void_cmp) {
1672                // custom cmp function matches .. turn on leaf b+tree mode
1673                leaf_cond = 1;
1674                memcpy(trie->last_map_chunk, key, trie->chunksize);
1675                // set aux for _fdb_custom_cmp_wrap()
1676                trie->cmp_args.aux = void_cmp;
1677                trie->aux = &trie->cmp_args;
1678            }
1679        }
1680    }
1681
1682    list_init(&btreelist);
1683    // btreeitem for root btree
1684    btreeitem = (struct btreelist_item*)
1685                mempool_alloc(sizeof(struct btreelist_item));
1686    list_push_back(&btreelist, &btreeitem->e);
1687
1688    if (trie->root_bid == BLK_NOT_FOUND) {
1689        // create root b-tree
1690        _hbtrie_store_meta(trie, &meta.size, 0, HBMETA_NORMAL,
1691                           NULL, 0, NULL, buf);
1692        r = btree_init(&btreeitem->btree, trie->btreeblk_handle,
1693                       trie->btree_blk_ops, trie->btree_kv_ops,
1694                       trie->btree_nodesize, trie->chunksize,
1695                       trie->valuelen, 0x0, &meta);
1696        if (r != BTREE_RESULT_SUCCESS) {
1697            return HBTRIE_RESULT_FAIL;
1698        }
1699    } else {
1700        // read from root_bid
1701        r = btree_init_from_bid(&btreeitem->btree, trie->btreeblk_handle,
1702                                trie->btree_blk_ops, trie->btree_kv_ops,
1703                                trie->btree_nodesize, trie->root_bid);
1704        if (r != BTREE_RESULT_SUCCESS) {
1705            return HBTRIE_RESULT_FAIL;
1706        }
1707        if (btreeitem->btree.ksize != trie->chunksize ||
1708            btreeitem->btree.vsize != trie->valuelen) {
1709            if (((trie->chunksize << 4) | trie->valuelen) == btreeitem->btree.ksize) {
1710                // this is an old meta format
1711                return HBTRIE_RESULT_INDEX_VERSION_NOT_SUPPORTED;
1712            }
1713            // B+tree root node is corrupted.
1714            return HBTRIE_RESULT_INDEX_CORRUPTED;
1715        }
1716    }
1717    btreeitem->btree.aux = trie->aux;
1718
1719    // set 'oldvalue_out' to 0xff..
1720    if (oldvalue_out) {
1721        memset(oldvalue_out, 0xff, trie->valuelen);
1722    }
1723
1724    while(curchunkno < nchunk){
1725        // get current chunk number
1726        meta.size = btree_read_meta(&btreeitem->btree, meta.data);
1727        _hbtrie_fetch_meta(trie, meta.size, &hbmeta, meta.data);
1728        prevchunkno = curchunkno;
1729        if (_is_leaf_btree(hbmeta.chunkno)) {
1730            cpt_node = 1;
1731            hbmeta.chunkno = _get_chunkno(hbmeta.chunkno);
1732            btreeitem->btree.kv_ops = trie->btree_leaf_kv_ops;
1733        }
1734        btreeitem->chunkno = curchunkno = hbmeta.chunkno;
1735
1736        //3 check whether there is skipped prefix
1737        if (curchunkno - prevchunkno > 1) {
1738            // prefix comparison (find the first different chunk)
1739            int diffchunkno = _hbtrie_find_diff_chunk(trie, hbmeta.prefix,
1740                                  key + trie->chunksize * (prevchunkno+1),
1741                                  0, curchunkno - (prevchunkno+1));
1742            if (diffchunkno < curchunkno - (prevchunkno+1)) {
1743                //3 3. create sub-tree between parent and child tree
1744
1745                // metadata (prefix) update in btreeitem->btree
1746                int new_prefixlen = trie->chunksize *
1747                                    (curchunkno - (prevchunkno+1) -
1748                                        (diffchunkno+1));
1749                // backup old prefix
1750                int old_prefixlen = hbmeta.prefix_len;
1751                uint8_t *old_prefix = alca(uint8_t, old_prefixlen);
1752                memcpy(old_prefix, hbmeta.prefix, old_prefixlen);
1753
1754                if (new_prefixlen > 0) {
1755                    uint8_t *new_prefix = alca(uint8_t, new_prefixlen);
1756                    memcpy(new_prefix,
1757                           (uint8_t*)hbmeta.prefix +
1758                               trie->chunksize * (diffchunkno + 1),
1759                           new_prefixlen);
1760                    _hbtrie_store_meta(trie, &meta.size, curchunkno,
1761                                       HBMETA_NORMAL, new_prefix,
1762                                       new_prefixlen, hbmeta.value, buf);
1763                } else {
1764                    _hbtrie_store_meta(trie, &meta.size, curchunkno,
1765                                       HBMETA_NORMAL, NULL, 0,
1766                                       hbmeta.value, buf);
1767                }
1768                // update metadata for old b-tree
1769                btree_update_meta(&btreeitem->btree, &meta);
1770
1771                // split prefix and create new sub-tree
1772                _hbtrie_store_meta(trie, &meta.size,
1773                                   prevchunkno + diffchunkno + 1,
1774                                   HBMETA_NORMAL, old_prefix,
1775                                   diffchunkno * trie->chunksize, NULL, buf);
1776
1777                // create new b-tree
1778                btreeitem_new = (struct btreelist_item *)
1779                                mempool_alloc(sizeof(struct btreelist_item));
1780                btreeitem_new->chunkno = prevchunkno + diffchunkno + 1;
1781                r = btree_init(&btreeitem_new->btree, trie->btreeblk_handle,
1782                               trie->btree_blk_ops, trie->btree_kv_ops,
1783                               trie->btree_nodesize, trie->chunksize,
1784                               trie->valuelen, 0x0, &meta);
1785                if (r != BTREE_RESULT_SUCCESS) {
1786                    return HBTRIE_RESULT_FAIL;
1787                }
1788                btreeitem_new->btree.aux = trie->aux;
1789                list_insert_before(&btreelist, &btreeitem->e,
1790                                   &btreeitem_new->e);
1791
1792                // insert chunk for 'key'
1793                chunk_new = key + (prevchunkno + diffchunkno + 1) *
1794                                  trie->chunksize;
1795                r = btree_insert(&btreeitem_new->btree, chunk_new, value);
1796                if (r == BTREE_RESULT_FAIL) {
1797                    return HBTRIE_RESULT_FAIL;
1798                }
1799                // insert chunk for existing btree
1800                chunk_new = (uint8_t*)old_prefix + diffchunkno *
1801                                                   trie->chunksize;
1802                bid_new = btreeitem->btree.root_bid;
1803                btreeitem_new->child_rootbid = bid_new;
1804                // set MSB
1805                _bid = _endian_encode(bid_new);
1806                _hbtrie_set_msb(trie, (void*)&_bid);
1807                r = btree_insert(&btreeitem_new->btree,
1808                                 chunk_new, (void*)&_bid);
1809                if (r == BTREE_RESULT_FAIL) {
1810                    return HBTRIE_RESULT_FAIL;
1811                }
1812
1813                break;
1814            }
1815        }
1816
1817        //3 search b-tree using current chunk
1818        if ((cpt_node && rawkeylen == curchunkno * trie->chunksize) ||
1819            (!cpt_node && nchunk == curchunkno)) {
1820            // KEY is exactly same as tree's prefix .. insert into metasection
1821            _hbtrie_store_meta(trie, &meta.size, curchunkno,
1822                               (cpt_node)?(HBMETA_LEAF):(HBMETA_NORMAL),
1823                               hbmeta.prefix,
1824                               (curchunkno-prevchunkno - 1) * trie->chunksize,
1825                               value, buf);
1826            btree_update_meta(&btreeitem->btree, &meta);
1827            break;
1828        } else {
1829            chunk = key + curchunkno*trie->chunksize;
1830            if (cpt_node) {
1831                // leaf b-tree
1832                _set_leaf_key(k, chunk,
1833                              rawkeylen - curchunkno*trie->chunksize);
1834                r = btree_find(&btreeitem->btree, k, btree_value);
1835                _free_leaf_key(k);
1836            } else {
1837                r = btree_find(&btreeitem->btree, chunk, btree_value);
1838            }
1839        }
1840
1841        if (r == BTREE_RESULT_FAIL) {
1842            //3 1. normal insert: same chunk does not exist -> just insert
1843            if (flag & HBTRIE_PARTIAL_UPDATE) {
1844                // partial update doesn't allow inserting a new key
1845                ret_result = HBTRIE_RESULT_FAIL;
1846                break; // while loop
1847            }
1848
1849            if (cpt_node) {
1850                // leaf b-tree
1851                _set_leaf_key(k, chunk,
1852                              rawkeylen - curchunkno*trie->chunksize);
1853                r = btree_insert(&btreeitem->btree, k, value);
1854                if (r == BTREE_RESULT_FAIL) {
1855                    _free_leaf_key(k);
1856                    ret_result = HBTRIE_RESULT_FAIL;
1857                    break; // while loop
1858                }
1859                _free_leaf_key(k);
1860
1861                if (btreeitem->btree.height > trie->leaf_height_limit) {
1862                    // height growth .. extend!
1863                    _hbtrie_extend_leaf_tree(trie, &btreelist, btreeitem,
1864                        key, curchunkno * trie->chunksize);
1865                    return ret_result;
1866                }
1867            } else {
1868                r = btree_insert(&btreeitem->btree, chunk, value);
1869                if (r == BTREE_RESULT_FAIL) {
1870                    ret_result = HBTRIE_RESULT_FAIL;
1871                }
1872            }
1873            break; // while loop
1874        }
1875
1876        // same chunk already exists
1877        if (flag & HBTRIE_PARTIAL_UPDATE &&
1878            curchunkno + 1 == nchunk - 1) {
1879            // partial update mode & the last meaningful chunk
1880            // update the local btree value
1881            if (oldvalue_out) {
1882                memcpy(oldvalue_out, btree_value, trie->valuelen);
1883            }
1884            // assume that always normal b-tree
1885            r = btree_insert(&btreeitem->btree, chunk, value);
1886            if (r == BTREE_RESULT_FAIL) {
1887                ret_result = HBTRIE_RESULT_FAIL;
1888            } else {
1889                ret_result = HBTRIE_RESULT_SUCCESS;
1890            }
1891            break;
1892        }
1893
1894        // check whether the value points to sub-tree or document
1895        // check MSB
1896        if (_hbtrie_is_msb_set(trie, btree_value)) {
1897            // this is BID of b-tree node (by clearing MSB)
1898            _hbtrie_clear_msb(trie, btree_value);
1899            bid_new = trie->btree_kv_ops->value2bid(btree_value);
1900            bid_new = _endian_decode(bid_new);
1901            btreeitem->child_rootbid = bid_new;
1902            //3 traverse to the sub-tree
1903            // fetch sub-tree
1904            btreeitem = (struct btreelist_item*)
1905                        mempool_alloc(sizeof(struct btreelist_item));
1906
1907            r = btree_init_from_bid(&btreeitem->btree,
1908                                    trie->btreeblk_handle,
1909                                    trie->btree_blk_ops,
1910                                    trie->btree_kv_ops,
1911                                    trie->btree_nodesize, bid_new);
1912            if (r == BTREE_RESULT_FAIL) {
1913                ret_result = HBTRIE_RESULT_FAIL;
1914            }
1915            btreeitem->btree.aux = trie->aux;
1916            list_push_back(&btreelist, &btreeitem->e);
1917            continue;
1918        }
1919
1920        // this is offset of document (as it is)
1921        // create new sub-tree
1922
1923        // read entire key
1924        uint8_t *docrawkey = alca(uint8_t, HBTRIE_MAX_KEYLEN);
1925        uint8_t *dockey = alca(uint8_t, HBTRIE_MAX_KEYLEN);
1926        uint32_t docrawkeylen, dockeylen, minrawkeylen;
1927        uint64_t offset;
1928        int docnchunk, minchunkno, newchunkno, diffchunkno;
1929
1930        // get offset value from btree_value
1931        offset = trie->btree_kv_ops->value2bid(btree_value);
1932
1933        // read entire key
1934        docrawkeylen = trie->readkey(trie->doc_handle, offset, docrawkey);
1935        dockeylen = _hbtrie_reform_key(trie, docrawkey, docrawkeylen, dockey);
1936
1937        // find first different chunk
1938        docnchunk = _get_nchunk(trie, dockey, dockeylen);
1939
1940        if (trie->flag & HBTRIE_FLAG_COMPACT || leaf_cond) {
1941            // optimization mode
1942            // Note: custom cmp function doesn't support key
1943            //       longer than a block size.
1944
1945            // newchunkno doesn't matter to leaf B+tree,
1946            // since leaf B+tree can't create sub-tree.
1947            newchunkno = curchunkno+1;
1948            minchunkno = MIN(_l2c(trie, rawkeylen),
1949                             _l2c(trie, (int)docrawkeylen));
1950            minrawkeylen = MIN(rawkeylen, (int)docrawkeylen);
1951
1952            if (curchunkno == 0) {
1953                // root B+tree
1954                diffchunkno = _hbtrie_find_diff_chunk(trie, rawkey, docrawkey,
1955                    curchunkno, minchunkno -
1956                                ((minrawkeylen%trie->chunksize == 0)?(0):(1)));
1957                if (rawkeylen == (int)docrawkeylen && diffchunkno+1 == minchunkno) {
1958                    if (!memcmp(rawkey, docrawkey, rawkeylen)) {
1959                        // same key
1960                        diffchunkno = minchunkno;
1961                    }
1962                }
1963            } else {
1964                // diffchunkno also doesn't matter to leaf B+tree,
1965                // since leaf B+tree is not based on a lexicographical key order.
1966                // Hence, we set diffchunkno to minchunkno iff two keys are
1967                // identified as the same key by the custom compare function.
1968                // Otherwise, diffchunkno is always set to curchunkno.
1969                uint8_t *k_doc = alca(uint8_t, trie->chunksize);
1970                _set_leaf_key(k, chunk,
1971                              rawkeylen - curchunkno*trie->chunksize);
1972                _set_leaf_key(k_doc, (uint8_t*)docrawkey + curchunkno*trie->chunksize,
1973                              docrawkeylen - curchunkno*trie->chunksize);
1974                if (trie->btree_leaf_kv_ops->cmp(k, k_doc, trie->aux) == 0) {
1975                    // same key
1976                    diffchunkno = minchunkno;
1977                    docnchunk = nchunk;
1978                } else {
1979                    // different key
1980                    diffchunkno = curchunkno;
1981                }
1982                _free_leaf_key(k);
1983                _free_leaf_key(k_doc);
1984            }
1985            opt = HBMETA_LEAF;
1986            kv_ops = trie->btree_leaf_kv_ops;
1987        } else {
1988            // original mode
1989            minchunkno = MIN(nchunk, docnchunk);
1990            newchunkno = diffchunkno =
1991                _hbtrie_find_diff_chunk(trie, key, dockey,
1992                                        curchunkno, minchunkno);
1993            opt = HBMETA_NORMAL;
1994            kv_ops = trie->btree_kv_ops;
1995        }
1996
1997        // one key is substring of the other key
1998        if (minchunkno == diffchunkno && docnchunk == nchunk) {
1999            //3 same key!! .. update the value
2000            if (oldvalue_out) {
2001                memcpy(oldvalue_out, btree_value, trie->valuelen);
2002            }
2003            if (cpt_node) {
2004                // leaf b-tree
2005                _set_leaf_key(k, chunk,
2006                              rawkeylen - curchunkno*trie->chunksize);
2007                r = btree_insert(&btreeitem->btree, k, value);
2008                _free_leaf_key(k);
2009            } else {
2010                // normal b-tree
2011                r = btree_insert(&btreeitem->btree, chunk, value);
2012            }
2013            if (r == BTREE_RESULT_FAIL) {
2014                ret_result = HBTRIE_RESULT_FAIL;
2015            } else {
2016                ret_result = HBTRIE_RESULT_SUCCESS;
2017            }
2018            break;
2019        }
2020
2021        // different key
2022        while (trie->btree_nodesize > HBTRIE_HEADROOM &&
2023               (newchunkno - curchunkno) * trie->chunksize >
2024                   (int)trie->btree_nodesize - HBTRIE_HEADROOM) {
2025            // prefix is too long .. we have to split it
2026            fdb_assert(opt == HBMETA_NORMAL, opt, trie);
2027            int midchunkno;
2028            midchunkno = curchunkno +
2029                        (trie->btree_nodesize - HBTRIE_HEADROOM) /
2030                            trie->chunksize;
2031            _hbtrie_store_meta(trie, &meta.size, midchunkno, opt,
2032                               key + trie->chunksize * (curchunkno+1),
2033                               (midchunkno - (curchunkno+1)) *
2034                                   trie->chunksize,
2035                               NULL, buf);
2036
2037            btreeitem_new = (struct btreelist_item *)
2038                            mempool_alloc(sizeof(struct btreelist_item));
2039            btreeitem_new->chunkno = midchunkno;
2040            r = btree_init(&btreeitem_new->btree,
2041                           trie->btreeblk_handle,
2042                           trie->btree_blk_ops, kv_ops,
2043                           trie->btree_nodesize, trie->chunksize,
2044                           trie->valuelen, 0x0, &meta);
2045            if (r == BTREE_RESULT_FAIL) {
2046                return HBTRIE_RESULT_FAIL;
2047            }
2048            btreeitem_new->btree.aux = trie->aux;
2049            btreeitem_new->child_rootbid = BLK_NOT_FOUND;
2050            list_push_back(&btreelist, &btreeitem_new->e);
2051
2052            // insert new btree's bid into the previous btree
2053            bid_new = btreeitem_new->btree.root_bid;
2054            btreeitem->child_rootbid = bid_new;
2055            _bid = _endian_encode(bid_new);
2056            _hbtrie_set_msb(trie, (void *)&_bid);
2057            r = btree_insert(&btreeitem->btree, chunk, &_bid);
2058            if (r == BTREE_RESULT_FAIL) {
2059                ret_result = HBTRIE_RESULT_FAIL;
2060                break;
2061            }
2062
2063            // switch & go to the next tree
2064            chunk = (uint8_t*)key +
2065                    midchunkno * trie->chunksize;
2066            curchunkno = midchunkno;
2067            btreeitem = btreeitem_new;
2068        }
2069        if (ret_result != HBTRIE_RESULT_SUCCESS) {
2070            break;
2071        }
2072
2073        if (minchunkno == diffchunkno && minchunkno == newchunkno) {
2074            //3 2-1. create sub-tree
2075            // that containing file offset of one doc (sub-string)
2076            // in its meta section, and insert the other doc
2077            // (super-string) into the tree
2078
2079            void *key_long, *value_long;
2080            void *key_short, *value_short;
2081            size_t nchunk_long, rawkeylen_long;
2082
2083            if (docnchunk < nchunk) {
2084                // dockey is substring of key
2085                key_short = dockey;
2086                value_short = btree_value;
2087
2088                key_long = key;
2089                value_long = value;
2090
2091                nchunk_long = nchunk;
2092                rawkeylen_long = rawkeylen;
2093            } else {
2094                // key is substring of dockey
2095                key_short = key;
2096                value_short = value;
2097
2098                key_long = dockey;
2099                value_long = btree_value;
2100
2101                nchunk_long = docnchunk;
2102                rawkeylen_long = docrawkeylen;
2103            }
2104            (void)key_short;
2105            (void)nchunk_long;
2106
2107            _hbtrie_store_meta(trie, &meta.size, newchunkno, opt,
2108                               key + trie->chunksize * (curchunkno+1),
2109                               (newchunkno - (curchunkno+1)) *
2110                                   trie->chunksize,
2111                               value_short, buf);
2112
2113            btreeitem_new = (struct btreelist_item *)
2114                            mempool_alloc(sizeof(struct btreelist_item));
2115            btreeitem_new->chunkno = newchunkno;
2116            r = btree_init(&btreeitem_new->btree,
2117                           trie->btreeblk_handle,
2118                           trie->btree_blk_ops, kv_ops,
2119                           trie->btree_nodesize, trie->chunksize,
2120                           trie->valuelen, 0x0, &meta);
2121            if (r == BTREE_RESULT_FAIL) {
2122                ret_result = HBTRIE_RESULT_FAIL;
2123                break;
2124            }
2125            btreeitem_new->btree.aux = trie->aux;
2126
2127            list_push_back(&btreelist, &btreeitem_new->e);
2128
2129            chunk_new = (uint8_t*)key_long +
2130                        newchunkno * trie->chunksize;
2131
2132            if (opt == HBMETA_LEAF) {
2133                // optimization mode
2134                _set_leaf_key(k, chunk_new, rawkeylen_long -
2135                                  newchunkno*trie->chunksize);
2136                r = btree_insert(&btreeitem_new->btree, k, value_long);
2137                if (r == BTREE_RESULT_FAIL) {
2138                    ret_result = HBTRIE_RESULT_FAIL;
2139                }
2140                _free_leaf_key(k);
2141            } else {
2142                // normal mode
2143                r = btree_insert(&btreeitem_new->btree,
2144                                 chunk_new, value_long);
2145                if (r == BTREE_RESULT_FAIL) {
2146                    ret_result = HBTRIE_RESULT_FAIL;
2147                }
2148            }
2149
2150        } else {
2151            //3 2-2. create sub-tree
2152            // and insert two docs into it
2153            _hbtrie_store_meta(trie, &meta.size, newchunkno, opt,
2154                               key + trie->chunksize * (curchunkno+1),
2155                               (newchunkno - (curchunkno+1)) * trie->chunksize,
2156                               NULL, buf);
2157
2158            btreeitem_new = (struct btreelist_item *)
2159                            mempool_alloc(sizeof(struct btreelist_item));
2160            btreeitem_new->chunkno = newchunkno;
2161            r = btree_init(&btreeitem_new->btree, trie->btreeblk_handle,
2162                           trie->btree_blk_ops, kv_ops,
2163                           trie->btree_nodesize, trie->chunksize,
2164                           trie->valuelen, 0x0, &meta);
2165            if (r == BTREE_RESULT_FAIL) {
2166                ret_result = HBTRIE_RESULT_FAIL;
2167            }
2168            btreeitem_new->btree.aux = trie->aux;
2169
2170            list_push_back(&btreelist, &btreeitem_new->e);
2171            // insert KEY
2172            chunk_new = key + newchunkno * trie->chunksize;
2173            if (opt == HBMETA_LEAF) {
2174                // optimization mode
2175                _set_leaf_key(k, chunk_new,
2176                              rawkeylen - newchunkno*trie->chunksize);
2177                r = btree_insert(&btreeitem_new->btree, k, value);
2178                _free_leaf_key(k);
2179            } else {
2180                r = btree_insert(&btreeitem_new->btree, chunk_new, value);
2181            }
2182            if (r == BTREE_RESULT_FAIL) {
2183                ret_result = HBTRIE_RESULT_FAIL;
2184            }
2185
2186            // insert the original DOCKEY
2187            chunk_new = dockey + newchunkno * trie->chunksize;
2188            if (opt == HBMETA_LEAF) {
2189                // optimization mode
2190                _set_leaf_key(k, chunk_new,
2191                              docrawkeylen - newchunkno*trie->chunksize);
2192                r = btree_insert(&btreeitem_new->btree, k, btree_value);
2193                _free_leaf_key(k);
2194            } else {
2195                r = btree_insert(&btreeitem_new->btree,
2196                                 chunk_new, btree_value);
2197            }
2198            if (r == BTREE_RESULT_FAIL) {
2199                ret_result = HBTRIE_RESULT_FAIL;
2200            }
2201        }
2202
2203        // update previous (parent) b-tree
2204        bid_new = btreeitem_new->btree.root_bid;
2205        btreeitem->child_rootbid = bid_new;
2206
2207        // set MSB
2208        _bid = _endian_encode(bid_new);
2209        _hbtrie_set_msb(trie, (void *)&_bid);
2210        // ASSUMPTION: parent b-tree always MUST be non-leaf b-tree
2211        r = btree_insert(&btreeitem->btree, chunk, (void*)&_bid);
2212        if (r == BTREE_RESULT_FAIL) {
2213            ret_result = HBTRIE_RESULT_FAIL;
2214        }
2215
2216        break;
2217    } // while
2218
2219    _hbtrie_btree_cascaded_update(trie, &btreelist, key, 1);
2220
2221    return ret_result;
2222}
2223
2224hbtrie_result hbtrie_insert(struct hbtrie *trie,
2225                            void *rawkey, int rawkeylen,
2226                            void *value, void *oldvalue_out) {
2227    return _hbtrie_insert(trie, rawkey, rawkeylen, value, oldvalue_out, 0x0);
2228}
2229
2230hbtrie_result hbtrie_insert_partial(struct hbtrie *trie,
2231                                    void *rawkey, int rawkeylen,
2232                                    void *value, void *oldvalue_out) {
2233    return _hbtrie_insert(trie, rawkey, rawkeylen,
2234                          value, oldvalue_out, HBTRIE_PARTIAL_UPDATE);
2235}
2236