xref: /5.5.2/couchstore/src/couch_db.cc (revision 745729c4)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "config.h"
3#include <fcntl.h>
4#include <platform/cb_malloc.h>
5#include <string.h>
6#include <stdlib.h>
7#include <assert.h>
8#include <stdio.h>
9
10#include "internal.h"
11#include "node_types.h"
12#include "couch_btree.h"
13#include "bitfield.h"
14#include "reduces.h"
15#include "util.h"
16
17#include "couch_latency_internal.h"
18
19#define ROOT_BASE_SIZE 12
20#define HEADER_BASE_SIZE 25
21
22// Initializes one of the db's root node pointers from data in the file header
23static couchstore_error_t read_db_root(Db *db, node_pointer **root,
24                                       void *root_data, int root_size)
25{
26    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
27    if (root_size > 0) {
28        error_unless(root_size >= ROOT_BASE_SIZE, COUCHSTORE_ERROR_CORRUPT);
29        *root = read_root(root_data, root_size);
30        error_unless(*root, COUCHSTORE_ERROR_ALLOC_FAIL);
31        error_unless((*root)->pointer < db->header.position, COUCHSTORE_ERROR_CORRUPT);
32    } else {
33        *root = NULL;
34    }
35cleanup:
36    return errcode;
37}
38
39// Attempts to initialize the database from a header at the given file position
40static couchstore_error_t find_header_at_pos(Db *db, cs_off_t pos)
41{
42    int seqrootsize;
43    int idrootsize;
44    int localrootsize;
45    char *root_data;
46    int header_len;
47    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
48    union {
49        raw_file_header *raw;
50        char *buf;
51    } header_buf = { NULL };
52    uint8_t buf[2];
53    ssize_t readsize;
54    {
55        // Speculative read looking for header, mark as Empty.
56        ScopedFileTag tag(db->file.ops, db->file.handle, FileTag::Empty);
57        readsize = db->file.ops->pread(
58                &db->file.lastError, db->file.handle, buf, 2, pos);
59    }
60    error_unless(readsize == 2, COUCHSTORE_ERROR_READ);
61    if (buf[0] == 0) {
62        return COUCHSTORE_ERROR_NO_HEADER;
63    } else if (buf[0] != 1) {
64        return COUCHSTORE_ERROR_CORRUPT;
65    }
66
67    header_len = pread_header(&db->file, pos, &header_buf.buf, MAX_DB_HEADER_SIZE);
68    if (header_len < 0) {
69        error_pass(static_cast<couchstore_error_t>(header_len));
70    }
71
72    db->header.position = pos;
73    db->header.disk_version = decode_raw08(header_buf.raw->version);
74
75    // Only 12 and 11 are valid
76    error_unless(db->header.disk_version == COUCH_DISK_VERSION ||
77                 db->header.disk_version == COUCH_DISK_VERSION_11,
78                 COUCHSTORE_ERROR_HEADER_VERSION);
79    db->header.update_seq = decode_raw48(header_buf.raw->update_seq);
80    db->header.purge_seq = decode_raw48(header_buf.raw->purge_seq);
81    db->header.purge_ptr = decode_raw48(header_buf.raw->purge_ptr);
82    error_unless(db->header.purge_ptr <= db->header.position, COUCHSTORE_ERROR_CORRUPT);
83    seqrootsize = decode_raw16(header_buf.raw->seqrootsize);
84    idrootsize = decode_raw16(header_buf.raw->idrootsize);
85    localrootsize = decode_raw16(header_buf.raw->localrootsize);
86    error_unless(header_len == HEADER_BASE_SIZE + seqrootsize + idrootsize + localrootsize,
87                 COUCHSTORE_ERROR_CORRUPT);
88
89    root_data = (char*) (header_buf.raw + 1);  // i.e. just past *header_buf
90    error_pass(read_db_root(db, &db->header.by_seq_root, root_data, seqrootsize));
91    root_data += seqrootsize;
92    error_pass(read_db_root(db, &db->header.by_id_root, root_data, idrootsize));
93    root_data += idrootsize;
94    error_pass(read_db_root(db, &db->header.local_docs_root, root_data, localrootsize));
95
96cleanup:
97    cb_free(header_buf.raw);
98    return errcode;
99}
100
101// Finds the database header by scanning back from the end of the file at 4k boundaries
102static couchstore_error_t find_header(Db *db, int64_t start_pos)
103{
104    couchstore_error_t last_header_errcode = COUCHSTORE_ERROR_NO_HEADER;
105    int64_t pos = start_pos;
106    pos -= pos % COUCH_BLOCK_SIZE;
107    for (; pos >= 0; pos -= COUCH_BLOCK_SIZE) {
108        couchstore_error_t errcode = find_header_at_pos(db, pos);
109        switch(errcode) {
110            case COUCHSTORE_SUCCESS:
111                // Found it!
112                return COUCHSTORE_SUCCESS;
113            case COUCHSTORE_ERROR_NO_HEADER:
114                // No header here, so keep going
115                break;
116            case COUCHSTORE_ERROR_ALLOC_FAIL:
117                // Fatal error
118                return errcode;
119            default:
120                // Invalid header; continue, but remember the last error
121                last_header_errcode = errcode;
122                break;
123        }
124    }
125    return last_header_errcode;
126}
127
128/**
129 * Calculates how large in bytes the current header will be
130 * when written to disk.
131 *
132 * The seqrootsize, idrootsize and localrootsize params are
133 * used to return the respective sizes in this header if
134 * needed.
135 */
136size_t calculate_header_size(Db *db, size_t& seqrootsize,
137                             size_t& idrootsize, size_t& localrootsize)
138{
139    seqrootsize = idrootsize = localrootsize = 0;
140
141    if (db->header.by_seq_root) {
142        seqrootsize = ROOT_BASE_SIZE + db->header.by_seq_root->reduce_value.size;
143    }
144    if (db->header.by_id_root) {
145        idrootsize = ROOT_BASE_SIZE + db->header.by_id_root->reduce_value.size;
146    }
147    if (db->header.local_docs_root) {
148        localrootsize = ROOT_BASE_SIZE + db->header.local_docs_root->reduce_value.size;
149    }
150    return sizeof(raw_file_header) + seqrootsize + idrootsize + localrootsize;
151}
152
153couchstore_error_t db_write_header(Db *db)
154{
155    sized_buf writebuf;
156    size_t seqrootsize, idrootsize, localrootsize;
157    writebuf.size = calculate_header_size(db, seqrootsize,
158                                          idrootsize, localrootsize);
159    writebuf.buf = (char *) cb_calloc(1, writebuf.size);
160    raw_file_header* header = (raw_file_header*)writebuf.buf;
161    header->version = encode_raw08(db->header.disk_version);
162    encode_raw48(db->header.update_seq, &header->update_seq);
163    encode_raw48(db->header.purge_seq, &header->purge_seq);
164    encode_raw48(db->header.purge_ptr, &header->purge_ptr);
165    header->seqrootsize = encode_raw16((uint16_t)seqrootsize);
166    header->idrootsize = encode_raw16((uint16_t)idrootsize);
167    header->localrootsize = encode_raw16((uint16_t)localrootsize);
168    uint8_t *root = (uint8_t*)(header + 1);
169    encode_root(root, db->header.by_seq_root);
170    root += seqrootsize;
171    encode_root(root, db->header.by_id_root);
172    root += idrootsize;
173    encode_root(root, db->header.local_docs_root);
174    cs_off_t pos;
175    couchstore_error_t errcode = write_header(&db->file, &writebuf, &pos);
176    if (errcode == COUCHSTORE_SUCCESS) {
177        db->header.position = pos;
178    }
179    cb_free(writebuf.buf);
180    return errcode;
181}
182
183static couchstore_error_t create_header(Db *db)
184{
185    // Select the version based upon selected CRC
186    if (db->file.crc_mode == CRC32) {
187        // user is creating down-level files
188        db->header.disk_version = COUCH_DISK_VERSION_11;
189    } else {
190        // user is using latest
191        db->header.disk_version = COUCH_DISK_VERSION;
192    }
193    db->header.update_seq = 0;
194    db->header.by_id_root = NULL;
195    db->header.by_seq_root = NULL;
196    db->header.local_docs_root = NULL;
197    db->header.purge_seq = 0;
198    db->header.purge_ptr = 0;
199    db->header.position = 0;
200    return db_write_header(db);
201}
202
203LIBCOUCHSTORE_API
204uint64_t couchstore_get_header_position(Db *db)
205{
206    return db->header.position;
207}
208
209/**
210 * Precommit should occur before writing a header, it has two
211 * purposes. Firstly it ensures data is written before we attempt
212 * to write the header. This means it's impossible for the header
213 * to be written before the data. This is accomplished through
214 * a sync.
215 *
216 * The second purpose is to extend the file to be large enough
217 * to include the subsequently written header. This is done so
218 * the fdatasync performed by writing a header doesn't have to
219 * do an additional (expensive) modified metadata flush on top
220 * of the one we're already doing.
221 */
222couchstore_error_t precommit(Db *db)
223{
224    cs_off_t curpos = db->file.pos;
225
226    db->file.pos = align_to_next_block(db->file.pos);
227    sized_buf zerobyte = { const_cast<char*>("\0"), 1};
228
229    size_t seqrootsize, idrootsize, localrootsize;
230    db->file.pos += calculate_header_size(db, seqrootsize,
231                                          idrootsize, localrootsize);
232
233    //Extend file size to where end of header will land before we do first sync
234    couchstore_error_t errcode = static_cast<couchstore_error_t>(
235        db_write_buf(&db->file, &zerobyte, NULL, NULL));
236
237    if (errcode == COUCHSTORE_SUCCESS) {
238        errcode = db->file.ops->sync(&db->file.lastError, db->file.handle);
239    }
240    // Move cursor back to where it was
241    db->file.pos = curpos;
242    return errcode;
243}
244
245LIBCOUCHSTORE_API
246couchstore_error_t couchstore_commit(Db *db)
247{
248    COLLECT_LATENCY();
249
250    couchstore_error_t errcode = precommit(db);
251
252    if (errcode == COUCHSTORE_SUCCESS) {
253        errcode = db_write_header(db);
254    }
255
256    if (errcode == COUCHSTORE_SUCCESS) {
257        errcode = db->file.ops->sync(&db->file.lastError, db->file.handle);
258    }
259
260    return errcode;
261}
262
263static tree_file_options get_tree_file_options_from_flags(couchstore_open_flags flags)
264{
265    tree_file_options options;
266
267    if (flags & COUCHSTORE_OPEN_FLAG_UNBUFFERED) {
268        options.buf_io_enabled = false;
269    } else if (flags & COUCHSTORE_OPEN_WITH_CUSTOM_BUFFER) {
270        // Buffered IO with custom buffer settings.
271        //  * First 4 bits [15:12]: read buffer capacity
272        //  * Next  4 bits [11:08]: max read buffer count
273
274        uint32_t unit_index = (flags >> 12) & 0xf;
275        if (unit_index) {
276            // unit_index    1     2     3     4     ...   15
277            // unit size     1KB   2KB   4KB   8KB   ...   16MB
278            options.buf_io_read_unit_size = 1024 * (1 << (unit_index -1));
279        }
280        uint32_t count_index = (flags >> 8) & 0xf;
281        if (count_index) {
282            // count_index   1     2     3     4     ...   15
283            // # buffers     8     16    32    64    ...   128K
284            options.buf_io_read_buffers = 8 * (1 << (count_index-1));
285        }
286    }
287
288    // Set default value first.
289    options.kp_nodesize = DB_KP_CHUNK_THRESHOLD;
290    options.kv_nodesize = DB_KV_CHUNK_THRESHOLD;
291    if (flags & COUCHSTORE_OPEN_WITH_CUSTOM_NODESIZE) {
292        // B+tree custom node size settings.
293        //  * First 4 bits [23:20]: KP node size
294        //  * Next  4 bits [19:16]: KV node size
295        uint32_t kp_flag = (flags >> 20) & 0xf;
296        if (kp_flag) {
297            options.kp_nodesize = kp_flag * 1024;
298        }
299        uint32_t kv_flag = (flags >> 16) & 0xf;
300        if (kv_flag) {
301            options.kv_nodesize = kv_flag * 1024;
302        }
303    }
304
305    if (flags & COUCHSTORE_OPEN_WITH_PERIODIC_SYNC) {
306        // Automatic sync() every N bytes written.
307        //  * 5 bits [28-24]: power-of-2 * 1kB
308        uint64_t sync_flag = (flags >> 24) & 0x1f;
309        options.periodic_sync_bytes = uint64_t(1024) << (sync_flag - 1);
310    }
311
312    return options;
313}
314
315LIBCOUCHSTORE_API
316couchstore_open_flags couchstore_encode_periodic_sync_flags(uint64_t bytes) {
317    // Convert to encoding supported by couchstore_open_flags - KB power-of-2
318    // value.
319    // Round up to whole kilobyte units.
320    const uint64_t kilobytes = (bytes + 1023) / 1024;
321    // Calculate the shift amount (what is the log2 power)
322    uint64_t shiftAmount = std::log2(kilobytes);
323    // Saturate if the user specified more than the encodable amount.
324    shiftAmount = std::min(shiftAmount, uint64_t(30));
325    // Finally, encode in couchstore_open flags
326    return ((shiftAmount + 1)) << 24;
327}
328
329LIBCOUCHSTORE_API
330couchstore_error_t couchstore_open_db(const char *filename,
331                                      couchstore_open_flags flags,
332                                      Db **pDb)
333{
334    return couchstore_open_db_ex(filename, flags,
335                                 couchstore_get_default_file_ops(), pDb);
336}
337
338LIBCOUCHSTORE_API
339couchstore_error_t couchstore_open_db_ex(const char *filename,
340                                         couchstore_open_flags flags,
341                                         FileOpsInterface* ops,
342                                         Db **pDb)
343{
344    COLLECT_LATENCY();
345
346    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
347    Db *db;
348    int openflags;
349    cs_off_t pos;
350
351    /* Sanity check input parameters */
352    if ((flags & COUCHSTORE_OPEN_FLAG_RDONLY) &&
353        (flags & COUCHSTORE_OPEN_FLAG_CREATE)) {
354        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
355    }
356
357    if ((db = static_cast<Db*>(cb_calloc(1, sizeof(Db)))) == NULL) {
358        return COUCHSTORE_ERROR_ALLOC_FAIL;
359    }
360
361    if (flags & COUCHSTORE_OPEN_FLAG_RDONLY) {
362        openflags = O_RDONLY;
363    } else {
364        openflags = O_RDWR;
365    }
366
367    if (flags & COUCHSTORE_OPEN_FLAG_CREATE) {
368        openflags |= O_CREAT;
369    }
370
371    // open with CRC unknown, CRC will be selected when header is read/or not found.
372    error_pass(tree_file_open(&db->file, filename, openflags, CRC_UNKNOWN, ops,
373                              get_tree_file_options_from_flags(flags)));
374
375    pos = db->file.ops->goto_eof(&db->file.lastError, db->file.handle);
376    db->file.pos = pos;
377    if (pos == 0) {
378        /* This is an empty file. Create a new fileheader unless the
379         * user wanted a read-only version of the file
380         */
381
382        if (flags & COUCHSTORE_OPEN_FLAG_RDONLY) {
383            error_pass(COUCHSTORE_ERROR_NO_HEADER);
384        } else {
385
386            // Select the CRC to use on this new file
387            if (flags & COUCHSTORE_OPEN_WITH_LEGACY_CRC) {
388                db->file.crc_mode = CRC32;
389            } else {
390                db->file.crc_mode = CRC32C;
391            }
392
393            error_pass(create_header(db));
394        }
395    } else if (pos > 0) {
396        error_pass(find_header(db, db->file.pos - 2));
397
398        if (db->header.disk_version <= COUCH_DISK_VERSION_11) {
399            db->file.crc_mode = CRC32;
400        } else {
401            cb_assert(db->header.disk_version >= COUCH_DISK_VERSION_12);
402            db->file.crc_mode = CRC32C;
403        }
404
405        // Not allowed. Can't request legacy_crc but be opening non legacy CRC files.
406        if (db->file.crc_mode == CRC32C && flags & COUCHSTORE_OPEN_WITH_LEGACY_CRC) {
407            errcode = COUCHSTORE_ERROR_INVALID_ARGUMENTS;
408            goto cleanup;
409        }
410    } else {
411        error_pass(static_cast<couchstore_error_t>(db->file.pos));
412    }
413
414    *pDb = db;
415    db->dropped = 0;
416
417cleanup:
418    if(errcode != COUCHSTORE_SUCCESS) {
419        couchstore_close_file(db);
420        couchstore_free_db(db);
421    }
422
423    return errcode;
424}
425
426LIBCOUCHSTORE_API
427couchstore_error_t couchstore_close_file(Db* db)
428{
429    COLLECT_LATENCY();
430
431    if(db->dropped) {
432        return COUCHSTORE_SUCCESS;
433    }
434    couchstore_error_t error = tree_file_close(&db->file);
435    db->dropped = 1;
436    return error;
437}
438
439LIBCOUCHSTORE_API
440couchstore_error_t couchstore_rewind_db_header(Db *db)
441{
442    COLLECT_LATENCY();
443
444    couchstore_error_t errcode;
445    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
446    // free current header guts
447    cb_free(db->header.by_id_root);
448    cb_free(db->header.by_seq_root);
449    cb_free(db->header.local_docs_root);
450    db->header.by_id_root = NULL;
451    db->header.by_seq_root = NULL;
452    db->header.local_docs_root = NULL;
453
454    error_unless(db->header.position != 0, COUCHSTORE_ERROR_DB_NO_LONGER_VALID);
455    // find older header
456    error_pass(find_header(db, db->header.position - 2));
457
458cleanup:
459    // if we failed, free the handle and return an error
460    if(errcode != COUCHSTORE_SUCCESS) {
461        couchstore_close_file(db);
462        couchstore_free_db(db);
463        errcode = COUCHSTORE_ERROR_DB_NO_LONGER_VALID;
464    }
465    return errcode;
466}
467
468LIBCOUCHSTORE_API
469couchstore_error_t couchstore_free_db(Db* db)
470{
471    COLLECT_LATENCY();
472
473    if(!db) {
474        return COUCHSTORE_SUCCESS;
475    }
476
477    if(!db->dropped) {
478        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
479    }
480
481    cb_free(db->header.by_id_root);
482    cb_free(db->header.by_seq_root);
483    cb_free(db->header.local_docs_root);
484    db->header.by_id_root = NULL;
485    db->header.by_seq_root = NULL;
486    db->header.local_docs_root = NULL;
487
488    memset(db, 0xa5, sizeof(*db));
489    cb_free(db);
490
491    return COUCHSTORE_SUCCESS;
492}
493
494LIBCOUCHSTORE_API
495const char* couchstore_get_db_filename(Db *db) {
496    return db->file.path;
497}
498
499LIBCOUCHSTORE_API
500FileOpsInterface::FHStats* couchstore_get_db_filestats(Db* db) {
501    return db->file.ops->get_stats(db->file.handle);
502}
503
504DocInfo* couchstore_alloc_docinfo(const sized_buf *id, const sized_buf *rev_meta) {
505    size_t size = sizeof(DocInfo);
506    if (id) {
507        size += id->size;
508    }
509    if (rev_meta) {
510        size += rev_meta->size;
511    }
512    DocInfo* docInfo = static_cast<DocInfo*>(cb_malloc(size));
513    if (!docInfo) {
514        return NULL;
515    }
516    memset(docInfo, 0, sizeof(DocInfo));
517    char *extra = (char *)docInfo + sizeof(DocInfo);
518    if (id) {
519        memcpy(extra, id->buf, id->size);
520        docInfo->id.buf = extra;
521        docInfo->id.size = id->size;
522        extra += id->size;
523    }
524    if (rev_meta) {
525        memcpy(extra, rev_meta->buf, rev_meta->size);
526        docInfo->rev_meta.buf = extra;
527        docInfo->rev_meta.size = rev_meta->size;
528    }
529    return docInfo;
530}
531
532LIBCOUCHSTORE_API
533void couchstore_free_docinfo(DocInfo *docinfo)
534{
535    cb_free(docinfo);
536}
537
538LIBCOUCHSTORE_API
539void couchstore_free_document(Doc *doc)
540{
541    if (doc) {
542        char *offset = (char *) (&((fatbuf *) NULL)->buf);
543        fatbuf_free((fatbuf *) ((char *)doc - (char *)offset));
544    }
545}
546
547couchstore_error_t by_seq_read_docinfo(DocInfo **pInfo,
548                                       const sized_buf *k,
549                                       const sized_buf *v)
550{
551    const raw_seq_index_value *raw = (const raw_seq_index_value*)v->buf;
552    ssize_t extraSize = v->size - sizeof(*raw);
553    if (extraSize < 0) {
554        return COUCHSTORE_ERROR_CORRUPT;
555    }
556
557    uint32_t idsize, datasize;
558    decode_kv_length(&raw->sizes, &idsize, &datasize);
559    uint64_t bp = decode_raw48(raw->bp);
560    int deleted = (bp & BP_DELETED_FLAG) != 0;
561    bp &= ~BP_DELETED_FLAG;
562    uint8_t content_meta = decode_raw08(raw->content_meta);
563    uint64_t rev_seq = decode_raw48(raw->rev_seq);
564    uint64_t db_seq = decode_sequence_key(k);
565
566    sized_buf id = {v->buf + sizeof(*raw), idsize};
567    sized_buf rev_meta = {id.buf + idsize, extraSize - id.size};
568    DocInfo* docInfo = couchstore_alloc_docinfo(&id, &rev_meta);
569    if (!docInfo) {
570        return COUCHSTORE_ERROR_ALLOC_FAIL;
571    }
572
573    docInfo->db_seq = db_seq;
574    docInfo->rev_seq = rev_seq;
575    docInfo->deleted = deleted;
576    docInfo->bp = bp;
577    docInfo->size = datasize;
578    docInfo->content_meta = content_meta;
579    *pInfo = docInfo;
580    return COUCHSTORE_SUCCESS;
581}
582
583static couchstore_error_t by_id_read_docinfo(DocInfo **pInfo,
584                                             const sized_buf *k,
585                                             const sized_buf *v)
586{
587    const raw_id_index_value *raw = (const raw_id_index_value*)v->buf;
588    ssize_t revMetaSize = v->size - sizeof(*raw);
589    if (revMetaSize < 0) {
590        return COUCHSTORE_ERROR_CORRUPT;
591    }
592
593    uint32_t datasize, deleted;
594    uint8_t content_meta;
595    uint64_t bp, seq, revnum;
596
597    seq = decode_raw48(raw->db_seq);
598    datasize = decode_raw32(raw->size);
599    bp = decode_raw48(raw->bp);
600    deleted = (bp & BP_DELETED_FLAG) != 0;
601    bp &= ~BP_DELETED_FLAG;
602    content_meta = decode_raw08(raw->content_meta);
603    revnum = decode_raw48(raw->rev_seq);
604
605    sized_buf rev_meta = {v->buf + sizeof(*raw), static_cast<size_t>(revMetaSize)};
606    DocInfo* docInfo = couchstore_alloc_docinfo(k, &rev_meta);
607    if (!docInfo) {
608        return COUCHSTORE_ERROR_ALLOC_FAIL;
609    }
610
611    docInfo->db_seq = seq;
612    docInfo->rev_seq = revnum;
613    docInfo->deleted = deleted;
614    docInfo->bp = bp;
615    docInfo->size = datasize;
616    docInfo->content_meta = content_meta;
617    *pInfo = docInfo;
618    return COUCHSTORE_SUCCESS;
619}
620
621//Fill in doc from reading file.
622static couchstore_error_t bp_to_doc(Doc **pDoc, Db *db, cs_off_t bp, couchstore_open_options options)
623{
624    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
625    int bodylen = 0;
626    char *docbody = NULL;
627    fatbuf *docbuf = NULL;
628    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
629
630    if (options & DECOMPRESS_DOC_BODIES) {
631        bodylen = pread_compressed(&db->file, bp, &docbody);
632    } else {
633        bodylen = pread_bin(&db->file, bp, &docbody);
634    }
635
636    error_unless(bodylen >= 0, static_cast<couchstore_error_t>(bodylen));    // if bodylen is negative it's an error code
637    error_unless(docbody || bodylen == 0, COUCHSTORE_ERROR_READ);
638
639    error_unless(docbuf = fatbuf_alloc(sizeof(Doc) + bodylen), COUCHSTORE_ERROR_ALLOC_FAIL);
640    *pDoc = (Doc *) fatbuf_get(docbuf, sizeof(Doc));
641
642    if (bodylen == 0) { //Empty doc
643        (*pDoc)->data.buf = NULL;
644        (*pDoc)->data.size = 0;
645        cb_free(docbody);
646        return COUCHSTORE_SUCCESS;
647    }
648
649    (*pDoc)->data.buf = (char *) fatbuf_get(docbuf, bodylen);
650    (*pDoc)->data.size = bodylen;
651    memcpy((*pDoc)->data.buf, docbody, bodylen);
652
653cleanup:
654    cb_free(docbody);
655    if (errcode < 0) {
656        fatbuf_free(docbuf);
657    }
658    return errcode;
659}
660
661static couchstore_error_t docinfo_fetch_by_id(couchfile_lookup_request *rq,
662                                              const sized_buf *k,
663                                              const sized_buf *v)
664{
665    DocInfo **pInfo = (DocInfo **) rq->callback_ctx;
666    if (v == NULL) {
667        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
668    }
669    return by_id_read_docinfo(pInfo, k, v);
670}
671
672static couchstore_error_t docinfo_fetch_by_seq(couchfile_lookup_request *rq,
673                                               const sized_buf *k,
674                                               const sized_buf *v)
675{
676    DocInfo **pInfo = (DocInfo **) rq->callback_ctx;
677    if (v == NULL) {
678        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
679    }
680    return by_seq_read_docinfo(pInfo, k, v);
681}
682
683LIBCOUCHSTORE_API
684couchstore_error_t couchstore_docinfo_by_id(Db *db,
685                                            const void *id,
686                                            size_t idlen,
687                                            DocInfo **pInfo)
688{
689    COLLECT_LATENCY();
690
691    sized_buf key;
692    sized_buf *keylist = &key;
693    couchfile_lookup_request rq;
694    couchstore_error_t errcode;
695    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
696
697    if (db->header.by_id_root == NULL) {
698        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
699    }
700
701    key.buf = (char *) id;
702    key.size = idlen;
703
704    rq.cmp.compare = ebin_cmp;
705    rq.file = &db->file;
706    rq.num_keys = 1;
707    rq.keys = &keylist;
708    rq.callback_ctx = pInfo;
709    rq.fetch_callback = docinfo_fetch_by_id;
710    rq.node_callback = NULL;
711    rq.fold = 0;
712
713    errcode = btree_lookup(&rq, db->header.by_id_root->pointer);
714    if (errcode == COUCHSTORE_SUCCESS) {
715        if (*pInfo == NULL) {
716            errcode = COUCHSTORE_ERROR_DOC_NOT_FOUND;
717        }
718    }
719cleanup:
720    return errcode;
721}
722
723LIBCOUCHSTORE_API
724couchstore_error_t couchstore_docinfo_by_sequence(Db *db,
725                                                  uint64_t sequence,
726                                                  DocInfo **pInfo)
727{
728    COLLECT_LATENCY();
729
730    sized_buf key;
731    sized_buf *keylist = &key;
732    couchfile_lookup_request rq;
733    couchstore_error_t errcode;
734    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
735
736    if (db->header.by_id_root == NULL) {
737        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
738    }
739
740    sequence = htonll(sequence);
741    key.buf = (char *)&sequence + 2;
742    key.size = 6;
743
744    rq.cmp.compare = seq_cmp;
745    rq.file = &db->file;
746    rq.num_keys = 1;
747    rq.keys = &keylist;
748    rq.callback_ctx = pInfo;
749    rq.fetch_callback = docinfo_fetch_by_seq;
750    rq.node_callback = NULL;
751    rq.fold = 0;
752
753    errcode = btree_lookup(&rq, db->header.by_seq_root->pointer);
754    if (errcode == COUCHSTORE_SUCCESS) {
755        if (*pInfo == NULL) {
756            errcode = COUCHSTORE_ERROR_DOC_NOT_FOUND;
757        }
758    }
759cleanup:
760    return errcode;
761}
762
763LIBCOUCHSTORE_API
764couchstore_error_t couchstore_open_doc_with_docinfo(Db *db,
765                                                    const DocInfo *docinfo,
766                                                    Doc **pDoc,
767                                                    couchstore_open_options options)
768{
769    COLLECT_LATENCY();
770
771    couchstore_error_t errcode;
772
773    *pDoc = NULL;
774    if (docinfo->bp == 0) {
775        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
776    }
777
778    if (!(docinfo->content_meta & COUCH_DOC_IS_COMPRESSED)) {
779        options &= ~DECOMPRESS_DOC_BODIES;
780    }
781
782    errcode = bp_to_doc(pDoc, db, docinfo->bp, options);
783    if (errcode == COUCHSTORE_SUCCESS) {
784        (*pDoc)->id.buf = docinfo->id.buf;
785        (*pDoc)->id.size = docinfo->id.size;
786    }
787
788    return errcode;
789}
790
791LIBCOUCHSTORE_API
792couchstore_error_t couchstore_open_document(Db *db,
793                                            const void *id,
794                                            size_t idlen,
795                                            Doc **pDoc,
796                                            couchstore_open_options options)
797{
798    COLLECT_LATENCY();
799
800    couchstore_error_t errcode;
801    DocInfo *info;
802    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
803    *pDoc = NULL;
804    errcode = couchstore_docinfo_by_id(db, id, idlen, &info);
805    if (errcode == COUCHSTORE_SUCCESS) {
806        errcode = couchstore_open_doc_with_docinfo(db, info, pDoc, options);
807        if (errcode == COUCHSTORE_SUCCESS) {
808            (*pDoc)->id.buf = (char *) id;
809            (*pDoc)->id.size = idlen;
810        }
811
812        couchstore_free_docinfo(info);
813    }
814cleanup:
815    return errcode;
816}
817
818// context info passed to lookup_callback via btree_lookup
819typedef struct {
820    Db *db;
821    couchstore_docinfos_options options;
822    couchstore_changes_callback_fn callback;
823    void* callback_context;
824    int by_id;
825    int depth;
826    couchstore_walk_tree_callback_fn walk_callback;
827} lookup_context;
828
829// btree_lookup callback, called while iterating keys
830static couchstore_error_t lookup_callback(couchfile_lookup_request *rq,
831                                          const sized_buf *k,
832                                          const sized_buf *v)
833{
834    if (v == NULL) {
835        return COUCHSTORE_SUCCESS;
836    }
837
838    const lookup_context *context = static_cast<const lookup_context *>(rq->callback_ctx);
839    DocInfo *docinfo = NULL;
840    couchstore_error_t errcode;
841    if (context->by_id) {
842        errcode = by_id_read_docinfo(&docinfo, k, v);
843    } else {
844        errcode = by_seq_read_docinfo(&docinfo, k, v);
845    }
846    if (errcode == COUCHSTORE_ERROR_CORRUPT &&
847        (context->options & COUCHSTORE_TOLERATE_CORRUPTION)) {
848        // Invoke callback even if doc info is corrupted/unreadable, if magic flag is set
849        docinfo = static_cast<DocInfo*>(cb_calloc(sizeof(DocInfo), 1));
850        docinfo->id = *k;
851        docinfo->rev_meta = *v;
852    } else if (errcode) {
853        return errcode;
854    }
855
856    if ((context->options & COUCHSTORE_DELETES_ONLY) && docinfo->deleted == 0) {
857        couchstore_free_docinfo(docinfo);
858        return COUCHSTORE_SUCCESS;
859    }
860
861    if ((context->options & COUCHSTORE_NO_DELETES) && docinfo->deleted == 1) {
862        couchstore_free_docinfo(docinfo);
863        return COUCHSTORE_SUCCESS;
864    }
865
866    if (context->walk_callback) {
867        errcode = static_cast<couchstore_error_t>(context->walk_callback(context->db,
868                                                                         context->depth,
869                                                                         docinfo,
870                                                                         0,
871                                                                         NULL,
872                                                                         context->callback_context));
873    } else {
874        errcode = static_cast<couchstore_error_t>(context->callback(context->db,
875                                                                    docinfo,
876                                                                    context->callback_context));
877    }
878    if (errcode <= 0) {
879        couchstore_free_docinfo(docinfo);
880    } else {
881        // User requested docinfo not be freed, don't free it, return success
882        return COUCHSTORE_SUCCESS;
883    }
884    return errcode;
885}
886
887LIBCOUCHSTORE_API
888couchstore_error_t couchstore_changes_since(Db *db,
889                                            uint64_t since,
890                                            couchstore_docinfos_options options,
891                                            couchstore_changes_callback_fn callback,
892                                            void *ctx)
893{
894    COLLECT_LATENCY();
895
896    char since_termbuf[6];
897    sized_buf since_term;
898    sized_buf *keylist = &since_term;
899    lookup_context cbctx = {db, options, callback, ctx, 0, 0, NULL};
900    couchfile_lookup_request rq;
901    couchstore_error_t errcode;
902
903    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
904    if (db->header.by_seq_root == NULL) {
905        return COUCHSTORE_SUCCESS;
906    }
907
908    since_term.buf = since_termbuf;
909    since_term.size = 6;
910    encode_raw48(since, (raw_48*)since_term.buf);
911
912    rq.cmp.compare = seq_cmp;
913    rq.file = &db->file;
914    rq.num_keys = 1;
915    rq.keys = &keylist;
916    rq.callback_ctx = &cbctx;
917    rq.fetch_callback = lookup_callback;
918    rq.node_callback = NULL;
919    rq.fold = 1;
920    rq.tolerate_corruption = (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0;
921
922    errcode = btree_lookup(&rq, db->header.by_seq_root->pointer);
923cleanup:
924    return errcode;
925}
926
927LIBCOUCHSTORE_API
928couchstore_error_t couchstore_all_docs(Db *db,
929                                       const sized_buf* startKeyPtr,
930                                       couchstore_docinfos_options options,
931                                       couchstore_changes_callback_fn callback,
932                                       void *ctx)
933{
934    COLLECT_LATENCY();
935
936    sized_buf startKey = {NULL, 0};
937    sized_buf *keylist = &startKey;
938    lookup_context cbctx = {db, options, callback, ctx, 1, 0, NULL};
939    couchfile_lookup_request rq;
940    couchstore_error_t errcode;
941
942    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
943    if (db->header.by_id_root == NULL) {
944        return COUCHSTORE_SUCCESS;
945    }
946
947    if (startKeyPtr) {
948        startKey = *startKeyPtr;
949    }
950
951    rq.cmp.compare = ebin_cmp;
952    rq.file = &db->file;
953    rq.num_keys = 1;
954    rq.keys = &keylist;
955    rq.callback_ctx = &cbctx;
956    rq.fetch_callback = lookup_callback;
957    rq.node_callback = NULL;
958    rq.fold = 1;
959    rq.tolerate_corruption = (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0;
960
961    errcode = btree_lookup(&rq, db->header.by_id_root->pointer);
962cleanup:
963    return errcode;
964}
965
966static couchstore_error_t walk_node_callback(struct couchfile_lookup_request *rq,
967                                                 uint64_t subtreeSize,
968                                                 const sized_buf *reduceValue)
969{
970    lookup_context* context = static_cast<lookup_context*>(rq->callback_ctx);
971    if (reduceValue) {
972        int result = context->walk_callback(context->db,
973                                            context->depth,
974                                            NULL,
975                                            subtreeSize,
976                                            reduceValue,
977                                            context->callback_context);
978        context->depth++;
979        if (result < 0)
980            return static_cast<couchstore_error_t>(result);
981    } else {
982        context->depth--;
983    }
984    return COUCHSTORE_SUCCESS;
985}
986
987static
988couchstore_error_t couchstore_walk_tree(Db *db,
989                                        int by_id,
990                                        const node_pointer* root,
991                                        const sized_buf* startKeyPtr,
992                                        couchstore_docinfos_options options,
993                                        int (*compare)(const sized_buf *k1, const sized_buf *k2),
994                                        couchstore_walk_tree_callback_fn callback,
995                                        void *ctx)
996{
997    couchstore_error_t errcode;
998    sized_buf startKey = {NULL, 0};
999    sized_buf *keylist;
1000    couchfile_lookup_request rq;
1001
1002    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1003    if (root == NULL) {
1004        return COUCHSTORE_SUCCESS;
1005    }
1006
1007    // Invoke the callback on the root node:
1008    errcode = static_cast<couchstore_error_t>(callback(db, 0, NULL,
1009                                                       root->subtreesize,
1010                                                       &root->reduce_value,
1011                                                       ctx));
1012    if (errcode < 0) {
1013        return errcode;
1014    }
1015
1016    if (startKeyPtr) {
1017        startKey = *startKeyPtr;
1018    }
1019    keylist = &startKey;
1020
1021    {
1022        // Create a new scope here just to mute the warning from the
1023        // compiler that the goto in the macro error_unless
1024        // skips the initialization of lookup_ctx..
1025        lookup_context lookup_ctx = {db, options, NULL, ctx, by_id, 1, callback};
1026
1027        rq.cmp.compare = compare;
1028        rq.file = &db->file;
1029        rq.num_keys = 1;
1030        rq.keys = &keylist;
1031        rq.callback_ctx = &lookup_ctx;
1032        rq.fetch_callback = lookup_callback;
1033        rq.node_callback = walk_node_callback;
1034        rq.fold = 1;
1035        rq.tolerate_corruption = (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0;
1036
1037        error_pass(btree_lookup(&rq, root->pointer));
1038    }
1039cleanup:
1040    return errcode;
1041}
1042
1043LIBCOUCHSTORE_API
1044couchstore_error_t couchstore_walk_id_tree(Db *db,
1045                                           const sized_buf* startDocID,
1046                                           couchstore_docinfos_options options,
1047                                           couchstore_walk_tree_callback_fn callback,
1048                                           void *ctx)
1049{
1050    COLLECT_LATENCY();
1051
1052    return couchstore_walk_tree(db, 1, db->header.by_id_root, startDocID,
1053                                options, ebin_cmp, callback, ctx);
1054}
1055
1056LIBCOUCHSTORE_API
1057couchstore_error_t couchstore_walk_seq_tree(Db *db,
1058                                           uint64_t startSequence,
1059                                           couchstore_docinfos_options options,
1060                                           couchstore_walk_tree_callback_fn callback,
1061                                           void *ctx)
1062{
1063    COLLECT_LATENCY();
1064
1065    raw_48 start_termbuf;
1066    encode_raw48(startSequence, &start_termbuf);
1067    sized_buf start_term = {(char*)&start_termbuf, 6};
1068
1069    return couchstore_walk_tree(db, 0, db->header.by_seq_root, &start_term,
1070                                options, seq_cmp, callback, ctx);
1071}
1072
1073static int id_ptr_cmp(const void *a, const void *b)
1074{
1075    sized_buf **buf1 = (sized_buf**) a;
1076    sized_buf **buf2 = (sized_buf**) b;
1077    return ebin_cmp(*buf1, *buf2);
1078}
1079
1080static int seq_ptr_cmp(const void *a, const void *b)
1081{
1082    sized_buf **buf1 = (sized_buf**) a;
1083    sized_buf **buf2 = (sized_buf**) b;
1084    return seq_cmp(*buf1, *buf2);
1085}
1086
1087// Common subroutine of couchstore_docinfos_by_{ids, sequence}
1088static couchstore_error_t iterate_docinfos(Db *db,
1089                                           const sized_buf keys[],
1090                                           unsigned numDocs,
1091                                           node_pointer *tree,
1092                                           int (*key_ptr_compare)(const void *, const void *),
1093                                           int (*key_compare)(const sized_buf *k1, const sized_buf *k2),
1094                                           couchstore_changes_callback_fn callback,
1095                                           int fold,
1096                                           int tolerate_corruption,
1097                                           void *ctx)
1098{
1099    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
1100    const sized_buf **keyptrs = NULL;
1101    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1102    // Nothing to do if the tree is empty
1103    if (tree == NULL) {
1104        return COUCHSTORE_SUCCESS;
1105    }
1106
1107    if(numDocs <= 0) {
1108        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
1109    }
1110
1111    // Create an array of *pointers to* sized_bufs, which is what btree_lookup wants:
1112    keyptrs = static_cast<const sized_buf**>(cb_malloc(numDocs * sizeof(sized_buf*)));
1113    error_unless(keyptrs, COUCHSTORE_ERROR_ALLOC_FAIL);
1114
1115    {
1116        unsigned i;
1117        for (i = 0; i< numDocs; ++i) {
1118            keyptrs[i] = &keys[i];
1119        }
1120        if (!fold) {
1121            // Sort the key pointers:
1122            qsort(keyptrs, numDocs, sizeof(keyptrs[0]), key_ptr_compare);
1123        }
1124
1125        // Construct the lookup request:
1126        lookup_context cbctx = {db, 0, callback, ctx, (tree == db->header.by_id_root), 0, NULL};
1127        couchfile_lookup_request rq;
1128        rq.cmp.compare = key_compare;
1129        rq.file = &db->file;
1130        rq.num_keys = numDocs;
1131        rq.keys = (sized_buf**) keyptrs;
1132        rq.callback_ctx = &cbctx;
1133        rq.fetch_callback = lookup_callback;
1134        rq.node_callback = NULL;
1135        rq.fold = fold;
1136        rq.tolerate_corruption = tolerate_corruption;
1137
1138        // Go!
1139        error_pass(btree_lookup(&rq, tree->pointer));
1140    }
1141cleanup:
1142    cb_free(keyptrs);
1143    return errcode;
1144}
1145
1146LIBCOUCHSTORE_API
1147couchstore_error_t couchstore_docinfos_by_id(Db *db,
1148                                             const sized_buf ids[],
1149                                             unsigned numDocs,
1150                                             couchstore_docinfos_options options,
1151                                             couchstore_changes_callback_fn callback,
1152                                             void *ctx)
1153{
1154    COLLECT_LATENCY();
1155
1156    return iterate_docinfos(db, ids, numDocs,
1157                            db->header.by_id_root, id_ptr_cmp, ebin_cmp,
1158                            callback,
1159                            (options & RANGES) != 0,
1160                            (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0,
1161                            ctx);
1162}
1163
1164LIBCOUCHSTORE_API
1165couchstore_error_t couchstore_docinfos_by_sequence(Db *db,
1166                                                   const uint64_t sequence[],
1167                                                   unsigned numDocs,
1168                                                   couchstore_docinfos_options options,
1169                                                   couchstore_changes_callback_fn callback,
1170                                                   void *ctx)
1171{
1172    COLLECT_LATENCY();
1173
1174    // Create the array of keys:
1175    sized_buf *keylist = static_cast<sized_buf*>(cb_malloc(numDocs * sizeof(sized_buf)));
1176    raw_by_seq_key *keyvalues = static_cast<raw_by_seq_key*>(cb_malloc(numDocs * sizeof(raw_by_seq_key)));
1177    couchstore_error_t errcode;
1178    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1179    error_unless(keylist && keyvalues, COUCHSTORE_ERROR_ALLOC_FAIL);
1180    unsigned i;
1181    for (i = 0; i< numDocs; ++i) {
1182        encode_raw48(sequence[i], &keyvalues[i].sequence);
1183        keylist[i].buf = static_cast<char*>((void*) &keyvalues[i]);
1184        keylist[i].size = sizeof(keyvalues[i]);
1185    }
1186
1187    error_pass(iterate_docinfos(db, keylist, numDocs,
1188                                db->header.by_seq_root, seq_ptr_cmp, seq_cmp,
1189                                callback,
1190                                (options & RANGES) != 0,
1191                                (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0,
1192                                ctx));
1193cleanup:
1194    cb_free(keylist);
1195    cb_free(keyvalues);
1196    return errcode;
1197}
1198
1199LIBCOUCHSTORE_API
1200couchstore_error_t couchstore_db_info(Db *db, DbInfo* dbinfo) {
1201    if (db == NULL || dbinfo == NULL) {
1202        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
1203    }
1204    const node_pointer *id_root = db->header.by_id_root;
1205    const node_pointer *seq_root = db->header.by_seq_root;
1206    const node_pointer *local_root = db->header.local_docs_root;
1207    dbinfo->filename = db->file.path;
1208    dbinfo->header_position = db->header.position;
1209    dbinfo->last_sequence = db->header.update_seq;
1210    dbinfo->purge_seq = db->header.purge_seq;
1211    dbinfo->deleted_count = dbinfo->doc_count = dbinfo->space_used = 0;
1212    dbinfo->file_size = db->file.pos;
1213    if (id_root) {
1214        raw_by_id_reduce* id_reduce = (raw_by_id_reduce*) id_root->reduce_value.buf;
1215        dbinfo->doc_count = decode_raw40(id_reduce->notdeleted);
1216        dbinfo->deleted_count = decode_raw40(id_reduce->deleted);
1217        dbinfo->space_used = decode_raw48(id_reduce->size);
1218        dbinfo->space_used += id_root->subtreesize;
1219    }
1220    if(seq_root) {
1221        dbinfo->space_used += seq_root->subtreesize;
1222    }
1223    if(local_root) {
1224        dbinfo->space_used += local_root->subtreesize;
1225    }
1226    return COUCHSTORE_SUCCESS;
1227}
1228
1229static couchstore_error_t local_doc_fetch(couchfile_lookup_request *rq,
1230                                          const sized_buf *k,
1231                                          const sized_buf *v)
1232{
1233    LocalDoc **lDoc = (LocalDoc **) rq->callback_ctx;
1234    LocalDoc *dp;
1235
1236    if (!v) {
1237        *lDoc = NULL;
1238        return COUCHSTORE_SUCCESS;
1239    }
1240    fatbuf *ldbuf = fatbuf_alloc(sizeof(LocalDoc) + k->size + v->size);
1241    if (ldbuf == NULL) {
1242        return COUCHSTORE_ERROR_ALLOC_FAIL;
1243    }
1244
1245    dp = *lDoc = (LocalDoc *) fatbuf_get(ldbuf, sizeof(LocalDoc));
1246    dp->id.buf = (char *) fatbuf_get(ldbuf, k->size);
1247    dp->id.size = k->size;
1248
1249    dp->json.buf = (char *) fatbuf_get(ldbuf, v->size);
1250    dp->json.size = v->size;
1251
1252    dp->deleted = 0;
1253
1254    memcpy(dp->id.buf, k->buf, k->size);
1255    memcpy(dp->json.buf, v->buf, v->size);
1256
1257    return COUCHSTORE_SUCCESS;
1258}
1259
1260LIBCOUCHSTORE_API
1261couchstore_error_t couchstore_open_local_document(Db *db,
1262                                                  const void *id,
1263                                                  size_t idlen,
1264                                                  LocalDoc **pDoc)
1265{
1266    sized_buf key;
1267    sized_buf *keylist = &key;
1268    couchfile_lookup_request rq;
1269    couchstore_error_t errcode;
1270    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1271    if (db->header.local_docs_root == NULL) {
1272        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
1273    }
1274
1275    key.buf = (char *) id;
1276    key.size = idlen;
1277
1278    rq.cmp.compare = ebin_cmp;
1279    rq.file = &db->file;
1280    rq.num_keys = 1;
1281    rq.keys = &keylist;
1282    rq.callback_ctx = pDoc;
1283    rq.fetch_callback = local_doc_fetch;
1284    rq.node_callback = NULL;
1285    rq.fold = 0;
1286
1287    errcode = btree_lookup(&rq, db->header.local_docs_root->pointer);
1288    if (errcode == COUCHSTORE_SUCCESS) {
1289        if (*pDoc == NULL) {
1290            errcode = COUCHSTORE_ERROR_DOC_NOT_FOUND;
1291        }
1292    }
1293cleanup:
1294    return errcode;
1295}
1296
1297LIBCOUCHSTORE_API
1298couchstore_error_t couchstore_save_local_document(Db *db, LocalDoc *lDoc)
1299{
1300    couchstore_error_t errcode;
1301    couchfile_modify_action ldupdate;
1302    node_pointer *nroot = NULL;
1303    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1304
1305    if (lDoc->deleted) {
1306        ldupdate.type = ACTION_REMOVE;
1307    } else {
1308        ldupdate.type = ACTION_INSERT;
1309    }
1310
1311    ldupdate.key = &lDoc->id;
1312    ldupdate.value.data = &lDoc->json;
1313
1314    couchfile_modify_request rq;
1315    rq.cmp.compare = ebin_cmp;
1316    rq.num_actions = 1;
1317    rq.actions = &ldupdate;
1318    rq.fetch_callback = NULL;
1319    rq.reduce = NULL;
1320    rq.rereduce = NULL;
1321    rq.file = &db->file;
1322    rq.enable_purging = false;
1323    rq.purge_kp = NULL;
1324    rq.purge_kv = NULL;
1325    rq.compacting = 0;
1326    rq.kv_chunk_threshold = db->file.options.kv_nodesize;
1327    rq.kp_chunk_threshold = db->file.options.kp_nodesize;
1328
1329    nroot = modify_btree(&rq, db->header.local_docs_root, &errcode);
1330    if (errcode == COUCHSTORE_SUCCESS && nroot != db->header.local_docs_root) {
1331        cb_free(db->header.local_docs_root);
1332        db->header.local_docs_root = nroot;
1333    }
1334
1335cleanup:
1336    return errcode;
1337}
1338
1339LIBCOUCHSTORE_API
1340void couchstore_free_local_document(LocalDoc *lDoc)
1341{
1342    if (lDoc) {
1343        char *offset = (char *) (&((fatbuf *) NULL)->buf);
1344        fatbuf_free((fatbuf *) ((char *)lDoc - (char *)offset));
1345    }
1346}
1347
1348LIBCOUCHSTORE_API
1349couchstore_error_t couchstore_last_os_error(const Db *db,
1350                                            char* buf,
1351                                            size_t size) {
1352    if (db == NULL || buf == nullptr || size == 0) {
1353        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
1354    }
1355    const couchstore_error_info_t *err = &db->file.lastError;
1356
1357    int nw;
1358
1359#ifdef WIN32
1360    char* win_msg = NULL;
1361    FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER |
1362                   FORMAT_MESSAGE_FROM_SYSTEM |
1363                   FORMAT_MESSAGE_IGNORE_INSERTS,
1364                   NULL, err->error,
1365                   MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
1366                   (LPTSTR) &win_msg,
1367                   0, NULL);
1368    nw = _snprintf(buf, size, "WINAPI error = %d: '%s'", err->error, win_msg);
1369    LocalFree(win_msg);
1370#else
1371    nw = snprintf(buf, size, "errno = %d: '%s'",
1372                      err->error, strerror(err->error));
1373#endif
1374
1375    if (nw < 0) {
1376        return COUCHSTORE_ERROR_ALLOC_FAIL;
1377    } if (size_t(nw) >= size) {
1378        /* Truncate the error message */
1379        buf[size - 1] = '\0';
1380    }
1381
1382    return COUCHSTORE_SUCCESS;
1383}
1384
1385static couchstore_error_t btree_eval_seq_reduce(Db *db,
1386                                                uint64_t *accum,
1387                                                sized_buf *left,
1388                                                sized_buf *right,
1389                                                bool past_left_edge,
1390                                                uint64_t diskpos) {
1391    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
1392    int bufpos = 1, nodebuflen = 0;
1393    int node_type;
1394    char *nodebuf = NULL;
1395    nodebuflen = pread_compressed(&db->file, diskpos, &nodebuf);
1396    error_unless(nodebuflen >= 0, (static_cast<couchstore_error_t>(nodebuflen)));  // if negative, it's an error code
1397
1398    node_type = nodebuf[0];
1399    while(bufpos < nodebuflen) {
1400        sized_buf k, v;
1401        bufpos += read_kv(nodebuf + bufpos, &k, &v);
1402        int left_cmp = seq_cmp(&k, left);
1403        int right_cmp = seq_cmp(&k, right);
1404        if(left_cmp < 0) {
1405            continue;
1406        }
1407        if(node_type == KP_NODE) {
1408            // In-range Item in a KP Node
1409            const raw_node_pointer *raw = (const raw_node_pointer*)v.buf;
1410            const raw_by_seq_reduce *rawreduce = (const raw_by_seq_reduce*) (v.buf + sizeof(raw_node_pointer));
1411            uint64_t subcount = decode_raw40(rawreduce->count);
1412            uint64_t pointer = decode_raw48(raw->pointer);
1413            if((left_cmp >= 0 && !past_left_edge) || right_cmp >= 0) {
1414                error_pass(btree_eval_seq_reduce(db, accum, left, right, past_left_edge, pointer));
1415                if(right_cmp >= 0) {
1416                    break;
1417                } else {
1418                    past_left_edge = true;
1419                }
1420            } else {
1421                *accum += subcount;
1422            }
1423        } else {
1424            if(right_cmp > 0) {
1425                break;
1426            }
1427            // In-range Item in a KV Node
1428            *accum += 1;
1429        }
1430    }
1431cleanup:
1432    if (nodebuf) {
1433        cb_free(nodebuf);
1434    }
1435    return errcode;
1436}
1437
1438LIBCOUCHSTORE_API
1439couchstore_error_t couchstore_changes_count(Db* db,
1440                                            uint64_t min_seq,
1441                                            uint64_t max_seq,
1442                                            uint64_t *count) {
1443    COLLECT_LATENCY();
1444
1445    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
1446    raw_48 leftkr, rightkr;
1447    sized_buf leftk, rightk;
1448    leftk.buf = (char*) &leftkr;
1449    rightk.buf = (char*) &rightkr;
1450    leftk.size = 6;
1451    rightk.size = 6;
1452    encode_raw48(min_seq, &leftkr);
1453    encode_raw48(max_seq, &rightkr);
1454
1455    *count = 0;
1456    if(db->header.by_seq_root) {
1457        error_pass(btree_eval_seq_reduce(db, count, &leftk, &rightk, false,
1458                                         db->header.by_seq_root->pointer));
1459    }
1460cleanup:
1461    return errcode;
1462}
1463