xref: /6.0.3/couchstore/src/couch_db.cc (revision c2c458ff)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "config.h"
3
4#include "bitfield.h"
5#include "couch_btree.h"
6#include "couch_latency_internal.h"
7#include "internal.h"
8#include "node_types.h"
9#include "reduces.h"
10#include "util.h"
11
12#include <cstddef>
13#include <assert.h>
14#include <fcntl.h>
15#include <phosphor/phosphor.h>
16#include <platform/cb_malloc.h>
17#include <stdio.h>
18#include <stdlib.h>
19#include <string.h>
20
21
22#define ROOT_BASE_SIZE 12
23#define HEADER_BASE_SIZE 25
24
25#if __APPLE__
26/*
27 * Apple's clang disables thread_local keyword support
28 */
29__thread char internal_error_string[MAX_ERR_STR_LEN];
30#else
31thread_local char internal_error_string[MAX_ERR_STR_LEN];
32#endif
33
34// Initializes one of the db's root node pointers from data in the file header
35static couchstore_error_t read_db_root(Db *db, node_pointer **root,
36                                       void *root_data, int root_size)
37{
38    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
39    if (root_size > 0) {
40        error_unless(root_size >= ROOT_BASE_SIZE, COUCHSTORE_ERROR_CORRUPT);
41        *root = read_root(root_data, root_size);
42        error_unless(*root, COUCHSTORE_ERROR_ALLOC_FAIL);
43        error_unless((*root)->pointer < db->header.position, COUCHSTORE_ERROR_CORRUPT);
44    } else {
45        *root = NULL;
46    }
47cleanup:
48    return errcode;
49}
50
51// Attempts to initialize the database from a header at the given file position
52static couchstore_error_t find_header_at_pos(Db *db, cs_off_t pos)
53{
54    int seqrootsize;
55    int idrootsize;
56    int localrootsize;
57    char *root_data;
58    int header_len;
59    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
60    union {
61        raw_file_header *raw;
62        char *buf;
63    } header_buf = { NULL };
64    uint8_t buf[2];
65    ssize_t readsize;
66    {
67        // Speculative read looking for header, mark as Empty.
68        ScopedFileTag tag(db->file.ops, db->file.handle, FileTag::Empty);
69        readsize = db->file.ops->pread(
70                &db->file.lastError, db->file.handle, buf, 2, pos);
71    }
72    error_unless(readsize == 2, COUCHSTORE_ERROR_READ);
73    if (buf[0] == 0) {
74        return COUCHSTORE_ERROR_NO_HEADER;
75    } else if (buf[0] != 1) {
76        return COUCHSTORE_ERROR_CORRUPT;
77    }
78
79    header_len = pread_header(&db->file, pos, &header_buf.buf, MAX_DB_HEADER_SIZE);
80    if (header_len < 0) {
81        error_pass(static_cast<couchstore_error_t>(header_len));
82    }
83
84    db->header.position = pos;
85    db->header.disk_version = decode_raw08(header_buf.raw->version);
86
87    // Only 12 and 11 are valid
88    error_unless(db->header.disk_version == COUCH_DISK_VERSION ||
89                 db->header.disk_version == COUCH_DISK_VERSION_11,
90                 COUCHSTORE_ERROR_HEADER_VERSION);
91    db->header.update_seq = decode_raw48(header_buf.raw->update_seq);
92    db->header.purge_seq = decode_raw48(header_buf.raw->purge_seq);
93    db->header.purge_ptr = decode_raw48(header_buf.raw->purge_ptr);
94    error_unless(db->header.purge_ptr <= db->header.position, COUCHSTORE_ERROR_CORRUPT);
95    seqrootsize = decode_raw16(header_buf.raw->seqrootsize);
96    idrootsize = decode_raw16(header_buf.raw->idrootsize);
97    localrootsize = decode_raw16(header_buf.raw->localrootsize);
98    error_unless(header_len == HEADER_BASE_SIZE + seqrootsize + idrootsize + localrootsize,
99                 COUCHSTORE_ERROR_CORRUPT);
100
101    root_data = (char*) (header_buf.raw + 1);  // i.e. just past *header_buf
102    error_pass(read_db_root(db, &db->header.by_seq_root, root_data, seqrootsize));
103    root_data += seqrootsize;
104    error_pass(read_db_root(db, &db->header.by_id_root, root_data, idrootsize));
105    root_data += idrootsize;
106    error_pass(read_db_root(db, &db->header.local_docs_root, root_data, localrootsize));
107
108cleanup:
109    cb_free(header_buf.raw);
110    return errcode;
111}
112
113// Finds the database header by scanning back from the end of the file at 4k boundaries
114static couchstore_error_t find_header(Db *db, int64_t start_pos)
115{
116    couchstore_error_t last_header_errcode = COUCHSTORE_ERROR_NO_HEADER;
117    int64_t pos = start_pos;
118    pos -= pos % COUCH_BLOCK_SIZE;
119    for (; pos >= 0; pos -= COUCH_BLOCK_SIZE) {
120        couchstore_error_t errcode = find_header_at_pos(db, pos);
121        switch(errcode) {
122            case COUCHSTORE_SUCCESS:
123                // Found it!
124                return COUCHSTORE_SUCCESS;
125            case COUCHSTORE_ERROR_NO_HEADER:
126                // No header here, so keep going
127                break;
128            case COUCHSTORE_ERROR_ALLOC_FAIL:
129                // Fatal error
130                return errcode;
131            default:
132                // Invalid header; continue, but remember the last error
133                last_header_errcode = errcode;
134                break;
135        }
136    }
137    return last_header_errcode;
138}
139
140/**
141 * Calculates how large in bytes the current header will be
142 * when written to disk.
143 *
144 * The seqrootsize, idrootsize and localrootsize params are
145 * used to return the respective sizes in this header if
146 * needed.
147 */
148size_t calculate_header_size(Db *db, size_t& seqrootsize,
149                             size_t& idrootsize, size_t& localrootsize)
150{
151    seqrootsize = idrootsize = localrootsize = 0;
152
153    if (db->header.by_seq_root) {
154        seqrootsize = ROOT_BASE_SIZE + db->header.by_seq_root->reduce_value.size;
155    }
156    if (db->header.by_id_root) {
157        idrootsize = ROOT_BASE_SIZE + db->header.by_id_root->reduce_value.size;
158    }
159    if (db->header.local_docs_root) {
160        localrootsize = ROOT_BASE_SIZE + db->header.local_docs_root->reduce_value.size;
161    }
162    return sizeof(raw_file_header) + seqrootsize + idrootsize + localrootsize;
163}
164
165couchstore_error_t db_write_header(Db *db)
166{
167    sized_buf writebuf;
168    size_t seqrootsize, idrootsize, localrootsize;
169    writebuf.size = calculate_header_size(db, seqrootsize,
170                                          idrootsize, localrootsize);
171    writebuf.buf = (char *) cb_malloc(writebuf.size);
172    raw_file_header* header = (raw_file_header*)writebuf.buf;
173    header->version = encode_raw08(db->header.disk_version);
174    encode_raw48(db->header.update_seq, &header->update_seq);
175    encode_raw48(db->header.purge_seq, &header->purge_seq);
176    encode_raw48(db->header.purge_ptr, &header->purge_ptr);
177    header->seqrootsize = encode_raw16((uint16_t)seqrootsize);
178    header->idrootsize = encode_raw16((uint16_t)idrootsize);
179    header->localrootsize = encode_raw16((uint16_t)localrootsize);
180    uint8_t *root = (uint8_t*)(header + 1);
181    encode_root(root, db->header.by_seq_root);
182    root += seqrootsize;
183    encode_root(root, db->header.by_id_root);
184    root += idrootsize;
185    encode_root(root, db->header.local_docs_root);
186    cs_off_t pos;
187    couchstore_error_t errcode = write_header(&db->file, &writebuf, &pos);
188    if (errcode == COUCHSTORE_SUCCESS) {
189        db->header.position = pos;
190    }
191    cb_free(writebuf.buf);
192    return errcode;
193}
194
195static couchstore_error_t create_header(Db *db)
196{
197    // Select the version based upon selected CRC
198    if (db->file.crc_mode == CRC32) {
199        // user is creating down-level files
200        db->header.disk_version = COUCH_DISK_VERSION_11;
201    } else {
202        // user is using latest
203        db->header.disk_version = COUCH_DISK_VERSION;
204    }
205    db->header.update_seq = 0;
206    db->header.by_id_root = NULL;
207    db->header.by_seq_root = NULL;
208    db->header.local_docs_root = NULL;
209    db->header.purge_seq = 0;
210    db->header.purge_ptr = 0;
211    db->header.position = 0;
212    return db_write_header(db);
213}
214
215LIBCOUCHSTORE_API
216uint64_t couchstore_get_header_position(Db *db)
217{
218    return db->header.position;
219}
220
221/**
222 * Precommit should occur before writing a header, it has two
223 * purposes. Firstly it ensures data is written before we attempt
224 * to write the header. This means it's impossible for the header
225 * to be written before the data. This is accomplished through
226 * a sync.
227 *
228 * The second purpose is to extend the file to be large enough
229 * to include the subsequently written header. This is done so
230 * the fdatasync performed by writing a header doesn't have to
231 * do an additional (expensive) modified metadata flush on top
232 * of the one we're already doing.
233 */
234couchstore_error_t precommit(Db *db)
235{
236    cs_off_t curpos = db->file.pos;
237
238    db->file.pos = align_to_next_block(db->file.pos);
239    sized_buf zerobyte = { const_cast<char*>("\0"), 1};
240
241    size_t seqrootsize, idrootsize, localrootsize;
242    db->file.pos += calculate_header_size(db, seqrootsize,
243                                          idrootsize, localrootsize);
244
245    //Extend file size to where end of header will land before we do first sync
246    couchstore_error_t errcode = static_cast<couchstore_error_t>(
247        db_write_buf(&db->file, &zerobyte, NULL, NULL));
248
249    if (errcode == COUCHSTORE_SUCCESS) {
250        errcode = db->file.ops->sync(&db->file.lastError, db->file.handle);
251    }
252    // Move cursor back to where it was
253    db->file.pos = curpos;
254    return errcode;
255}
256
257LIBCOUCHSTORE_API
258couchstore_error_t couchstore_commit(Db *db)
259{
260    COLLECT_LATENCY();
261
262    couchstore_error_t errcode = precommit(db);
263
264    if (errcode == COUCHSTORE_SUCCESS) {
265        errcode = db_write_header(db);
266    }
267
268    if (errcode == COUCHSTORE_SUCCESS) {
269        errcode = db->file.ops->sync(&db->file.lastError, db->file.handle);
270    }
271
272    return errcode;
273}
274
275static tree_file_options get_tree_file_options_from_flags(couchstore_open_flags flags)
276{
277    tree_file_options options;
278
279    if (flags & COUCHSTORE_OPEN_FLAG_UNBUFFERED) {
280        options.buf_io_enabled = false;
281    } else if (flags & COUCHSTORE_OPEN_WITH_CUSTOM_BUFFER) {
282        // Buffered IO with custom buffer settings.
283        //  * First 4 bits [15:12]: read buffer capacity
284        //  * Next  4 bits [11:08]: max read buffer count
285
286        uint32_t unit_index = (flags >> 12) & 0xf;
287        if (unit_index) {
288            // unit_index    1     2     3     4     ...   15
289            // unit size     1KB   2KB   4KB   8KB   ...   16MB
290            options.buf_io_read_unit_size = 1024 * (1 << (unit_index -1));
291        }
292        uint32_t count_index = (flags >> 8) & 0xf;
293        if (count_index) {
294            // count_index   1     2     3     4     ...   15
295            // # buffers     8     16    32    64    ...   128K
296            options.buf_io_read_buffers = 8 * (1 << (count_index-1));
297        }
298    }
299
300    // Set default value first.
301    options.kp_nodesize = DB_KP_CHUNK_THRESHOLD;
302    options.kv_nodesize = DB_KV_CHUNK_THRESHOLD;
303    if (flags & COUCHSTORE_OPEN_WITH_CUSTOM_NODESIZE) {
304        // B+tree custom node size settings.
305        //  * First 4 bits [23:20]: KP node size
306        //  * Next  4 bits [19:16]: KV node size
307        uint32_t kp_flag = (flags >> 20) & 0xf;
308        if (kp_flag) {
309            options.kp_nodesize = kp_flag * 1024;
310        }
311        uint32_t kv_flag = (flags >> 16) & 0xf;
312        if (kv_flag) {
313            options.kv_nodesize = kv_flag * 1024;
314        }
315    }
316
317    if (flags & COUCHSTORE_OPEN_WITH_PERIODIC_SYNC) {
318        // Automatic sync() every N bytes written.
319        //  * 5 bits [28-24]: power-of-2 * 1kB
320        uint64_t sync_flag = (flags >> 24) & 0x1f;
321        options.periodic_sync_bytes = uint64_t(1024) << (sync_flag - 1);
322    }
323
324    /* set the tracing and validation options */
325    options.tracing_enabled = false;
326    options.write_validation_enabled = false;
327    options.mprotect_enabled = false;
328    if (flags & COUCHSTORE_OPEN_WITH_TRACING) {
329        options.tracing_enabled = true;
330    }
331    if (flags & COUCHSTORE_OPEN_WITH_WRITE_VALIDATION) {
332        options.write_validation_enabled = true;
333    }
334    if (flags & COUCHSTORE_OPEN_WITH_MPROTECT) {
335        options.mprotect_enabled = true;
336    }
337
338    return options;
339}
340
341LIBCOUCHSTORE_API
342couchstore_open_flags couchstore_encode_periodic_sync_flags(uint64_t bytes) {
343    // Convert to encoding supported by couchstore_open_flags - KB power-of-2
344    // value.
345    // Round up to whole kilobyte units.
346    const uint64_t kilobytes = (bytes + 1023) / 1024;
347    // Calculate the shift amount (what is the log2 power)
348    uint64_t shiftAmount = std::log2(kilobytes);
349    // Saturate if the user specified more than the encodable amount.
350    shiftAmount = std::min(shiftAmount, uint64_t(30));
351    // Finally, encode in couchstore_open flags
352    return ((shiftAmount + 1)) << 24;
353}
354
355LIBCOUCHSTORE_API
356couchstore_error_t couchstore_open_db(const char *filename,
357                                      couchstore_open_flags flags,
358                                      Db **pDb)
359{
360    return couchstore_open_db_ex(filename, flags,
361                                 couchstore_get_default_file_ops(), pDb);
362}
363
364LIBCOUCHSTORE_API
365couchstore_error_t couchstore_open_db_ex(const char *filename,
366                                         couchstore_open_flags flags,
367                                         FileOpsInterface* ops,
368                                         Db **pDb)
369{
370    COLLECT_LATENCY();
371
372    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
373    Db *db;
374    int openflags;
375    cs_off_t pos;
376
377    /* Sanity check input parameters */
378    if ((flags & COUCHSTORE_OPEN_FLAG_RDONLY) &&
379        (flags & COUCHSTORE_OPEN_FLAG_CREATE)) {
380        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
381    }
382
383    if ((db = static_cast<Db*>(cb_calloc(1, sizeof(Db)))) == NULL) {
384        return COUCHSTORE_ERROR_ALLOC_FAIL;
385    }
386
387    if (flags & COUCHSTORE_OPEN_FLAG_RDONLY) {
388        openflags = O_RDONLY;
389    } else {
390        openflags = O_RDWR;
391    }
392
393    if (flags & COUCHSTORE_OPEN_FLAG_CREATE) {
394        openflags |= O_CREAT;
395    }
396
397    // open with CRC unknown, CRC will be selected when header is read/or not found.
398    error_pass(tree_file_open(&db->file, filename, openflags, CRC_UNKNOWN, ops,
399                              get_tree_file_options_from_flags(flags)));
400
401    pos = db->file.ops->goto_eof(&db->file.lastError, db->file.handle);
402    db->file.pos = pos;
403    if (pos == 0) {
404        /* This is an empty file. Create a new fileheader unless the
405         * user wanted a read-only version of the file
406         */
407
408        if (flags & COUCHSTORE_OPEN_FLAG_RDONLY) {
409            error_pass(COUCHSTORE_ERROR_NO_HEADER);
410        } else {
411
412            // Select the CRC to use on this new file
413            if (flags & COUCHSTORE_OPEN_WITH_LEGACY_CRC) {
414                db->file.crc_mode = CRC32;
415            } else {
416                db->file.crc_mode = CRC32C;
417            }
418
419            error_pass(create_header(db));
420        }
421    } else if (pos > 0) {
422        error_pass(find_header(db, db->file.pos - 2));
423
424        if (db->header.disk_version <= COUCH_DISK_VERSION_11) {
425            db->file.crc_mode = CRC32;
426        } else {
427            cb_assert(db->header.disk_version >= COUCH_DISK_VERSION_12);
428            db->file.crc_mode = CRC32C;
429        }
430
431        // Not allowed. Can't request legacy_crc but be opening non legacy CRC files.
432        if (db->file.crc_mode == CRC32C && flags & COUCHSTORE_OPEN_WITH_LEGACY_CRC) {
433            errcode = COUCHSTORE_ERROR_INVALID_ARGUMENTS;
434            goto cleanup;
435        }
436    } else {
437        error_pass(static_cast<couchstore_error_t>(db->file.pos));
438    }
439
440    *pDb = db;
441    db->dropped = 0;
442
443cleanup:
444    if (errcode != COUCHSTORE_SUCCESS) {
445        couchstore_close_file(db);
446        couchstore_free_db(db);
447    }
448
449    return errcode;
450}
451
452LIBCOUCHSTORE_API
453couchstore_error_t couchstore_close_file(Db* db)
454{
455    COLLECT_LATENCY();
456
457    if(db->dropped) {
458        return COUCHSTORE_SUCCESS;
459    }
460    couchstore_error_t error = tree_file_close(&db->file);
461    db->dropped = 1;
462    return error;
463}
464
465LIBCOUCHSTORE_API
466couchstore_error_t couchstore_rewind_db_header(Db *db)
467{
468    COLLECT_LATENCY();
469
470    couchstore_error_t errcode;
471    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
472    // free current header guts
473    cb_free(db->header.by_id_root);
474    cb_free(db->header.by_seq_root);
475    cb_free(db->header.local_docs_root);
476    db->header.by_id_root = NULL;
477    db->header.by_seq_root = NULL;
478    db->header.local_docs_root = NULL;
479
480    error_unless(db->header.position != 0, COUCHSTORE_ERROR_DB_NO_LONGER_VALID);
481    // find older header
482    error_pass(find_header(db, db->header.position - 2));
483
484cleanup:
485    // if we failed, free the handle and return an error
486    if(errcode != COUCHSTORE_SUCCESS) {
487        couchstore_close_file(db);
488        couchstore_free_db(db);
489        errcode = COUCHSTORE_ERROR_DB_NO_LONGER_VALID;
490    }
491    return errcode;
492}
493
494LIBCOUCHSTORE_API
495couchstore_error_t couchstore_free_db(Db* db)
496{
497    COLLECT_LATENCY();
498
499    if(!db) {
500        return COUCHSTORE_SUCCESS;
501    }
502
503    if(!db->dropped) {
504        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
505    }
506
507    cb_free(db->header.by_id_root);
508    cb_free(db->header.by_seq_root);
509    cb_free(db->header.local_docs_root);
510    db->header.by_id_root = NULL;
511    db->header.by_seq_root = NULL;
512    db->header.local_docs_root = NULL;
513
514    memset(db, 0xa5, sizeof(*db));
515    cb_free(db);
516
517    return COUCHSTORE_SUCCESS;
518}
519
520LIBCOUCHSTORE_API
521const char* couchstore_get_db_filename(Db *db) {
522    return db->file.path;
523}
524
525LIBCOUCHSTORE_API
526FileOpsInterface::FHStats* couchstore_get_db_filestats(Db* db) {
527    return db->file.ops->get_stats(db->file.handle);
528}
529
530DocInfo* couchstore_alloc_docinfo(const sized_buf *id, const sized_buf *rev_meta) {
531    size_t size = sizeof(DocInfo);
532    if (id) {
533        size += id->size;
534    }
535    if (rev_meta) {
536        size += rev_meta->size;
537    }
538    DocInfo* docInfo = static_cast<DocInfo*>(cb_malloc(size));
539    if (!docInfo) {
540        return NULL;
541    }
542    memset(docInfo, 0, sizeof(DocInfo));
543    char *extra = (char *)docInfo + sizeof(DocInfo);
544    if (id) {
545        memcpy(extra, id->buf, id->size);
546        docInfo->id.buf = extra;
547        docInfo->id.size = id->size;
548        extra += id->size;
549    }
550    if (rev_meta) {
551        memcpy(extra, rev_meta->buf, rev_meta->size);
552        docInfo->rev_meta.buf = extra;
553        docInfo->rev_meta.size = rev_meta->size;
554    }
555    return docInfo;
556}
557
558LIBCOUCHSTORE_API
559void couchstore_free_docinfo(DocInfo *docinfo)
560{
561    cb_free(docinfo);
562}
563
564LIBCOUCHSTORE_API
565void couchstore_free_document(Doc *doc)
566{
567    if (doc) {
568        size_t offset = offsetof(fatbuf, buf);
569        fatbuf_free((fatbuf *) ((char *)doc - (char *)offset));
570    }
571}
572
573couchstore_error_t by_seq_read_docinfo(DocInfo **pInfo,
574                                       const sized_buf *k,
575                                       const sized_buf *v)
576{
577    const raw_seq_index_value *raw = (const raw_seq_index_value*)v->buf;
578    ssize_t extraSize = v->size - sizeof(*raw);
579    if (extraSize < 0) {
580        return COUCHSTORE_ERROR_CORRUPT;
581    }
582
583    uint32_t idsize, datasize;
584    decode_kv_length(&raw->sizes, &idsize, &datasize);
585    uint64_t bp = decode_raw48(raw->bp);
586    int deleted = (bp & BP_DELETED_FLAG) != 0;
587    bp &= ~BP_DELETED_FLAG;
588    uint8_t content_meta = decode_raw08(raw->content_meta);
589    uint64_t rev_seq = decode_raw48(raw->rev_seq);
590    uint64_t db_seq = decode_sequence_key(k);
591
592    sized_buf id = {v->buf + sizeof(*raw), idsize};
593    sized_buf rev_meta = {id.buf + idsize, extraSize - id.size};
594    DocInfo* docInfo = couchstore_alloc_docinfo(&id, &rev_meta);
595    if (!docInfo) {
596        return COUCHSTORE_ERROR_ALLOC_FAIL;
597    }
598
599    docInfo->db_seq = db_seq;
600    docInfo->rev_seq = rev_seq;
601    docInfo->deleted = deleted;
602    docInfo->bp = bp;
603    docInfo->size = datasize;
604    docInfo->content_meta = content_meta;
605    *pInfo = docInfo;
606    return COUCHSTORE_SUCCESS;
607}
608
609static couchstore_error_t by_id_read_docinfo(DocInfo **pInfo,
610                                             const sized_buf *k,
611                                             const sized_buf *v)
612{
613    const raw_id_index_value *raw = (const raw_id_index_value*)v->buf;
614    ssize_t revMetaSize = v->size - sizeof(*raw);
615    if (revMetaSize < 0) {
616        return COUCHSTORE_ERROR_CORRUPT;
617    }
618
619    uint32_t datasize, deleted;
620    uint8_t content_meta;
621    uint64_t bp, seq, revnum;
622
623    seq = decode_raw48(raw->db_seq);
624    datasize = decode_raw32(raw->size);
625    bp = decode_raw48(raw->bp);
626    deleted = (bp & BP_DELETED_FLAG) != 0;
627    bp &= ~BP_DELETED_FLAG;
628    content_meta = decode_raw08(raw->content_meta);
629    revnum = decode_raw48(raw->rev_seq);
630
631    sized_buf rev_meta = {v->buf + sizeof(*raw), static_cast<size_t>(revMetaSize)};
632    DocInfo* docInfo = couchstore_alloc_docinfo(k, &rev_meta);
633    if (!docInfo) {
634        return COUCHSTORE_ERROR_ALLOC_FAIL;
635    }
636
637    docInfo->db_seq = seq;
638    docInfo->rev_seq = revnum;
639    docInfo->deleted = deleted;
640    docInfo->bp = bp;
641    docInfo->size = datasize;
642    docInfo->content_meta = content_meta;
643    *pInfo = docInfo;
644    return COUCHSTORE_SUCCESS;
645}
646
647//Fill in doc from reading file.
648static couchstore_error_t bp_to_doc(Doc **pDoc, Db *db, cs_off_t bp, couchstore_open_options options)
649{
650    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
651    int bodylen = 0;
652    char *docbody = NULL;
653    fatbuf *docbuf = NULL;
654    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
655
656    if (options & DECOMPRESS_DOC_BODIES) {
657        bodylen = pread_compressed(&db->file, bp, &docbody);
658    } else {
659        bodylen = pread_bin(&db->file, bp, &docbody);
660    }
661
662    error_unless(bodylen >= 0, static_cast<couchstore_error_t>(bodylen));    // if bodylen is negative it's an error code
663    error_unless(docbody || bodylen == 0, COUCHSTORE_ERROR_READ);
664
665    error_unless(docbuf = fatbuf_alloc(sizeof(Doc) + bodylen), COUCHSTORE_ERROR_ALLOC_FAIL);
666    *pDoc = (Doc *) fatbuf_get(docbuf, sizeof(Doc));
667
668    if (bodylen == 0) { //Empty doc
669        (*pDoc)->data.buf = NULL;
670        (*pDoc)->data.size = 0;
671        cb_free(docbody);
672        return COUCHSTORE_SUCCESS;
673    }
674
675    (*pDoc)->data.buf = (char *) fatbuf_get(docbuf, bodylen);
676    (*pDoc)->data.size = bodylen;
677    memcpy((*pDoc)->data.buf, docbody, bodylen);
678
679cleanup:
680    cb_free(docbody);
681    if (errcode < 0) {
682        fatbuf_free(docbuf);
683    }
684    return errcode;
685}
686
687static couchstore_error_t docinfo_fetch_by_id(couchfile_lookup_request *rq,
688                                              const sized_buf *k,
689                                              const sized_buf *v)
690{
691    DocInfo **pInfo = (DocInfo **) rq->callback_ctx;
692    if (v == NULL) {
693        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
694    }
695    return by_id_read_docinfo(pInfo, k, v);
696}
697
698static couchstore_error_t docinfo_fetch_by_seq(couchfile_lookup_request *rq,
699                                               const sized_buf *k,
700                                               const sized_buf *v)
701{
702    DocInfo **pInfo = (DocInfo **) rq->callback_ctx;
703    if (v == NULL) {
704        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
705    }
706    return by_seq_read_docinfo(pInfo, k, v);
707}
708
709LIBCOUCHSTORE_API
710couchstore_error_t couchstore_docinfo_by_id(Db *db,
711                                            const void *id,
712                                            size_t idlen,
713                                            DocInfo **pInfo)
714{
715    COLLECT_LATENCY();
716
717    sized_buf key;
718    sized_buf *keylist = &key;
719    couchfile_lookup_request rq;
720    couchstore_error_t errcode;
721    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
722
723    if (db->header.by_id_root == NULL) {
724        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
725    }
726
727    key.buf = (char *) id;
728    key.size = idlen;
729
730    rq.cmp.compare = ebin_cmp;
731    rq.file = &db->file;
732    rq.num_keys = 1;
733    rq.keys = &keylist;
734    rq.callback_ctx = pInfo;
735    rq.fetch_callback = docinfo_fetch_by_id;
736    rq.node_callback = NULL;
737    rq.fold = 0;
738
739    errcode = btree_lookup(&rq, db->header.by_id_root->pointer);
740    if (errcode == COUCHSTORE_SUCCESS) {
741        if (*pInfo == NULL) {
742            errcode = COUCHSTORE_ERROR_DOC_NOT_FOUND;
743        }
744    }
745cleanup:
746    return errcode;
747}
748
749LIBCOUCHSTORE_API
750couchstore_error_t couchstore_docinfo_by_sequence(Db *db,
751                                                  uint64_t sequence,
752                                                  DocInfo **pInfo)
753{
754    COLLECT_LATENCY();
755
756    sized_buf key;
757    sized_buf *keylist = &key;
758    couchfile_lookup_request rq;
759    couchstore_error_t errcode;
760    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
761
762    if (db->header.by_id_root == NULL) {
763        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
764    }
765
766    sequence = htonll(sequence);
767    key.buf = (char *)&sequence + 2;
768    key.size = 6;
769
770    rq.cmp.compare = seq_cmp;
771    rq.file = &db->file;
772    rq.num_keys = 1;
773    rq.keys = &keylist;
774    rq.callback_ctx = pInfo;
775    rq.fetch_callback = docinfo_fetch_by_seq;
776    rq.node_callback = NULL;
777    rq.fold = 0;
778
779    errcode = btree_lookup(&rq, db->header.by_seq_root->pointer);
780    if (errcode == COUCHSTORE_SUCCESS) {
781        if (*pInfo == NULL) {
782            errcode = COUCHSTORE_ERROR_DOC_NOT_FOUND;
783        }
784    }
785cleanup:
786    return errcode;
787}
788
789LIBCOUCHSTORE_API
790couchstore_error_t couchstore_open_doc_with_docinfo(Db *db,
791                                                    const DocInfo *docinfo,
792                                                    Doc **pDoc,
793                                                    couchstore_open_options options)
794{
795    COLLECT_LATENCY();
796
797    couchstore_error_t errcode;
798
799    *pDoc = NULL;
800    if (docinfo->bp == 0) {
801        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
802    }
803
804    if (!(docinfo->content_meta & COUCH_DOC_IS_COMPRESSED)) {
805        options &= ~DECOMPRESS_DOC_BODIES;
806    }
807
808    errcode = bp_to_doc(pDoc, db, docinfo->bp, options);
809    if (errcode == COUCHSTORE_SUCCESS) {
810        (*pDoc)->id.buf = docinfo->id.buf;
811        (*pDoc)->id.size = docinfo->id.size;
812    }
813
814    return errcode;
815}
816
817LIBCOUCHSTORE_API
818couchstore_error_t couchstore_open_document(Db *db,
819                                            const void *id,
820                                            size_t idlen,
821                                            Doc **pDoc,
822                                            couchstore_open_options options)
823{
824    COLLECT_LATENCY();
825
826    couchstore_error_t errcode;
827    DocInfo *info;
828    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
829    *pDoc = NULL;
830    errcode = couchstore_docinfo_by_id(db, id, idlen, &info);
831    if (errcode == COUCHSTORE_SUCCESS) {
832        errcode = couchstore_open_doc_with_docinfo(db, info, pDoc, options);
833        if (errcode == COUCHSTORE_SUCCESS) {
834            (*pDoc)->id.buf = (char *) id;
835            (*pDoc)->id.size = idlen;
836        }
837
838        couchstore_free_docinfo(info);
839    }
840cleanup:
841    return errcode;
842}
843
844// context info passed to lookup_callback via btree_lookup
845typedef struct {
846    Db *db;
847    couchstore_docinfos_options options;
848    couchstore_changes_callback_fn callback;
849    void* callback_context;
850    int by_id;
851    int depth;
852    couchstore_walk_tree_callback_fn walk_callback;
853} lookup_context;
854
855// btree_lookup callback, called while iterating keys
856static couchstore_error_t lookup_callback(couchfile_lookup_request *rq,
857                                          const sized_buf *k,
858                                          const sized_buf *v)
859{
860    if (v == NULL) {
861        return COUCHSTORE_SUCCESS;
862    }
863
864    const lookup_context *context = static_cast<const lookup_context *>(rq->callback_ctx);
865    DocInfo *docinfo = NULL;
866    couchstore_error_t errcode;
867    if (context->by_id) {
868        errcode = by_id_read_docinfo(&docinfo, k, v);
869    } else {
870        errcode = by_seq_read_docinfo(&docinfo, k, v);
871    }
872    if (errcode == COUCHSTORE_ERROR_CORRUPT &&
873        (context->options & COUCHSTORE_TOLERATE_CORRUPTION)) {
874        // Invoke callback even if doc info is corrupted/unreadable, if magic flag is set
875        docinfo = static_cast<DocInfo*>(cb_calloc(sizeof(DocInfo), 1));
876        docinfo->id = *k;
877        docinfo->rev_meta = *v;
878    } else if (errcode) {
879        return errcode;
880    }
881
882    if ((context->options & COUCHSTORE_DELETES_ONLY) && docinfo->deleted == 0) {
883        couchstore_free_docinfo(docinfo);
884        return COUCHSTORE_SUCCESS;
885    }
886
887    if ((context->options & COUCHSTORE_NO_DELETES) && docinfo->deleted == 1) {
888        couchstore_free_docinfo(docinfo);
889        return COUCHSTORE_SUCCESS;
890    }
891
892    if (context->walk_callback) {
893        errcode = static_cast<couchstore_error_t>(context->walk_callback(context->db,
894                                                                         context->depth,
895                                                                         docinfo,
896                                                                         0,
897                                                                         NULL,
898                                                                         context->callback_context));
899    } else {
900        errcode = static_cast<couchstore_error_t>(context->callback(context->db,
901                                                                    docinfo,
902                                                                    context->callback_context));
903    }
904    if (errcode <= 0) {
905        couchstore_free_docinfo(docinfo);
906    } else {
907        // User requested docinfo not be freed, don't free it, return success
908        return COUCHSTORE_SUCCESS;
909    }
910    return errcode;
911}
912
913LIBCOUCHSTORE_API
914couchstore_error_t couchstore_changes_since(Db *db,
915                                            uint64_t since,
916                                            couchstore_docinfos_options options,
917                                            couchstore_changes_callback_fn callback,
918                                            void *ctx)
919{
920    COLLECT_LATENCY();
921
922    char since_termbuf[6];
923    sized_buf since_term;
924    sized_buf *keylist = &since_term;
925    lookup_context cbctx = {db, options, callback, ctx, 0, 0, NULL};
926    couchfile_lookup_request rq;
927    couchstore_error_t errcode;
928
929    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
930    if (db->header.by_seq_root == NULL) {
931        return COUCHSTORE_SUCCESS;
932    }
933
934    since_term.buf = since_termbuf;
935    since_term.size = 6;
936    encode_raw48(since, (raw_48*)since_term.buf);
937
938    rq.cmp.compare = seq_cmp;
939    rq.file = &db->file;
940    rq.num_keys = 1;
941    rq.keys = &keylist;
942    rq.callback_ctx = &cbctx;
943    rq.fetch_callback = lookup_callback;
944    rq.node_callback = NULL;
945    rq.fold = 1;
946    rq.tolerate_corruption = (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0;
947
948    errcode = btree_lookup(&rq, db->header.by_seq_root->pointer);
949cleanup:
950    return errcode;
951}
952
953LIBCOUCHSTORE_API
954couchstore_error_t couchstore_all_docs(Db *db,
955                                       const sized_buf* startKeyPtr,
956                                       couchstore_docinfos_options options,
957                                       couchstore_changes_callback_fn callback,
958                                       void *ctx)
959{
960    COLLECT_LATENCY();
961
962    sized_buf startKey = {NULL, 0};
963    sized_buf *keylist = &startKey;
964    lookup_context cbctx = {db, options, callback, ctx, 1, 0, NULL};
965    couchfile_lookup_request rq;
966    couchstore_error_t errcode;
967
968    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
969    if (db->header.by_id_root == NULL) {
970        return COUCHSTORE_SUCCESS;
971    }
972
973    if (startKeyPtr) {
974        startKey = *startKeyPtr;
975    }
976
977    rq.cmp.compare = ebin_cmp;
978    rq.file = &db->file;
979    rq.num_keys = 1;
980    rq.keys = &keylist;
981    rq.callback_ctx = &cbctx;
982    rq.fetch_callback = lookup_callback;
983    rq.node_callback = NULL;
984    rq.fold = 1;
985    rq.tolerate_corruption = (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0;
986
987    errcode = btree_lookup(&rq, db->header.by_id_root->pointer);
988cleanup:
989    return errcode;
990}
991
992static couchstore_error_t walk_node_callback(struct couchfile_lookup_request *rq,
993                                                 uint64_t subtreeSize,
994                                                 const sized_buf *reduceValue)
995{
996    lookup_context* context = static_cast<lookup_context*>(rq->callback_ctx);
997    if (reduceValue) {
998        int result = context->walk_callback(context->db,
999                                            context->depth,
1000                                            NULL,
1001                                            subtreeSize,
1002                                            reduceValue,
1003                                            context->callback_context);
1004        context->depth++;
1005        if (result < 0)
1006            return static_cast<couchstore_error_t>(result);
1007    } else {
1008        context->depth--;
1009    }
1010    return COUCHSTORE_SUCCESS;
1011}
1012
1013static
1014couchstore_error_t couchstore_walk_tree(Db *db,
1015                                        int by_id,
1016                                        const node_pointer* root,
1017                                        const sized_buf* startKeyPtr,
1018                                        couchstore_docinfos_options options,
1019                                        int (*compare)(const sized_buf *k1, const sized_buf *k2),
1020                                        couchstore_walk_tree_callback_fn callback,
1021                                        void *ctx)
1022{
1023    couchstore_error_t errcode;
1024    sized_buf startKey = {NULL, 0};
1025    sized_buf *keylist;
1026    couchfile_lookup_request rq;
1027
1028    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1029    if (root == NULL) {
1030        return COUCHSTORE_SUCCESS;
1031    }
1032
1033    // Invoke the callback on the root node:
1034    errcode = static_cast<couchstore_error_t>(callback(db, 0, NULL,
1035                                                       root->subtreesize,
1036                                                       &root->reduce_value,
1037                                                       ctx));
1038    if (errcode < 0) {
1039        return errcode;
1040    }
1041
1042    if (startKeyPtr) {
1043        startKey = *startKeyPtr;
1044    }
1045    keylist = &startKey;
1046
1047    {
1048        // Create a new scope here just to mute the warning from the
1049        // compiler that the goto in the macro error_unless
1050        // skips the initialization of lookup_ctx..
1051        lookup_context lookup_ctx = {db, options, NULL, ctx, by_id, 1, callback};
1052
1053        rq.cmp.compare = compare;
1054        rq.file = &db->file;
1055        rq.num_keys = 1;
1056        rq.keys = &keylist;
1057        rq.callback_ctx = &lookup_ctx;
1058        rq.fetch_callback = lookup_callback;
1059        rq.node_callback = walk_node_callback;
1060        rq.fold = 1;
1061        rq.tolerate_corruption = (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0;
1062
1063        error_pass(btree_lookup(&rq, root->pointer));
1064    }
1065cleanup:
1066    return errcode;
1067}
1068
1069LIBCOUCHSTORE_API
1070couchstore_error_t couchstore_walk_id_tree(Db *db,
1071                                           const sized_buf* startDocID,
1072                                           couchstore_docinfos_options options,
1073                                           couchstore_walk_tree_callback_fn callback,
1074                                           void *ctx)
1075{
1076    COLLECT_LATENCY();
1077
1078    return couchstore_walk_tree(db, 1, db->header.by_id_root, startDocID,
1079                                options, ebin_cmp, callback, ctx);
1080}
1081
1082LIBCOUCHSTORE_API
1083couchstore_error_t couchstore_walk_seq_tree(Db *db,
1084                                           uint64_t startSequence,
1085                                           couchstore_docinfos_options options,
1086                                           couchstore_walk_tree_callback_fn callback,
1087                                           void *ctx)
1088{
1089    COLLECT_LATENCY();
1090
1091    raw_48 start_termbuf;
1092    encode_raw48(startSequence, &start_termbuf);
1093    sized_buf start_term = {(char*)&start_termbuf, 6};
1094
1095    return couchstore_walk_tree(db, 0, db->header.by_seq_root, &start_term,
1096                                options, seq_cmp, callback, ctx);
1097}
1098
1099static int id_ptr_cmp(const void *a, const void *b)
1100{
1101    sized_buf **buf1 = (sized_buf**) a;
1102    sized_buf **buf2 = (sized_buf**) b;
1103    return ebin_cmp(*buf1, *buf2);
1104}
1105
1106static int seq_ptr_cmp(const void *a, const void *b)
1107{
1108    sized_buf **buf1 = (sized_buf**) a;
1109    sized_buf **buf2 = (sized_buf**) b;
1110    return seq_cmp(*buf1, *buf2);
1111}
1112
1113// Common subroutine of couchstore_docinfos_by_{ids, sequence}
1114static couchstore_error_t iterate_docinfos(Db *db,
1115                                           const sized_buf keys[],
1116                                           unsigned numDocs,
1117                                           node_pointer *tree,
1118                                           int (*key_ptr_compare)(const void *, const void *),
1119                                           int (*key_compare)(const sized_buf *k1, const sized_buf *k2),
1120                                           couchstore_changes_callback_fn callback,
1121                                           int fold,
1122                                           int tolerate_corruption,
1123                                           void *ctx)
1124{
1125    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
1126    const sized_buf **keyptrs = NULL;
1127    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1128    // Nothing to do if the tree is empty
1129    if (tree == NULL) {
1130        return COUCHSTORE_SUCCESS;
1131    }
1132
1133    if(numDocs <= 0) {
1134        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
1135    }
1136
1137    // Create an array of *pointers to* sized_bufs, which is what btree_lookup wants:
1138    keyptrs = static_cast<const sized_buf**>(cb_malloc(numDocs * sizeof(sized_buf*)));
1139    error_unless(keyptrs, COUCHSTORE_ERROR_ALLOC_FAIL);
1140
1141    {
1142        unsigned i;
1143        for (i = 0; i< numDocs; ++i) {
1144            keyptrs[i] = &keys[i];
1145        }
1146        if (!fold) {
1147            // Sort the key pointers:
1148            qsort(keyptrs, numDocs, sizeof(keyptrs[0]), key_ptr_compare);
1149        }
1150
1151        // Construct the lookup request:
1152        lookup_context cbctx = {db, 0, callback, ctx, (tree == db->header.by_id_root), 0, NULL};
1153        couchfile_lookup_request rq;
1154        rq.cmp.compare = key_compare;
1155        rq.file = &db->file;
1156        rq.num_keys = numDocs;
1157        rq.keys = (sized_buf**) keyptrs;
1158        rq.callback_ctx = &cbctx;
1159        rq.fetch_callback = lookup_callback;
1160        rq.node_callback = NULL;
1161        rq.fold = fold;
1162        rq.tolerate_corruption = tolerate_corruption;
1163
1164        // Go!
1165        error_pass(btree_lookup(&rq, tree->pointer));
1166    }
1167cleanup:
1168    cb_free(keyptrs);
1169    return errcode;
1170}
1171
1172LIBCOUCHSTORE_API
1173couchstore_error_t couchstore_docinfos_by_id(Db *db,
1174                                             const sized_buf ids[],
1175                                             unsigned numDocs,
1176                                             couchstore_docinfos_options options,
1177                                             couchstore_changes_callback_fn callback,
1178                                             void *ctx)
1179{
1180    COLLECT_LATENCY();
1181
1182    return iterate_docinfos(db, ids, numDocs,
1183                            db->header.by_id_root, id_ptr_cmp, ebin_cmp,
1184                            callback,
1185                            (options & RANGES) != 0,
1186                            (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0,
1187                            ctx);
1188}
1189
1190LIBCOUCHSTORE_API
1191couchstore_error_t couchstore_docinfos_by_sequence(Db *db,
1192                                                   const uint64_t sequence[],
1193                                                   unsigned numDocs,
1194                                                   couchstore_docinfos_options options,
1195                                                   couchstore_changes_callback_fn callback,
1196                                                   void *ctx)
1197{
1198    COLLECT_LATENCY();
1199
1200    // Create the array of keys:
1201    sized_buf *keylist = static_cast<sized_buf*>(cb_malloc(numDocs * sizeof(sized_buf)));
1202    raw_by_seq_key *keyvalues = static_cast<raw_by_seq_key*>(cb_malloc(numDocs * sizeof(raw_by_seq_key)));
1203    couchstore_error_t errcode;
1204    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1205    error_unless(keylist && keyvalues, COUCHSTORE_ERROR_ALLOC_FAIL);
1206    unsigned i;
1207    for (i = 0; i< numDocs; ++i) {
1208        encode_raw48(sequence[i], &keyvalues[i].sequence);
1209        keylist[i].buf = static_cast<char*>((void*) &keyvalues[i]);
1210        keylist[i].size = sizeof(keyvalues[i]);
1211    }
1212
1213    error_pass(iterate_docinfos(db, keylist, numDocs,
1214                                db->header.by_seq_root, seq_ptr_cmp, seq_cmp,
1215                                callback,
1216                                (options & RANGES) != 0,
1217                                (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0,
1218                                ctx));
1219cleanup:
1220    cb_free(keylist);
1221    cb_free(keyvalues);
1222    return errcode;
1223}
1224
1225LIBCOUCHSTORE_API
1226couchstore_error_t couchstore_db_info(Db *db, DbInfo* dbinfo) {
1227    if (db == NULL || dbinfo == NULL) {
1228        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
1229    }
1230    const node_pointer *id_root = db->header.by_id_root;
1231    const node_pointer *seq_root = db->header.by_seq_root;
1232    const node_pointer *local_root = db->header.local_docs_root;
1233    dbinfo->filename = db->file.path;
1234    dbinfo->header_position = db->header.position;
1235    dbinfo->last_sequence = db->header.update_seq;
1236    dbinfo->purge_seq = db->header.purge_seq;
1237    dbinfo->deleted_count = dbinfo->doc_count = dbinfo->space_used = 0;
1238    dbinfo->file_size = db->file.pos;
1239    if (id_root) {
1240        raw_by_id_reduce* id_reduce = (raw_by_id_reduce*) id_root->reduce_value.buf;
1241        dbinfo->doc_count = decode_raw40(id_reduce->notdeleted);
1242        dbinfo->deleted_count = decode_raw40(id_reduce->deleted);
1243        dbinfo->space_used = decode_raw48(id_reduce->size);
1244        dbinfo->space_used += id_root->subtreesize;
1245    }
1246    if(seq_root) {
1247        dbinfo->space_used += seq_root->subtreesize;
1248    }
1249    if(local_root) {
1250        dbinfo->space_used += local_root->subtreesize;
1251    }
1252    return COUCHSTORE_SUCCESS;
1253}
1254
1255static couchstore_error_t local_doc_fetch(couchfile_lookup_request *rq,
1256                                          const sized_buf *k,
1257                                          const sized_buf *v)
1258{
1259    LocalDoc **lDoc = (LocalDoc **) rq->callback_ctx;
1260    LocalDoc *dp;
1261
1262    if (!v) {
1263        *lDoc = NULL;
1264        return COUCHSTORE_SUCCESS;
1265    }
1266    fatbuf *ldbuf = fatbuf_alloc(sizeof(LocalDoc) + k->size + v->size);
1267    if (ldbuf == NULL) {
1268        return COUCHSTORE_ERROR_ALLOC_FAIL;
1269    }
1270
1271    dp = *lDoc = (LocalDoc *) fatbuf_get(ldbuf, sizeof(LocalDoc));
1272    dp->id.buf = (char *) fatbuf_get(ldbuf, k->size);
1273    dp->id.size = k->size;
1274
1275    dp->json.buf = (char *) fatbuf_get(ldbuf, v->size);
1276    dp->json.size = v->size;
1277
1278    dp->deleted = 0;
1279
1280    memcpy(dp->id.buf, k->buf, k->size);
1281    memcpy(dp->json.buf, v->buf, v->size);
1282
1283    return COUCHSTORE_SUCCESS;
1284}
1285
1286LIBCOUCHSTORE_API
1287couchstore_error_t couchstore_open_local_document(Db *db,
1288                                                  const void *id,
1289                                                  size_t idlen,
1290                                                  LocalDoc **pDoc)
1291{
1292    sized_buf key;
1293    sized_buf *keylist = &key;
1294    couchfile_lookup_request rq;
1295    couchstore_error_t errcode;
1296    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1297    if (db->header.local_docs_root == NULL) {
1298        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
1299    }
1300
1301    key.buf = (char *) id;
1302    key.size = idlen;
1303
1304    rq.cmp.compare = ebin_cmp;
1305    rq.file = &db->file;
1306    rq.num_keys = 1;
1307    rq.keys = &keylist;
1308    rq.callback_ctx = pDoc;
1309    rq.fetch_callback = local_doc_fetch;
1310    rq.node_callback = NULL;
1311    rq.fold = 0;
1312
1313    errcode = btree_lookup(&rq, db->header.local_docs_root->pointer);
1314    if (errcode == COUCHSTORE_SUCCESS) {
1315        if (*pDoc == NULL) {
1316            errcode = COUCHSTORE_ERROR_DOC_NOT_FOUND;
1317        }
1318    }
1319cleanup:
1320    return errcode;
1321}
1322
1323LIBCOUCHSTORE_API
1324couchstore_error_t couchstore_save_local_document(Db *db, LocalDoc *lDoc)
1325{
1326    couchstore_error_t errcode;
1327    couchfile_modify_action ldupdate;
1328    node_pointer *nroot = NULL;
1329    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1330
1331    if (lDoc->deleted) {
1332        ldupdate.type = ACTION_REMOVE;
1333    } else {
1334        ldupdate.type = ACTION_INSERT;
1335    }
1336
1337    ldupdate.key = &lDoc->id;
1338    ldupdate.value.data = &lDoc->json;
1339
1340    couchfile_modify_request rq;
1341    rq.cmp.compare = ebin_cmp;
1342    rq.num_actions = 1;
1343    rq.actions = &ldupdate;
1344    rq.fetch_callback = NULL;
1345    rq.reduce = NULL;
1346    rq.rereduce = NULL;
1347    rq.file = &db->file;
1348    rq.enable_purging = false;
1349    rq.purge_kp = NULL;
1350    rq.purge_kv = NULL;
1351    rq.compacting = 0;
1352    rq.kv_chunk_threshold = db->file.options.kv_nodesize;
1353    rq.kp_chunk_threshold = db->file.options.kp_nodesize;
1354
1355    nroot = modify_btree(&rq, db->header.local_docs_root, &errcode);
1356    if (errcode == COUCHSTORE_SUCCESS && nroot != db->header.local_docs_root) {
1357        cb_free(db->header.local_docs_root);
1358        db->header.local_docs_root = nroot;
1359    }
1360
1361cleanup:
1362    return errcode;
1363}
1364
1365LIBCOUCHSTORE_API
1366void couchstore_free_local_document(LocalDoc *lDoc)
1367{
1368    if (lDoc) {
1369        size_t offset = offsetof(fatbuf, buf);
1370        fatbuf_free((fatbuf *) ((char *)lDoc - (char *)offset));
1371    }
1372}
1373
1374LIBCOUCHSTORE_API
1375couchstore_error_t couchstore_last_os_error(const Db *db,
1376                                            char* buf,
1377                                            size_t size) {
1378    if (db == NULL || buf == nullptr || size == 0) {
1379        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
1380    }
1381    const couchstore_error_info_t *err = &db->file.lastError;
1382
1383    int nw;
1384
1385#ifdef WIN32
1386    char* win_msg = NULL;
1387    FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER |
1388                   FORMAT_MESSAGE_FROM_SYSTEM |
1389                   FORMAT_MESSAGE_IGNORE_INSERTS,
1390                   NULL, err->error,
1391                   MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
1392                   (LPTSTR) &win_msg,
1393                   0, NULL);
1394    nw = _snprintf(buf, size, "WINAPI error = %d: '%s'", err->error, win_msg);
1395    LocalFree(win_msg);
1396#else
1397    nw = snprintf(buf, size, "errno = %d: '%s'",
1398                      err->error, strerror(err->error));
1399#endif
1400
1401    if (nw < 0) {
1402        return COUCHSTORE_ERROR_ALLOC_FAIL;
1403    } if (size_t(nw) >= size) {
1404        /* Truncate the error message */
1405        buf[size - 1] = '\0';
1406    }
1407
1408    return COUCHSTORE_SUCCESS;
1409}
1410
1411LIBCOUCHSTORE_API
1412couchstore_error_t couchstore_last_internal_error(const Db *db,
1413                                                  char* buf,
1414                                                  size_t size) {
1415    if (db == NULL || buf == nullptr || size == 0) {
1416        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
1417    }
1418
1419    int nw;
1420
1421    nw = snprintf(buf, size, "'%s'", internal_error_string);
1422    if (nw < 0) {
1423        return COUCHSTORE_ERROR_ALLOC_FAIL;
1424    }
1425    return COUCHSTORE_SUCCESS;
1426}
1427
1428static couchstore_error_t btree_eval_seq_reduce(Db *db,
1429                                                uint64_t *accum,
1430                                                sized_buf *left,
1431                                                sized_buf *right,
1432                                                bool past_left_edge,
1433                                                uint64_t diskpos) {
1434    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
1435    int bufpos = 1, nodebuflen = 0;
1436    int node_type;
1437    char *nodebuf = NULL;
1438    nodebuflen = pread_compressed(&db->file, diskpos, &nodebuf);
1439    error_unless(nodebuflen >= 0, (static_cast<couchstore_error_t>(nodebuflen)));  // if negative, it's an error code
1440
1441    node_type = nodebuf[0];
1442    while(bufpos < nodebuflen) {
1443        sized_buf k, v;
1444        bufpos += read_kv(nodebuf + bufpos, &k, &v);
1445        int left_cmp = seq_cmp(&k, left);
1446        int right_cmp = seq_cmp(&k, right);
1447        if(left_cmp < 0) {
1448            continue;
1449        }
1450        if(node_type == KP_NODE) {
1451            // In-range Item in a KP Node
1452            const raw_node_pointer *raw = (const raw_node_pointer*)v.buf;
1453            const raw_by_seq_reduce *rawreduce = (const raw_by_seq_reduce*) (v.buf + sizeof(raw_node_pointer));
1454            uint64_t subcount = decode_raw40(rawreduce->count);
1455            uint64_t pointer = decode_raw48(raw->pointer);
1456            if((left_cmp >= 0 && !past_left_edge) || right_cmp >= 0) {
1457                error_pass(btree_eval_seq_reduce(db, accum, left, right, past_left_edge, pointer));
1458                if(right_cmp >= 0) {
1459                    break;
1460                } else {
1461                    past_left_edge = true;
1462                }
1463            } else {
1464                *accum += subcount;
1465            }
1466        } else {
1467            if(right_cmp > 0) {
1468                break;
1469            }
1470            // In-range Item in a KV Node
1471            *accum += 1;
1472        }
1473    }
1474cleanup:
1475    if (nodebuf) {
1476        cb_free(nodebuf);
1477    }
1478    return errcode;
1479}
1480
1481LIBCOUCHSTORE_API
1482couchstore_error_t couchstore_changes_count(Db* db,
1483                                            uint64_t min_seq,
1484                                            uint64_t max_seq,
1485                                            uint64_t *count) {
1486    COLLECT_LATENCY();
1487
1488    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
1489    raw_48 leftkr, rightkr;
1490    sized_buf leftk, rightk;
1491    leftk.buf = (char*) &leftkr;
1492    rightk.buf = (char*) &rightkr;
1493    leftk.size = 6;
1494    rightk.size = 6;
1495    encode_raw48(min_seq, &leftkr);
1496    encode_raw48(max_seq, &rightkr);
1497
1498    *count = 0;
1499    if(db->header.by_seq_root) {
1500        error_pass(btree_eval_seq_reduce(db, count, &leftk, &rightk, false,
1501                                         db->header.by_seq_root->pointer));
1502    }
1503cleanup:
1504    return errcode;
1505}
1506