xref: /4.6.0/forestdb/src/hbtrie.cc (revision 7af74b21)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21
22#include "hbtrie.h"
23#include "list.h"
24#include "btree.h"
25#include "btree_kv.h"
26#include "btree_fast_str_kv.h"
27#include "internal_types.h"
28
29#include "memleak.h"
30
31#ifdef __DEBUG
32#ifndef __DEBUG_HBTRIE
33    #undef DBG
34    #undef DBGCMD
35    #undef DBGSW
36    #define DBG(...)
37    #define DBGCMD(...)
38    #define DBGSW(n, ...)
39#endif
40#endif
41
42#define HBTRIE_EOK (0xF0)
43
44#define CHUNK_FLAG (0x8000)
45typedef uint16_t chunkno_t;
46struct hbtrie_meta {
47    chunkno_t chunkno;
48    uint16_t prefix_len;
49    void *value;
50    void *prefix;
51};
52
53#define _l2c(trie, len) (( (len) + ((trie)->chunksize-1) ) / (trie)->chunksize)
54
55// MUST return same value to '_get_nchunk(_hbtrie_reform_key(RAWKEY))'
56INLINE int _get_nchunk_raw(struct hbtrie *trie, void *rawkey, int rawkeylen)
57{
58    return _l2c(trie, rawkeylen) + 1;
59}
60
61INLINE int _get_nchunk(struct hbtrie *trie, void *key, int keylen)
62{
63    return (keylen-1) / trie->chunksize + 1;
64}
65
66int _hbtrie_reform_key(struct hbtrie *trie, void *rawkey,
67                       int rawkeylen, void *outkey)
68{
69    int outkeylen;
70    int nchunk;
71    int i;
72    uint8_t rsize;
73    size_t csize = trie->chunksize;
74
75    nchunk = _get_nchunk_raw(trie, rawkey, rawkeylen);
76    outkeylen = nchunk * csize;
77
78    if (nchunk > 2) {
79        // copy chunk[0] ~ chunk[nchunk-2]
80        rsize = rawkeylen - ((nchunk - 2) * csize);
81    } else {
82        rsize = rawkeylen;
83    }
84    fdb_assert(rsize && rsize <= trie->chunksize, rsize, trie);
85    memcpy((uint8_t*)outkey, (uint8_t*)rawkey, rawkeylen);
86
87    if (rsize < csize) {
88        // zero-fill rest space
89        i = nchunk - 2;
90        memset((uint8_t*)outkey + (i*csize) + rsize, 0x0, 2*csize - rsize);
91    } else {
92        // zero-fill the last chunk
93        i = nchunk - 1;
94        memset((uint8_t*)outkey + i * csize, 0x0, csize);
95    }
96
97    // assign rsize at the end of the outkey
98    *((uint8_t*)outkey + outkeylen - 1) = rsize;
99
100    return outkeylen;
101}
102
103// this function only returns (raw) key length
104static int _hbtrie_reform_key_reverse(struct hbtrie *trie,
105                                      void *key,
106                                      int keylen)
107{
108    uint8_t rsize;
109    rsize = *((uint8_t*)key + keylen - 1);
110    fdb_assert(rsize, rsize, trie);
111
112    if (rsize == trie->chunksize) {
113        return keylen - trie->chunksize;
114    } else {
115        // rsize: 1 ~ chunksize-1
116        return keylen - (trie->chunksize * 2) + rsize;
117    }
118}
119
120#define _get_leaf_kv_ops btree_fast_str_kv_get_kb64_vb64
121#define _get_leaf_key btree_fast_str_kv_get_key
122#define _set_leaf_key btree_fast_str_kv_set_key
123#define _set_leaf_inf_key btree_fast_str_kv_set_inf_key
124#define _free_leaf_key btree_fast_str_kv_free_key
125
126void hbtrie_init(struct hbtrie *trie, int chunksize, int valuelen,
127                 int btree_nodesize, bid_t root_bid, void *btreeblk_handle,
128                 struct btree_blk_ops *btree_blk_ops, void *doc_handle,
129                 hbtrie_func_readkey *readkey)
130{
131    struct btree_kv_ops *btree_kv_ops, *btree_leaf_kv_ops;
132
133    trie->chunksize = chunksize;
134    trie->valuelen = valuelen;
135    trie->btree_nodesize = btree_nodesize;
136    trie->btree_blk_ops = btree_blk_ops;
137    trie->btreeblk_handle = btreeblk_handle;
138    trie->doc_handle = doc_handle;
139    trie->root_bid = root_bid;
140    trie->flag = 0x0;
141    trie->leaf_height_limit = 0;
142    trie->cmp_args.chunksize = chunksize;
143    trie->cmp_args.aux = NULL;
144    trie->aux = &trie->cmp_args;
145
146    // assign key-value operations
147    btree_kv_ops = (struct btree_kv_ops *)malloc(sizeof(struct btree_kv_ops));
148    btree_leaf_kv_ops = (struct btree_kv_ops *)malloc(sizeof(struct btree_kv_ops));
149
150    fdb_assert(valuelen == 8, valuelen, trie);
151    fdb_assert((size_t)chunksize >= sizeof(void *), chunksize, trie);
152
153    if (chunksize == 8 && valuelen == 8){
154        btree_kv_ops = btree_kv_get_kb64_vb64(btree_kv_ops);
155        btree_leaf_kv_ops = _get_leaf_kv_ops(btree_leaf_kv_ops);
156    } else if (chunksize == 4 && valuelen == 8) {
157        btree_kv_ops = btree_kv_get_kb32_vb64(btree_kv_ops);
158        btree_leaf_kv_ops = _get_leaf_kv_ops(btree_leaf_kv_ops);
159    } else {
160        btree_kv_ops = btree_kv_get_kbn_vb64(btree_kv_ops);
161        btree_leaf_kv_ops = _get_leaf_kv_ops(btree_leaf_kv_ops);
162    }
163
164    trie->btree_kv_ops = btree_kv_ops;
165    trie->btree_leaf_kv_ops = btree_leaf_kv_ops;
166    trie->readkey = readkey;
167    trie->map = NULL;
168    trie->last_map_chunk = (void *)malloc(chunksize);
169    memset(trie->last_map_chunk, 0xff, chunksize); // set 0xffff...
170}
171
172void hbtrie_free(struct hbtrie *trie)
173{
174    free(trie->btree_kv_ops);
175    free(trie->btree_leaf_kv_ops);
176    free(trie->last_map_chunk);
177}
178
179void hbtrie_set_flag(struct hbtrie *trie, uint8_t flag)
180{
181    trie->flag = flag;
182    if (trie->leaf_height_limit == 0) {
183        trie->leaf_height_limit = 1;
184    }
185}
186
187void hbtrie_set_leaf_height_limit(struct hbtrie *trie, uint8_t limit)
188{
189    trie->leaf_height_limit = limit;
190}
191
192void hbtrie_set_leaf_cmp(struct hbtrie *trie, btree_cmp_func *cmp)
193{
194    trie->btree_leaf_kv_ops->cmp = cmp;
195}
196
197void hbtrie_set_map_function(struct hbtrie *trie,
198                             hbtrie_cmp_map *map_func)
199{
200    trie->map = map_func;
201}
202
203// IMPORTANT: hbmeta doesn't have own allocated memory space (pointers only)
204static void _hbtrie_fetch_meta(struct hbtrie *trie, int metasize,
205                               struct hbtrie_meta *hbmeta, void *buf)
206{
207    // read hbmeta from buf
208    int offset = 0;
209    uint32_t valuelen = 0;
210
211    memcpy(&hbmeta->chunkno, buf, sizeof(hbmeta->chunkno));
212    hbmeta->chunkno = _endian_decode(hbmeta->chunkno);
213    offset += sizeof(hbmeta->chunkno);
214
215    memcpy(&valuelen, (uint8_t*)buf+offset, sizeof(trie->valuelen));
216    offset += sizeof(trie->valuelen);
217
218    if (valuelen > 0) {
219        hbmeta->value = (uint8_t*)buf + offset;
220        offset += trie->valuelen;
221    } else {
222        hbmeta->value = NULL;
223    }
224
225    if (metasize - offset > 0) {
226        //memcpy(hbmeta->prefix, buf+offset, metasize - offset);
227        hbmeta->prefix = (uint8_t*)buf + offset;
228        hbmeta->prefix_len = metasize - offset;
229    } else {
230        hbmeta->prefix = NULL;
231        hbmeta->prefix_len = 0;
232    }
233}
234
235typedef enum {
236    HBMETA_NORMAL,
237    HBMETA_LEAF,
238} hbmeta_opt;
239/* << raw hbtrie meta structure >>
240 * [Total meta length]: 2 bytes
241 * [Chunk number]:      2 bytes
242 * [Value length]:      1 bytes
243 * [Value (optional)]:  x bytes
244 * [Prefix (optional)]: y bytes
245 */
246static void _hbtrie_store_meta(struct hbtrie *trie,
247                               metasize_t *metasize_out,
248                               chunkno_t chunkno,
249                               hbmeta_opt opt,
250                               void *prefix,
251                               int prefixlen,
252                               void *value,
253                               void *buf)
254{
255    chunkno_t _chunkno;
256
257    // write hbmeta to buf
258    *metasize_out = 0;
259
260    if (opt == HBMETA_LEAF) {
261        chunkno |= CHUNK_FLAG;
262    }
263
264    _chunkno = _endian_encode(chunkno);
265    memcpy(buf, &_chunkno, sizeof(chunkno));
266    *metasize_out += sizeof(chunkno);
267
268    if (value) {
269        memcpy((uint8_t*)buf + *metasize_out,
270               &trie->valuelen, sizeof(trie->valuelen));
271        *metasize_out += sizeof(trie->valuelen);
272        memcpy((uint8_t*)buf + *metasize_out,
273               value, trie->valuelen);
274        *metasize_out += trie->valuelen;
275    } else {
276        memset((uint8_t*)buf + *metasize_out, 0x0, sizeof(trie->valuelen));
277        *metasize_out += sizeof(trie->valuelen);
278    }
279
280    if (prefixlen > 0) {
281        memcpy((uint8_t*)buf + *metasize_out, prefix, prefixlen);
282        *metasize_out += prefixlen;
283    }
284}
285
286INLINE int _hbtrie_find_diff_chunk(struct hbtrie *trie,
287                                   void *key1,
288                                   void *key2,
289                                   int start_chunk,
290                                   int end_chunk)
291{
292    int i;
293    for (i=start_chunk; i < end_chunk; ++i) {
294        if (memcmp((uint8_t*)key1 + trie->chunksize*i,
295                   (uint8_t*)key2 + trie->chunksize*i,
296                   trie->chunksize)) {
297             return i;
298        }
299    }
300    return i;
301}
302
303//3 ASSUMPTION: 'VALUE' should be based on same endian to hb+trie
304
305#if defined(__ENDIAN_SAFE) || defined(_BIG_ENDIAN)
306// endian safe option is turned on, OR,
307// the architecture is based on big endian
308INLINE void _hbtrie_set_msb(struct hbtrie *trie, void *value)
309{
310    *((uint8_t*)value) |= (uint8_t)0x80;
311}
312INLINE void _hbtrie_clear_msb(struct hbtrie *trie, void *value)
313{
314    *((uint8_t*)value) &= ~((uint8_t)0x80);
315}
316INLINE int _hbtrie_is_msb_set(struct hbtrie *trie, void *value)
317{
318    return *((uint8_t*)value) & ((uint8_t)0x80);
319}
320#else
321// little endian
322INLINE void _hbtrie_set_msb(struct hbtrie *trie, void *value)
323{
324    *((uint8_t*)value + (trie->valuelen-1)) |= (uint8_t)0x80;
325}
326INLINE void _hbtrie_clear_msb(struct hbtrie *trie, void *value)
327{
328    *((uint8_t*)value + (trie->valuelen-1)) &= ~((uint8_t)0x80);
329}
330INLINE int _hbtrie_is_msb_set(struct hbtrie *trie, void *value)
331{
332    return *((uint8_t*)value + (trie->valuelen-1)) & ((uint8_t)0x80);
333}
334#endif
335
336struct btreelist_item {
337    struct btree btree;
338    chunkno_t chunkno;
339    bid_t child_rootbid;
340    struct list_elem e;
341    uint8_t leaf;
342};
343
344struct btreeit_item {
345    struct btree_iterator btree_it;
346    chunkno_t chunkno;
347    struct list_elem le;
348    uint8_t leaf;
349};
350
351#define _is_leaf_btree(chunkno) ((chunkno) & CHUNK_FLAG)
352#define _get_chunkno(chunkno) ((chunkno) & (~(CHUNK_FLAG)))
353
354hbtrie_result hbtrie_iterator_init(struct hbtrie *trie,
355                                   struct hbtrie_iterator *it,
356                                   void *initial_key,
357                                   size_t keylen)
358{
359    it->trie = *trie;
360
361    // MUST NOT affect the original trie due to sharing the same memory segment
362    it->trie.last_map_chunk = (void *)malloc(it->trie.chunksize);
363    memset(it->trie.last_map_chunk, 0xff, it->trie.chunksize);
364
365    it->curkey = (void *)malloc(HBTRIE_MAX_KEYLEN);
366
367    if (initial_key) {
368        it->keylen = _hbtrie_reform_key(trie, initial_key, keylen, it->curkey);
369        if (it->keylen >= HBTRIE_MAX_KEYLEN) {
370            free(it->curkey);
371            DBG("Error: HBTrie iterator init fails because the init key length %d is "
372                "greater than the max key length %d\n", it->keylen, HBTRIE_MAX_KEYLEN);
373            return HBTRIE_RESULT_FAIL;
374        }
375        memset((uint8_t*)it->curkey + it->keylen, 0, trie->chunksize);
376    }else{
377        it->keylen = 0;
378        memset(it->curkey, 0, trie->chunksize);
379    }
380    list_init(&it->btreeit_list);
381    it->flags = 0;
382
383    return HBTRIE_RESULT_SUCCESS;
384}
385
386hbtrie_result hbtrie_iterator_free(struct hbtrie_iterator *it)
387{
388    struct list_elem *e;
389    struct btreeit_item *item;
390    e = list_begin(&it->btreeit_list);
391    while(e){
392        item = _get_entry(e, struct btreeit_item, le);
393        e = list_remove(&it->btreeit_list, e);
394        btree_iterator_free(&item->btree_it);
395        mempool_free(item);
396    }
397    free(it->trie.last_map_chunk);
398    if (it->curkey) free(it->curkey);
399    return HBTRIE_RESULT_SUCCESS;
400}
401
402// move iterator's cursor to the end of the key range.
403// hbtrie_prev() call after hbtrie_last() will return the last key.
404hbtrie_result hbtrie_last(struct hbtrie_iterator *it)
405{
406    struct hbtrie_iterator temp;
407
408    temp = *it;
409    hbtrie_iterator_free(it);
410
411    it->trie = temp.trie;
412    // MUST NOT affect the original trie due to sharing the same memory segment
413    it->trie.last_map_chunk = (void *)malloc(it->trie.chunksize);
414    memset(it->trie.last_map_chunk, 0xff, it->trie.chunksize);
415
416    it->curkey = (void *)malloc(HBTRIE_MAX_KEYLEN);
417    // init with the infinite (0xff..) key without reforming
418    memset(it->curkey, 0xff, it->trie.chunksize);
419    it->keylen = it->trie.chunksize;
420
421    list_init(&it->btreeit_list);
422    it->flags = 0;
423
424    return HBTRIE_RESULT_SUCCESS;
425}
426
427// recursive function
428#define HBTRIE_PREFIX_MATCH_ONLY (0x1)
429#define HBTRIE_PARTIAL_MATCH (0x2)
430static hbtrie_result _hbtrie_prev(struct hbtrie_iterator *it,
431                                  struct btreeit_item *item,
432                                  void *key_buf,
433                                  size_t *keylen,
434                                  void *value_buf,
435                                  uint8_t flag)
436{
437    struct hbtrie *trie = &it->trie;
438    struct list_elem *e;
439    struct btreeit_item *item_new;
440    struct btree btree;
441    hbtrie_result hr = HBTRIE_RESULT_FAIL;
442    btree_result br;
443    struct hbtrie_meta hbmeta;
444    struct btree_meta bmeta;
445    void *chunk;
446    uint8_t *k = alca(uint8_t, trie->chunksize);
447    uint8_t *v = alca(uint8_t, trie->valuelen);
448    memset(k, 0, trie->chunksize);
449    memset(k, 0, trie->valuelen);
450    bid_t bid;
451    uint64_t offset;
452
453    if (item == NULL) {
454        // this happens only when first call
455        // create iterator for root b-tree
456        if (it->trie.root_bid == BLK_NOT_FOUND) return HBTRIE_RESULT_FAIL;
457        // set current chunk (key for b-tree)
458        chunk = it->curkey;
459        // load b-tree
460        btree_init_from_bid(
461            &btree, trie->btreeblk_handle, trie->btree_blk_ops,
462            trie->btree_kv_ops,
463            trie->btree_nodesize, trie->root_bid);
464        btree.aux = trie->aux;
465        if (btree.ksize != trie->chunksize || btree.vsize != trie->valuelen) {
466            if (((trie->chunksize << 4) | trie->valuelen) == btree.ksize) {
467                // this is an old meta format
468                return HBTRIE_RESULT_INDEX_VERSION_NOT_SUPPORTED;
469            }
470            // B+tree root node is corrupted.
471            return HBTRIE_RESULT_INDEX_CORRUPTED;
472        }
473
474        item = (struct btreeit_item *)mempool_alloc(sizeof(
475                                                    struct btreeit_item));
476        item->chunkno = 0;
477        item->leaf = 0;
478
479        br = btree_iterator_init(&btree, &item->btree_it, chunk);
480        if (br == BTREE_RESULT_FAIL) return HBTRIE_RESULT_FAIL;
481
482        list_push_back(&it->btreeit_list, &item->le);
483    }
484
485    e = list_next(&item->le);
486    if (e) {
487        // if prev sub b-tree exists
488        item_new = _get_entry(e, struct btreeit_item, le);
489        hr = _hbtrie_prev(it, item_new, key_buf, keylen, value_buf, flag);
490        if (hr == HBTRIE_RESULT_SUCCESS) return hr;
491        it->keylen = (item->chunkno+1) * trie->chunksize;
492    }
493
494    while (hr != HBTRIE_RESULT_SUCCESS) {
495        // get key-value from current b-tree iterator
496        memset(k, 0, trie->chunksize);
497        br = btree_prev(&item->btree_it, k, v);
498        if (item->leaf) {
499            _free_leaf_key(k);
500        } else {
501            chunk = (uint8_t*)it->curkey + item->chunkno * trie->chunksize;
502            if (item->btree_it.btree.kv_ops->cmp(k, chunk,
503                    item->btree_it.btree.aux) != 0) {
504                // not exact match key .. the rest of string is not necessary anymore
505                it->keylen = (item->chunkno+1) * trie->chunksize;
506                HBTRIE_ITR_SET_MOVED(it);
507            }
508        }
509
510        if (br == BTREE_RESULT_FAIL) {
511            // no more KV pair in the b-tree
512            btree_iterator_free(&item->btree_it);
513            list_remove(&it->btreeit_list, &item->le);
514            mempool_free(item);
515            return HBTRIE_RESULT_FAIL;
516        }
517
518        // check whether v points to doc or sub b-tree
519        if (_hbtrie_is_msb_set(trie, v)) {
520            // MSB is set -> sub b-tree
521
522            // load sub b-tree and create new iterator for the b-tree
523            _hbtrie_clear_msb(trie, v);
524            bid = trie->btree_kv_ops->value2bid(v);
525            bid = _endian_decode(bid);
526            btree_init_from_bid(&btree, trie->btreeblk_handle,
527                                trie->btree_blk_ops, trie->btree_kv_ops,
528                                trie->btree_nodesize, bid);
529
530            // get sub b-tree's chunk number
531            bmeta.data = (void *)mempool_alloc(trie->btree_nodesize);
532            bmeta.size = btree_read_meta(&btree, bmeta.data);
533            _hbtrie_fetch_meta(trie, bmeta.size, &hbmeta, bmeta.data);
534
535            item_new = (struct btreeit_item *)
536                       mempool_alloc(sizeof(struct btreeit_item));
537            if (_is_leaf_btree(hbmeta.chunkno)) {
538                hbtrie_cmp_func *void_cmp;
539
540                if (trie->map) { // custom cmp functions exist
541                    if (!memcmp(trie->last_map_chunk, it->curkey, trie->chunksize)) {
542                        // same custom function was used in the last call ..
543                        // do nothing
544                    } else {
545                        // get cmp function corresponding to the key
546                        void_cmp = trie->map(it->curkey, (void *)trie);
547                        if (void_cmp) {
548                            memcpy(trie->last_map_chunk, it->curkey, trie->chunksize);
549                            // set aux for _fdb_custom_cmp_wrap()
550                            trie->cmp_args.aux = void_cmp;
551                            trie->aux = &trie->cmp_args;
552                        }
553                    }
554                }
555
556                btree.kv_ops = trie->btree_leaf_kv_ops;
557                item_new->leaf = 1;
558            } else {
559                item_new->leaf = 0;
560            }
561            btree.aux = trie->aux;
562            hbmeta.chunkno = _get_chunkno(hbmeta.chunkno);
563            item_new->chunkno = hbmeta.chunkno;
564
565            // Note: if user's key is exactly aligned to chunk size, then the
566            //       dummy chunk will be a zero-filled value, and it is used
567            //       as a key in the next level of B+tree. Hence, there will be
568            //       no problem to assign the dummy chunk to the 'chunk' variable.
569            if ( (unsigned)((item_new->chunkno+1) * trie->chunksize) <=
570                 it->keylen) {
571                // happen only once for the first call (for each level of b-trees)
572                chunk = (uint8_t*)it->curkey +
573                        item_new->chunkno*trie->chunksize;
574                if (item->chunkno+1 < item_new->chunkno) {
575                    // skipped prefix exists
576                    // Note: all skipped chunks should be compared using the default
577                    //       cmp function
578                    int i, offset_meta = 0, offset_key = 0, chunkcmp = 0;
579                    for (i=item->chunkno+1; i<item_new->chunkno; ++i) {
580                        offset_meta = trie->chunksize * (i - (item->chunkno+1));
581                        offset_key = trie->chunksize * i;
582                        chunkcmp = trie->btree_kv_ops->cmp(
583                            (uint8_t*)it->curkey + offset_key,
584                            (uint8_t*)hbmeta.prefix + offset_meta,
585                            trie->aux);
586                        if (chunkcmp < 0) {
587                            // start_key's prefix is smaller than the skipped prefix
588                            // we have to go back to parent B+tree and pick prev entry
589                            mempool_free(bmeta.data);
590                            mempool_free(item_new);
591                            it->keylen = offset_key;
592                            hr = HBTRIE_RESULT_FAIL;
593                            HBTRIE_ITR_SET_MOVED(it);
594                            break;
595                        } else if (chunkcmp > 0 && trie->chunksize > 0) {
596                            // start_key's prefix is gerater than the skipped prefix
597                            // set largest key for next B+tree
598                            chunk = alca(uint8_t, trie->chunksize);
599                            memset(chunk, 0xff, trie->chunksize);
600                            break;
601                        }
602                    }
603                    if (chunkcmp < 0) {
604                        // go back to parent B+tree
605                        continue;
606                    }
607                }
608
609            } else {
610                // chunk number of the b-tree is shorter than current iterator's key
611                if (!HBTRIE_ITR_IS_MOVED(it)) {
612                    // The first prev call right after iterator init call.
613                    // This means that the init key is smaller than
614                    // the smallest key of the current tree, and larger than
615                    // the largest key of the previous tree.
616                    // So we have to go back to the parent tree, and
617                    // return the largest key of the previous tree.
618                    mempool_free(bmeta.data);
619                    mempool_free(item_new);
620                    it->keylen = (item->chunkno+1) * trie->chunksize;
621                    HBTRIE_ITR_SET_MOVED(it);
622                    continue;
623                }
624                // set largest key
625                chunk = alca(uint8_t, trie->chunksize);
626                memset(chunk, 0xff, trie->chunksize);
627            }
628
629            if (item_new->leaf && chunk && trie->chunksize > 0) {
630                uint8_t *k_temp = alca(uint8_t, trie->chunksize);
631                size_t _leaf_keylen, _leaf_keylen_raw = 0;
632
633                _leaf_keylen = it->keylen - (item_new->chunkno * trie->chunksize);
634                if (_leaf_keylen) {
635                    _leaf_keylen_raw = _hbtrie_reform_key_reverse(
636                                           trie, chunk, _leaf_keylen);
637                    _set_leaf_key(k_temp, chunk, _leaf_keylen_raw);
638                    if (_leaf_keylen_raw) {
639                        btree_iterator_init(&btree, &item_new->btree_it, k_temp);
640                    } else {
641                        btree_iterator_init(&btree, &item_new->btree_it, NULL);
642                    }
643                } else {
644                    // set initial key as the largest key
645                    // for reverse scan from the end of the B+tree
646                    _set_leaf_inf_key(k_temp);
647                    btree_iterator_init(&btree, &item_new->btree_it, k_temp);
648                }
649                _free_leaf_key(k_temp);
650            } else {
651                btree_iterator_init(&btree, &item_new->btree_it, chunk);
652            }
653            list_push_back(&it->btreeit_list, &item_new->le);
654
655            if (hbmeta.value && chunk == NULL) {
656                // NULL key exists .. the smallest key in this tree .. return first
657                offset = trie->btree_kv_ops->value2bid(hbmeta.value);
658                if (!(flag & HBTRIE_PREFIX_MATCH_ONLY)) {
659                    *keylen = trie->readkey(trie->doc_handle, offset, key_buf);
660                    it->keylen = _hbtrie_reform_key(trie, key_buf, *keylen,
661                                                    it->curkey);
662                }
663                memcpy(value_buf, &offset, trie->valuelen);
664                hr = HBTRIE_RESULT_SUCCESS;
665            } else {
666                hr = _hbtrie_prev(it, item_new, key_buf, keylen, value_buf,
667                                  flag);
668            }
669            mempool_free(bmeta.data);
670            if (hr == HBTRIE_RESULT_SUCCESS)
671                return hr;
672
673            // fail searching .. get back to parent tree
674            // (this happens when the initial key is smaller than
675            // the smallest key in the current tree (ITEM_NEW) ..
676            // so return back to ITEM and retrieve next child)
677            it->keylen = (item->chunkno+1) * trie->chunksize;
678            HBTRIE_ITR_SET_MOVED(it);
679
680        } else {
681            // MSB is not set -> doc
682            // read entire key and return the doc offset
683            offset = trie->btree_kv_ops->value2bid(v);
684            if (!(flag & HBTRIE_PREFIX_MATCH_ONLY)) {
685                *keylen = trie->readkey(trie->doc_handle, offset, key_buf);
686                it->keylen = _hbtrie_reform_key(trie, key_buf, *keylen, it->curkey);
687            }
688            memcpy(value_buf, &offset, trie->valuelen);
689
690            return HBTRIE_RESULT_SUCCESS;
691        }
692    }
693    return HBTRIE_RESULT_FAIL;
694}
695
696hbtrie_result hbtrie_prev(struct hbtrie_iterator *it,
697                          void *key_buf,
698                          size_t *keylen,
699                          void *value_buf)
700{
701    hbtrie_result hr;
702
703    if (HBTRIE_ITR_IS_REV(it) && HBTRIE_ITR_IS_FAILED(it)) {
704        return HBTRIE_RESULT_FAIL;
705    }
706
707    struct list_elem *e = list_begin(&it->btreeit_list);
708    struct btreeit_item *item = NULL;
709    if (e) item = _get_entry(e, struct btreeit_item, le);
710
711    hr = _hbtrie_prev(it, item, key_buf, keylen, value_buf, 0x0);
712    HBTRIE_ITR_SET_REV(it);
713    if (hr == HBTRIE_RESULT_SUCCESS) {
714        HBTRIE_ITR_CLR_FAILED(it);
715        HBTRIE_ITR_SET_MOVED(it);
716    } else {
717        HBTRIE_ITR_SET_FAILED(it);
718    }
719    return hr;
720}
721
722// recursive function
723static hbtrie_result _hbtrie_next(struct hbtrie_iterator *it,
724                                  struct btreeit_item *item,
725                                  void *key_buf,
726                                  size_t *keylen,
727                                  void *value_buf,
728                                  uint8_t flag)
729{
730    struct hbtrie *trie = &it->trie;
731    struct list_elem *e;
732    struct btreeit_item *item_new;
733    struct btree btree;
734    hbtrie_result hr = HBTRIE_RESULT_FAIL;
735    btree_result br;
736    struct hbtrie_meta hbmeta;
737    struct btree_meta bmeta;
738    void *chunk;
739    uint8_t *k = alca(uint8_t, trie->chunksize);
740    uint8_t *v = alca(uint8_t, trie->valuelen);
741    bid_t bid;
742    uint64_t offset;
743
744    if (item == NULL) {
745        // this happens only when first call
746        // create iterator for root b-tree
747        if (it->trie.root_bid == BLK_NOT_FOUND) return HBTRIE_RESULT_FAIL;
748        // set current chunk (key for b-tree)
749        chunk = it->curkey;
750        // load b-tree
751        btree_init_from_bid(
752            &btree, trie->btreeblk_handle, trie->btree_blk_ops, trie->btree_kv_ops,
753            trie->btree_nodesize, trie->root_bid);
754        btree.aux = trie->aux;
755        if (btree.ksize != trie->chunksize || btree.vsize != trie->valuelen) {
756            if (((trie->chunksize << 4) | trie->valuelen) == btree.ksize) {
757                // this is an old meta format
758                return HBTRIE_RESULT_INDEX_VERSION_NOT_SUPPORTED;
759            }
760            // B+tree root node is corrupted.
761            return HBTRIE_RESULT_INDEX_CORRUPTED;
762        }
763
764        item = (struct btreeit_item *)mempool_alloc(sizeof(struct btreeit_item));
765        item->chunkno = 0;
766        item->leaf = 0;
767
768        br = btree_iterator_init(&btree, &item->btree_it, chunk);
769        if (br == BTREE_RESULT_FAIL) return HBTRIE_RESULT_FAIL;
770
771        list_push_back(&it->btreeit_list, &item->le);
772    }
773
774    e = list_next(&item->le);
775    if (e) {
776        // if next sub b-tree exists
777        item_new = _get_entry(e, struct btreeit_item, le);
778        hr = _hbtrie_next(it, item_new, key_buf, keylen, value_buf, flag);
779        if (hr != HBTRIE_RESULT_SUCCESS) {
780            it->keylen = (item->chunkno+1) * trie->chunksize;
781        }
782    }
783
784    while (hr != HBTRIE_RESULT_SUCCESS) {
785        // get key-value from current b-tree iterator
786        memset(k, 0, trie->chunksize);
787        br = btree_next(&item->btree_it, k, v);
788        if (item->leaf) {
789            _free_leaf_key(k);
790        } else {
791            chunk = (uint8_t*)it->curkey + item->chunkno * trie->chunksize;
792            if (item->btree_it.btree.kv_ops->cmp(k, chunk,
793                    item->btree_it.btree.aux) != 0) {
794                // not exact match key .. the rest of string is not necessary anymore
795                it->keylen = (item->chunkno+1) * trie->chunksize;
796                HBTRIE_ITR_SET_MOVED(it);
797            }
798        }
799
800        if (br == BTREE_RESULT_FAIL) {
801            // no more KV pair in the b-tree
802            btree_iterator_free(&item->btree_it);
803            list_remove(&it->btreeit_list, &item->le);
804            mempool_free(item);
805            return HBTRIE_RESULT_FAIL;
806        }
807
808        if (flag & HBTRIE_PARTIAL_MATCH) {
809            // in partial match mode, we don't read actual doc key,
810            // and just store & return indexed part of key.
811            memcpy((uint8_t*)it->curkey + item->chunkno * trie->chunksize,
812                   k, trie->chunksize);
813        }
814
815        // check whether v points to doc or sub b-tree
816        if (_hbtrie_is_msb_set(trie, v)) {
817            // MSB is set -> sub b-tree
818
819            // load sub b-tree and create new iterator for the b-tree
820            _hbtrie_clear_msb(trie, v);
821            bid = trie->btree_kv_ops->value2bid(v);
822            bid = _endian_decode(bid);
823            btree_init_from_bid(&btree, trie->btreeblk_handle,
824                                trie->btree_blk_ops, trie->btree_kv_ops,
825                                trie->btree_nodesize, bid);
826
827            // get sub b-tree's chunk number
828            bmeta.data = (void *)mempool_alloc(trie->btree_nodesize);
829            bmeta.size = btree_read_meta(&btree, bmeta.data);
830            _hbtrie_fetch_meta(trie, bmeta.size, &hbmeta, bmeta.data);
831
832            item_new = (struct btreeit_item *)
833                       mempool_alloc(sizeof(struct btreeit_item));
834            if (_is_leaf_btree(hbmeta.chunkno)) {
835                hbtrie_cmp_func *void_cmp;
836
837                if (trie->map) { // custom cmp functions exist
838                    if (!memcmp(trie->last_map_chunk, it->curkey, trie->chunksize)) {
839                        // same custom function was used in the last call ..
840                        // do nothing
841                    } else {
842                        // get cmp function corresponding to the key
843                        void_cmp = trie->map(it->curkey, (void *)trie);
844                        if (void_cmp) {
845                            memcpy(trie->last_map_chunk, it->curkey, trie->chunksize);
846                            // set aux for _fdb_custom_cmp_wrap()
847                            trie->cmp_args.aux = void_cmp;
848                            trie->aux = &trie->cmp_args;
849                        }
850                    }
851                }
852
853                btree.kv_ops = trie->btree_leaf_kv_ops;
854                item_new->leaf = 1;
855            } else {
856                item_new->leaf = 0;
857            }
858            btree.aux = trie->aux;
859            hbmeta.chunkno = _get_chunkno(hbmeta.chunkno);
860            item_new->chunkno = hbmeta.chunkno;
861
862            // Note: if user's key is exactly aligned to chunk size, then the
863            //       dummy chunk will be a zero-filled value, and it is used
864            //       as a key in the next level of B+tree. Hence, there will be
865            //       no problem to assign the dummy chunk to the 'chunk' variable.
866            if ( (unsigned)((item_new->chunkno+1) * trie->chunksize)
867                 <= it->keylen) {
868                // happen only once for the first call (for each level of b-trees)
869                chunk = (uint8_t*)it->curkey +
870                        item_new->chunkno*trie->chunksize;
871                if (item->chunkno+1 < item_new->chunkno) {
872                    // skipped prefix exists
873                    // Note: all skipped chunks should be compared using the default
874                    //       cmp function
875                    int i, offset_meta = 0, offset_key = 0, chunkcmp = 0;
876                    for (i=item->chunkno+1; i<item_new->chunkno; ++i) {
877                        offset_meta = trie->chunksize * (i - (item->chunkno+1));
878                        offset_key = trie->chunksize * i;
879                        chunkcmp = trie->btree_kv_ops->cmp(
880                            (uint8_t*)it->curkey + offset_key,
881                            (uint8_t*)hbmeta.prefix + offset_meta,
882                            trie->aux);
883                        if (chunkcmp < 0) {
884                            // start_key's prefix is smaller than the skipped prefix
885                            // set smallest key for next B+tree
886                            it->keylen = offset_key;
887                            chunk = NULL;
888                            break;
889                        } else if (chunkcmp > 0) {
890                            // start_key's prefix is gerater than the skipped prefix
891                            // we have to go back to parent B+tree and pick next entry
892                            mempool_free(bmeta.data);
893                            mempool_free(item_new);
894                            it->keylen = offset_key;
895                            hr = HBTRIE_RESULT_FAIL;
896                            HBTRIE_ITR_SET_MOVED(it);
897                            break;
898                        }
899                    }
900                    if (chunkcmp > 0) {
901                        // go back to parent B+tree
902                        continue;
903                    }
904                }
905            } else {
906                // chunk number of the b-tree is longer than current iterator's key
907                // set smallest key
908                chunk = NULL;
909            }
910
911            if (item_new->leaf && chunk && trie->chunksize > 0) {
912                uint8_t *k_temp = alca(uint8_t, trie->chunksize);
913                memset(k_temp, 0, trie->chunksize * sizeof(uint8_t));
914                size_t _leaf_keylen, _leaf_keylen_raw = 0;
915
916                _leaf_keylen = it->keylen - (item_new->chunkno * trie->chunksize);
917                if (_leaf_keylen > 0) {
918                    _leaf_keylen_raw = _hbtrie_reform_key_reverse(
919                                           trie, chunk, _leaf_keylen);
920                }
921                if (_leaf_keylen_raw) {
922                    _set_leaf_key(k_temp, chunk, _leaf_keylen_raw);
923                    btree_iterator_init(&btree, &item_new->btree_it, k_temp);
924                    _free_leaf_key(k_temp);
925                } else {
926                    btree_iterator_init(&btree, &item_new->btree_it, NULL);
927                }
928            } else {
929                bool null_btree_init_key = false;
930                if (!HBTRIE_ITR_IS_MOVED(it) && chunk && trie->chunksize > 0 &&
931                    ((uint64_t)item_new->chunkno+1) * trie->chunksize == it->keylen) {
932                    // Next chunk is the last chunk of the current iterator key
933                    // (happens only on iterator_init(), it internally calls next()).
934                    uint8_t *k_temp = alca(uint8_t, trie->chunksize);
935                    memset(k_temp, 0x0, trie->chunksize);
936                    k_temp[trie->chunksize - 1] = trie->chunksize;
937                    if (!memcmp(k_temp, chunk, trie->chunksize)) {
938                        // Extra chunk is same to the specific pattern
939                        // ([0x0] [0x0] ... [trie->chunksize])
940                        // which means that given iterator key is exactly aligned
941                        // to chunk size and shorter than the position of the
942                        // next chunk.
943                        // To guarantee lexicographical order between
944                        // NULL and zero-filled key (NULL < 0x0000...),
945                        // we should init btree iterator with NULL key.
946                        null_btree_init_key = true;
947                    }
948                }
949                if (null_btree_init_key) {
950                    btree_iterator_init(&btree, &item_new->btree_it, NULL);
951                } else {
952                    btree_iterator_init(&btree, &item_new->btree_it, chunk);
953                }
954            }
955            list_push_back(&it->btreeit_list, &item_new->le);
956
957            if (hbmeta.value && chunk == NULL) {
958                // NULL key exists .. the smallest key in this tree .. return first
959                offset = trie->btree_kv_ops->value2bid(hbmeta.value);
960                if (flag & HBTRIE_PARTIAL_MATCH) {
961                    // return indexed key part only
962                    *keylen = (item->chunkno+1) * trie->chunksize;
963                    memcpy(key_buf, it->curkey, *keylen);
964                } else if (!(flag & HBTRIE_PREFIX_MATCH_ONLY)) {
965                    // read entire key from doc's meta
966                    *keylen = trie->readkey(trie->doc_handle, offset, key_buf);
967                    it->keylen = _hbtrie_reform_key(trie, key_buf, *keylen, it->curkey);
968                }
969                memcpy(value_buf, &offset, trie->valuelen);
970                hr = HBTRIE_RESULT_SUCCESS;
971            } else {
972                hr = _hbtrie_next(it, item_new, key_buf, keylen, value_buf, flag);
973            }
974            mempool_free(bmeta.data);
975            if (hr == HBTRIE_RESULT_SUCCESS) {
976                return hr;
977            }
978
979            // fail searching .. get back to parent tree
980            // (this happens when the initial key is greater than
981            // the largest key in the current tree (ITEM_NEW) ..
982            // so return back to ITEM and retrieve next child)
983            it->keylen = (item->chunkno+1) * trie->chunksize;
984
985        } else {
986            // MSB is not set -> doc
987            // read entire key and return the doc offset
988            offset = trie->btree_kv_ops->value2bid(v);
989            if (flag & HBTRIE_PARTIAL_MATCH) {
990                // return indexed key part only
991                *keylen = (item->chunkno+1) * trie->chunksize;
992                memcpy(key_buf, it->curkey, *keylen);
993            } else if (!(flag & HBTRIE_PREFIX_MATCH_ONLY)) {
994                // read entire key from doc's meta
995                *keylen = trie->readkey(trie->doc_handle, offset, key_buf);
996                it->keylen = _hbtrie_reform_key(trie, key_buf, *keylen, it->curkey);
997            }
998            memcpy(value_buf, &offset, trie->valuelen);
999
1000            return HBTRIE_RESULT_SUCCESS;
1001        }
1002    }
1003
1004    return hr;
1005}
1006
1007hbtrie_result hbtrie_next(struct hbtrie_iterator *it,
1008                          void *key_buf,
1009                          size_t *keylen,
1010                          void *value_buf)
1011{
1012    hbtrie_result hr;
1013
1014    if (HBTRIE_ITR_IS_FWD(it) && HBTRIE_ITR_IS_FAILED(it)) {
1015        return HBTRIE_RESULT_FAIL;
1016    }
1017
1018    struct list_elem *e = list_begin(&it->btreeit_list);
1019    struct btreeit_item *item = NULL;
1020    if (e) item = _get_entry(e, struct btreeit_item, le);
1021
1022    hr = _hbtrie_next(it, item, key_buf, keylen, value_buf, 0x0);
1023    HBTRIE_ITR_SET_FWD(it);
1024    if (hr == HBTRIE_RESULT_SUCCESS) {
1025        HBTRIE_ITR_CLR_FAILED(it);
1026        HBTRIE_ITR_SET_MOVED(it);
1027    } else {
1028        HBTRIE_ITR_SET_FAILED(it);
1029    }
1030    return hr;
1031}
1032
1033hbtrie_result hbtrie_next_partial(struct hbtrie_iterator *it,
1034                                  void *key_buf,
1035                                  size_t *keylen,
1036                                  void *value_buf)
1037{
1038    hbtrie_result hr;
1039
1040    if (HBTRIE_ITR_IS_FWD(it) && HBTRIE_ITR_IS_FAILED(it)) {
1041        return HBTRIE_RESULT_FAIL;
1042    }
1043
1044    struct list_elem *e = list_begin(&it->btreeit_list);
1045    struct btreeit_item *item = NULL;
1046    if (e) item = _get_entry(e, struct btreeit_item, le);
1047
1048    hr = _hbtrie_next(it, item, key_buf, keylen, value_buf, HBTRIE_PARTIAL_MATCH);
1049    HBTRIE_ITR_SET_FWD(it);
1050    if (hr == HBTRIE_RESULT_SUCCESS) {
1051        HBTRIE_ITR_CLR_FAILED(it);
1052        HBTRIE_ITR_SET_MOVED(it);
1053    } else {
1054        HBTRIE_ITR_SET_FAILED(it);
1055    }
1056    return hr;
1057}
1058
1059hbtrie_result hbtrie_next_value_only(struct hbtrie_iterator *it,
1060                                     void *value_buf)
1061{
1062    hbtrie_result hr;
1063
1064    if (it->curkey == NULL) return HBTRIE_RESULT_FAIL;
1065
1066    struct list_elem *e = list_begin(&it->btreeit_list);
1067    struct btreeit_item *item = NULL;
1068    if (e) item = _get_entry(e, struct btreeit_item, le);
1069
1070    hr = _hbtrie_next(it, item, NULL, 0, value_buf, HBTRIE_PREFIX_MATCH_ONLY);
1071    if (hr != HBTRIE_RESULT_SUCCESS) {
1072        // this iterator reaches the end of hb-trie
1073        free(it->curkey);
1074        it->curkey = NULL;
1075    }
1076    return hr;
1077}
1078
1079static void _hbtrie_free_btreelist(struct list *btreelist)
1080{
1081    struct btreelist_item *btreeitem;
1082    struct list_elem *e;
1083
1084    // free all items on list
1085    e = list_begin(btreelist);
1086    while(e) {
1087        btreeitem = _get_entry(e, struct btreelist_item, e);
1088        e = list_remove(btreelist, e);
1089        mempool_free(btreeitem);
1090    }
1091}
1092
1093static void _hbtrie_btree_cascaded_update(struct hbtrie *trie,
1094                                          struct list *btreelist,
1095                                          void *key,
1096                                          int free_opt)
1097{
1098    bid_t bid_new, _bid;
1099    struct btreelist_item *btreeitem, *btreeitem_child;
1100    struct list_elem *e, *e_child;
1101
1102    e = e_child = NULL;
1103
1104    //3 cascaded update of each b-tree from leaf to root
1105    e_child = list_end(btreelist);
1106    if (e_child) e = list_prev(e_child);
1107
1108    while(e && e_child) {
1109        btreeitem = _get_entry(e, struct btreelist_item, e);
1110        btreeitem_child = _get_entry(e_child, struct btreelist_item, e);
1111
1112        if (btreeitem->child_rootbid != btreeitem_child->btree.root_bid) {
1113            // root node of child sub-tree has been moved to another block
1114            // update parent sub-tree
1115            bid_new = btreeitem_child->btree.root_bid;
1116            _bid = _endian_encode(bid_new);
1117            _hbtrie_set_msb(trie, (void *)&_bid);
1118            btree_insert(&btreeitem->btree,
1119                    (uint8_t*)key + btreeitem->chunkno * trie->chunksize,
1120                    (void *)&_bid);
1121        }
1122        e_child = e;
1123        e = list_prev(e);
1124    }
1125
1126    // update trie root bid
1127    if (e) {
1128        btreeitem = _get_entry(e, struct btreelist_item, e);
1129        trie->root_bid = btreeitem->btree.root_bid;
1130    }else if (e_child) {
1131        btreeitem = _get_entry(e_child, struct btreelist_item, e);
1132        trie->root_bid = btreeitem->btree.root_bid;
1133    }else {
1134        fdb_assert(0, trie, e_child);
1135    }
1136
1137    if (free_opt) {
1138        _hbtrie_free_btreelist(btreelist);
1139    }
1140}
1141
1142static hbtrie_result _hbtrie_find(struct hbtrie *trie, void *key, int keylen,
1143                                  void *valuebuf, struct list *btreelist, uint8_t flag)
1144{
1145    int nchunk;
1146    int rawkeylen;
1147    int prevchunkno, curchunkno, cpt_node = 0;
1148    struct btree *btree = NULL;
1149    struct btree btree_static;
1150    btree_result r;
1151    struct hbtrie_meta hbmeta;
1152    struct btree_meta meta;
1153    struct btreelist_item *btreeitem = NULL;
1154    uint8_t *k = alca(uint8_t, trie->chunksize);
1155    uint8_t *buf = alca(uint8_t, trie->btree_nodesize);
1156    uint8_t *btree_value = alca(uint8_t, trie->valuelen);
1157    void *chunk = NULL;
1158    hbtrie_cmp_func *void_cmp;
1159    bid_t bid_new;
1160    nchunk = _get_nchunk(trie, key, keylen);
1161
1162    meta.data = buf;
1163    curchunkno = 0;
1164
1165    if (trie->map) { // custom cmp functions exist
1166        if (!memcmp(trie->last_map_chunk, key, trie->chunksize)) {
1167            // same custom function was used in the last call .. do nothing
1168        } else {
1169            // get cmp function corresponding to the key
1170            void_cmp = trie->map(key, (void *)trie);
1171            if (void_cmp) { // custom cmp function matches .. turn on leaf b+tree mode
1172                memcpy(trie->last_map_chunk, key, trie->chunksize);
1173                // set aux for _fdb_custom_cmp_wrap()
1174                trie->cmp_args.aux = void_cmp;
1175                trie->aux = &trie->cmp_args;
1176            }
1177        }
1178    }
1179
1180    if (btreelist) {
1181        list_init(btreelist);
1182        btreeitem = (struct btreelist_item *)mempool_alloc(sizeof(struct btreelist_item));
1183        list_push_back(btreelist, &btreeitem->e);
1184        btree = &btreeitem->btree;
1185    } else {
1186        btree = &btree_static;
1187    }
1188
1189    if (trie->root_bid == BLK_NOT_FOUND) {
1190        // retrieval fail
1191        return HBTRIE_RESULT_FAIL;
1192    } else {
1193        // read from root_bid
1194        r = btree_init_from_bid(btree, trie->btreeblk_handle, trie->btree_blk_ops,
1195                                trie->btree_kv_ops, trie->btree_nodesize,
1196                                trie->root_bid);
1197        if (r != BTREE_RESULT_SUCCESS) {
1198            return HBTRIE_RESULT_FAIL;
1199        }
1200        btree->aux = trie->aux;
1201        if (btree->ksize != trie->chunksize || btree->vsize != trie->valuelen) {
1202            if (((trie->chunksize << 4) | trie->valuelen) == btree->ksize) {
1203                // this is an old meta format
1204                return HBTRIE_RESULT_INDEX_VERSION_NOT_SUPPORTED;
1205            }
1206            // B+tree root node is corrupted.
1207            return HBTRIE_RESULT_INDEX_CORRUPTED;
1208        }
1209    }
1210
1211    while (curchunkno < nchunk) {
1212        // get current chunk number
1213        meta.size = btree_read_meta(btree, meta.data);
1214        _hbtrie_fetch_meta(trie, meta.size, &hbmeta, meta.data);
1215        prevchunkno = curchunkno;
1216        if (_is_leaf_btree(hbmeta.chunkno)) {
1217            cpt_node = 1;
1218            hbmeta.chunkno = _get_chunkno(hbmeta.chunkno);
1219            btree->kv_ops = trie->btree_leaf_kv_ops;
1220        }
1221        curchunkno = hbmeta.chunkno;
1222
1223        if (btreelist) {
1224            btreeitem->chunkno = curchunkno;
1225            btreeitem->leaf = cpt_node;
1226        }
1227
1228        //3 check whether there are skipped prefixes.
1229        if (curchunkno - prevchunkno > 1) {
1230            fdb_assert(hbmeta.prefix != NULL, hbmeta.prefix, trie);
1231            // prefix comparison (find the first different chunk)
1232            int diffchunkno = _hbtrie_find_diff_chunk(
1233                trie, hbmeta.prefix,
1234                (uint8_t*)key + trie->chunksize * (prevchunkno+1),
1235                0, curchunkno - (prevchunkno+1));
1236            if (diffchunkno < curchunkno - (prevchunkno+1)) {
1237                // prefix does not match -> retrieval fail
1238                return HBTRIE_RESULT_FAIL;
1239            }
1240        }
1241
1242        //3 search b-tree using current chunk (or postfix)
1243        rawkeylen = _hbtrie_reform_key_reverse(trie, key, keylen);
1244        if ((cpt_node && rawkeylen == curchunkno * trie->chunksize) ||
1245            (!cpt_node && nchunk == curchunkno)) {
1246            // KEY is exactly same as tree's prefix .. return value in metasection
1247            if (hbmeta.value && trie->valuelen > 0) {
1248                memcpy(valuebuf, hbmeta.value, trie->valuelen);
1249            }
1250            return HBTRIE_RESULT_SUCCESS;
1251        } else {
1252            chunk = (uint8_t*)key + curchunkno*trie->chunksize;
1253            if (cpt_node) {
1254                // leaf b-tree
1255                size_t rawchunklen =
1256                    _hbtrie_reform_key_reverse(trie, chunk,
1257                    (nchunk-curchunkno)*trie->chunksize);
1258
1259                _set_leaf_key(k, chunk, rawchunklen);
1260                r = btree_find(btree, k, btree_value);
1261                _free_leaf_key(k);
1262            } else {
1263                r = btree_find(btree, chunk, btree_value);
1264            }
1265        }
1266
1267        if (r == BTREE_RESULT_FAIL) {
1268            // retrieval fail
1269            return HBTRIE_RESULT_FAIL;
1270        } else {
1271            // same chunk exists
1272            if (flag & HBTRIE_PARTIAL_MATCH &&
1273                curchunkno + 1 == nchunk - 1) {
1274                // partial match mode & the last meaningful chunk
1275                // return btree value
1276                memcpy(valuebuf, btree_value, trie->valuelen);
1277                return HBTRIE_RESULT_SUCCESS;
1278            }
1279
1280            // check whether the value points to sub-tree or document
1281            // check MSB
1282            if (_hbtrie_is_msb_set(trie, btree_value)) {
1283                // this is BID of b-tree node (by clearing MSB)
1284                _hbtrie_clear_msb(trie, btree_value);
1285                bid_new = trie->btree_kv_ops->value2bid(btree_value);
1286                bid_new = _endian_decode(bid_new);
1287
1288                if (btreelist) {
1289                    btreeitem->child_rootbid = bid_new;
1290                    btreeitem = (struct btreelist_item *)
1291                                mempool_alloc(sizeof(struct btreelist_item));
1292                    list_push_back(btreelist, &btreeitem->e);
1293                    btree = &btreeitem->btree;
1294                }
1295
1296                // fetch sub-tree
1297                r = btree_init_from_bid(btree, trie->btreeblk_handle, trie->btree_blk_ops,
1298                                        trie->btree_kv_ops, trie->btree_nodesize, bid_new);
1299                if (r != BTREE_RESULT_SUCCESS) {
1300                    return HBTRIE_RESULT_FAIL;
1301                }
1302                btree->aux = trie->aux;
1303            } else {
1304                // this is offset of document (as it is)
1305                // read entire key
1306                uint8_t *docrawkey = (uint8_t *) malloc(HBTRIE_MAX_KEYLEN);
1307                uint8_t *dockey = (uint8_t *) malloc(HBTRIE_MAX_KEYLEN);
1308                uint32_t docrawkeylen, dockeylen;
1309                uint64_t offset;
1310                int docnchunk, diffchunkno;
1311
1312                hbtrie_result result = HBTRIE_RESULT_SUCCESS;
1313
1314                // get offset value from btree_value
1315                offset = trie->btree_kv_ops->value2bid(btree_value);
1316                if (!(flag & HBTRIE_PREFIX_MATCH_ONLY)) {
1317                    // read entire key
1318                    docrawkeylen = trie->readkey(trie->doc_handle, offset, docrawkey);
1319                    dockeylen = _hbtrie_reform_key(trie, docrawkey, docrawkeylen, dockey);
1320
1321                    // find first different chunk
1322                    docnchunk = _get_nchunk(trie, dockey, dockeylen);
1323
1324                    if (docnchunk == nchunk) {
1325                        diffchunkno = _hbtrie_find_diff_chunk(trie, key,
1326                                            dockey, curchunkno, nchunk);
1327                        if (diffchunkno == nchunk) {
1328                            // success
1329                            memcpy(valuebuf, btree_value, trie->valuelen);
1330                        } else {
1331                            result = HBTRIE_RESULT_FAIL;
1332                        }
1333                    } else {
1334                        result = HBTRIE_RESULT_FAIL;
1335                    }
1336                } else {
1337                    // just return value
1338                    memcpy(valuebuf, btree_value, trie->valuelen);
1339                }
1340
1341                // Clear docrawkey, dockey
1342                free(docrawkey);
1343                free(dockey);
1344                return result;
1345            }
1346        }
1347    }
1348
1349    return HBTRIE_RESULT_FAIL;
1350}
1351
1352hbtrie_result hbtrie_find(struct hbtrie *trie, void *rawkey,
1353                          int rawkeylen, void *valuebuf)
1354{
1355    int nchunk = _get_nchunk_raw(trie, rawkey, rawkeylen);
1356    uint8_t *key = alca(uint8_t, nchunk * trie->chunksize);
1357    int keylen;
1358
1359    keylen = _hbtrie_reform_key(trie, rawkey, rawkeylen, key);
1360    return _hbtrie_find(trie, key, keylen, valuebuf, NULL, 0x0);
1361}
1362
1363hbtrie_result hbtrie_find_offset(struct hbtrie *trie, void *rawkey,
1364                                 int rawkeylen, void *valuebuf)
1365{
1366    int nchunk = _get_nchunk_raw(trie, rawkey, rawkeylen);
1367    uint8_t *key = alca(uint8_t, nchunk * trie->chunksize);
1368    int keylen;
1369
1370    keylen = _hbtrie_reform_key(trie, rawkey, rawkeylen, key);
1371    return _hbtrie_find(trie, key, keylen, valuebuf, NULL,
1372                        HBTRIE_PREFIX_MATCH_ONLY);
1373}
1374
1375hbtrie_result hbtrie_find_partial(struct hbtrie *trie, void *rawkey,
1376                                  int rawkeylen, void *valuebuf)
1377{
1378    int nchunk = _get_nchunk_raw(trie, rawkey, rawkeylen);
1379    uint8_t *key = alca(uint8_t, nchunk * trie->chunksize);
1380    int keylen;
1381
1382    keylen = _hbtrie_reform_key(trie, rawkey, rawkeylen, key);
1383    return _hbtrie_find(trie, key, keylen, valuebuf, NULL,
1384                        HBTRIE_PARTIAL_MATCH);
1385}
1386
1387INLINE hbtrie_result _hbtrie_remove(struct hbtrie *trie,
1388                                    void *rawkey, int rawkeylen,
1389                                    uint8_t flag)
1390{
1391    int nchunk = _get_nchunk_raw(trie, rawkey, rawkeylen);
1392    int keylen;
1393    uint8_t *key = alca(uint8_t, nchunk * trie->chunksize);
1394    uint8_t *valuebuf = alca(uint8_t, trie->valuelen);
1395    hbtrie_result r;
1396    btree_result br = BTREE_RESULT_SUCCESS;
1397    struct list btreelist;
1398    struct btreelist_item *btreeitem;
1399    struct list_elem *e;
1400
1401    keylen = _hbtrie_reform_key(trie, rawkey, rawkeylen, key);
1402
1403    r = _hbtrie_find(trie, key, keylen, valuebuf, &btreelist, flag);
1404
1405    if (r == HBTRIE_RESULT_SUCCESS) {
1406        e = list_end(&btreelist);
1407        fdb_assert(e, trie, flag);
1408
1409        btreeitem = _get_entry(e, struct btreelist_item, e);
1410        if (btreeitem &&
1411            ((btreeitem->leaf && rawkeylen == btreeitem->chunkno * trie->chunksize) ||
1412             (!(btreeitem->leaf) && nchunk == btreeitem->chunkno)) ) {
1413            // key is exactly same as b-tree's prefix .. remove from metasection
1414            struct hbtrie_meta hbmeta;
1415            struct btree_meta meta;
1416            hbmeta_opt opt;
1417            uint8_t *buf = alca(uint8_t, trie->btree_nodesize);
1418
1419            meta.data = buf;
1420            meta.size = btree_read_meta(&btreeitem->btree, meta.data);
1421            _hbtrie_fetch_meta(trie, meta.size, &hbmeta, meta.data);
1422
1423            opt = (_is_leaf_btree(hbmeta.chunkno))?(HBMETA_LEAF):(HBMETA_NORMAL);
1424
1425            // remove value from metasection
1426            _hbtrie_store_meta(
1427                    trie, &meta.size, _get_chunkno(hbmeta.chunkno), opt,
1428                    hbmeta.prefix, hbmeta.prefix_len, NULL, buf);
1429            btree_update_meta(&btreeitem->btree, &meta);
1430        } else {
1431            if (btreeitem && btreeitem->leaf) {
1432                // leaf b-tree
1433                uint8_t *k = alca(uint8_t, trie->chunksize);
1434                _set_leaf_key(k, key + btreeitem->chunkno * trie->chunksize,
1435                    rawkeylen - btreeitem->chunkno * trie->chunksize);
1436                br = btree_remove(&btreeitem->btree, k);
1437                _free_leaf_key(k);
1438            } else if (btreeitem) {
1439                // normal b-tree
1440                br = btree_remove(&btreeitem->btree,
1441                                  key + trie->chunksize * btreeitem->chunkno);
1442            }
1443            if (br == BTREE_RESULT_FAIL) {
1444                r = HBTRIE_RESULT_FAIL;
1445            }
1446        }
1447        _hbtrie_btree_cascaded_update(trie, &btreelist, key, 1);
1448    } else {
1449        // key (to be removed) not found
1450        // no update occurred .. we don't need to update b-trees on the path
1451        // just free the btreelist
1452        _hbtrie_free_btreelist(&btreelist);
1453    }
1454
1455    return r;
1456}
1457
1458hbtrie_result hbtrie_remove(struct hbtrie *trie,
1459                            void *rawkey,
1460                            int rawkeylen)
1461{
1462    return _hbtrie_remove(trie, rawkey, rawkeylen, 0x0);
1463}
1464
1465hbtrie_result hbtrie_remove_partial(struct hbtrie *trie,
1466                                    void *rawkey,
1467                                    int rawkeylen)
1468{
1469    return _hbtrie_remove(trie, rawkey, rawkeylen,
1470                          HBTRIE_PARTIAL_MATCH);
1471}
1472
1473struct _key_item {
1474    size_t keylen;
1475    void *key;
1476    void *value;
1477    struct list_elem le;
1478};
1479
1480static void _hbtrie_extend_leaf_tree(struct hbtrie *trie,
1481                                     struct list *btreelist,
1482                                     struct btreelist_item *btreeitem,
1483                                     void *pre_str,
1484                                     size_t pre_str_len)
1485{
1486    struct list keys;
1487    struct list_elem *e;
1488    struct _key_item *item, *smallest = NULL;
1489    struct btree_iterator it;
1490    struct btree new_btree;
1491    struct btree_meta meta;
1492    struct hbtrie_meta hbmeta;
1493    btree_result br;
1494    void *prefix = NULL, *meta_value = NULL;
1495    uint8_t *key_str = alca(uint8_t, HBTRIE_MAX_KEYLEN);
1496    uint8_t *key_buf = alca(uint8_t, trie->chunksize);
1497    uint8_t *value_buf = alca(uint8_t, trie->valuelen);
1498    uint8_t *buf = alca(uint8_t, trie->btree_nodesize);
1499    size_t keylen, minchunkno = 0, chunksize;
1500
1501    chunksize = trie->chunksize;
1502
1503    // fetch metadata
1504    meta.data = buf;
1505    meta.size = btree_read_meta(&btreeitem->btree, meta.data);
1506    _hbtrie_fetch_meta(trie, meta.size, &hbmeta, meta.data);
1507
1508    // scan all keys
1509    list_init(&keys);
1510    memset(key_buf, 0, chunksize);
1511    minchunkno = 0;
1512
1513    br = btree_iterator_init(&btreeitem->btree, &it, NULL);
1514    while (br == BTREE_RESULT_SUCCESS) {
1515        // get key
1516        if ((br = btree_next(&it, key_buf, value_buf)) == BTREE_RESULT_FAIL) {
1517            break;
1518        }
1519
1520        _get_leaf_key(key_buf, key_str, &keylen);
1521        _free_leaf_key(key_buf);
1522
1523        // insert into list
1524        item = (struct _key_item *)malloc(sizeof(struct _key_item));
1525
1526        item->key = (void *)malloc(keylen);
1527        item->keylen = keylen;
1528        memcpy(item->key, key_str, keylen);
1529
1530        item->value = (void *)malloc(trie->valuelen);
1531        memcpy(item->value, value_buf, trie->valuelen);
1532
1533        list_push_back(&keys, &item->le);
1534
1535        if (hbmeta.value == NULL) {
1536            // check common prefix
1537            if (prefix == NULL) {
1538                // initialize
1539                prefix = item->key;
1540                minchunkno = _l2c(trie, item->keylen);
1541            } else {
1542                // update the length of common prefix
1543                minchunkno = _hbtrie_find_diff_chunk(
1544                    trie, prefix, item->key, 0,
1545                    MIN(_l2c(trie, item->keylen), minchunkno));
1546            }
1547
1548            // update smallest (shortest) key
1549            if (smallest == NULL) {
1550                smallest = item;
1551            } else {
1552                if (item->keylen < smallest->keylen)
1553                    smallest = item;
1554            }
1555        }
1556    }
1557    btree_iterator_free(&it);
1558
1559    // construct new (non-leaf) b-tree
1560    if (hbmeta.value) {
1561        // insert tree's prefix into the list
1562        item = (struct _key_item *)malloc(sizeof(struct _key_item));
1563
1564        item->key = NULL;
1565        item->keylen = 0;
1566
1567        item->value = (void *)malloc(trie->valuelen);
1568        memcpy(item->value, hbmeta.value, trie->valuelen);
1569
1570        list_push_back(&keys, &item->le);
1571
1572        meta_value = smallest = NULL;
1573    } else {
1574        if (smallest) {
1575            if (minchunkno > 0 &&
1576               (size_t) _get_nchunk_raw(trie, smallest->key, smallest->keylen) ==
1577                minchunkno) {
1578                meta_value = smallest->value;
1579            } else {
1580                smallest = NULL;
1581            }
1582        }
1583    }
1584    _hbtrie_store_meta(
1585            trie, &meta.size, _get_chunkno(hbmeta.chunkno) + minchunkno,
1586            HBMETA_NORMAL, prefix, minchunkno * chunksize, meta_value, buf);
1587
1588    btree_init(&new_btree, trie->btreeblk_handle, trie->btree_blk_ops,
1589        trie->btree_kv_ops, trie->btree_nodesize, chunksize, trie->valuelen,
1590        0x0, &meta);
1591    new_btree.aux = trie->aux;
1592
1593    // reset BTREEITEM
1594    btreeitem->btree = new_btree;
1595    btreeitem->chunkno = _get_chunkno(hbmeta.chunkno) + minchunkno;
1596    btreeitem->leaf = 0;
1597
1598    _hbtrie_btree_cascaded_update(trie, btreelist, pre_str, 1);
1599
1600    // insert all keys
1601    memcpy(key_str, pre_str, pre_str_len);
1602    e = list_begin(&keys);
1603    while (e) {
1604        item = _get_entry(e, struct _key_item, le);
1605        if (item != smallest) {
1606            if (item->keylen > 0) {
1607                memcpy(key_str + pre_str_len, item->key, item->keylen);
1608            }
1609            hbtrie_insert(trie, key_str, pre_str_len + item->keylen,
1610                item->value, value_buf);
1611        }
1612
1613        e = list_remove(&keys, e);
1614        if (item->key) {
1615            free(item->key);
1616        }
1617        free(item->value);
1618        free(item);
1619    }
1620
1621}
1622
1623// suppose that VALUE and OLDVALUE_OUT are based on the same endian in hb+trie
1624#define HBTRIE_PARTIAL_UPDATE (0x1)
1625INLINE hbtrie_result _hbtrie_insert(struct hbtrie *trie,
1626                                    void *rawkey, int rawkeylen,
1627                                    void *value, void *oldvalue_out,
1628                                    uint8_t flag)
1629{
1630    /*
1631    <insertion cases>
1632    1. normal insert: there is no creation of new b-tree
1633    2. replacing doc by new b-tree: a doc (which has same prefix) already exists
1634        2-1. a new b-tree that has file offset to a doc in its metadata
1635             is created, and the other doc is inserted into the tree
1636        2-2. two docs are inserted into the new b-tree
1637    3. create new b-tree between existing b-trees: when prefix mismatches
1638    */
1639
1640    int nchunk;
1641    int keylen;
1642    int prevchunkno, curchunkno;
1643    int cpt_node = 0;
1644    int leaf_cond = 0;
1645    uint8_t *k = alca(uint8_t, trie->chunksize);
1646
1647    struct list btreelist;
1648    //struct btree btree, btree_new;
1649    struct btreelist_item *btreeitem, *btreeitem_new;
1650    hbtrie_result ret_result = HBTRIE_RESULT_SUCCESS;
1651    btree_result r;
1652    struct btree_kv_ops *kv_ops;
1653
1654    struct hbtrie_meta hbmeta;
1655    struct btree_meta meta;
1656    hbmeta_opt opt;
1657    hbtrie_cmp_func *void_cmp;
1658
1659    nchunk = _get_nchunk_raw(trie, rawkey, rawkeylen);
1660
1661    uint8_t *key = alca(uint8_t, nchunk * trie->chunksize);
1662    uint8_t *buf = alca(uint8_t, trie->btree_nodesize);
1663    uint8_t *btree_value = alca(uint8_t, trie->valuelen);
1664    void *chunk, *chunk_new;
1665    bid_t bid_new, _bid;
1666
1667    meta.data = buf;
1668    curchunkno = 0;
1669    keylen = _hbtrie_reform_key(trie, rawkey, rawkeylen, key);
1670    (void)keylen;
1671
1672    if (trie->map) { // custom cmp functions exist
1673        if (!memcmp(trie->last_map_chunk, key, trie->chunksize)) {
1674            // same custom function was used in the last call .. leaf b+tree
1675            leaf_cond = 1;
1676        } else {
1677            // get cmp function corresponding to the key
1678            void_cmp = trie->map(key, (void *)trie);
1679            if (void_cmp) {
1680                // custom cmp function matches .. turn on leaf b+tree mode
1681                leaf_cond = 1;
1682                memcpy(trie->last_map_chunk, key, trie->chunksize);
1683                // set aux for _fdb_custom_cmp_wrap()
1684                trie->cmp_args.aux = void_cmp;
1685                trie->aux = &trie->cmp_args;
1686            }
1687        }
1688    }
1689
1690    list_init(&btreelist);
1691    // btreeitem for root btree
1692    btreeitem = (struct btreelist_item*)
1693                mempool_alloc(sizeof(struct btreelist_item));
1694    list_push_back(&btreelist, &btreeitem->e);
1695
1696    if (trie->root_bid == BLK_NOT_FOUND) {
1697        // create root b-tree
1698        _hbtrie_store_meta(trie, &meta.size, 0, HBMETA_NORMAL,
1699                           NULL, 0, NULL, buf);
1700        r = btree_init(&btreeitem->btree, trie->btreeblk_handle,
1701                       trie->btree_blk_ops, trie->btree_kv_ops,
1702                       trie->btree_nodesize, trie->chunksize,
1703                       trie->valuelen, 0x0, &meta);
1704        if (r != BTREE_RESULT_SUCCESS) {
1705            return HBTRIE_RESULT_FAIL;
1706        }
1707    } else {
1708        // read from root_bid
1709        r = btree_init_from_bid(&btreeitem->btree, trie->btreeblk_handle,
1710                                trie->btree_blk_ops, trie->btree_kv_ops,
1711                                trie->btree_nodesize, trie->root_bid);
1712        if (r != BTREE_RESULT_SUCCESS) {
1713            return HBTRIE_RESULT_FAIL;
1714        }
1715        if (btreeitem->btree.ksize != trie->chunksize ||
1716            btreeitem->btree.vsize != trie->valuelen) {
1717            if (((trie->chunksize << 4) | trie->valuelen) == btreeitem->btree.ksize) {
1718                // this is an old meta format
1719                return HBTRIE_RESULT_INDEX_VERSION_NOT_SUPPORTED;
1720            }
1721            // B+tree root node is corrupted.
1722            return HBTRIE_RESULT_INDEX_CORRUPTED;
1723        }
1724    }
1725    btreeitem->btree.aux = trie->aux;
1726
1727    // set 'oldvalue_out' to 0xff..
1728    if (oldvalue_out) {
1729        memset(oldvalue_out, 0xff, trie->valuelen);
1730    }
1731
1732    // Allocate room for entire key on heap
1733    uint8_t *docrawkey = (uint8_t *) malloc(HBTRIE_MAX_KEYLEN);
1734    uint8_t *dockey = (uint8_t *) malloc(HBTRIE_MAX_KEYLEN);
1735
1736    while (curchunkno < nchunk) {
1737        // get current chunk number
1738        meta.size = btree_read_meta(&btreeitem->btree, meta.data);
1739        _hbtrie_fetch_meta(trie, meta.size, &hbmeta, meta.data);
1740        prevchunkno = curchunkno;
1741        if (_is_leaf_btree(hbmeta.chunkno)) {
1742            cpt_node = 1;
1743            hbmeta.chunkno = _get_chunkno(hbmeta.chunkno);
1744            btreeitem->btree.kv_ops = trie->btree_leaf_kv_ops;
1745        }
1746        btreeitem->chunkno = curchunkno = hbmeta.chunkno;
1747
1748        //3 check whether there is skipped prefix
1749        if (curchunkno - prevchunkno > 1) {
1750            // prefix comparison (find the first different chunk)
1751            int diffchunkno = _hbtrie_find_diff_chunk(trie, hbmeta.prefix,
1752                                  key + trie->chunksize * (prevchunkno+1),
1753                                  0, curchunkno - (prevchunkno+1));
1754            if (diffchunkno < curchunkno - (prevchunkno+1)) {
1755                //3 3. create sub-tree between parent and child tree
1756
1757                // metadata (prefix) update in btreeitem->btree
1758                int new_prefixlen = trie->chunksize *
1759                                    (curchunkno - (prevchunkno+1) -
1760                                        (diffchunkno+1));
1761                // backup old prefix
1762                int old_prefixlen = hbmeta.prefix_len;
1763                uint8_t *old_prefix = alca(uint8_t, old_prefixlen);
1764                memcpy(old_prefix, hbmeta.prefix, old_prefixlen);
1765
1766                if (new_prefixlen > 0) {
1767                    uint8_t *new_prefix = alca(uint8_t, new_prefixlen);
1768                    memcpy(new_prefix,
1769                           (uint8_t*)hbmeta.prefix +
1770                               trie->chunksize * (diffchunkno + 1),
1771                           new_prefixlen);
1772                    _hbtrie_store_meta(trie, &meta.size, curchunkno,
1773                                       HBMETA_NORMAL, new_prefix,
1774                                       new_prefixlen, hbmeta.value, buf);
1775                } else {
1776                    _hbtrie_store_meta(trie, &meta.size, curchunkno,
1777                                       HBMETA_NORMAL, NULL, 0,
1778                                       hbmeta.value, buf);
1779                }
1780                // update metadata for old b-tree
1781                btree_update_meta(&btreeitem->btree, &meta);
1782
1783                // split prefix and create new sub-tree
1784                _hbtrie_store_meta(trie, &meta.size,
1785                                   prevchunkno + diffchunkno + 1,
1786                                   HBMETA_NORMAL, old_prefix,
1787                                   diffchunkno * trie->chunksize, NULL, buf);
1788
1789                // create new b-tree
1790                btreeitem_new = (struct btreelist_item *)
1791                                mempool_alloc(sizeof(struct btreelist_item));
1792                btreeitem_new->chunkno = prevchunkno + diffchunkno + 1;
1793                r = btree_init(&btreeitem_new->btree, trie->btreeblk_handle,
1794                               trie->btree_blk_ops, trie->btree_kv_ops,
1795                               trie->btree_nodesize, trie->chunksize,
1796                               trie->valuelen, 0x0, &meta);
1797                if (r != BTREE_RESULT_SUCCESS) {
1798                    // Clear docrawkey, dockey
1799                    free(docrawkey);
1800                    free(dockey);
1801                    return HBTRIE_RESULT_FAIL;
1802                }
1803                btreeitem_new->btree.aux = trie->aux;
1804                list_insert_before(&btreelist, &btreeitem->e,
1805                                   &btreeitem_new->e);
1806
1807                // insert chunk for 'key'
1808                chunk_new = key + (prevchunkno + diffchunkno + 1) *
1809                                  trie->chunksize;
1810                r = btree_insert(&btreeitem_new->btree, chunk_new, value);
1811                if (r == BTREE_RESULT_FAIL) {
1812                    // Clear docrawkey, dockey
1813                    free(docrawkey);
1814                    free(dockey);
1815                    return HBTRIE_RESULT_FAIL;
1816                }
1817                // insert chunk for existing btree
1818                chunk_new = (uint8_t*)old_prefix + diffchunkno *
1819                                                   trie->chunksize;
1820                bid_new = btreeitem->btree.root_bid;
1821                btreeitem_new->child_rootbid = bid_new;
1822                // set MSB
1823                _bid = _endian_encode(bid_new);
1824                _hbtrie_set_msb(trie, (void*)&_bid);
1825                r = btree_insert(&btreeitem_new->btree,
1826                                 chunk_new, (void*)&_bid);
1827                if (r == BTREE_RESULT_FAIL) {
1828                    // Clear docrawkey, dockey
1829                    free(docrawkey);
1830                    free(dockey);
1831                    return HBTRIE_RESULT_FAIL;
1832                }
1833
1834                break;
1835            }
1836        }
1837
1838        //3 search b-tree using current chunk
1839        if ((cpt_node && rawkeylen == curchunkno * trie->chunksize) ||
1840            (!cpt_node && nchunk == curchunkno)) {
1841            // KEY is exactly same as tree's prefix .. insert into metasection
1842            _hbtrie_store_meta(trie, &meta.size, curchunkno,
1843                               (cpt_node)?(HBMETA_LEAF):(HBMETA_NORMAL),
1844                               hbmeta.prefix,
1845                               (curchunkno-prevchunkno - 1) * trie->chunksize,
1846                               value, buf);
1847            btree_update_meta(&btreeitem->btree, &meta);
1848            break;
1849        } else {
1850            chunk = key + curchunkno*trie->chunksize;
1851            if (cpt_node) {
1852                // leaf b-tree
1853                _set_leaf_key(k, chunk,
1854                              rawkeylen - curchunkno*trie->chunksize);
1855                r = btree_find(&btreeitem->btree, k, btree_value);
1856                _free_leaf_key(k);
1857            } else {
1858                r = btree_find(&btreeitem->btree, chunk, btree_value);
1859            }
1860        }
1861
1862        if (r == BTREE_RESULT_FAIL) {
1863            //3 1. normal insert: same chunk does not exist -> just insert
1864            if (flag & HBTRIE_PARTIAL_UPDATE) {
1865                // partial update doesn't allow inserting a new key
1866                ret_result = HBTRIE_RESULT_FAIL;
1867                break; // while loop
1868            }
1869
1870            if (cpt_node) {
1871                // leaf b-tree
1872                _set_leaf_key(k, chunk,
1873                              rawkeylen - curchunkno*trie->chunksize);
1874                r = btree_insert(&btreeitem->btree, k, value);
1875                if (r == BTREE_RESULT_FAIL) {
1876                    _free_leaf_key(k);
1877                    ret_result = HBTRIE_RESULT_FAIL;
1878                    break; // while loop
1879                }
1880                _free_leaf_key(k);
1881
1882                if (btreeitem->btree.height > trie->leaf_height_limit) {
1883                    // height growth .. extend!
1884                    _hbtrie_extend_leaf_tree(trie, &btreelist, btreeitem,
1885                        key, curchunkno * trie->chunksize);
1886                    // Clear docrawkey, dockey
1887                    free(docrawkey);
1888                    free(dockey);
1889                    return ret_result;
1890                }
1891            } else {
1892                r = btree_insert(&btreeitem->btree, chunk, value);
1893                if (r == BTREE_RESULT_FAIL) {
1894                    ret_result = HBTRIE_RESULT_FAIL;
1895                }
1896            }
1897            break; // while loop
1898        }
1899
1900        // same chunk already exists
1901        if (flag & HBTRIE_PARTIAL_UPDATE &&
1902            curchunkno + 1 == nchunk - 1) {
1903            // partial update mode & the last meaningful chunk
1904            // update the local btree value
1905            if (oldvalue_out) {
1906                memcpy(oldvalue_out, btree_value, trie->valuelen);
1907            }
1908            // assume that always normal b-tree
1909            r = btree_insert(&btreeitem->btree, chunk, value);
1910            if (r == BTREE_RESULT_FAIL) {
1911                ret_result = HBTRIE_RESULT_FAIL;
1912            } else {
1913                ret_result = HBTRIE_RESULT_SUCCESS;
1914            }
1915            break;
1916        }
1917
1918        // check whether the value points to sub-tree or document
1919        // check MSB
1920        if (_hbtrie_is_msb_set(trie, btree_value)) {
1921            // this is BID of b-tree node (by clearing MSB)
1922            _hbtrie_clear_msb(trie, btree_value);
1923            bid_new = trie->btree_kv_ops->value2bid(btree_value);
1924            bid_new = _endian_decode(bid_new);
1925            btreeitem->child_rootbid = bid_new;
1926            //3 traverse to the sub-tree
1927            // fetch sub-tree
1928            btreeitem = (struct btreelist_item*)
1929                        mempool_alloc(sizeof(struct btreelist_item));
1930
1931            r = btree_init_from_bid(&btreeitem->btree,
1932                                    trie->btreeblk_handle,
1933                                    trie->btree_blk_ops,
1934                                    trie->btree_kv_ops,
1935                                    trie->btree_nodesize, bid_new);
1936            if (r == BTREE_RESULT_FAIL) {
1937                ret_result = HBTRIE_RESULT_FAIL;
1938            }
1939            btreeitem->btree.aux = trie->aux;
1940            list_push_back(&btreelist, &btreeitem->e);
1941            continue;
1942        }
1943
1944        // this is offset of document (as it is)
1945        // create new sub-tree
1946
1947        uint32_t docrawkeylen, dockeylen, minrawkeylen;
1948        uint64_t offset;
1949        int docnchunk, minchunkno, newchunkno, diffchunkno;
1950
1951        // get offset value from btree_value
1952        offset = trie->btree_kv_ops->value2bid(btree_value);
1953
1954        // read entire key
1955        docrawkeylen = trie->readkey(trie->doc_handle, offset, docrawkey);
1956        dockeylen = _hbtrie_reform_key(trie, docrawkey, docrawkeylen, dockey);
1957
1958        // find first different chunk
1959        docnchunk = _get_nchunk(trie, dockey, dockeylen);
1960
1961        if (trie->flag & HBTRIE_FLAG_COMPACT || leaf_cond) {
1962            // optimization mode
1963            // Note: custom cmp function doesn't support key
1964            //       longer than a block size.
1965
1966            // newchunkno doesn't matter to leaf B+tree,
1967            // since leaf B+tree can't create sub-tree.
1968            newchunkno = curchunkno+1;
1969            minchunkno = MIN(_l2c(trie, rawkeylen),
1970                             _l2c(trie, (int)docrawkeylen));
1971            minrawkeylen = MIN(rawkeylen, (int)docrawkeylen);
1972
1973            if (curchunkno == 0) {
1974                // root B+tree
1975                diffchunkno = _hbtrie_find_diff_chunk(trie, rawkey, docrawkey,
1976                    curchunkno, minchunkno -
1977                                ((minrawkeylen%trie->chunksize == 0)?(0):(1)));
1978                if (rawkeylen == (int)docrawkeylen && diffchunkno+1 == minchunkno) {
1979                    if (!memcmp(rawkey, docrawkey, rawkeylen)) {
1980                        // same key
1981                        diffchunkno = minchunkno;
1982                    }
1983                }
1984            } else {
1985                // diffchunkno also doesn't matter to leaf B+tree,
1986                // since leaf B+tree is not based on a lexicographical key order.
1987                // Hence, we set diffchunkno to minchunkno iff two keys are
1988                // identified as the same key by the custom compare function.
1989                // Otherwise, diffchunkno is always set to curchunkno.
1990                uint8_t *k_doc = alca(uint8_t, trie->chunksize);
1991                _set_leaf_key(k, chunk,
1992                              rawkeylen - curchunkno*trie->chunksize);
1993                _set_leaf_key(k_doc, (uint8_t*)docrawkey + curchunkno*trie->chunksize,
1994                              docrawkeylen - curchunkno*trie->chunksize);
1995                if (trie->btree_leaf_kv_ops->cmp(k, k_doc, trie->aux) == 0) {
1996                    // same key
1997                    diffchunkno = minchunkno;
1998                    docnchunk = nchunk;
1999                } else {
2000                    // different key
2001                    diffchunkno = curchunkno;
2002                }
2003                _free_leaf_key(k);
2004                _free_leaf_key(k_doc);
2005            }
2006            opt = HBMETA_LEAF;
2007            kv_ops = trie->btree_leaf_kv_ops;
2008        } else {
2009            // original mode
2010            minchunkno = MIN(nchunk, docnchunk);
2011            newchunkno = diffchunkno =
2012                _hbtrie_find_diff_chunk(trie, key, dockey,
2013                                        curchunkno, minchunkno);
2014            opt = HBMETA_NORMAL;
2015            kv_ops = trie->btree_kv_ops;
2016        }
2017
2018        // one key is substring of the other key
2019        if (minchunkno == diffchunkno && docnchunk == nchunk) {
2020            //3 same key!! .. update the value
2021            if (oldvalue_out) {
2022                memcpy(oldvalue_out, btree_value, trie->valuelen);
2023            }
2024            if (cpt_node) {
2025                // leaf b-tree
2026                _set_leaf_key(k, chunk,
2027                              rawkeylen - curchunkno*trie->chunksize);
2028                r = btree_insert(&btreeitem->btree, k, value);
2029                _free_leaf_key(k);
2030            } else {
2031                // normal b-tree
2032                r = btree_insert(&btreeitem->btree, chunk, value);
2033            }
2034            if (r == BTREE_RESULT_FAIL) {
2035                ret_result = HBTRIE_RESULT_FAIL;
2036            } else {
2037                ret_result = HBTRIE_RESULT_SUCCESS;
2038            }
2039
2040            break;
2041        }
2042
2043        // different key
2044        while (trie->btree_nodesize > HBTRIE_HEADROOM &&
2045               (newchunkno - curchunkno) * trie->chunksize >
2046                   (int)trie->btree_nodesize - HBTRIE_HEADROOM) {
2047            // prefix is too long .. we have to split it
2048            fdb_assert(opt == HBMETA_NORMAL, opt, trie);
2049            int midchunkno;
2050            midchunkno = curchunkno +
2051                        (trie->btree_nodesize - HBTRIE_HEADROOM) /
2052                            trie->chunksize;
2053            _hbtrie_store_meta(trie, &meta.size, midchunkno, opt,
2054                               key + trie->chunksize * (curchunkno+1),
2055                               (midchunkno - (curchunkno+1)) *
2056                                   trie->chunksize,
2057                               NULL, buf);
2058
2059            btreeitem_new = (struct btreelist_item *)
2060                            mempool_alloc(sizeof(struct btreelist_item));
2061            btreeitem_new->chunkno = midchunkno;
2062            r = btree_init(&btreeitem_new->btree,
2063                           trie->btreeblk_handle,
2064                           trie->btree_blk_ops, kv_ops,
2065                           trie->btree_nodesize, trie->chunksize,
2066                           trie->valuelen, 0x0, &meta);
2067            if (r == BTREE_RESULT_FAIL) {
2068                // Clear docrawkey, dockey
2069                free(docrawkey);
2070                free(dockey);
2071                return HBTRIE_RESULT_FAIL;
2072            }
2073
2074            btreeitem_new->btree.aux = trie->aux;
2075            btreeitem_new->child_rootbid = BLK_NOT_FOUND;
2076            list_push_back(&btreelist, &btreeitem_new->e);
2077
2078            // insert new btree's bid into the previous btree
2079            bid_new = btreeitem_new->btree.root_bid;
2080            btreeitem->child_rootbid = bid_new;
2081            _bid = _endian_encode(bid_new);
2082            _hbtrie_set_msb(trie, (void *)&_bid);
2083            r = btree_insert(&btreeitem->btree, chunk, &_bid);
2084            if (r == BTREE_RESULT_FAIL) {
2085                ret_result = HBTRIE_RESULT_FAIL;
2086                break;
2087            }
2088
2089            // switch & go to the next tree
2090            chunk = (uint8_t*)key +
2091                    midchunkno * trie->chunksize;
2092            curchunkno = midchunkno;
2093            btreeitem = btreeitem_new;
2094        } // 2nd while
2095
2096        if (ret_result != HBTRIE_RESULT_SUCCESS) {
2097            break;
2098        }
2099
2100        if (minchunkno == diffchunkno && minchunkno == newchunkno) {
2101            //3 2-1. create sub-tree
2102            // that containing file offset of one doc (sub-string)
2103            // in its meta section, and insert the other doc
2104            // (super-string) into the tree
2105
2106            void *key_long, *value_long;
2107            void *key_short, *value_short;
2108            size_t nchunk_long, rawkeylen_long;
2109
2110            if (docnchunk < nchunk) {
2111                // dockey is substring of key
2112                key_short = dockey;
2113                value_short = btree_value;
2114
2115                key_long = key;
2116                value_long = value;
2117
2118                nchunk_long = nchunk;
2119                rawkeylen_long = rawkeylen;
2120            } else {
2121                // key is substring of dockey
2122                key_short = key;
2123                value_short = value;
2124
2125                key_long = dockey;
2126                value_long = btree_value;
2127
2128                nchunk_long = docnchunk;
2129                rawkeylen_long = docrawkeylen;
2130            }
2131            (void)key_short;
2132            (void)nchunk_long;
2133
2134            _hbtrie_store_meta(trie, &meta.size, newchunkno, opt,
2135                               key + trie->chunksize * (curchunkno+1),
2136                               (newchunkno - (curchunkno+1)) *
2137                                   trie->chunksize,
2138                               value_short, buf);
2139
2140            btreeitem_new = (struct btreelist_item *)
2141                            mempool_alloc(sizeof(struct btreelist_item));
2142            btreeitem_new->chunkno = newchunkno;
2143            r = btree_init(&btreeitem_new->btree,
2144                           trie->btreeblk_handle,
2145                           trie->btree_blk_ops, kv_ops,
2146                           trie->btree_nodesize, trie->chunksize,
2147                           trie->valuelen, 0x0, &meta);
2148            if (r == BTREE_RESULT_FAIL) {
2149                ret_result = HBTRIE_RESULT_FAIL;
2150                break;
2151            }
2152            btreeitem_new->btree.aux = trie->aux;
2153
2154            list_push_back(&btreelist, &btreeitem_new->e);
2155
2156            chunk_new = (uint8_t*)key_long +
2157                        newchunkno * trie->chunksize;
2158
2159            if (opt == HBMETA_LEAF) {
2160                // optimization mode
2161                _set_leaf_key(k, chunk_new, rawkeylen_long -
2162                                  newchunkno*trie->chunksize);
2163                r = btree_insert(&btreeitem_new->btree, k, value_long);
2164                if (r == BTREE_RESULT_FAIL) {
2165                    ret_result = HBTRIE_RESULT_FAIL;
2166                }
2167                _free_leaf_key(k);
2168            } else {
2169                // normal mode
2170                r = btree_insert(&btreeitem_new->btree,
2171                                 chunk_new, value_long);
2172                if (r == BTREE_RESULT_FAIL) {
2173                    ret_result = HBTRIE_RESULT_FAIL;
2174                }
2175            }
2176
2177        } else {
2178            //3 2-2. create sub-tree
2179            // and insert two docs into it
2180            _hbtrie_store_meta(trie, &meta.size, newchunkno, opt,
2181                               key + trie->chunksize * (curchunkno+1),
2182                               (newchunkno - (curchunkno+1)) * trie->chunksize,
2183                               NULL, buf);
2184
2185            btreeitem_new = (struct btreelist_item *)
2186                            mempool_alloc(sizeof(struct btreelist_item));
2187            btreeitem_new->chunkno = newchunkno;
2188            r = btree_init(&btreeitem_new->btree, trie->btreeblk_handle,
2189                           trie->btree_blk_ops, kv_ops,
2190                           trie->btree_nodesize, trie->chunksize,
2191                           trie->valuelen, 0x0, &meta);
2192            if (r == BTREE_RESULT_FAIL) {
2193                ret_result = HBTRIE_RESULT_FAIL;
2194            }
2195            btreeitem_new->btree.aux = trie->aux;
2196
2197            list_push_back(&btreelist, &btreeitem_new->e);
2198            // insert KEY
2199            chunk_new = key + newchunkno * trie->chunksize;
2200            if (opt == HBMETA_LEAF) {
2201                // optimization mode
2202                _set_leaf_key(k, chunk_new,
2203                              rawkeylen - newchunkno*trie->chunksize);
2204                r = btree_insert(&btreeitem_new->btree, k, value);
2205                _free_leaf_key(k);
2206            } else {
2207                r = btree_insert(&btreeitem_new->btree, chunk_new, value);
2208            }
2209            if (r == BTREE_RESULT_FAIL) {
2210                ret_result = HBTRIE_RESULT_FAIL;
2211            }
2212
2213            // insert the original DOCKEY
2214            chunk_new = dockey + newchunkno * trie->chunksize;
2215            if (opt == HBMETA_LEAF) {
2216                // optimization mode
2217                _set_leaf_key(k, chunk_new,
2218                              docrawkeylen - newchunkno*trie->chunksize);
2219                r = btree_insert(&btreeitem_new->btree, k, btree_value);
2220                _free_leaf_key(k);
2221            } else {
2222                r = btree_insert(&btreeitem_new->btree,
2223                                 chunk_new, btree_value);
2224            }
2225            if (r == BTREE_RESULT_FAIL) {
2226                ret_result = HBTRIE_RESULT_FAIL;
2227            }
2228        }
2229
2230        // update previous (parent) b-tree
2231        bid_new = btreeitem_new->btree.root_bid;
2232        btreeitem->child_rootbid = bid_new;
2233
2234        // set MSB
2235        _bid = _endian_encode(bid_new);
2236        _hbtrie_set_msb(trie, (void *)&_bid);
2237        // ASSUMPTION: parent b-tree always MUST be non-leaf b-tree
2238        r = btree_insert(&btreeitem->btree, chunk, (void*)&_bid);
2239        if (r == BTREE_RESULT_FAIL) {
2240            ret_result = HBTRIE_RESULT_FAIL;
2241        }
2242
2243        break;
2244    } // 1st while
2245
2246    // Clear docrawkey, dockey
2247    free(docrawkey);
2248    free(dockey);
2249
2250    _hbtrie_btree_cascaded_update(trie, &btreelist, key, 1);
2251
2252    return ret_result;
2253}
2254
2255hbtrie_result hbtrie_insert(struct hbtrie *trie,
2256                            void *rawkey, int rawkeylen,
2257                            void *value, void *oldvalue_out) {
2258    return _hbtrie_insert(trie, rawkey, rawkeylen, value, oldvalue_out, 0x0);
2259}
2260
2261hbtrie_result hbtrie_insert_partial(struct hbtrie *trie,
2262                                    void *rawkey, int rawkeylen,
2263                                    void *value, void *oldvalue_out) {
2264    return _hbtrie_insert(trie, rawkey, rawkeylen,
2265                          value, oldvalue_out, HBTRIE_PARTIAL_UPDATE);
2266}
2267