xref: /6.0.3/couchstore/src/couch_db.cc (revision 2b2ba193)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "config.h"
3
4#include <cstddef>
5#include <fcntl.h>
6#include <platform/cb_malloc.h>
7#include <string.h>
8#include <stdlib.h>
9#include <assert.h>
10#include <stdio.h>
11
12#include "internal.h"
13#include "node_types.h"
14#include "couch_btree.h"
15#include "bitfield.h"
16#include "reduces.h"
17#include "util.h"
18
19#include "couch_latency_internal.h"
20
21#define ROOT_BASE_SIZE 12
22#define HEADER_BASE_SIZE 25
23
24thread_local char internal_error_string[MAX_ERR_STR_LEN];
25
26// Initializes one of the db's root node pointers from data in the file header
27static couchstore_error_t read_db_root(Db *db, node_pointer **root,
28                                       void *root_data, int root_size)
29{
30    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
31    if (root_size > 0) {
32        error_unless(root_size >= ROOT_BASE_SIZE, COUCHSTORE_ERROR_CORRUPT);
33        *root = read_root(root_data, root_size);
34        error_unless(*root, COUCHSTORE_ERROR_ALLOC_FAIL);
35        error_unless((*root)->pointer < db->header.position, COUCHSTORE_ERROR_CORRUPT);
36    } else {
37        *root = NULL;
38    }
39cleanup:
40    return errcode;
41}
42
43// Attempts to initialize the database from a header at the given file position
44static couchstore_error_t find_header_at_pos(Db *db, cs_off_t pos)
45{
46    int seqrootsize;
47    int idrootsize;
48    int localrootsize;
49    char *root_data;
50    int header_len;
51    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
52    union {
53        raw_file_header *raw;
54        char *buf;
55    } header_buf = { NULL };
56    uint8_t buf[2];
57    ssize_t readsize;
58    {
59        // Speculative read looking for header, mark as Empty.
60        ScopedFileTag tag(db->file.ops, db->file.handle, FileTag::Empty);
61        readsize = db->file.ops->pread(
62                &db->file.lastError, db->file.handle, buf, 2, pos);
63    }
64    error_unless(readsize == 2, COUCHSTORE_ERROR_READ);
65    if (buf[0] == 0) {
66        return COUCHSTORE_ERROR_NO_HEADER;
67    } else if (buf[0] != 1) {
68        return COUCHSTORE_ERROR_CORRUPT;
69    }
70
71    header_len = pread_header(&db->file, pos, &header_buf.buf, MAX_DB_HEADER_SIZE);
72    if (header_len < 0) {
73        error_pass(static_cast<couchstore_error_t>(header_len));
74    }
75
76    db->header.position = pos;
77    db->header.disk_version = decode_raw08(header_buf.raw->version);
78
79    // Only 12 and 11 are valid
80    error_unless(db->header.disk_version == COUCH_DISK_VERSION ||
81                 db->header.disk_version == COUCH_DISK_VERSION_11,
82                 COUCHSTORE_ERROR_HEADER_VERSION);
83    db->header.update_seq = decode_raw48(header_buf.raw->update_seq);
84    db->header.purge_seq = decode_raw48(header_buf.raw->purge_seq);
85    db->header.purge_ptr = decode_raw48(header_buf.raw->purge_ptr);
86    error_unless(db->header.purge_ptr <= db->header.position, COUCHSTORE_ERROR_CORRUPT);
87    seqrootsize = decode_raw16(header_buf.raw->seqrootsize);
88    idrootsize = decode_raw16(header_buf.raw->idrootsize);
89    localrootsize = decode_raw16(header_buf.raw->localrootsize);
90    error_unless(header_len == HEADER_BASE_SIZE + seqrootsize + idrootsize + localrootsize,
91                 COUCHSTORE_ERROR_CORRUPT);
92
93    root_data = (char*) (header_buf.raw + 1);  // i.e. just past *header_buf
94    error_pass(read_db_root(db, &db->header.by_seq_root, root_data, seqrootsize));
95    root_data += seqrootsize;
96    error_pass(read_db_root(db, &db->header.by_id_root, root_data, idrootsize));
97    root_data += idrootsize;
98    error_pass(read_db_root(db, &db->header.local_docs_root, root_data, localrootsize));
99
100cleanup:
101    cb_free(header_buf.raw);
102    return errcode;
103}
104
105// Finds the database header by scanning back from the end of the file at 4k boundaries
106static couchstore_error_t find_header(Db *db, int64_t start_pos)
107{
108    couchstore_error_t last_header_errcode = COUCHSTORE_ERROR_NO_HEADER;
109    int64_t pos = start_pos;
110    pos -= pos % COUCH_BLOCK_SIZE;
111    for (; pos >= 0; pos -= COUCH_BLOCK_SIZE) {
112        couchstore_error_t errcode = find_header_at_pos(db, pos);
113        switch(errcode) {
114            case COUCHSTORE_SUCCESS:
115                // Found it!
116                return COUCHSTORE_SUCCESS;
117            case COUCHSTORE_ERROR_NO_HEADER:
118                // No header here, so keep going
119                break;
120            case COUCHSTORE_ERROR_ALLOC_FAIL:
121                // Fatal error
122                return errcode;
123            default:
124                // Invalid header; continue, but remember the last error
125                last_header_errcode = errcode;
126                break;
127        }
128    }
129    return last_header_errcode;
130}
131
132/**
133 * Calculates how large in bytes the current header will be
134 * when written to disk.
135 *
136 * The seqrootsize, idrootsize and localrootsize params are
137 * used to return the respective sizes in this header if
138 * needed.
139 */
140size_t calculate_header_size(Db *db, size_t& seqrootsize,
141                             size_t& idrootsize, size_t& localrootsize)
142{
143    seqrootsize = idrootsize = localrootsize = 0;
144
145    if (db->header.by_seq_root) {
146        seqrootsize = ROOT_BASE_SIZE + db->header.by_seq_root->reduce_value.size;
147    }
148    if (db->header.by_id_root) {
149        idrootsize = ROOT_BASE_SIZE + db->header.by_id_root->reduce_value.size;
150    }
151    if (db->header.local_docs_root) {
152        localrootsize = ROOT_BASE_SIZE + db->header.local_docs_root->reduce_value.size;
153    }
154    return sizeof(raw_file_header) + seqrootsize + idrootsize + localrootsize;
155}
156
157couchstore_error_t db_write_header(Db *db)
158{
159    sized_buf writebuf;
160    size_t seqrootsize, idrootsize, localrootsize;
161    writebuf.size = calculate_header_size(db, seqrootsize,
162                                          idrootsize, localrootsize);
163    writebuf.buf = (char *) cb_malloc(writebuf.size);
164    raw_file_header* header = (raw_file_header*)writebuf.buf;
165    header->version = encode_raw08(db->header.disk_version);
166    encode_raw48(db->header.update_seq, &header->update_seq);
167    encode_raw48(db->header.purge_seq, &header->purge_seq);
168    encode_raw48(db->header.purge_ptr, &header->purge_ptr);
169    header->seqrootsize = encode_raw16((uint16_t)seqrootsize);
170    header->idrootsize = encode_raw16((uint16_t)idrootsize);
171    header->localrootsize = encode_raw16((uint16_t)localrootsize);
172    uint8_t *root = (uint8_t*)(header + 1);
173    encode_root(root, db->header.by_seq_root);
174    root += seqrootsize;
175    encode_root(root, db->header.by_id_root);
176    root += idrootsize;
177    encode_root(root, db->header.local_docs_root);
178    cs_off_t pos;
179    couchstore_error_t errcode = write_header(&db->file, &writebuf, &pos);
180    if (errcode == COUCHSTORE_SUCCESS) {
181        db->header.position = pos;
182    }
183    cb_free(writebuf.buf);
184    return errcode;
185}
186
187static couchstore_error_t create_header(Db *db)
188{
189    // Select the version based upon selected CRC
190    if (db->file.crc_mode == CRC32) {
191        // user is creating down-level files
192        db->header.disk_version = COUCH_DISK_VERSION_11;
193    } else {
194        // user is using latest
195        db->header.disk_version = COUCH_DISK_VERSION;
196    }
197    db->header.update_seq = 0;
198    db->header.by_id_root = NULL;
199    db->header.by_seq_root = NULL;
200    db->header.local_docs_root = NULL;
201    db->header.purge_seq = 0;
202    db->header.purge_ptr = 0;
203    db->header.position = 0;
204    return db_write_header(db);
205}
206
207LIBCOUCHSTORE_API
208uint64_t couchstore_get_header_position(Db *db)
209{
210    return db->header.position;
211}
212
213/**
214 * Precommit should occur before writing a header, it has two
215 * purposes. Firstly it ensures data is written before we attempt
216 * to write the header. This means it's impossible for the header
217 * to be written before the data. This is accomplished through
218 * a sync.
219 *
220 * The second purpose is to extend the file to be large enough
221 * to include the subsequently written header. This is done so
222 * the fdatasync performed by writing a header doesn't have to
223 * do an additional (expensive) modified metadata flush on top
224 * of the one we're already doing.
225 */
226couchstore_error_t precommit(Db *db)
227{
228    cs_off_t curpos = db->file.pos;
229
230    db->file.pos = align_to_next_block(db->file.pos);
231    sized_buf zerobyte = { const_cast<char*>("\0"), 1};
232
233    size_t seqrootsize, idrootsize, localrootsize;
234    db->file.pos += calculate_header_size(db, seqrootsize,
235                                          idrootsize, localrootsize);
236
237    //Extend file size to where end of header will land before we do first sync
238    couchstore_error_t errcode = static_cast<couchstore_error_t>(
239        db_write_buf(&db->file, &zerobyte, NULL, NULL));
240
241    if (errcode == COUCHSTORE_SUCCESS) {
242        errcode = db->file.ops->sync(&db->file.lastError, db->file.handle);
243    }
244    // Move cursor back to where it was
245    db->file.pos = curpos;
246    return errcode;
247}
248
249LIBCOUCHSTORE_API
250couchstore_error_t couchstore_commit(Db *db)
251{
252    COLLECT_LATENCY();
253
254    couchstore_error_t errcode = precommit(db);
255
256    if (errcode == COUCHSTORE_SUCCESS) {
257        errcode = db_write_header(db);
258    }
259
260    if (errcode == COUCHSTORE_SUCCESS) {
261        errcode = db->file.ops->sync(&db->file.lastError, db->file.handle);
262    }
263
264    return errcode;
265}
266
267static tree_file_options get_tree_file_options_from_flags(couchstore_open_flags flags)
268{
269    tree_file_options options;
270
271    if (flags & COUCHSTORE_OPEN_FLAG_UNBUFFERED) {
272        options.buf_io_enabled = false;
273    } else if (flags & COUCHSTORE_OPEN_WITH_CUSTOM_BUFFER) {
274        // Buffered IO with custom buffer settings.
275        //  * First 4 bits [15:12]: read buffer capacity
276        //  * Next  4 bits [11:08]: max read buffer count
277
278        uint32_t unit_index = (flags >> 12) & 0xf;
279        if (unit_index) {
280            // unit_index    1     2     3     4     ...   15
281            // unit size     1KB   2KB   4KB   8KB   ...   16MB
282            options.buf_io_read_unit_size = 1024 * (1 << (unit_index -1));
283        }
284        uint32_t count_index = (flags >> 8) & 0xf;
285        if (count_index) {
286            // count_index   1     2     3     4     ...   15
287            // # buffers     8     16    32    64    ...   128K
288            options.buf_io_read_buffers = 8 * (1 << (count_index-1));
289        }
290    }
291
292    // Set default value first.
293    options.kp_nodesize = DB_KP_CHUNK_THRESHOLD;
294    options.kv_nodesize = DB_KV_CHUNK_THRESHOLD;
295    if (flags & COUCHSTORE_OPEN_WITH_CUSTOM_NODESIZE) {
296        // B+tree custom node size settings.
297        //  * First 4 bits [23:20]: KP node size
298        //  * Next  4 bits [19:16]: KV node size
299        uint32_t kp_flag = (flags >> 20) & 0xf;
300        if (kp_flag) {
301            options.kp_nodesize = kp_flag * 1024;
302        }
303        uint32_t kv_flag = (flags >> 16) & 0xf;
304        if (kv_flag) {
305            options.kv_nodesize = kv_flag * 1024;
306        }
307    }
308
309    if (flags & COUCHSTORE_OPEN_WITH_PERIODIC_SYNC) {
310        // Automatic sync() every N bytes written.
311        //  * 5 bits [28-24]: power-of-2 * 1kB
312        uint64_t sync_flag = (flags >> 24) & 0x1f;
313        options.periodic_sync_bytes = uint64_t(1024) << (sync_flag - 1);
314    }
315
316    return options;
317}
318
319LIBCOUCHSTORE_API
320couchstore_open_flags couchstore_encode_periodic_sync_flags(uint64_t bytes) {
321    // Convert to encoding supported by couchstore_open_flags - KB power-of-2
322    // value.
323    // Round up to whole kilobyte units.
324    const uint64_t kilobytes = (bytes + 1023) / 1024;
325    // Calculate the shift amount (what is the log2 power)
326    uint64_t shiftAmount = std::log2(kilobytes);
327    // Saturate if the user specified more than the encodable amount.
328    shiftAmount = std::min(shiftAmount, uint64_t(30));
329    // Finally, encode in couchstore_open flags
330    return ((shiftAmount + 1)) << 24;
331}
332
333LIBCOUCHSTORE_API
334couchstore_error_t couchstore_open_db(const char *filename,
335                                      couchstore_open_flags flags,
336                                      Db **pDb)
337{
338    return couchstore_open_db_ex(filename, flags,
339                                 couchstore_get_default_file_ops(), pDb);
340}
341
342LIBCOUCHSTORE_API
343couchstore_error_t couchstore_open_db_ex(const char *filename,
344                                         couchstore_open_flags flags,
345                                         FileOpsInterface* ops,
346                                         Db **pDb)
347{
348    COLLECT_LATENCY();
349
350    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
351    Db *db;
352    int openflags;
353    cs_off_t pos;
354
355    /* Sanity check input parameters */
356    if ((flags & COUCHSTORE_OPEN_FLAG_RDONLY) &&
357        (flags & COUCHSTORE_OPEN_FLAG_CREATE)) {
358        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
359    }
360
361    if ((db = static_cast<Db*>(cb_calloc(1, sizeof(Db)))) == NULL) {
362        return COUCHSTORE_ERROR_ALLOC_FAIL;
363    }
364
365    if (flags & COUCHSTORE_OPEN_FLAG_RDONLY) {
366        openflags = O_RDONLY;
367    } else {
368        openflags = O_RDWR;
369    }
370
371    if (flags & COUCHSTORE_OPEN_FLAG_CREATE) {
372        openflags |= O_CREAT;
373    }
374
375    // open with CRC unknown, CRC will be selected when header is read/or not found.
376    error_pass(tree_file_open(&db->file, filename, openflags, CRC_UNKNOWN, ops,
377                              get_tree_file_options_from_flags(flags)));
378
379    pos = db->file.ops->goto_eof(&db->file.lastError, db->file.handle);
380    db->file.pos = pos;
381    if (pos == 0) {
382        /* This is an empty file. Create a new fileheader unless the
383         * user wanted a read-only version of the file
384         */
385
386        if (flags & COUCHSTORE_OPEN_FLAG_RDONLY) {
387            error_pass(COUCHSTORE_ERROR_NO_HEADER);
388        } else {
389
390            // Select the CRC to use on this new file
391            if (flags & COUCHSTORE_OPEN_WITH_LEGACY_CRC) {
392                db->file.crc_mode = CRC32;
393            } else {
394                db->file.crc_mode = CRC32C;
395            }
396
397            error_pass(create_header(db));
398        }
399    } else if (pos > 0) {
400        error_pass(find_header(db, db->file.pos - 2));
401
402        if (db->header.disk_version <= COUCH_DISK_VERSION_11) {
403            db->file.crc_mode = CRC32;
404        } else {
405            cb_assert(db->header.disk_version >= COUCH_DISK_VERSION_12);
406            db->file.crc_mode = CRC32C;
407        }
408
409        // Not allowed. Can't request legacy_crc but be opening non legacy CRC files.
410        if (db->file.crc_mode == CRC32C && flags & COUCHSTORE_OPEN_WITH_LEGACY_CRC) {
411            errcode = COUCHSTORE_ERROR_INVALID_ARGUMENTS;
412            goto cleanup;
413        }
414    } else {
415        error_pass(static_cast<couchstore_error_t>(db->file.pos));
416    }
417
418    *pDb = db;
419    db->dropped = 0;
420
421cleanup:
422    if(errcode != COUCHSTORE_SUCCESS) {
423        couchstore_close_file(db);
424        couchstore_free_db(db);
425    }
426
427    return errcode;
428}
429
430LIBCOUCHSTORE_API
431couchstore_error_t couchstore_close_file(Db* db)
432{
433    COLLECT_LATENCY();
434
435    if(db->dropped) {
436        return COUCHSTORE_SUCCESS;
437    }
438    couchstore_error_t error = tree_file_close(&db->file);
439    db->dropped = 1;
440    return error;
441}
442
443LIBCOUCHSTORE_API
444couchstore_error_t couchstore_rewind_db_header(Db *db)
445{
446    COLLECT_LATENCY();
447
448    couchstore_error_t errcode;
449    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
450    // free current header guts
451    cb_free(db->header.by_id_root);
452    cb_free(db->header.by_seq_root);
453    cb_free(db->header.local_docs_root);
454    db->header.by_id_root = NULL;
455    db->header.by_seq_root = NULL;
456    db->header.local_docs_root = NULL;
457
458    error_unless(db->header.position != 0, COUCHSTORE_ERROR_DB_NO_LONGER_VALID);
459    // find older header
460    error_pass(find_header(db, db->header.position - 2));
461
462cleanup:
463    // if we failed, free the handle and return an error
464    if(errcode != COUCHSTORE_SUCCESS) {
465        couchstore_close_file(db);
466        couchstore_free_db(db);
467        errcode = COUCHSTORE_ERROR_DB_NO_LONGER_VALID;
468    }
469    return errcode;
470}
471
472LIBCOUCHSTORE_API
473couchstore_error_t couchstore_free_db(Db* db)
474{
475    COLLECT_LATENCY();
476
477    if(!db) {
478        return COUCHSTORE_SUCCESS;
479    }
480
481    if(!db->dropped) {
482        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
483    }
484
485    cb_free(db->header.by_id_root);
486    cb_free(db->header.by_seq_root);
487    cb_free(db->header.local_docs_root);
488    db->header.by_id_root = NULL;
489    db->header.by_seq_root = NULL;
490    db->header.local_docs_root = NULL;
491
492    memset(db, 0xa5, sizeof(*db));
493    cb_free(db);
494
495    return COUCHSTORE_SUCCESS;
496}
497
498LIBCOUCHSTORE_API
499const char* couchstore_get_db_filename(Db *db) {
500    return db->file.path;
501}
502
503LIBCOUCHSTORE_API
504FileOpsInterface::FHStats* couchstore_get_db_filestats(Db* db) {
505    return db->file.ops->get_stats(db->file.handle);
506}
507
508DocInfo* couchstore_alloc_docinfo(const sized_buf *id, const sized_buf *rev_meta) {
509    size_t size = sizeof(DocInfo);
510    if (id) {
511        size += id->size;
512    }
513    if (rev_meta) {
514        size += rev_meta->size;
515    }
516    DocInfo* docInfo = static_cast<DocInfo*>(cb_malloc(size));
517    if (!docInfo) {
518        return NULL;
519    }
520    memset(docInfo, 0, sizeof(DocInfo));
521    char *extra = (char *)docInfo + sizeof(DocInfo);
522    if (id) {
523        memcpy(extra, id->buf, id->size);
524        docInfo->id.buf = extra;
525        docInfo->id.size = id->size;
526        extra += id->size;
527    }
528    if (rev_meta) {
529        memcpy(extra, rev_meta->buf, rev_meta->size);
530        docInfo->rev_meta.buf = extra;
531        docInfo->rev_meta.size = rev_meta->size;
532    }
533    return docInfo;
534}
535
536LIBCOUCHSTORE_API
537void couchstore_free_docinfo(DocInfo *docinfo)
538{
539    cb_free(docinfo);
540}
541
542LIBCOUCHSTORE_API
543void couchstore_free_document(Doc *doc)
544{
545    if (doc) {
546        size_t offset = offsetof(fatbuf, buf);
547        fatbuf_free((fatbuf *) ((char *)doc - (char *)offset));
548    }
549}
550
551couchstore_error_t by_seq_read_docinfo(DocInfo **pInfo,
552                                       const sized_buf *k,
553                                       const sized_buf *v)
554{
555    const raw_seq_index_value *raw = (const raw_seq_index_value*)v->buf;
556    ssize_t extraSize = v->size - sizeof(*raw);
557    if (extraSize < 0) {
558        return COUCHSTORE_ERROR_CORRUPT;
559    }
560
561    uint32_t idsize, datasize;
562    decode_kv_length(&raw->sizes, &idsize, &datasize);
563    uint64_t bp = decode_raw48(raw->bp);
564    int deleted = (bp & BP_DELETED_FLAG) != 0;
565    bp &= ~BP_DELETED_FLAG;
566    uint8_t content_meta = decode_raw08(raw->content_meta);
567    uint64_t rev_seq = decode_raw48(raw->rev_seq);
568    uint64_t db_seq = decode_sequence_key(k);
569
570    sized_buf id = {v->buf + sizeof(*raw), idsize};
571    sized_buf rev_meta = {id.buf + idsize, extraSize - id.size};
572    DocInfo* docInfo = couchstore_alloc_docinfo(&id, &rev_meta);
573    if (!docInfo) {
574        return COUCHSTORE_ERROR_ALLOC_FAIL;
575    }
576
577    docInfo->db_seq = db_seq;
578    docInfo->rev_seq = rev_seq;
579    docInfo->deleted = deleted;
580    docInfo->bp = bp;
581    docInfo->size = datasize;
582    docInfo->content_meta = content_meta;
583    *pInfo = docInfo;
584    return COUCHSTORE_SUCCESS;
585}
586
587static couchstore_error_t by_id_read_docinfo(DocInfo **pInfo,
588                                             const sized_buf *k,
589                                             const sized_buf *v)
590{
591    const raw_id_index_value *raw = (const raw_id_index_value*)v->buf;
592    ssize_t revMetaSize = v->size - sizeof(*raw);
593    if (revMetaSize < 0) {
594        return COUCHSTORE_ERROR_CORRUPT;
595    }
596
597    uint32_t datasize, deleted;
598    uint8_t content_meta;
599    uint64_t bp, seq, revnum;
600
601    seq = decode_raw48(raw->db_seq);
602    datasize = decode_raw32(raw->size);
603    bp = decode_raw48(raw->bp);
604    deleted = (bp & BP_DELETED_FLAG) != 0;
605    bp &= ~BP_DELETED_FLAG;
606    content_meta = decode_raw08(raw->content_meta);
607    revnum = decode_raw48(raw->rev_seq);
608
609    sized_buf rev_meta = {v->buf + sizeof(*raw), static_cast<size_t>(revMetaSize)};
610    DocInfo* docInfo = couchstore_alloc_docinfo(k, &rev_meta);
611    if (!docInfo) {
612        return COUCHSTORE_ERROR_ALLOC_FAIL;
613    }
614
615    docInfo->db_seq = seq;
616    docInfo->rev_seq = revnum;
617    docInfo->deleted = deleted;
618    docInfo->bp = bp;
619    docInfo->size = datasize;
620    docInfo->content_meta = content_meta;
621    *pInfo = docInfo;
622    return COUCHSTORE_SUCCESS;
623}
624
625//Fill in doc from reading file.
626static couchstore_error_t bp_to_doc(Doc **pDoc, Db *db, cs_off_t bp, couchstore_open_options options)
627{
628    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
629    int bodylen = 0;
630    char *docbody = NULL;
631    fatbuf *docbuf = NULL;
632    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
633
634    if (options & DECOMPRESS_DOC_BODIES) {
635        bodylen = pread_compressed(&db->file, bp, &docbody);
636    } else {
637        bodylen = pread_bin(&db->file, bp, &docbody);
638    }
639
640    error_unless(bodylen >= 0, static_cast<couchstore_error_t>(bodylen));    // if bodylen is negative it's an error code
641    error_unless(docbody || bodylen == 0, COUCHSTORE_ERROR_READ);
642
643    error_unless(docbuf = fatbuf_alloc(sizeof(Doc) + bodylen), COUCHSTORE_ERROR_ALLOC_FAIL);
644    *pDoc = (Doc *) fatbuf_get(docbuf, sizeof(Doc));
645
646    if (bodylen == 0) { //Empty doc
647        (*pDoc)->data.buf = NULL;
648        (*pDoc)->data.size = 0;
649        cb_free(docbody);
650        return COUCHSTORE_SUCCESS;
651    }
652
653    (*pDoc)->data.buf = (char *) fatbuf_get(docbuf, bodylen);
654    (*pDoc)->data.size = bodylen;
655    memcpy((*pDoc)->data.buf, docbody, bodylen);
656
657cleanup:
658    cb_free(docbody);
659    if (errcode < 0) {
660        fatbuf_free(docbuf);
661    }
662    return errcode;
663}
664
665static couchstore_error_t docinfo_fetch_by_id(couchfile_lookup_request *rq,
666                                              const sized_buf *k,
667                                              const sized_buf *v)
668{
669    DocInfo **pInfo = (DocInfo **) rq->callback_ctx;
670    if (v == NULL) {
671        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
672    }
673    return by_id_read_docinfo(pInfo, k, v);
674}
675
676static couchstore_error_t docinfo_fetch_by_seq(couchfile_lookup_request *rq,
677                                               const sized_buf *k,
678                                               const sized_buf *v)
679{
680    DocInfo **pInfo = (DocInfo **) rq->callback_ctx;
681    if (v == NULL) {
682        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
683    }
684    return by_seq_read_docinfo(pInfo, k, v);
685}
686
687LIBCOUCHSTORE_API
688couchstore_error_t couchstore_docinfo_by_id(Db *db,
689                                            const void *id,
690                                            size_t idlen,
691                                            DocInfo **pInfo)
692{
693    COLLECT_LATENCY();
694
695    sized_buf key;
696    sized_buf *keylist = &key;
697    couchfile_lookup_request rq;
698    couchstore_error_t errcode;
699    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
700
701    if (db->header.by_id_root == NULL) {
702        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
703    }
704
705    key.buf = (char *) id;
706    key.size = idlen;
707
708    rq.cmp.compare = ebin_cmp;
709    rq.file = &db->file;
710    rq.num_keys = 1;
711    rq.keys = &keylist;
712    rq.callback_ctx = pInfo;
713    rq.fetch_callback = docinfo_fetch_by_id;
714    rq.node_callback = NULL;
715    rq.fold = 0;
716
717    errcode = btree_lookup(&rq, db->header.by_id_root->pointer);
718    if (errcode == COUCHSTORE_SUCCESS) {
719        if (*pInfo == NULL) {
720            errcode = COUCHSTORE_ERROR_DOC_NOT_FOUND;
721        }
722    }
723cleanup:
724    return errcode;
725}
726
727LIBCOUCHSTORE_API
728couchstore_error_t couchstore_docinfo_by_sequence(Db *db,
729                                                  uint64_t sequence,
730                                                  DocInfo **pInfo)
731{
732    COLLECT_LATENCY();
733
734    sized_buf key;
735    sized_buf *keylist = &key;
736    couchfile_lookup_request rq;
737    couchstore_error_t errcode;
738    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
739
740    if (db->header.by_id_root == NULL) {
741        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
742    }
743
744    sequence = htonll(sequence);
745    key.buf = (char *)&sequence + 2;
746    key.size = 6;
747
748    rq.cmp.compare = seq_cmp;
749    rq.file = &db->file;
750    rq.num_keys = 1;
751    rq.keys = &keylist;
752    rq.callback_ctx = pInfo;
753    rq.fetch_callback = docinfo_fetch_by_seq;
754    rq.node_callback = NULL;
755    rq.fold = 0;
756
757    errcode = btree_lookup(&rq, db->header.by_seq_root->pointer);
758    if (errcode == COUCHSTORE_SUCCESS) {
759        if (*pInfo == NULL) {
760            errcode = COUCHSTORE_ERROR_DOC_NOT_FOUND;
761        }
762    }
763cleanup:
764    return errcode;
765}
766
767LIBCOUCHSTORE_API
768couchstore_error_t couchstore_open_doc_with_docinfo(Db *db,
769                                                    const DocInfo *docinfo,
770                                                    Doc **pDoc,
771                                                    couchstore_open_options options)
772{
773    COLLECT_LATENCY();
774
775    couchstore_error_t errcode;
776
777    *pDoc = NULL;
778    if (docinfo->bp == 0) {
779        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
780    }
781
782    if (!(docinfo->content_meta & COUCH_DOC_IS_COMPRESSED)) {
783        options &= ~DECOMPRESS_DOC_BODIES;
784    }
785
786    errcode = bp_to_doc(pDoc, db, docinfo->bp, options);
787    if (errcode == COUCHSTORE_SUCCESS) {
788        (*pDoc)->id.buf = docinfo->id.buf;
789        (*pDoc)->id.size = docinfo->id.size;
790    }
791
792    return errcode;
793}
794
795LIBCOUCHSTORE_API
796couchstore_error_t couchstore_open_document(Db *db,
797                                            const void *id,
798                                            size_t idlen,
799                                            Doc **pDoc,
800                                            couchstore_open_options options)
801{
802    COLLECT_LATENCY();
803
804    couchstore_error_t errcode;
805    DocInfo *info;
806    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
807    *pDoc = NULL;
808    errcode = couchstore_docinfo_by_id(db, id, idlen, &info);
809    if (errcode == COUCHSTORE_SUCCESS) {
810        errcode = couchstore_open_doc_with_docinfo(db, info, pDoc, options);
811        if (errcode == COUCHSTORE_SUCCESS) {
812            (*pDoc)->id.buf = (char *) id;
813            (*pDoc)->id.size = idlen;
814        }
815
816        couchstore_free_docinfo(info);
817    }
818cleanup:
819    return errcode;
820}
821
822// context info passed to lookup_callback via btree_lookup
823typedef struct {
824    Db *db;
825    couchstore_docinfos_options options;
826    couchstore_changes_callback_fn callback;
827    void* callback_context;
828    int by_id;
829    int depth;
830    couchstore_walk_tree_callback_fn walk_callback;
831} lookup_context;
832
833// btree_lookup callback, called while iterating keys
834static couchstore_error_t lookup_callback(couchfile_lookup_request *rq,
835                                          const sized_buf *k,
836                                          const sized_buf *v)
837{
838    if (v == NULL) {
839        return COUCHSTORE_SUCCESS;
840    }
841
842    const lookup_context *context = static_cast<const lookup_context *>(rq->callback_ctx);
843    DocInfo *docinfo = NULL;
844    couchstore_error_t errcode;
845    if (context->by_id) {
846        errcode = by_id_read_docinfo(&docinfo, k, v);
847    } else {
848        errcode = by_seq_read_docinfo(&docinfo, k, v);
849    }
850    if (errcode == COUCHSTORE_ERROR_CORRUPT &&
851        (context->options & COUCHSTORE_TOLERATE_CORRUPTION)) {
852        // Invoke callback even if doc info is corrupted/unreadable, if magic flag is set
853        docinfo = static_cast<DocInfo*>(cb_calloc(sizeof(DocInfo), 1));
854        docinfo->id = *k;
855        docinfo->rev_meta = *v;
856    } else if (errcode) {
857        return errcode;
858    }
859
860    if ((context->options & COUCHSTORE_DELETES_ONLY) && docinfo->deleted == 0) {
861        couchstore_free_docinfo(docinfo);
862        return COUCHSTORE_SUCCESS;
863    }
864
865    if ((context->options & COUCHSTORE_NO_DELETES) && docinfo->deleted == 1) {
866        couchstore_free_docinfo(docinfo);
867        return COUCHSTORE_SUCCESS;
868    }
869
870    if (context->walk_callback) {
871        errcode = static_cast<couchstore_error_t>(context->walk_callback(context->db,
872                                                                         context->depth,
873                                                                         docinfo,
874                                                                         0,
875                                                                         NULL,
876                                                                         context->callback_context));
877    } else {
878        errcode = static_cast<couchstore_error_t>(context->callback(context->db,
879                                                                    docinfo,
880                                                                    context->callback_context));
881    }
882    if (errcode <= 0) {
883        couchstore_free_docinfo(docinfo);
884    } else {
885        // User requested docinfo not be freed, don't free it, return success
886        return COUCHSTORE_SUCCESS;
887    }
888    return errcode;
889}
890
891LIBCOUCHSTORE_API
892couchstore_error_t couchstore_changes_since(Db *db,
893                                            uint64_t since,
894                                            couchstore_docinfos_options options,
895                                            couchstore_changes_callback_fn callback,
896                                            void *ctx)
897{
898    COLLECT_LATENCY();
899
900    char since_termbuf[6];
901    sized_buf since_term;
902    sized_buf *keylist = &since_term;
903    lookup_context cbctx = {db, options, callback, ctx, 0, 0, NULL};
904    couchfile_lookup_request rq;
905    couchstore_error_t errcode;
906
907    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
908    if (db->header.by_seq_root == NULL) {
909        return COUCHSTORE_SUCCESS;
910    }
911
912    since_term.buf = since_termbuf;
913    since_term.size = 6;
914    encode_raw48(since, (raw_48*)since_term.buf);
915
916    rq.cmp.compare = seq_cmp;
917    rq.file = &db->file;
918    rq.num_keys = 1;
919    rq.keys = &keylist;
920    rq.callback_ctx = &cbctx;
921    rq.fetch_callback = lookup_callback;
922    rq.node_callback = NULL;
923    rq.fold = 1;
924    rq.tolerate_corruption = (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0;
925
926    errcode = btree_lookup(&rq, db->header.by_seq_root->pointer);
927cleanup:
928    return errcode;
929}
930
931LIBCOUCHSTORE_API
932couchstore_error_t couchstore_all_docs(Db *db,
933                                       const sized_buf* startKeyPtr,
934                                       couchstore_docinfos_options options,
935                                       couchstore_changes_callback_fn callback,
936                                       void *ctx)
937{
938    COLLECT_LATENCY();
939
940    sized_buf startKey = {NULL, 0};
941    sized_buf *keylist = &startKey;
942    lookup_context cbctx = {db, options, callback, ctx, 1, 0, NULL};
943    couchfile_lookup_request rq;
944    couchstore_error_t errcode;
945
946    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
947    if (db->header.by_id_root == NULL) {
948        return COUCHSTORE_SUCCESS;
949    }
950
951    if (startKeyPtr) {
952        startKey = *startKeyPtr;
953    }
954
955    rq.cmp.compare = ebin_cmp;
956    rq.file = &db->file;
957    rq.num_keys = 1;
958    rq.keys = &keylist;
959    rq.callback_ctx = &cbctx;
960    rq.fetch_callback = lookup_callback;
961    rq.node_callback = NULL;
962    rq.fold = 1;
963    rq.tolerate_corruption = (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0;
964
965    errcode = btree_lookup(&rq, db->header.by_id_root->pointer);
966cleanup:
967    return errcode;
968}
969
970static couchstore_error_t walk_node_callback(struct couchfile_lookup_request *rq,
971                                                 uint64_t subtreeSize,
972                                                 const sized_buf *reduceValue)
973{
974    lookup_context* context = static_cast<lookup_context*>(rq->callback_ctx);
975    if (reduceValue) {
976        int result = context->walk_callback(context->db,
977                                            context->depth,
978                                            NULL,
979                                            subtreeSize,
980                                            reduceValue,
981                                            context->callback_context);
982        context->depth++;
983        if (result < 0)
984            return static_cast<couchstore_error_t>(result);
985    } else {
986        context->depth--;
987    }
988    return COUCHSTORE_SUCCESS;
989}
990
991static
992couchstore_error_t couchstore_walk_tree(Db *db,
993                                        int by_id,
994                                        const node_pointer* root,
995                                        const sized_buf* startKeyPtr,
996                                        couchstore_docinfos_options options,
997                                        int (*compare)(const sized_buf *k1, const sized_buf *k2),
998                                        couchstore_walk_tree_callback_fn callback,
999                                        void *ctx)
1000{
1001    couchstore_error_t errcode;
1002    sized_buf startKey = {NULL, 0};
1003    sized_buf *keylist;
1004    couchfile_lookup_request rq;
1005
1006    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1007    if (root == NULL) {
1008        return COUCHSTORE_SUCCESS;
1009    }
1010
1011    // Invoke the callback on the root node:
1012    errcode = static_cast<couchstore_error_t>(callback(db, 0, NULL,
1013                                                       root->subtreesize,
1014                                                       &root->reduce_value,
1015                                                       ctx));
1016    if (errcode < 0) {
1017        return errcode;
1018    }
1019
1020    if (startKeyPtr) {
1021        startKey = *startKeyPtr;
1022    }
1023    keylist = &startKey;
1024
1025    {
1026        // Create a new scope here just to mute the warning from the
1027        // compiler that the goto in the macro error_unless
1028        // skips the initialization of lookup_ctx..
1029        lookup_context lookup_ctx = {db, options, NULL, ctx, by_id, 1, callback};
1030
1031        rq.cmp.compare = compare;
1032        rq.file = &db->file;
1033        rq.num_keys = 1;
1034        rq.keys = &keylist;
1035        rq.callback_ctx = &lookup_ctx;
1036        rq.fetch_callback = lookup_callback;
1037        rq.node_callback = walk_node_callback;
1038        rq.fold = 1;
1039        rq.tolerate_corruption = (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0;
1040
1041        error_pass(btree_lookup(&rq, root->pointer));
1042    }
1043cleanup:
1044    return errcode;
1045}
1046
1047LIBCOUCHSTORE_API
1048couchstore_error_t couchstore_walk_id_tree(Db *db,
1049                                           const sized_buf* startDocID,
1050                                           couchstore_docinfos_options options,
1051                                           couchstore_walk_tree_callback_fn callback,
1052                                           void *ctx)
1053{
1054    COLLECT_LATENCY();
1055
1056    return couchstore_walk_tree(db, 1, db->header.by_id_root, startDocID,
1057                                options, ebin_cmp, callback, ctx);
1058}
1059
1060LIBCOUCHSTORE_API
1061couchstore_error_t couchstore_walk_seq_tree(Db *db,
1062                                           uint64_t startSequence,
1063                                           couchstore_docinfos_options options,
1064                                           couchstore_walk_tree_callback_fn callback,
1065                                           void *ctx)
1066{
1067    COLLECT_LATENCY();
1068
1069    raw_48 start_termbuf;
1070    encode_raw48(startSequence, &start_termbuf);
1071    sized_buf start_term = {(char*)&start_termbuf, 6};
1072
1073    return couchstore_walk_tree(db, 0, db->header.by_seq_root, &start_term,
1074                                options, seq_cmp, callback, ctx);
1075}
1076
1077static int id_ptr_cmp(const void *a, const void *b)
1078{
1079    sized_buf **buf1 = (sized_buf**) a;
1080    sized_buf **buf2 = (sized_buf**) b;
1081    return ebin_cmp(*buf1, *buf2);
1082}
1083
1084static int seq_ptr_cmp(const void *a, const void *b)
1085{
1086    sized_buf **buf1 = (sized_buf**) a;
1087    sized_buf **buf2 = (sized_buf**) b;
1088    return seq_cmp(*buf1, *buf2);
1089}
1090
1091// Common subroutine of couchstore_docinfos_by_{ids, sequence}
1092static couchstore_error_t iterate_docinfos(Db *db,
1093                                           const sized_buf keys[],
1094                                           unsigned numDocs,
1095                                           node_pointer *tree,
1096                                           int (*key_ptr_compare)(const void *, const void *),
1097                                           int (*key_compare)(const sized_buf *k1, const sized_buf *k2),
1098                                           couchstore_changes_callback_fn callback,
1099                                           int fold,
1100                                           int tolerate_corruption,
1101                                           void *ctx)
1102{
1103    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
1104    const sized_buf **keyptrs = NULL;
1105    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1106    // Nothing to do if the tree is empty
1107    if (tree == NULL) {
1108        return COUCHSTORE_SUCCESS;
1109    }
1110
1111    if(numDocs <= 0) {
1112        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
1113    }
1114
1115    // Create an array of *pointers to* sized_bufs, which is what btree_lookup wants:
1116    keyptrs = static_cast<const sized_buf**>(cb_malloc(numDocs * sizeof(sized_buf*)));
1117    error_unless(keyptrs, COUCHSTORE_ERROR_ALLOC_FAIL);
1118
1119    {
1120        unsigned i;
1121        for (i = 0; i< numDocs; ++i) {
1122            keyptrs[i] = &keys[i];
1123        }
1124        if (!fold) {
1125            // Sort the key pointers:
1126            qsort(keyptrs, numDocs, sizeof(keyptrs[0]), key_ptr_compare);
1127        }
1128
1129        // Construct the lookup request:
1130        lookup_context cbctx = {db, 0, callback, ctx, (tree == db->header.by_id_root), 0, NULL};
1131        couchfile_lookup_request rq;
1132        rq.cmp.compare = key_compare;
1133        rq.file = &db->file;
1134        rq.num_keys = numDocs;
1135        rq.keys = (sized_buf**) keyptrs;
1136        rq.callback_ctx = &cbctx;
1137        rq.fetch_callback = lookup_callback;
1138        rq.node_callback = NULL;
1139        rq.fold = fold;
1140        rq.tolerate_corruption = tolerate_corruption;
1141
1142        // Go!
1143        error_pass(btree_lookup(&rq, tree->pointer));
1144    }
1145cleanup:
1146    cb_free(keyptrs);
1147    return errcode;
1148}
1149
1150LIBCOUCHSTORE_API
1151couchstore_error_t couchstore_docinfos_by_id(Db *db,
1152                                             const sized_buf ids[],
1153                                             unsigned numDocs,
1154                                             couchstore_docinfos_options options,
1155                                             couchstore_changes_callback_fn callback,
1156                                             void *ctx)
1157{
1158    COLLECT_LATENCY();
1159
1160    return iterate_docinfos(db, ids, numDocs,
1161                            db->header.by_id_root, id_ptr_cmp, ebin_cmp,
1162                            callback,
1163                            (options & RANGES) != 0,
1164                            (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0,
1165                            ctx);
1166}
1167
1168LIBCOUCHSTORE_API
1169couchstore_error_t couchstore_docinfos_by_sequence(Db *db,
1170                                                   const uint64_t sequence[],
1171                                                   unsigned numDocs,
1172                                                   couchstore_docinfos_options options,
1173                                                   couchstore_changes_callback_fn callback,
1174                                                   void *ctx)
1175{
1176    COLLECT_LATENCY();
1177
1178    // Create the array of keys:
1179    sized_buf *keylist = static_cast<sized_buf*>(cb_malloc(numDocs * sizeof(sized_buf)));
1180    raw_by_seq_key *keyvalues = static_cast<raw_by_seq_key*>(cb_malloc(numDocs * sizeof(raw_by_seq_key)));
1181    couchstore_error_t errcode;
1182    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1183    error_unless(keylist && keyvalues, COUCHSTORE_ERROR_ALLOC_FAIL);
1184    unsigned i;
1185    for (i = 0; i< numDocs; ++i) {
1186        encode_raw48(sequence[i], &keyvalues[i].sequence);
1187        keylist[i].buf = static_cast<char*>((void*) &keyvalues[i]);
1188        keylist[i].size = sizeof(keyvalues[i]);
1189    }
1190
1191    error_pass(iterate_docinfos(db, keylist, numDocs,
1192                                db->header.by_seq_root, seq_ptr_cmp, seq_cmp,
1193                                callback,
1194                                (options & RANGES) != 0,
1195                                (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0,
1196                                ctx));
1197cleanup:
1198    cb_free(keylist);
1199    cb_free(keyvalues);
1200    return errcode;
1201}
1202
1203LIBCOUCHSTORE_API
1204couchstore_error_t couchstore_db_info(Db *db, DbInfo* dbinfo) {
1205    if (db == NULL || dbinfo == NULL) {
1206        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
1207    }
1208    const node_pointer *id_root = db->header.by_id_root;
1209    const node_pointer *seq_root = db->header.by_seq_root;
1210    const node_pointer *local_root = db->header.local_docs_root;
1211    dbinfo->filename = db->file.path;
1212    dbinfo->header_position = db->header.position;
1213    dbinfo->last_sequence = db->header.update_seq;
1214    dbinfo->purge_seq = db->header.purge_seq;
1215    dbinfo->deleted_count = dbinfo->doc_count = dbinfo->space_used = 0;
1216    dbinfo->file_size = db->file.pos;
1217    if (id_root) {
1218        raw_by_id_reduce* id_reduce = (raw_by_id_reduce*) id_root->reduce_value.buf;
1219        dbinfo->doc_count = decode_raw40(id_reduce->notdeleted);
1220        dbinfo->deleted_count = decode_raw40(id_reduce->deleted);
1221        dbinfo->space_used = decode_raw48(id_reduce->size);
1222        dbinfo->space_used += id_root->subtreesize;
1223    }
1224    if(seq_root) {
1225        dbinfo->space_used += seq_root->subtreesize;
1226    }
1227    if(local_root) {
1228        dbinfo->space_used += local_root->subtreesize;
1229    }
1230    return COUCHSTORE_SUCCESS;
1231}
1232
1233static couchstore_error_t local_doc_fetch(couchfile_lookup_request *rq,
1234                                          const sized_buf *k,
1235                                          const sized_buf *v)
1236{
1237    LocalDoc **lDoc = (LocalDoc **) rq->callback_ctx;
1238    LocalDoc *dp;
1239
1240    if (!v) {
1241        *lDoc = NULL;
1242        return COUCHSTORE_SUCCESS;
1243    }
1244    fatbuf *ldbuf = fatbuf_alloc(sizeof(LocalDoc) + k->size + v->size);
1245    if (ldbuf == NULL) {
1246        return COUCHSTORE_ERROR_ALLOC_FAIL;
1247    }
1248
1249    dp = *lDoc = (LocalDoc *) fatbuf_get(ldbuf, sizeof(LocalDoc));
1250    dp->id.buf = (char *) fatbuf_get(ldbuf, k->size);
1251    dp->id.size = k->size;
1252
1253    dp->json.buf = (char *) fatbuf_get(ldbuf, v->size);
1254    dp->json.size = v->size;
1255
1256    dp->deleted = 0;
1257
1258    memcpy(dp->id.buf, k->buf, k->size);
1259    memcpy(dp->json.buf, v->buf, v->size);
1260
1261    return COUCHSTORE_SUCCESS;
1262}
1263
1264LIBCOUCHSTORE_API
1265couchstore_error_t couchstore_open_local_document(Db *db,
1266                                                  const void *id,
1267                                                  size_t idlen,
1268                                                  LocalDoc **pDoc)
1269{
1270    sized_buf key;
1271    sized_buf *keylist = &key;
1272    couchfile_lookup_request rq;
1273    couchstore_error_t errcode;
1274    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1275    if (db->header.local_docs_root == NULL) {
1276        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
1277    }
1278
1279    key.buf = (char *) id;
1280    key.size = idlen;
1281
1282    rq.cmp.compare = ebin_cmp;
1283    rq.file = &db->file;
1284    rq.num_keys = 1;
1285    rq.keys = &keylist;
1286    rq.callback_ctx = pDoc;
1287    rq.fetch_callback = local_doc_fetch;
1288    rq.node_callback = NULL;
1289    rq.fold = 0;
1290
1291    errcode = btree_lookup(&rq, db->header.local_docs_root->pointer);
1292    if (errcode == COUCHSTORE_SUCCESS) {
1293        if (*pDoc == NULL) {
1294            errcode = COUCHSTORE_ERROR_DOC_NOT_FOUND;
1295        }
1296    }
1297cleanup:
1298    return errcode;
1299}
1300
1301LIBCOUCHSTORE_API
1302couchstore_error_t couchstore_save_local_document(Db *db, LocalDoc *lDoc)
1303{
1304    couchstore_error_t errcode;
1305    couchfile_modify_action ldupdate;
1306    node_pointer *nroot = NULL;
1307    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1308
1309    if (lDoc->deleted) {
1310        ldupdate.type = ACTION_REMOVE;
1311    } else {
1312        ldupdate.type = ACTION_INSERT;
1313    }
1314
1315    ldupdate.key = &lDoc->id;
1316    ldupdate.value.data = &lDoc->json;
1317
1318    couchfile_modify_request rq;
1319    rq.cmp.compare = ebin_cmp;
1320    rq.num_actions = 1;
1321    rq.actions = &ldupdate;
1322    rq.fetch_callback = NULL;
1323    rq.reduce = NULL;
1324    rq.rereduce = NULL;
1325    rq.file = &db->file;
1326    rq.enable_purging = false;
1327    rq.purge_kp = NULL;
1328    rq.purge_kv = NULL;
1329    rq.compacting = 0;
1330    rq.kv_chunk_threshold = db->file.options.kv_nodesize;
1331    rq.kp_chunk_threshold = db->file.options.kp_nodesize;
1332
1333    nroot = modify_btree(&rq, db->header.local_docs_root, &errcode);
1334    if (errcode == COUCHSTORE_SUCCESS && nroot != db->header.local_docs_root) {
1335        cb_free(db->header.local_docs_root);
1336        db->header.local_docs_root = nroot;
1337    }
1338
1339cleanup:
1340    return errcode;
1341}
1342
1343LIBCOUCHSTORE_API
1344void couchstore_free_local_document(LocalDoc *lDoc)
1345{
1346    if (lDoc) {
1347        size_t offset = offsetof(fatbuf, buf);
1348        fatbuf_free((fatbuf *) ((char *)lDoc - (char *)offset));
1349    }
1350}
1351
1352LIBCOUCHSTORE_API
1353couchstore_error_t couchstore_last_os_error(const Db *db,
1354                                            char* buf,
1355                                            size_t size) {
1356    if (db == NULL || buf == nullptr || size == 0) {
1357        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
1358    }
1359    const couchstore_error_info_t *err = &db->file.lastError;
1360
1361    int nw;
1362
1363#ifdef WIN32
1364    char* win_msg = NULL;
1365    FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER |
1366                   FORMAT_MESSAGE_FROM_SYSTEM |
1367                   FORMAT_MESSAGE_IGNORE_INSERTS,
1368                   NULL, err->error,
1369                   MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
1370                   (LPTSTR) &win_msg,
1371                   0, NULL);
1372    nw = _snprintf(buf, size, "WINAPI error = %d: '%s'", err->error, win_msg);
1373    LocalFree(win_msg);
1374#else
1375    nw = snprintf(buf, size, "errno = %d: '%s'",
1376                      err->error, strerror(err->error));
1377#endif
1378
1379    if (nw < 0) {
1380        return COUCHSTORE_ERROR_ALLOC_FAIL;
1381    } if (size_t(nw) >= size) {
1382        /* Truncate the error message */
1383        buf[size - 1] = '\0';
1384    }
1385
1386    return COUCHSTORE_SUCCESS;
1387}
1388
1389LIBCOUCHSTORE_API
1390couchstore_error_t couchstore_last_internal_error(const Db *db,
1391                                                  char* buf,
1392                                                  size_t size) {
1393    if (db == NULL || buf == nullptr || size == 0) {
1394        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
1395    }
1396
1397    int nw;
1398
1399    nw = snprintf(buf, size, "'%s'", internal_error_string);
1400    if (nw < 0) {
1401        return COUCHSTORE_ERROR_ALLOC_FAIL;
1402    }
1403    return COUCHSTORE_SUCCESS;
1404}
1405
1406static couchstore_error_t btree_eval_seq_reduce(Db *db,
1407                                                uint64_t *accum,
1408                                                sized_buf *left,
1409                                                sized_buf *right,
1410                                                bool past_left_edge,
1411                                                uint64_t diskpos) {
1412    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
1413    int bufpos = 1, nodebuflen = 0;
1414    int node_type;
1415    char *nodebuf = NULL;
1416    nodebuflen = pread_compressed(&db->file, diskpos, &nodebuf);
1417    error_unless(nodebuflen >= 0, (static_cast<couchstore_error_t>(nodebuflen)));  // if negative, it's an error code
1418
1419    node_type = nodebuf[0];
1420    while(bufpos < nodebuflen) {
1421        sized_buf k, v;
1422        bufpos += read_kv(nodebuf + bufpos, &k, &v);
1423        int left_cmp = seq_cmp(&k, left);
1424        int right_cmp = seq_cmp(&k, right);
1425        if(left_cmp < 0) {
1426            continue;
1427        }
1428        if(node_type == KP_NODE) {
1429            // In-range Item in a KP Node
1430            const raw_node_pointer *raw = (const raw_node_pointer*)v.buf;
1431            const raw_by_seq_reduce *rawreduce = (const raw_by_seq_reduce*) (v.buf + sizeof(raw_node_pointer));
1432            uint64_t subcount = decode_raw40(rawreduce->count);
1433            uint64_t pointer = decode_raw48(raw->pointer);
1434            if((left_cmp >= 0 && !past_left_edge) || right_cmp >= 0) {
1435                error_pass(btree_eval_seq_reduce(db, accum, left, right, past_left_edge, pointer));
1436                if(right_cmp >= 0) {
1437                    break;
1438                } else {
1439                    past_left_edge = true;
1440                }
1441            } else {
1442                *accum += subcount;
1443            }
1444        } else {
1445            if(right_cmp > 0) {
1446                break;
1447            }
1448            // In-range Item in a KV Node
1449            *accum += 1;
1450        }
1451    }
1452cleanup:
1453    if (nodebuf) {
1454        cb_free(nodebuf);
1455    }
1456    return errcode;
1457}
1458
1459LIBCOUCHSTORE_API
1460couchstore_error_t couchstore_changes_count(Db* db,
1461                                            uint64_t min_seq,
1462                                            uint64_t max_seq,
1463                                            uint64_t *count) {
1464    COLLECT_LATENCY();
1465
1466    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
1467    raw_48 leftkr, rightkr;
1468    sized_buf leftk, rightk;
1469    leftk.buf = (char*) &leftkr;
1470    rightk.buf = (char*) &rightkr;
1471    leftk.size = 6;
1472    rightk.size = 6;
1473    encode_raw48(min_seq, &leftkr);
1474    encode_raw48(max_seq, &rightkr);
1475
1476    *count = 0;
1477    if(db->header.by_seq_root) {
1478        error_pass(btree_eval_seq_reduce(db, count, &leftk, &rightk, false,
1479                                         db->header.by_seq_root->pointer));
1480    }
1481cleanup:
1482    return errcode;
1483}
1484