xref: /6.0.3/couchstore/src/couch_db.cc (revision 6a2d3d10)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "config.h"
3
4#include <cstddef>
5#include <fcntl.h>
6#include <platform/cb_malloc.h>
7#include <string.h>
8#include <stdlib.h>
9#include <assert.h>
10#include <stdio.h>
11
12#include "internal.h"
13#include "node_types.h"
14#include "couch_btree.h"
15#include "bitfield.h"
16#include "reduces.h"
17#include "util.h"
18
19#include "couch_latency_internal.h"
20
21#define ROOT_BASE_SIZE 12
22#define HEADER_BASE_SIZE 25
23
24// Initializes one of the db's root node pointers from data in the file header
25static couchstore_error_t read_db_root(Db *db, node_pointer **root,
26                                       void *root_data, int root_size)
27{
28    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
29    if (root_size > 0) {
30        error_unless(root_size >= ROOT_BASE_SIZE, COUCHSTORE_ERROR_CORRUPT);
31        *root = read_root(root_data, root_size);
32        error_unless(*root, COUCHSTORE_ERROR_ALLOC_FAIL);
33        error_unless((*root)->pointer < db->header.position, COUCHSTORE_ERROR_CORRUPT);
34    } else {
35        *root = NULL;
36    }
37cleanup:
38    return errcode;
39}
40
41// Attempts to initialize the database from a header at the given file position
42static couchstore_error_t find_header_at_pos(Db *db, cs_off_t pos)
43{
44    int seqrootsize;
45    int idrootsize;
46    int localrootsize;
47    char *root_data;
48    int header_len;
49    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
50    union {
51        raw_file_header *raw;
52        char *buf;
53    } header_buf = { NULL };
54    uint8_t buf[2];
55    ssize_t readsize;
56    {
57        // Speculative read looking for header, mark as Empty.
58        ScopedFileTag tag(db->file.ops, db->file.handle, FileTag::Empty);
59        readsize = db->file.ops->pread(
60                &db->file.lastError, db->file.handle, buf, 2, pos);
61    }
62    error_unless(readsize == 2, COUCHSTORE_ERROR_READ);
63    if (buf[0] == 0) {
64        return COUCHSTORE_ERROR_NO_HEADER;
65    } else if (buf[0] != 1) {
66        return COUCHSTORE_ERROR_CORRUPT;
67    }
68
69    header_len = pread_header(&db->file, pos, &header_buf.buf, MAX_DB_HEADER_SIZE);
70    if (header_len < 0) {
71        error_pass(static_cast<couchstore_error_t>(header_len));
72    }
73
74    db->header.position = pos;
75    db->header.disk_version = decode_raw08(header_buf.raw->version);
76
77    // Only 12 and 11 are valid
78    error_unless(db->header.disk_version == COUCH_DISK_VERSION ||
79                 db->header.disk_version == COUCH_DISK_VERSION_11,
80                 COUCHSTORE_ERROR_HEADER_VERSION);
81    db->header.update_seq = decode_raw48(header_buf.raw->update_seq);
82    db->header.purge_seq = decode_raw48(header_buf.raw->purge_seq);
83    db->header.purge_ptr = decode_raw48(header_buf.raw->purge_ptr);
84    error_unless(db->header.purge_ptr <= db->header.position, COUCHSTORE_ERROR_CORRUPT);
85    seqrootsize = decode_raw16(header_buf.raw->seqrootsize);
86    idrootsize = decode_raw16(header_buf.raw->idrootsize);
87    localrootsize = decode_raw16(header_buf.raw->localrootsize);
88    error_unless(header_len == HEADER_BASE_SIZE + seqrootsize + idrootsize + localrootsize,
89                 COUCHSTORE_ERROR_CORRUPT);
90
91    root_data = (char*) (header_buf.raw + 1);  // i.e. just past *header_buf
92    error_pass(read_db_root(db, &db->header.by_seq_root, root_data, seqrootsize));
93    root_data += seqrootsize;
94    error_pass(read_db_root(db, &db->header.by_id_root, root_data, idrootsize));
95    root_data += idrootsize;
96    error_pass(read_db_root(db, &db->header.local_docs_root, root_data, localrootsize));
97
98cleanup:
99    cb_free(header_buf.raw);
100    return errcode;
101}
102
103// Finds the database header by scanning back from the end of the file at 4k boundaries
104static couchstore_error_t find_header(Db *db, int64_t start_pos)
105{
106    couchstore_error_t last_header_errcode = COUCHSTORE_ERROR_NO_HEADER;
107    int64_t pos = start_pos;
108    pos -= pos % COUCH_BLOCK_SIZE;
109    for (; pos >= 0; pos -= COUCH_BLOCK_SIZE) {
110        couchstore_error_t errcode = find_header_at_pos(db, pos);
111        switch(errcode) {
112            case COUCHSTORE_SUCCESS:
113                // Found it!
114                return COUCHSTORE_SUCCESS;
115            case COUCHSTORE_ERROR_NO_HEADER:
116                // No header here, so keep going
117                break;
118            case COUCHSTORE_ERROR_ALLOC_FAIL:
119                // Fatal error
120                return errcode;
121            default:
122                // Invalid header; continue, but remember the last error
123                last_header_errcode = errcode;
124                break;
125        }
126    }
127    return last_header_errcode;
128}
129
130/**
131 * Calculates how large in bytes the current header will be
132 * when written to disk.
133 *
134 * The seqrootsize, idrootsize and localrootsize params are
135 * used to return the respective sizes in this header if
136 * needed.
137 */
138size_t calculate_header_size(Db *db, size_t& seqrootsize,
139                             size_t& idrootsize, size_t& localrootsize)
140{
141    seqrootsize = idrootsize = localrootsize = 0;
142
143    if (db->header.by_seq_root) {
144        seqrootsize = ROOT_BASE_SIZE + db->header.by_seq_root->reduce_value.size;
145    }
146    if (db->header.by_id_root) {
147        idrootsize = ROOT_BASE_SIZE + db->header.by_id_root->reduce_value.size;
148    }
149    if (db->header.local_docs_root) {
150        localrootsize = ROOT_BASE_SIZE + db->header.local_docs_root->reduce_value.size;
151    }
152    return sizeof(raw_file_header) + seqrootsize + idrootsize + localrootsize;
153}
154
155couchstore_error_t db_write_header(Db *db)
156{
157    sized_buf writebuf;
158    size_t seqrootsize, idrootsize, localrootsize;
159    writebuf.size = calculate_header_size(db, seqrootsize,
160                                          idrootsize, localrootsize);
161    writebuf.buf = (char *) cb_calloc(1, writebuf.size);
162    raw_file_header* header = (raw_file_header*)writebuf.buf;
163    header->version = encode_raw08(db->header.disk_version);
164    encode_raw48(db->header.update_seq, &header->update_seq);
165    encode_raw48(db->header.purge_seq, &header->purge_seq);
166    encode_raw48(db->header.purge_ptr, &header->purge_ptr);
167    header->seqrootsize = encode_raw16((uint16_t)seqrootsize);
168    header->idrootsize = encode_raw16((uint16_t)idrootsize);
169    header->localrootsize = encode_raw16((uint16_t)localrootsize);
170    uint8_t *root = (uint8_t*)(header + 1);
171    encode_root(root, db->header.by_seq_root);
172    root += seqrootsize;
173    encode_root(root, db->header.by_id_root);
174    root += idrootsize;
175    encode_root(root, db->header.local_docs_root);
176    cs_off_t pos;
177    couchstore_error_t errcode = write_header(&db->file, &writebuf, &pos);
178    if (errcode == COUCHSTORE_SUCCESS) {
179        db->header.position = pos;
180    }
181    cb_free(writebuf.buf);
182    return errcode;
183}
184
185static couchstore_error_t create_header(Db *db)
186{
187    // Select the version based upon selected CRC
188    if (db->file.crc_mode == CRC32) {
189        // user is creating down-level files
190        db->header.disk_version = COUCH_DISK_VERSION_11;
191    } else {
192        // user is using latest
193        db->header.disk_version = COUCH_DISK_VERSION;
194    }
195    db->header.update_seq = 0;
196    db->header.by_id_root = NULL;
197    db->header.by_seq_root = NULL;
198    db->header.local_docs_root = NULL;
199    db->header.purge_seq = 0;
200    db->header.purge_ptr = 0;
201    db->header.position = 0;
202    return db_write_header(db);
203}
204
205LIBCOUCHSTORE_API
206uint64_t couchstore_get_header_position(Db *db)
207{
208    return db->header.position;
209}
210
211/**
212 * Precommit should occur before writing a header, it has two
213 * purposes. Firstly it ensures data is written before we attempt
214 * to write the header. This means it's impossible for the header
215 * to be written before the data. This is accomplished through
216 * a sync.
217 *
218 * The second purpose is to extend the file to be large enough
219 * to include the subsequently written header. This is done so
220 * the fdatasync performed by writing a header doesn't have to
221 * do an additional (expensive) modified metadata flush on top
222 * of the one we're already doing.
223 */
224couchstore_error_t precommit(Db *db)
225{
226    cs_off_t curpos = db->file.pos;
227
228    db->file.pos = align_to_next_block(db->file.pos);
229    sized_buf zerobyte = { const_cast<char*>("\0"), 1};
230
231    size_t seqrootsize, idrootsize, localrootsize;
232    db->file.pos += calculate_header_size(db, seqrootsize,
233                                          idrootsize, localrootsize);
234
235    //Extend file size to where end of header will land before we do first sync
236    couchstore_error_t errcode = static_cast<couchstore_error_t>(
237        db_write_buf(&db->file, &zerobyte, NULL, NULL));
238
239    if (errcode == COUCHSTORE_SUCCESS) {
240        errcode = db->file.ops->sync(&db->file.lastError, db->file.handle);
241    }
242    // Move cursor back to where it was
243    db->file.pos = curpos;
244    return errcode;
245}
246
247LIBCOUCHSTORE_API
248couchstore_error_t couchstore_commit(Db *db)
249{
250    COLLECT_LATENCY();
251
252    couchstore_error_t errcode = precommit(db);
253
254    if (errcode == COUCHSTORE_SUCCESS) {
255        errcode = db_write_header(db);
256    }
257
258    if (errcode == COUCHSTORE_SUCCESS) {
259        errcode = db->file.ops->sync(&db->file.lastError, db->file.handle);
260    }
261
262    return errcode;
263}
264
265static tree_file_options get_tree_file_options_from_flags(couchstore_open_flags flags)
266{
267    tree_file_options options;
268
269    if (flags & COUCHSTORE_OPEN_FLAG_UNBUFFERED) {
270        options.buf_io_enabled = false;
271    } else if (flags & COUCHSTORE_OPEN_WITH_CUSTOM_BUFFER) {
272        // Buffered IO with custom buffer settings.
273        //  * First 4 bits [15:12]: read buffer capacity
274        //  * Next  4 bits [11:08]: max read buffer count
275
276        uint32_t unit_index = (flags >> 12) & 0xf;
277        if (unit_index) {
278            // unit_index    1     2     3     4     ...   15
279            // unit size     1KB   2KB   4KB   8KB   ...   16MB
280            options.buf_io_read_unit_size = 1024 * (1 << (unit_index -1));
281        }
282        uint32_t count_index = (flags >> 8) & 0xf;
283        if (count_index) {
284            // count_index   1     2     3     4     ...   15
285            // # buffers     8     16    32    64    ...   128K
286            options.buf_io_read_buffers = 8 * (1 << (count_index-1));
287        }
288    }
289
290    // Set default value first.
291    options.kp_nodesize = DB_KP_CHUNK_THRESHOLD;
292    options.kv_nodesize = DB_KV_CHUNK_THRESHOLD;
293    if (flags & COUCHSTORE_OPEN_WITH_CUSTOM_NODESIZE) {
294        // B+tree custom node size settings.
295        //  * First 4 bits [23:20]: KP node size
296        //  * Next  4 bits [19:16]: KV node size
297        uint32_t kp_flag = (flags >> 20) & 0xf;
298        if (kp_flag) {
299            options.kp_nodesize = kp_flag * 1024;
300        }
301        uint32_t kv_flag = (flags >> 16) & 0xf;
302        if (kv_flag) {
303            options.kv_nodesize = kv_flag * 1024;
304        }
305    }
306
307    if (flags & COUCHSTORE_OPEN_WITH_PERIODIC_SYNC) {
308        // Automatic sync() every N bytes written.
309        //  * 5 bits [28-24]: power-of-2 * 1kB
310        uint64_t sync_flag = (flags >> 24) & 0x1f;
311        options.periodic_sync_bytes = uint64_t(1024) << (sync_flag - 1);
312    }
313
314    return options;
315}
316
317LIBCOUCHSTORE_API
318couchstore_open_flags couchstore_encode_periodic_sync_flags(uint64_t bytes) {
319    // Convert to encoding supported by couchstore_open_flags - KB power-of-2
320    // value.
321    // Round up to whole kilobyte units.
322    const uint64_t kilobytes = (bytes + 1023) / 1024;
323    // Calculate the shift amount (what is the log2 power)
324    uint64_t shiftAmount = std::log2(kilobytes);
325    // Saturate if the user specified more than the encodable amount.
326    shiftAmount = std::min(shiftAmount, uint64_t(30));
327    // Finally, encode in couchstore_open flags
328    return ((shiftAmount + 1)) << 24;
329}
330
331LIBCOUCHSTORE_API
332couchstore_error_t couchstore_open_db(const char *filename,
333                                      couchstore_open_flags flags,
334                                      Db **pDb)
335{
336    return couchstore_open_db_ex(filename, flags,
337                                 couchstore_get_default_file_ops(), pDb);
338}
339
340LIBCOUCHSTORE_API
341couchstore_error_t couchstore_open_db_ex(const char *filename,
342                                         couchstore_open_flags flags,
343                                         FileOpsInterface* ops,
344                                         Db **pDb)
345{
346    COLLECT_LATENCY();
347
348    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
349    Db *db;
350    int openflags;
351    cs_off_t pos;
352
353    /* Sanity check input parameters */
354    if ((flags & COUCHSTORE_OPEN_FLAG_RDONLY) &&
355        (flags & COUCHSTORE_OPEN_FLAG_CREATE)) {
356        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
357    }
358
359    if ((db = static_cast<Db*>(cb_calloc(1, sizeof(Db)))) == NULL) {
360        return COUCHSTORE_ERROR_ALLOC_FAIL;
361    }
362
363    if (flags & COUCHSTORE_OPEN_FLAG_RDONLY) {
364        openflags = O_RDONLY;
365    } else {
366        openflags = O_RDWR;
367    }
368
369    if (flags & COUCHSTORE_OPEN_FLAG_CREATE) {
370        openflags |= O_CREAT;
371    }
372
373    // open with CRC unknown, CRC will be selected when header is read/or not found.
374    error_pass(tree_file_open(&db->file, filename, openflags, CRC_UNKNOWN, ops,
375                              get_tree_file_options_from_flags(flags)));
376
377    pos = db->file.ops->goto_eof(&db->file.lastError, db->file.handle);
378    db->file.pos = pos;
379    if (pos == 0) {
380        /* This is an empty file. Create a new fileheader unless the
381         * user wanted a read-only version of the file
382         */
383
384        if (flags & COUCHSTORE_OPEN_FLAG_RDONLY) {
385            error_pass(COUCHSTORE_ERROR_NO_HEADER);
386        } else {
387
388            // Select the CRC to use on this new file
389            if (flags & COUCHSTORE_OPEN_WITH_LEGACY_CRC) {
390                db->file.crc_mode = CRC32;
391            } else {
392                db->file.crc_mode = CRC32C;
393            }
394
395            error_pass(create_header(db));
396        }
397    } else if (pos > 0) {
398        error_pass(find_header(db, db->file.pos - 2));
399
400        if (db->header.disk_version <= COUCH_DISK_VERSION_11) {
401            db->file.crc_mode = CRC32;
402        } else {
403            cb_assert(db->header.disk_version >= COUCH_DISK_VERSION_12);
404            db->file.crc_mode = CRC32C;
405        }
406
407        // Not allowed. Can't request legacy_crc but be opening non legacy CRC files.
408        if (db->file.crc_mode == CRC32C && flags & COUCHSTORE_OPEN_WITH_LEGACY_CRC) {
409            errcode = COUCHSTORE_ERROR_INVALID_ARGUMENTS;
410            goto cleanup;
411        }
412    } else {
413        error_pass(static_cast<couchstore_error_t>(db->file.pos));
414    }
415
416    *pDb = db;
417    db->dropped = 0;
418
419cleanup:
420    if(errcode != COUCHSTORE_SUCCESS) {
421        couchstore_close_file(db);
422        couchstore_free_db(db);
423    }
424
425    return errcode;
426}
427
428LIBCOUCHSTORE_API
429couchstore_error_t couchstore_close_file(Db* db)
430{
431    COLLECT_LATENCY();
432
433    if(db->dropped) {
434        return COUCHSTORE_SUCCESS;
435    }
436    couchstore_error_t error = tree_file_close(&db->file);
437    db->dropped = 1;
438    return error;
439}
440
441LIBCOUCHSTORE_API
442couchstore_error_t couchstore_rewind_db_header(Db *db)
443{
444    COLLECT_LATENCY();
445
446    couchstore_error_t errcode;
447    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
448    // free current header guts
449    cb_free(db->header.by_id_root);
450    cb_free(db->header.by_seq_root);
451    cb_free(db->header.local_docs_root);
452    db->header.by_id_root = NULL;
453    db->header.by_seq_root = NULL;
454    db->header.local_docs_root = NULL;
455
456    error_unless(db->header.position != 0, COUCHSTORE_ERROR_DB_NO_LONGER_VALID);
457    // find older header
458    error_pass(find_header(db, db->header.position - 2));
459
460cleanup:
461    // if we failed, free the handle and return an error
462    if(errcode != COUCHSTORE_SUCCESS) {
463        couchstore_close_file(db);
464        couchstore_free_db(db);
465        errcode = COUCHSTORE_ERROR_DB_NO_LONGER_VALID;
466    }
467    return errcode;
468}
469
470LIBCOUCHSTORE_API
471couchstore_error_t couchstore_free_db(Db* db)
472{
473    COLLECT_LATENCY();
474
475    if(!db) {
476        return COUCHSTORE_SUCCESS;
477    }
478
479    if(!db->dropped) {
480        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
481    }
482
483    cb_free(db->header.by_id_root);
484    cb_free(db->header.by_seq_root);
485    cb_free(db->header.local_docs_root);
486    db->header.by_id_root = NULL;
487    db->header.by_seq_root = NULL;
488    db->header.local_docs_root = NULL;
489
490    memset(db, 0xa5, sizeof(*db));
491    cb_free(db);
492
493    return COUCHSTORE_SUCCESS;
494}
495
496LIBCOUCHSTORE_API
497const char* couchstore_get_db_filename(Db *db) {
498    return db->file.path;
499}
500
501LIBCOUCHSTORE_API
502FileOpsInterface::FHStats* couchstore_get_db_filestats(Db* db) {
503    return db->file.ops->get_stats(db->file.handle);
504}
505
506DocInfo* couchstore_alloc_docinfo(const sized_buf *id, const sized_buf *rev_meta) {
507    size_t size = sizeof(DocInfo);
508    if (id) {
509        size += id->size;
510    }
511    if (rev_meta) {
512        size += rev_meta->size;
513    }
514    DocInfo* docInfo = static_cast<DocInfo*>(cb_malloc(size));
515    if (!docInfo) {
516        return NULL;
517    }
518    memset(docInfo, 0, sizeof(DocInfo));
519    char *extra = (char *)docInfo + sizeof(DocInfo);
520    if (id) {
521        memcpy(extra, id->buf, id->size);
522        docInfo->id.buf = extra;
523        docInfo->id.size = id->size;
524        extra += id->size;
525    }
526    if (rev_meta) {
527        memcpy(extra, rev_meta->buf, rev_meta->size);
528        docInfo->rev_meta.buf = extra;
529        docInfo->rev_meta.size = rev_meta->size;
530    }
531    return docInfo;
532}
533
534LIBCOUCHSTORE_API
535void couchstore_free_docinfo(DocInfo *docinfo)
536{
537    cb_free(docinfo);
538}
539
540LIBCOUCHSTORE_API
541void couchstore_free_document(Doc *doc)
542{
543    if (doc) {
544        size_t offset = offsetof(fatbuf, buf);
545        fatbuf_free((fatbuf *) ((char *)doc - (char *)offset));
546    }
547}
548
549couchstore_error_t by_seq_read_docinfo(DocInfo **pInfo,
550                                       const sized_buf *k,
551                                       const sized_buf *v)
552{
553    const raw_seq_index_value *raw = (const raw_seq_index_value*)v->buf;
554    ssize_t extraSize = v->size - sizeof(*raw);
555    if (extraSize < 0) {
556        return COUCHSTORE_ERROR_CORRUPT;
557    }
558
559    uint32_t idsize, datasize;
560    decode_kv_length(&raw->sizes, &idsize, &datasize);
561    uint64_t bp = decode_raw48(raw->bp);
562    int deleted = (bp & BP_DELETED_FLAG) != 0;
563    bp &= ~BP_DELETED_FLAG;
564    uint8_t content_meta = decode_raw08(raw->content_meta);
565    uint64_t rev_seq = decode_raw48(raw->rev_seq);
566    uint64_t db_seq = decode_sequence_key(k);
567
568    sized_buf id = {v->buf + sizeof(*raw), idsize};
569    sized_buf rev_meta = {id.buf + idsize, extraSize - id.size};
570    DocInfo* docInfo = couchstore_alloc_docinfo(&id, &rev_meta);
571    if (!docInfo) {
572        return COUCHSTORE_ERROR_ALLOC_FAIL;
573    }
574
575    docInfo->db_seq = db_seq;
576    docInfo->rev_seq = rev_seq;
577    docInfo->deleted = deleted;
578    docInfo->bp = bp;
579    docInfo->size = datasize;
580    docInfo->content_meta = content_meta;
581    *pInfo = docInfo;
582    return COUCHSTORE_SUCCESS;
583}
584
585static couchstore_error_t by_id_read_docinfo(DocInfo **pInfo,
586                                             const sized_buf *k,
587                                             const sized_buf *v)
588{
589    const raw_id_index_value *raw = (const raw_id_index_value*)v->buf;
590    ssize_t revMetaSize = v->size - sizeof(*raw);
591    if (revMetaSize < 0) {
592        return COUCHSTORE_ERROR_CORRUPT;
593    }
594
595    uint32_t datasize, deleted;
596    uint8_t content_meta;
597    uint64_t bp, seq, revnum;
598
599    seq = decode_raw48(raw->db_seq);
600    datasize = decode_raw32(raw->size);
601    bp = decode_raw48(raw->bp);
602    deleted = (bp & BP_DELETED_FLAG) != 0;
603    bp &= ~BP_DELETED_FLAG;
604    content_meta = decode_raw08(raw->content_meta);
605    revnum = decode_raw48(raw->rev_seq);
606
607    sized_buf rev_meta = {v->buf + sizeof(*raw), static_cast<size_t>(revMetaSize)};
608    DocInfo* docInfo = couchstore_alloc_docinfo(k, &rev_meta);
609    if (!docInfo) {
610        return COUCHSTORE_ERROR_ALLOC_FAIL;
611    }
612
613    docInfo->db_seq = seq;
614    docInfo->rev_seq = revnum;
615    docInfo->deleted = deleted;
616    docInfo->bp = bp;
617    docInfo->size = datasize;
618    docInfo->content_meta = content_meta;
619    *pInfo = docInfo;
620    return COUCHSTORE_SUCCESS;
621}
622
623//Fill in doc from reading file.
624static couchstore_error_t bp_to_doc(Doc **pDoc, Db *db, cs_off_t bp, couchstore_open_options options)
625{
626    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
627    int bodylen = 0;
628    char *docbody = NULL;
629    fatbuf *docbuf = NULL;
630    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
631
632    if (options & DECOMPRESS_DOC_BODIES) {
633        bodylen = pread_compressed(&db->file, bp, &docbody);
634    } else {
635        bodylen = pread_bin(&db->file, bp, &docbody);
636    }
637
638    error_unless(bodylen >= 0, static_cast<couchstore_error_t>(bodylen));    // if bodylen is negative it's an error code
639    error_unless(docbody || bodylen == 0, COUCHSTORE_ERROR_READ);
640
641    error_unless(docbuf = fatbuf_alloc(sizeof(Doc) + bodylen), COUCHSTORE_ERROR_ALLOC_FAIL);
642    *pDoc = (Doc *) fatbuf_get(docbuf, sizeof(Doc));
643
644    if (bodylen == 0) { //Empty doc
645        (*pDoc)->data.buf = NULL;
646        (*pDoc)->data.size = 0;
647        cb_free(docbody);
648        return COUCHSTORE_SUCCESS;
649    }
650
651    (*pDoc)->data.buf = (char *) fatbuf_get(docbuf, bodylen);
652    (*pDoc)->data.size = bodylen;
653    memcpy((*pDoc)->data.buf, docbody, bodylen);
654
655cleanup:
656    cb_free(docbody);
657    if (errcode < 0) {
658        fatbuf_free(docbuf);
659    }
660    return errcode;
661}
662
663static couchstore_error_t docinfo_fetch_by_id(couchfile_lookup_request *rq,
664                                              const sized_buf *k,
665                                              const sized_buf *v)
666{
667    DocInfo **pInfo = (DocInfo **) rq->callback_ctx;
668    if (v == NULL) {
669        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
670    }
671    return by_id_read_docinfo(pInfo, k, v);
672}
673
674static couchstore_error_t docinfo_fetch_by_seq(couchfile_lookup_request *rq,
675                                               const sized_buf *k,
676                                               const sized_buf *v)
677{
678    DocInfo **pInfo = (DocInfo **) rq->callback_ctx;
679    if (v == NULL) {
680        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
681    }
682    return by_seq_read_docinfo(pInfo, k, v);
683}
684
685LIBCOUCHSTORE_API
686couchstore_error_t couchstore_docinfo_by_id(Db *db,
687                                            const void *id,
688                                            size_t idlen,
689                                            DocInfo **pInfo)
690{
691    COLLECT_LATENCY();
692
693    sized_buf key;
694    sized_buf *keylist = &key;
695    couchfile_lookup_request rq;
696    couchstore_error_t errcode;
697    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
698
699    if (db->header.by_id_root == NULL) {
700        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
701    }
702
703    key.buf = (char *) id;
704    key.size = idlen;
705
706    rq.cmp.compare = ebin_cmp;
707    rq.file = &db->file;
708    rq.num_keys = 1;
709    rq.keys = &keylist;
710    rq.callback_ctx = pInfo;
711    rq.fetch_callback = docinfo_fetch_by_id;
712    rq.node_callback = NULL;
713    rq.fold = 0;
714
715    errcode = btree_lookup(&rq, db->header.by_id_root->pointer);
716    if (errcode == COUCHSTORE_SUCCESS) {
717        if (*pInfo == NULL) {
718            errcode = COUCHSTORE_ERROR_DOC_NOT_FOUND;
719        }
720    }
721cleanup:
722    return errcode;
723}
724
725LIBCOUCHSTORE_API
726couchstore_error_t couchstore_docinfo_by_sequence(Db *db,
727                                                  uint64_t sequence,
728                                                  DocInfo **pInfo)
729{
730    COLLECT_LATENCY();
731
732    sized_buf key;
733    sized_buf *keylist = &key;
734    couchfile_lookup_request rq;
735    couchstore_error_t errcode;
736    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
737
738    if (db->header.by_id_root == NULL) {
739        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
740    }
741
742    sequence = htonll(sequence);
743    key.buf = (char *)&sequence + 2;
744    key.size = 6;
745
746    rq.cmp.compare = seq_cmp;
747    rq.file = &db->file;
748    rq.num_keys = 1;
749    rq.keys = &keylist;
750    rq.callback_ctx = pInfo;
751    rq.fetch_callback = docinfo_fetch_by_seq;
752    rq.node_callback = NULL;
753    rq.fold = 0;
754
755    errcode = btree_lookup(&rq, db->header.by_seq_root->pointer);
756    if (errcode == COUCHSTORE_SUCCESS) {
757        if (*pInfo == NULL) {
758            errcode = COUCHSTORE_ERROR_DOC_NOT_FOUND;
759        }
760    }
761cleanup:
762    return errcode;
763}
764
765LIBCOUCHSTORE_API
766couchstore_error_t couchstore_open_doc_with_docinfo(Db *db,
767                                                    const DocInfo *docinfo,
768                                                    Doc **pDoc,
769                                                    couchstore_open_options options)
770{
771    COLLECT_LATENCY();
772
773    couchstore_error_t errcode;
774
775    *pDoc = NULL;
776    if (docinfo->bp == 0) {
777        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
778    }
779
780    if (!(docinfo->content_meta & COUCH_DOC_IS_COMPRESSED)) {
781        options &= ~DECOMPRESS_DOC_BODIES;
782    }
783
784    errcode = bp_to_doc(pDoc, db, docinfo->bp, options);
785    if (errcode == COUCHSTORE_SUCCESS) {
786        (*pDoc)->id.buf = docinfo->id.buf;
787        (*pDoc)->id.size = docinfo->id.size;
788    }
789
790    return errcode;
791}
792
793LIBCOUCHSTORE_API
794couchstore_error_t couchstore_open_document(Db *db,
795                                            const void *id,
796                                            size_t idlen,
797                                            Doc **pDoc,
798                                            couchstore_open_options options)
799{
800    COLLECT_LATENCY();
801
802    couchstore_error_t errcode;
803    DocInfo *info;
804    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
805    *pDoc = NULL;
806    errcode = couchstore_docinfo_by_id(db, id, idlen, &info);
807    if (errcode == COUCHSTORE_SUCCESS) {
808        errcode = couchstore_open_doc_with_docinfo(db, info, pDoc, options);
809        if (errcode == COUCHSTORE_SUCCESS) {
810            (*pDoc)->id.buf = (char *) id;
811            (*pDoc)->id.size = idlen;
812        }
813
814        couchstore_free_docinfo(info);
815    }
816cleanup:
817    return errcode;
818}
819
820// context info passed to lookup_callback via btree_lookup
821typedef struct {
822    Db *db;
823    couchstore_docinfos_options options;
824    couchstore_changes_callback_fn callback;
825    void* callback_context;
826    int by_id;
827    int depth;
828    couchstore_walk_tree_callback_fn walk_callback;
829} lookup_context;
830
831// btree_lookup callback, called while iterating keys
832static couchstore_error_t lookup_callback(couchfile_lookup_request *rq,
833                                          const sized_buf *k,
834                                          const sized_buf *v)
835{
836    if (v == NULL) {
837        return COUCHSTORE_SUCCESS;
838    }
839
840    const lookup_context *context = static_cast<const lookup_context *>(rq->callback_ctx);
841    DocInfo *docinfo = NULL;
842    couchstore_error_t errcode;
843    if (context->by_id) {
844        errcode = by_id_read_docinfo(&docinfo, k, v);
845    } else {
846        errcode = by_seq_read_docinfo(&docinfo, k, v);
847    }
848    if (errcode == COUCHSTORE_ERROR_CORRUPT &&
849        (context->options & COUCHSTORE_TOLERATE_CORRUPTION)) {
850        // Invoke callback even if doc info is corrupted/unreadable, if magic flag is set
851        docinfo = static_cast<DocInfo*>(cb_calloc(sizeof(DocInfo), 1));
852        docinfo->id = *k;
853        docinfo->rev_meta = *v;
854    } else if (errcode) {
855        return errcode;
856    }
857
858    if ((context->options & COUCHSTORE_DELETES_ONLY) && docinfo->deleted == 0) {
859        couchstore_free_docinfo(docinfo);
860        return COUCHSTORE_SUCCESS;
861    }
862
863    if ((context->options & COUCHSTORE_NO_DELETES) && docinfo->deleted == 1) {
864        couchstore_free_docinfo(docinfo);
865        return COUCHSTORE_SUCCESS;
866    }
867
868    if (context->walk_callback) {
869        errcode = static_cast<couchstore_error_t>(context->walk_callback(context->db,
870                                                                         context->depth,
871                                                                         docinfo,
872                                                                         0,
873                                                                         NULL,
874                                                                         context->callback_context));
875    } else {
876        errcode = static_cast<couchstore_error_t>(context->callback(context->db,
877                                                                    docinfo,
878                                                                    context->callback_context));
879    }
880    if (errcode <= 0) {
881        couchstore_free_docinfo(docinfo);
882    } else {
883        // User requested docinfo not be freed, don't free it, return success
884        return COUCHSTORE_SUCCESS;
885    }
886    return errcode;
887}
888
889LIBCOUCHSTORE_API
890couchstore_error_t couchstore_changes_since(Db *db,
891                                            uint64_t since,
892                                            couchstore_docinfos_options options,
893                                            couchstore_changes_callback_fn callback,
894                                            void *ctx)
895{
896    COLLECT_LATENCY();
897
898    char since_termbuf[6];
899    sized_buf since_term;
900    sized_buf *keylist = &since_term;
901    lookup_context cbctx = {db, options, callback, ctx, 0, 0, NULL};
902    couchfile_lookup_request rq;
903    couchstore_error_t errcode;
904
905    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
906    if (db->header.by_seq_root == NULL) {
907        return COUCHSTORE_SUCCESS;
908    }
909
910    since_term.buf = since_termbuf;
911    since_term.size = 6;
912    encode_raw48(since, (raw_48*)since_term.buf);
913
914    rq.cmp.compare = seq_cmp;
915    rq.file = &db->file;
916    rq.num_keys = 1;
917    rq.keys = &keylist;
918    rq.callback_ctx = &cbctx;
919    rq.fetch_callback = lookup_callback;
920    rq.node_callback = NULL;
921    rq.fold = 1;
922    rq.tolerate_corruption = (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0;
923
924    errcode = btree_lookup(&rq, db->header.by_seq_root->pointer);
925cleanup:
926    return errcode;
927}
928
929LIBCOUCHSTORE_API
930couchstore_error_t couchstore_all_docs(Db *db,
931                                       const sized_buf* startKeyPtr,
932                                       couchstore_docinfos_options options,
933                                       couchstore_changes_callback_fn callback,
934                                       void *ctx)
935{
936    COLLECT_LATENCY();
937
938    sized_buf startKey = {NULL, 0};
939    sized_buf *keylist = &startKey;
940    lookup_context cbctx = {db, options, callback, ctx, 1, 0, NULL};
941    couchfile_lookup_request rq;
942    couchstore_error_t errcode;
943
944    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
945    if (db->header.by_id_root == NULL) {
946        return COUCHSTORE_SUCCESS;
947    }
948
949    if (startKeyPtr) {
950        startKey = *startKeyPtr;
951    }
952
953    rq.cmp.compare = ebin_cmp;
954    rq.file = &db->file;
955    rq.num_keys = 1;
956    rq.keys = &keylist;
957    rq.callback_ctx = &cbctx;
958    rq.fetch_callback = lookup_callback;
959    rq.node_callback = NULL;
960    rq.fold = 1;
961    rq.tolerate_corruption = (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0;
962
963    errcode = btree_lookup(&rq, db->header.by_id_root->pointer);
964cleanup:
965    return errcode;
966}
967
968static couchstore_error_t walk_node_callback(struct couchfile_lookup_request *rq,
969                                                 uint64_t subtreeSize,
970                                                 const sized_buf *reduceValue)
971{
972    lookup_context* context = static_cast<lookup_context*>(rq->callback_ctx);
973    if (reduceValue) {
974        int result = context->walk_callback(context->db,
975                                            context->depth,
976                                            NULL,
977                                            subtreeSize,
978                                            reduceValue,
979                                            context->callback_context);
980        context->depth++;
981        if (result < 0)
982            return static_cast<couchstore_error_t>(result);
983    } else {
984        context->depth--;
985    }
986    return COUCHSTORE_SUCCESS;
987}
988
989static
990couchstore_error_t couchstore_walk_tree(Db *db,
991                                        int by_id,
992                                        const node_pointer* root,
993                                        const sized_buf* startKeyPtr,
994                                        couchstore_docinfos_options options,
995                                        int (*compare)(const sized_buf *k1, const sized_buf *k2),
996                                        couchstore_walk_tree_callback_fn callback,
997                                        void *ctx)
998{
999    couchstore_error_t errcode;
1000    sized_buf startKey = {NULL, 0};
1001    sized_buf *keylist;
1002    couchfile_lookup_request rq;
1003
1004    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1005    if (root == NULL) {
1006        return COUCHSTORE_SUCCESS;
1007    }
1008
1009    // Invoke the callback on the root node:
1010    errcode = static_cast<couchstore_error_t>(callback(db, 0, NULL,
1011                                                       root->subtreesize,
1012                                                       &root->reduce_value,
1013                                                       ctx));
1014    if (errcode < 0) {
1015        return errcode;
1016    }
1017
1018    if (startKeyPtr) {
1019        startKey = *startKeyPtr;
1020    }
1021    keylist = &startKey;
1022
1023    {
1024        // Create a new scope here just to mute the warning from the
1025        // compiler that the goto in the macro error_unless
1026        // skips the initialization of lookup_ctx..
1027        lookup_context lookup_ctx = {db, options, NULL, ctx, by_id, 1, callback};
1028
1029        rq.cmp.compare = compare;
1030        rq.file = &db->file;
1031        rq.num_keys = 1;
1032        rq.keys = &keylist;
1033        rq.callback_ctx = &lookup_ctx;
1034        rq.fetch_callback = lookup_callback;
1035        rq.node_callback = walk_node_callback;
1036        rq.fold = 1;
1037        rq.tolerate_corruption = (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0;
1038
1039        error_pass(btree_lookup(&rq, root->pointer));
1040    }
1041cleanup:
1042    return errcode;
1043}
1044
1045LIBCOUCHSTORE_API
1046couchstore_error_t couchstore_walk_id_tree(Db *db,
1047                                           const sized_buf* startDocID,
1048                                           couchstore_docinfos_options options,
1049                                           couchstore_walk_tree_callback_fn callback,
1050                                           void *ctx)
1051{
1052    COLLECT_LATENCY();
1053
1054    return couchstore_walk_tree(db, 1, db->header.by_id_root, startDocID,
1055                                options, ebin_cmp, callback, ctx);
1056}
1057
1058LIBCOUCHSTORE_API
1059couchstore_error_t couchstore_walk_seq_tree(Db *db,
1060                                           uint64_t startSequence,
1061                                           couchstore_docinfos_options options,
1062                                           couchstore_walk_tree_callback_fn callback,
1063                                           void *ctx)
1064{
1065    COLLECT_LATENCY();
1066
1067    raw_48 start_termbuf;
1068    encode_raw48(startSequence, &start_termbuf);
1069    sized_buf start_term = {(char*)&start_termbuf, 6};
1070
1071    return couchstore_walk_tree(db, 0, db->header.by_seq_root, &start_term,
1072                                options, seq_cmp, callback, ctx);
1073}
1074
1075static int id_ptr_cmp(const void *a, const void *b)
1076{
1077    sized_buf **buf1 = (sized_buf**) a;
1078    sized_buf **buf2 = (sized_buf**) b;
1079    return ebin_cmp(*buf1, *buf2);
1080}
1081
1082static int seq_ptr_cmp(const void *a, const void *b)
1083{
1084    sized_buf **buf1 = (sized_buf**) a;
1085    sized_buf **buf2 = (sized_buf**) b;
1086    return seq_cmp(*buf1, *buf2);
1087}
1088
1089// Common subroutine of couchstore_docinfos_by_{ids, sequence}
1090static couchstore_error_t iterate_docinfos(Db *db,
1091                                           const sized_buf keys[],
1092                                           unsigned numDocs,
1093                                           node_pointer *tree,
1094                                           int (*key_ptr_compare)(const void *, const void *),
1095                                           int (*key_compare)(const sized_buf *k1, const sized_buf *k2),
1096                                           couchstore_changes_callback_fn callback,
1097                                           int fold,
1098                                           int tolerate_corruption,
1099                                           void *ctx)
1100{
1101    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
1102    const sized_buf **keyptrs = NULL;
1103    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1104    // Nothing to do if the tree is empty
1105    if (tree == NULL) {
1106        return COUCHSTORE_SUCCESS;
1107    }
1108
1109    if(numDocs <= 0) {
1110        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
1111    }
1112
1113    // Create an array of *pointers to* sized_bufs, which is what btree_lookup wants:
1114    keyptrs = static_cast<const sized_buf**>(cb_malloc(numDocs * sizeof(sized_buf*)));
1115    error_unless(keyptrs, COUCHSTORE_ERROR_ALLOC_FAIL);
1116
1117    {
1118        unsigned i;
1119        for (i = 0; i< numDocs; ++i) {
1120            keyptrs[i] = &keys[i];
1121        }
1122        if (!fold) {
1123            // Sort the key pointers:
1124            qsort(keyptrs, numDocs, sizeof(keyptrs[0]), key_ptr_compare);
1125        }
1126
1127        // Construct the lookup request:
1128        lookup_context cbctx = {db, 0, callback, ctx, (tree == db->header.by_id_root), 0, NULL};
1129        couchfile_lookup_request rq;
1130        rq.cmp.compare = key_compare;
1131        rq.file = &db->file;
1132        rq.num_keys = numDocs;
1133        rq.keys = (sized_buf**) keyptrs;
1134        rq.callback_ctx = &cbctx;
1135        rq.fetch_callback = lookup_callback;
1136        rq.node_callback = NULL;
1137        rq.fold = fold;
1138        rq.tolerate_corruption = tolerate_corruption;
1139
1140        // Go!
1141        error_pass(btree_lookup(&rq, tree->pointer));
1142    }
1143cleanup:
1144    cb_free(keyptrs);
1145    return errcode;
1146}
1147
1148LIBCOUCHSTORE_API
1149couchstore_error_t couchstore_docinfos_by_id(Db *db,
1150                                             const sized_buf ids[],
1151                                             unsigned numDocs,
1152                                             couchstore_docinfos_options options,
1153                                             couchstore_changes_callback_fn callback,
1154                                             void *ctx)
1155{
1156    COLLECT_LATENCY();
1157
1158    return iterate_docinfos(db, ids, numDocs,
1159                            db->header.by_id_root, id_ptr_cmp, ebin_cmp,
1160                            callback,
1161                            (options & RANGES) != 0,
1162                            (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0,
1163                            ctx);
1164}
1165
1166LIBCOUCHSTORE_API
1167couchstore_error_t couchstore_docinfos_by_sequence(Db *db,
1168                                                   const uint64_t sequence[],
1169                                                   unsigned numDocs,
1170                                                   couchstore_docinfos_options options,
1171                                                   couchstore_changes_callback_fn callback,
1172                                                   void *ctx)
1173{
1174    COLLECT_LATENCY();
1175
1176    // Create the array of keys:
1177    sized_buf *keylist = static_cast<sized_buf*>(cb_malloc(numDocs * sizeof(sized_buf)));
1178    raw_by_seq_key *keyvalues = static_cast<raw_by_seq_key*>(cb_malloc(numDocs * sizeof(raw_by_seq_key)));
1179    couchstore_error_t errcode;
1180    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1181    error_unless(keylist && keyvalues, COUCHSTORE_ERROR_ALLOC_FAIL);
1182    unsigned i;
1183    for (i = 0; i< numDocs; ++i) {
1184        encode_raw48(sequence[i], &keyvalues[i].sequence);
1185        keylist[i].buf = static_cast<char*>((void*) &keyvalues[i]);
1186        keylist[i].size = sizeof(keyvalues[i]);
1187    }
1188
1189    error_pass(iterate_docinfos(db, keylist, numDocs,
1190                                db->header.by_seq_root, seq_ptr_cmp, seq_cmp,
1191                                callback,
1192                                (options & RANGES) != 0,
1193                                (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0,
1194                                ctx));
1195cleanup:
1196    cb_free(keylist);
1197    cb_free(keyvalues);
1198    return errcode;
1199}
1200
1201LIBCOUCHSTORE_API
1202couchstore_error_t couchstore_db_info(Db *db, DbInfo* dbinfo) {
1203    if (db == NULL || dbinfo == NULL) {
1204        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
1205    }
1206    const node_pointer *id_root = db->header.by_id_root;
1207    const node_pointer *seq_root = db->header.by_seq_root;
1208    const node_pointer *local_root = db->header.local_docs_root;
1209    dbinfo->filename = db->file.path;
1210    dbinfo->header_position = db->header.position;
1211    dbinfo->last_sequence = db->header.update_seq;
1212    dbinfo->purge_seq = db->header.purge_seq;
1213    dbinfo->deleted_count = dbinfo->doc_count = dbinfo->space_used = 0;
1214    dbinfo->file_size = db->file.pos;
1215    if (id_root) {
1216        raw_by_id_reduce* id_reduce = (raw_by_id_reduce*) id_root->reduce_value.buf;
1217        dbinfo->doc_count = decode_raw40(id_reduce->notdeleted);
1218        dbinfo->deleted_count = decode_raw40(id_reduce->deleted);
1219        dbinfo->space_used = decode_raw48(id_reduce->size);
1220        dbinfo->space_used += id_root->subtreesize;
1221    }
1222    if(seq_root) {
1223        dbinfo->space_used += seq_root->subtreesize;
1224    }
1225    if(local_root) {
1226        dbinfo->space_used += local_root->subtreesize;
1227    }
1228    return COUCHSTORE_SUCCESS;
1229}
1230
1231static couchstore_error_t local_doc_fetch(couchfile_lookup_request *rq,
1232                                          const sized_buf *k,
1233                                          const sized_buf *v)
1234{
1235    LocalDoc **lDoc = (LocalDoc **) rq->callback_ctx;
1236    LocalDoc *dp;
1237
1238    if (!v) {
1239        *lDoc = NULL;
1240        return COUCHSTORE_SUCCESS;
1241    }
1242    fatbuf *ldbuf = fatbuf_alloc(sizeof(LocalDoc) + k->size + v->size);
1243    if (ldbuf == NULL) {
1244        return COUCHSTORE_ERROR_ALLOC_FAIL;
1245    }
1246
1247    dp = *lDoc = (LocalDoc *) fatbuf_get(ldbuf, sizeof(LocalDoc));
1248    dp->id.buf = (char *) fatbuf_get(ldbuf, k->size);
1249    dp->id.size = k->size;
1250
1251    dp->json.buf = (char *) fatbuf_get(ldbuf, v->size);
1252    dp->json.size = v->size;
1253
1254    dp->deleted = 0;
1255
1256    memcpy(dp->id.buf, k->buf, k->size);
1257    memcpy(dp->json.buf, v->buf, v->size);
1258
1259    return COUCHSTORE_SUCCESS;
1260}
1261
1262LIBCOUCHSTORE_API
1263couchstore_error_t couchstore_open_local_document(Db *db,
1264                                                  const void *id,
1265                                                  size_t idlen,
1266                                                  LocalDoc **pDoc)
1267{
1268    sized_buf key;
1269    sized_buf *keylist = &key;
1270    couchfile_lookup_request rq;
1271    couchstore_error_t errcode;
1272    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1273    if (db->header.local_docs_root == NULL) {
1274        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
1275    }
1276
1277    key.buf = (char *) id;
1278    key.size = idlen;
1279
1280    rq.cmp.compare = ebin_cmp;
1281    rq.file = &db->file;
1282    rq.num_keys = 1;
1283    rq.keys = &keylist;
1284    rq.callback_ctx = pDoc;
1285    rq.fetch_callback = local_doc_fetch;
1286    rq.node_callback = NULL;
1287    rq.fold = 0;
1288
1289    errcode = btree_lookup(&rq, db->header.local_docs_root->pointer);
1290    if (errcode == COUCHSTORE_SUCCESS) {
1291        if (*pDoc == NULL) {
1292            errcode = COUCHSTORE_ERROR_DOC_NOT_FOUND;
1293        }
1294    }
1295cleanup:
1296    return errcode;
1297}
1298
1299LIBCOUCHSTORE_API
1300couchstore_error_t couchstore_save_local_document(Db *db, LocalDoc *lDoc)
1301{
1302    couchstore_error_t errcode;
1303    couchfile_modify_action ldupdate;
1304    node_pointer *nroot = NULL;
1305    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1306
1307    if (lDoc->deleted) {
1308        ldupdate.type = ACTION_REMOVE;
1309    } else {
1310        ldupdate.type = ACTION_INSERT;
1311    }
1312
1313    ldupdate.key = &lDoc->id;
1314    ldupdate.value.data = &lDoc->json;
1315
1316    couchfile_modify_request rq;
1317    rq.cmp.compare = ebin_cmp;
1318    rq.num_actions = 1;
1319    rq.actions = &ldupdate;
1320    rq.fetch_callback = NULL;
1321    rq.reduce = NULL;
1322    rq.rereduce = NULL;
1323    rq.file = &db->file;
1324    rq.enable_purging = false;
1325    rq.purge_kp = NULL;
1326    rq.purge_kv = NULL;
1327    rq.compacting = 0;
1328    rq.kv_chunk_threshold = db->file.options.kv_nodesize;
1329    rq.kp_chunk_threshold = db->file.options.kp_nodesize;
1330
1331    nroot = modify_btree(&rq, db->header.local_docs_root, &errcode);
1332    if (errcode == COUCHSTORE_SUCCESS && nroot != db->header.local_docs_root) {
1333        cb_free(db->header.local_docs_root);
1334        db->header.local_docs_root = nroot;
1335    }
1336
1337cleanup:
1338    return errcode;
1339}
1340
1341LIBCOUCHSTORE_API
1342void couchstore_free_local_document(LocalDoc *lDoc)
1343{
1344    if (lDoc) {
1345        size_t offset = offsetof(fatbuf, buf);
1346        fatbuf_free((fatbuf *) ((char *)lDoc - (char *)offset));
1347    }
1348}
1349
1350LIBCOUCHSTORE_API
1351couchstore_error_t couchstore_last_os_error(const Db *db,
1352                                            char* buf,
1353                                            size_t size) {
1354    if (db == NULL || buf == nullptr || size == 0) {
1355        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
1356    }
1357    const couchstore_error_info_t *err = &db->file.lastError;
1358
1359    int nw;
1360
1361#ifdef WIN32
1362    char* win_msg = NULL;
1363    FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER |
1364                   FORMAT_MESSAGE_FROM_SYSTEM |
1365                   FORMAT_MESSAGE_IGNORE_INSERTS,
1366                   NULL, err->error,
1367                   MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
1368                   (LPTSTR) &win_msg,
1369                   0, NULL);
1370    nw = _snprintf(buf, size, "WINAPI error = %d: '%s'", err->error, win_msg);
1371    LocalFree(win_msg);
1372#else
1373    nw = snprintf(buf, size, "errno = %d: '%s'",
1374                      err->error, strerror(err->error));
1375#endif
1376
1377    if (nw < 0) {
1378        return COUCHSTORE_ERROR_ALLOC_FAIL;
1379    } if (size_t(nw) >= size) {
1380        /* Truncate the error message */
1381        buf[size - 1] = '\0';
1382    }
1383
1384    return COUCHSTORE_SUCCESS;
1385}
1386
1387static couchstore_error_t btree_eval_seq_reduce(Db *db,
1388                                                uint64_t *accum,
1389                                                sized_buf *left,
1390                                                sized_buf *right,
1391                                                bool past_left_edge,
1392                                                uint64_t diskpos) {
1393    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
1394    int bufpos = 1, nodebuflen = 0;
1395    int node_type;
1396    char *nodebuf = NULL;
1397    nodebuflen = pread_compressed(&db->file, diskpos, &nodebuf);
1398    error_unless(nodebuflen >= 0, (static_cast<couchstore_error_t>(nodebuflen)));  // if negative, it's an error code
1399
1400    node_type = nodebuf[0];
1401    while(bufpos < nodebuflen) {
1402        sized_buf k, v;
1403        bufpos += read_kv(nodebuf + bufpos, &k, &v);
1404        int left_cmp = seq_cmp(&k, left);
1405        int right_cmp = seq_cmp(&k, right);
1406        if(left_cmp < 0) {
1407            continue;
1408        }
1409        if(node_type == KP_NODE) {
1410            // In-range Item in a KP Node
1411            const raw_node_pointer *raw = (const raw_node_pointer*)v.buf;
1412            const raw_by_seq_reduce *rawreduce = (const raw_by_seq_reduce*) (v.buf + sizeof(raw_node_pointer));
1413            uint64_t subcount = decode_raw40(rawreduce->count);
1414            uint64_t pointer = decode_raw48(raw->pointer);
1415            if((left_cmp >= 0 && !past_left_edge) || right_cmp >= 0) {
1416                error_pass(btree_eval_seq_reduce(db, accum, left, right, past_left_edge, pointer));
1417                if(right_cmp >= 0) {
1418                    break;
1419                } else {
1420                    past_left_edge = true;
1421                }
1422            } else {
1423                *accum += subcount;
1424            }
1425        } else {
1426            if(right_cmp > 0) {
1427                break;
1428            }
1429            // In-range Item in a KV Node
1430            *accum += 1;
1431        }
1432    }
1433cleanup:
1434    if (nodebuf) {
1435        cb_free(nodebuf);
1436    }
1437    return errcode;
1438}
1439
1440LIBCOUCHSTORE_API
1441couchstore_error_t couchstore_changes_count(Db* db,
1442                                            uint64_t min_seq,
1443                                            uint64_t max_seq,
1444                                            uint64_t *count) {
1445    COLLECT_LATENCY();
1446
1447    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
1448    raw_48 leftkr, rightkr;
1449    sized_buf leftk, rightk;
1450    leftk.buf = (char*) &leftkr;
1451    rightk.buf = (char*) &rightkr;
1452    leftk.size = 6;
1453    rightk.size = 6;
1454    encode_raw48(min_seq, &leftkr);
1455    encode_raw48(max_seq, &rightkr);
1456
1457    *count = 0;
1458    if(db->header.by_seq_root) {
1459        error_pass(btree_eval_seq_reduce(db, count, &leftk, &rightk, false,
1460                                         db->header.by_seq_root->pointer));
1461    }
1462cleanup:
1463    return errcode;
1464}
1465