xref: /5.5.2/couchstore/src/couch_db.cc (revision ea89a543)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "config.h"
3#include <fcntl.h>
4#include <platform/cb_malloc.h>
5#include <string.h>
6#include <stdlib.h>
7#include <assert.h>
8#include <stdio.h>
9
10#include "internal.h"
11#include "node_types.h"
12#include "couch_btree.h"
13#include "bitfield.h"
14#include "reduces.h"
15#include "util.h"
16
17#include "couch_latency_internal.h"
18
19#define ROOT_BASE_SIZE 12
20#define HEADER_BASE_SIZE 25
21
22// Initializes one of the db's root node pointers from data in the file header
23static couchstore_error_t read_db_root(Db *db, node_pointer **root,
24                                       void *root_data, int root_size)
25{
26    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
27    if (root_size > 0) {
28        error_unless(root_size >= ROOT_BASE_SIZE, COUCHSTORE_ERROR_CORRUPT);
29        *root = read_root(root_data, root_size);
30        error_unless(*root, COUCHSTORE_ERROR_ALLOC_FAIL);
31        error_unless((*root)->pointer < db->header.position, COUCHSTORE_ERROR_CORRUPT);
32    } else {
33        *root = NULL;
34    }
35cleanup:
36    return errcode;
37}
38
39// Attempts to initialize the database from a header at the given file position
40static couchstore_error_t find_header_at_pos(Db *db, cs_off_t pos)
41{
42    int seqrootsize;
43    int idrootsize;
44    int localrootsize;
45    char *root_data;
46    int header_len;
47    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
48    union {
49        raw_file_header *raw;
50        char *buf;
51    } header_buf = { NULL };
52    uint8_t buf[2];
53    ssize_t readsize;
54    {
55        // Speculative read looking for header, mark as Empty.
56        ScopedFileTag tag(db->file.ops, db->file.handle, FileTag::Empty);
57        readsize = db->file.ops->pread(
58                &db->file.lastError, db->file.handle, buf, 2, pos);
59    }
60    error_unless(readsize == 2, COUCHSTORE_ERROR_READ);
61    if (buf[0] == 0) {
62        return COUCHSTORE_ERROR_NO_HEADER;
63    } else if (buf[0] != 1) {
64        return COUCHSTORE_ERROR_CORRUPT;
65    }
66
67    header_len = pread_header(&db->file, pos, &header_buf.buf, MAX_DB_HEADER_SIZE);
68    if (header_len < 0) {
69        error_pass(static_cast<couchstore_error_t>(header_len));
70    }
71
72    db->header.position = pos;
73    db->header.disk_version = decode_raw08(header_buf.raw->version);
74
75    // Only 12 and 11 are valid
76    error_unless(db->header.disk_version == COUCH_DISK_VERSION ||
77                 db->header.disk_version == COUCH_DISK_VERSION_11,
78                 COUCHSTORE_ERROR_HEADER_VERSION);
79    db->header.update_seq = decode_raw48(header_buf.raw->update_seq);
80    db->header.purge_seq = decode_raw48(header_buf.raw->purge_seq);
81    db->header.purge_ptr = decode_raw48(header_buf.raw->purge_ptr);
82    error_unless(db->header.purge_ptr <= db->header.position, COUCHSTORE_ERROR_CORRUPT);
83    seqrootsize = decode_raw16(header_buf.raw->seqrootsize);
84    idrootsize = decode_raw16(header_buf.raw->idrootsize);
85    localrootsize = decode_raw16(header_buf.raw->localrootsize);
86    error_unless(header_len == HEADER_BASE_SIZE + seqrootsize + idrootsize + localrootsize,
87                 COUCHSTORE_ERROR_CORRUPT);
88
89    root_data = (char*) (header_buf.raw + 1);  // i.e. just past *header_buf
90    error_pass(read_db_root(db, &db->header.by_seq_root, root_data, seqrootsize));
91    root_data += seqrootsize;
92    error_pass(read_db_root(db, &db->header.by_id_root, root_data, idrootsize));
93    root_data += idrootsize;
94    error_pass(read_db_root(db, &db->header.local_docs_root, root_data, localrootsize));
95
96cleanup:
97    cb_free(header_buf.raw);
98    return errcode;
99}
100
101// Finds the database header by scanning back from the end of the file at 4k boundaries
102static couchstore_error_t find_header(Db *db, int64_t start_pos)
103{
104    couchstore_error_t last_header_errcode = COUCHSTORE_ERROR_NO_HEADER;
105    int64_t pos = start_pos;
106    pos -= pos % COUCH_BLOCK_SIZE;
107    for (; pos >= 0; pos -= COUCH_BLOCK_SIZE) {
108        couchstore_error_t errcode = find_header_at_pos(db, pos);
109        switch(errcode) {
110            case COUCHSTORE_SUCCESS:
111                // Found it!
112                return COUCHSTORE_SUCCESS;
113            case COUCHSTORE_ERROR_NO_HEADER:
114                // No header here, so keep going
115                break;
116            case COUCHSTORE_ERROR_ALLOC_FAIL:
117                // Fatal error
118                return errcode;
119            default:
120                // Invalid header; continue, but remember the last error
121                last_header_errcode = errcode;
122                break;
123        }
124    }
125    return last_header_errcode;
126}
127
128/**
129 * Calculates how large in bytes the current header will be
130 * when written to disk.
131 *
132 * The seqrootsize, idrootsize and localrootsize params are
133 * used to return the respective sizes in this header if
134 * needed.
135 */
136size_t calculate_header_size(Db *db, size_t& seqrootsize,
137                             size_t& idrootsize, size_t& localrootsize)
138{
139    seqrootsize = idrootsize = localrootsize = 0;
140
141    if (db->header.by_seq_root) {
142        seqrootsize = ROOT_BASE_SIZE + db->header.by_seq_root->reduce_value.size;
143    }
144    if (db->header.by_id_root) {
145        idrootsize = ROOT_BASE_SIZE + db->header.by_id_root->reduce_value.size;
146    }
147    if (db->header.local_docs_root) {
148        localrootsize = ROOT_BASE_SIZE + db->header.local_docs_root->reduce_value.size;
149    }
150    return sizeof(raw_file_header) + seqrootsize + idrootsize + localrootsize;
151}
152
153couchstore_error_t db_write_header(Db *db)
154{
155    sized_buf writebuf;
156    size_t seqrootsize, idrootsize, localrootsize;
157    writebuf.size = calculate_header_size(db, seqrootsize,
158                                          idrootsize, localrootsize);
159    writebuf.buf = (char *) cb_calloc(1, writebuf.size);
160    raw_file_header* header = (raw_file_header*)writebuf.buf;
161    header->version = encode_raw08(db->header.disk_version);
162    encode_raw48(db->header.update_seq, &header->update_seq);
163    encode_raw48(db->header.purge_seq, &header->purge_seq);
164    encode_raw48(db->header.purge_ptr, &header->purge_ptr);
165    header->seqrootsize = encode_raw16((uint16_t)seqrootsize);
166    header->idrootsize = encode_raw16((uint16_t)idrootsize);
167    header->localrootsize = encode_raw16((uint16_t)localrootsize);
168    uint8_t *root = (uint8_t*)(header + 1);
169    encode_root(root, db->header.by_seq_root);
170    root += seqrootsize;
171    encode_root(root, db->header.by_id_root);
172    root += idrootsize;
173    encode_root(root, db->header.local_docs_root);
174    cs_off_t pos;
175    couchstore_error_t errcode = write_header(&db->file, &writebuf, &pos);
176    if (errcode == COUCHSTORE_SUCCESS) {
177        db->header.position = pos;
178    }
179    cb_free(writebuf.buf);
180    return errcode;
181}
182
183static couchstore_error_t create_header(Db *db)
184{
185    // Select the version based upon selected CRC
186    if (db->file.crc_mode == CRC32) {
187        // user is creating down-level files
188        db->header.disk_version = COUCH_DISK_VERSION_11;
189    } else {
190        // user is using latest
191        db->header.disk_version = COUCH_DISK_VERSION;
192    }
193    db->header.update_seq = 0;
194    db->header.by_id_root = NULL;
195    db->header.by_seq_root = NULL;
196    db->header.local_docs_root = NULL;
197    db->header.purge_seq = 0;
198    db->header.purge_ptr = 0;
199    db->header.position = 0;
200    return db_write_header(db);
201}
202
203LIBCOUCHSTORE_API
204uint64_t couchstore_get_header_position(Db *db)
205{
206    return db->header.position;
207}
208
209/**
210 * Precommit should occur before writing a header, it has two
211 * purposes. Firstly it ensures data is written before we attempt
212 * to write the header. This means it's impossible for the header
213 * to be written before the data. This is accomplished through
214 * a sync.
215 *
216 * The second purpose is to extend the file to be large enough
217 * to include the subsequently written header. This is done so
218 * the fdatasync performed by writing a header doesn't have to
219 * do an additional (expensive) modified metadata flush on top
220 * of the one we're already doing.
221 */
222couchstore_error_t precommit(Db *db)
223{
224    cs_off_t curpos = db->file.pos;
225
226    db->file.pos = align_to_next_block(db->file.pos);
227    sized_buf zerobyte = { const_cast<char*>("\0"), 1};
228
229    size_t seqrootsize, idrootsize, localrootsize;
230    db->file.pos += calculate_header_size(db, seqrootsize,
231                                          idrootsize, localrootsize);
232
233    //Extend file size to where end of header will land before we do first sync
234    couchstore_error_t errcode = static_cast<couchstore_error_t>(
235        db_write_buf(&db->file, &zerobyte, NULL, NULL));
236
237    if (errcode == COUCHSTORE_SUCCESS) {
238        errcode = db->file.ops->sync(&db->file.lastError, db->file.handle);
239    }
240    // Move cursor back to where it was
241    db->file.pos = curpos;
242    return errcode;
243}
244
245LIBCOUCHSTORE_API
246couchstore_error_t couchstore_commit(Db *db)
247{
248    COLLECT_LATENCY();
249
250    couchstore_error_t errcode = precommit(db);
251
252    if (errcode == COUCHSTORE_SUCCESS) {
253        errcode = db_write_header(db);
254    }
255
256    if (errcode == COUCHSTORE_SUCCESS) {
257        errcode = db->file.ops->sync(&db->file.lastError, db->file.handle);
258    }
259
260    return errcode;
261}
262
263static tree_file_options get_tree_file_options_from_flags(couchstore_open_flags flags)
264{
265    tree_file_options options;
266
267    if (flags & COUCHSTORE_OPEN_FLAG_UNBUFFERED) {
268        options.buf_io_enabled = false;
269    } else if (flags & COUCHSTORE_OPEN_WITH_CUSTOM_BUFFER) {
270        // Buffered IO with custom buffer settings.
271        //  * First 4 bits [15:12]: read buffer capacity
272        //  * Next  4 bits [11:08]: max read buffer count
273
274        uint32_t unit_index = (flags >> 12) & 0xf;
275        if (unit_index) {
276            // unit_index    1     2     3     4     ...   15
277            // unit size     1KB   2KB   4KB   8KB   ...   16MB
278            options.buf_io_read_unit_size = 1024 * (1 << (unit_index -1));
279        }
280        uint32_t count_index = (flags >> 8) & 0xf;
281        if (count_index) {
282            // count_index   1     2     3     4     ...   15
283            // # buffers     8     16    32    64    ...   128K
284            options.buf_io_read_buffers = 8 * (1 << (count_index-1));
285        }
286    }
287
288    // Set default value first.
289    options.kp_nodesize = DB_KP_CHUNK_THRESHOLD;
290    options.kv_nodesize = DB_KV_CHUNK_THRESHOLD;
291    if (flags & COUCHSTORE_OPEN_WITH_CUSTOM_NODESIZE) {
292        // B+tree custom node size settings.
293        //  * First 4 bits [23:20]: KP node size
294        //  * Next  4 bits [19:16]: KV node size
295        uint32_t kp_flag = (flags >> 20) & 0xf;
296        if (kp_flag) {
297            options.kp_nodesize = kp_flag * 1024;
298        }
299        uint32_t kv_flag = (flags >> 16) & 0xf;
300        if (kv_flag) {
301            options.kv_nodesize = kv_flag * 1024;
302        }
303    }
304
305    if (flags & COUCHSTORE_OPEN_WITH_PERIODIC_SYNC) {
306        // Automatic sync() every N bytes written.
307        //  * 5 bits [28-24]: power-of-2 * 1kB
308        uint64_t sync_flag = (flags >> 24) & 0x1f;
309        options.periodic_sync_bytes = uint64_t(1024) << (sync_flag - 1);
310    }
311
312    return options;
313}
314
315LIBCOUCHSTORE_API
316couchstore_error_t couchstore_open_db(const char *filename,
317                                      couchstore_open_flags flags,
318                                      Db **pDb)
319{
320    return couchstore_open_db_ex(filename, flags,
321                                 couchstore_get_default_file_ops(), pDb);
322}
323
324LIBCOUCHSTORE_API
325couchstore_error_t couchstore_open_db_ex(const char *filename,
326                                         couchstore_open_flags flags,
327                                         FileOpsInterface* ops,
328                                         Db **pDb)
329{
330    COLLECT_LATENCY();
331
332    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
333    Db *db;
334    int openflags;
335    cs_off_t pos;
336
337    /* Sanity check input parameters */
338    if ((flags & COUCHSTORE_OPEN_FLAG_RDONLY) &&
339        (flags & COUCHSTORE_OPEN_FLAG_CREATE)) {
340        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
341    }
342
343    if ((db = static_cast<Db*>(cb_calloc(1, sizeof(Db)))) == NULL) {
344        return COUCHSTORE_ERROR_ALLOC_FAIL;
345    }
346
347    if (flags & COUCHSTORE_OPEN_FLAG_RDONLY) {
348        openflags = O_RDONLY;
349    } else {
350        openflags = O_RDWR;
351    }
352
353    if (flags & COUCHSTORE_OPEN_FLAG_CREATE) {
354        openflags |= O_CREAT;
355    }
356
357    // open with CRC unknown, CRC will be selected when header is read/or not found.
358    error_pass(tree_file_open(&db->file, filename, openflags, CRC_UNKNOWN, ops,
359                              get_tree_file_options_from_flags(flags)));
360
361    pos = db->file.ops->goto_eof(&db->file.lastError, db->file.handle);
362    db->file.pos = pos;
363    if (pos == 0) {
364        /* This is an empty file. Create a new fileheader unless the
365         * user wanted a read-only version of the file
366         */
367
368        if (flags & COUCHSTORE_OPEN_FLAG_RDONLY) {
369            error_pass(COUCHSTORE_ERROR_NO_HEADER);
370        } else {
371
372            // Select the CRC to use on this new file
373            if (flags & COUCHSTORE_OPEN_WITH_LEGACY_CRC) {
374                db->file.crc_mode = CRC32;
375            } else {
376                db->file.crc_mode = CRC32C;
377            }
378
379            error_pass(create_header(db));
380        }
381    } else if (pos > 0) {
382        error_pass(find_header(db, db->file.pos - 2));
383
384        if (db->header.disk_version <= COUCH_DISK_VERSION_11) {
385            db->file.crc_mode = CRC32;
386        } else {
387            cb_assert(db->header.disk_version >= COUCH_DISK_VERSION_12);
388            db->file.crc_mode = CRC32C;
389        }
390
391        // Not allowed. Can't request legacy_crc but be opening non legacy CRC files.
392        if (db->file.crc_mode == CRC32C && flags & COUCHSTORE_OPEN_WITH_LEGACY_CRC) {
393            errcode = COUCHSTORE_ERROR_INVALID_ARGUMENTS;
394            goto cleanup;
395        }
396    } else {
397        error_pass(static_cast<couchstore_error_t>(db->file.pos));
398    }
399
400    *pDb = db;
401    db->dropped = 0;
402
403cleanup:
404    if(errcode != COUCHSTORE_SUCCESS) {
405        couchstore_close_file(db);
406        couchstore_free_db(db);
407    }
408
409    return errcode;
410}
411
412LIBCOUCHSTORE_API
413couchstore_error_t couchstore_close_file(Db* db)
414{
415    COLLECT_LATENCY();
416
417    if(db->dropped) {
418        return COUCHSTORE_SUCCESS;
419    }
420    couchstore_error_t error = tree_file_close(&db->file);
421    db->dropped = 1;
422    return error;
423}
424
425LIBCOUCHSTORE_API
426couchstore_error_t couchstore_rewind_db_header(Db *db)
427{
428    COLLECT_LATENCY();
429
430    couchstore_error_t errcode;
431    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
432    // free current header guts
433    cb_free(db->header.by_id_root);
434    cb_free(db->header.by_seq_root);
435    cb_free(db->header.local_docs_root);
436    db->header.by_id_root = NULL;
437    db->header.by_seq_root = NULL;
438    db->header.local_docs_root = NULL;
439
440    error_unless(db->header.position != 0, COUCHSTORE_ERROR_DB_NO_LONGER_VALID);
441    // find older header
442    error_pass(find_header(db, db->header.position - 2));
443
444cleanup:
445    // if we failed, free the handle and return an error
446    if(errcode != COUCHSTORE_SUCCESS) {
447        couchstore_close_file(db);
448        couchstore_free_db(db);
449        errcode = COUCHSTORE_ERROR_DB_NO_LONGER_VALID;
450    }
451    return errcode;
452}
453
454LIBCOUCHSTORE_API
455couchstore_error_t couchstore_free_db(Db* db)
456{
457    COLLECT_LATENCY();
458
459    if(!db) {
460        return COUCHSTORE_SUCCESS;
461    }
462
463    if(!db->dropped) {
464        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
465    }
466
467    cb_free(db->header.by_id_root);
468    cb_free(db->header.by_seq_root);
469    cb_free(db->header.local_docs_root);
470    db->header.by_id_root = NULL;
471    db->header.by_seq_root = NULL;
472    db->header.local_docs_root = NULL;
473
474    memset(db, 0xa5, sizeof(*db));
475    cb_free(db);
476
477    return COUCHSTORE_SUCCESS;
478}
479
480LIBCOUCHSTORE_API
481const char* couchstore_get_db_filename(Db *db) {
482    return db->file.path;
483}
484
485DocInfo* couchstore_alloc_docinfo(const sized_buf *id, const sized_buf *rev_meta) {
486    size_t size = sizeof(DocInfo);
487    if (id) {
488        size += id->size;
489    }
490    if (rev_meta) {
491        size += rev_meta->size;
492    }
493    DocInfo* docInfo = static_cast<DocInfo*>(cb_malloc(size));
494    if (!docInfo) {
495        return NULL;
496    }
497    memset(docInfo, 0, sizeof(DocInfo));
498    char *extra = (char *)docInfo + sizeof(DocInfo);
499    if (id) {
500        memcpy(extra, id->buf, id->size);
501        docInfo->id.buf = extra;
502        docInfo->id.size = id->size;
503        extra += id->size;
504    }
505    if (rev_meta) {
506        memcpy(extra, rev_meta->buf, rev_meta->size);
507        docInfo->rev_meta.buf = extra;
508        docInfo->rev_meta.size = rev_meta->size;
509    }
510    return docInfo;
511}
512
513LIBCOUCHSTORE_API
514void couchstore_free_docinfo(DocInfo *docinfo)
515{
516    cb_free(docinfo);
517}
518
519LIBCOUCHSTORE_API
520void couchstore_free_document(Doc *doc)
521{
522    if (doc) {
523        char *offset = (char *) (&((fatbuf *) NULL)->buf);
524        fatbuf_free((fatbuf *) ((char *)doc - (char *)offset));
525    }
526}
527
528couchstore_error_t by_seq_read_docinfo(DocInfo **pInfo,
529                                       const sized_buf *k,
530                                       const sized_buf *v)
531{
532    const raw_seq_index_value *raw = (const raw_seq_index_value*)v->buf;
533    ssize_t extraSize = v->size - sizeof(*raw);
534    if (extraSize < 0) {
535        return COUCHSTORE_ERROR_CORRUPT;
536    }
537
538    uint32_t idsize, datasize;
539    decode_kv_length(&raw->sizes, &idsize, &datasize);
540    uint64_t bp = decode_raw48(raw->bp);
541    int deleted = (bp & BP_DELETED_FLAG) != 0;
542    bp &= ~BP_DELETED_FLAG;
543    uint8_t content_meta = decode_raw08(raw->content_meta);
544    uint64_t rev_seq = decode_raw48(raw->rev_seq);
545    uint64_t db_seq = decode_sequence_key(k);
546
547    sized_buf id = {v->buf + sizeof(*raw), idsize};
548    sized_buf rev_meta = {id.buf + idsize, extraSize - id.size};
549    DocInfo* docInfo = couchstore_alloc_docinfo(&id, &rev_meta);
550    if (!docInfo) {
551        return COUCHSTORE_ERROR_ALLOC_FAIL;
552    }
553
554    docInfo->db_seq = db_seq;
555    docInfo->rev_seq = rev_seq;
556    docInfo->deleted = deleted;
557    docInfo->bp = bp;
558    docInfo->size = datasize;
559    docInfo->content_meta = content_meta;
560    *pInfo = docInfo;
561    return COUCHSTORE_SUCCESS;
562}
563
564static couchstore_error_t by_id_read_docinfo(DocInfo **pInfo,
565                                             const sized_buf *k,
566                                             const sized_buf *v)
567{
568    const raw_id_index_value *raw = (const raw_id_index_value*)v->buf;
569    ssize_t revMetaSize = v->size - sizeof(*raw);
570    if (revMetaSize < 0) {
571        return COUCHSTORE_ERROR_CORRUPT;
572    }
573
574    uint32_t datasize, deleted;
575    uint8_t content_meta;
576    uint64_t bp, seq, revnum;
577
578    seq = decode_raw48(raw->db_seq);
579    datasize = decode_raw32(raw->size);
580    bp = decode_raw48(raw->bp);
581    deleted = (bp & BP_DELETED_FLAG) != 0;
582    bp &= ~BP_DELETED_FLAG;
583    content_meta = decode_raw08(raw->content_meta);
584    revnum = decode_raw48(raw->rev_seq);
585
586    sized_buf rev_meta = {v->buf + sizeof(*raw), static_cast<size_t>(revMetaSize)};
587    DocInfo* docInfo = couchstore_alloc_docinfo(k, &rev_meta);
588    if (!docInfo) {
589        return COUCHSTORE_ERROR_ALLOC_FAIL;
590    }
591
592    docInfo->db_seq = seq;
593    docInfo->rev_seq = revnum;
594    docInfo->deleted = deleted;
595    docInfo->bp = bp;
596    docInfo->size = datasize;
597    docInfo->content_meta = content_meta;
598    *pInfo = docInfo;
599    return COUCHSTORE_SUCCESS;
600}
601
602//Fill in doc from reading file.
603static couchstore_error_t bp_to_doc(Doc **pDoc, Db *db, cs_off_t bp, couchstore_open_options options)
604{
605    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
606    int bodylen = 0;
607    char *docbody = NULL;
608    fatbuf *docbuf = NULL;
609    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
610
611    if (options & DECOMPRESS_DOC_BODIES) {
612        bodylen = pread_compressed(&db->file, bp, &docbody);
613    } else {
614        bodylen = pread_bin(&db->file, bp, &docbody);
615    }
616
617    error_unless(bodylen >= 0, static_cast<couchstore_error_t>(bodylen));    // if bodylen is negative it's an error code
618    error_unless(docbody || bodylen == 0, COUCHSTORE_ERROR_READ);
619
620    error_unless(docbuf = fatbuf_alloc(sizeof(Doc) + bodylen), COUCHSTORE_ERROR_ALLOC_FAIL);
621    *pDoc = (Doc *) fatbuf_get(docbuf, sizeof(Doc));
622
623    if (bodylen == 0) { //Empty doc
624        (*pDoc)->data.buf = NULL;
625        (*pDoc)->data.size = 0;
626        cb_free(docbody);
627        return COUCHSTORE_SUCCESS;
628    }
629
630    (*pDoc)->data.buf = (char *) fatbuf_get(docbuf, bodylen);
631    (*pDoc)->data.size = bodylen;
632    memcpy((*pDoc)->data.buf, docbody, bodylen);
633
634cleanup:
635    cb_free(docbody);
636    if (errcode < 0) {
637        fatbuf_free(docbuf);
638    }
639    return errcode;
640}
641
642static couchstore_error_t docinfo_fetch_by_id(couchfile_lookup_request *rq,
643                                              const sized_buf *k,
644                                              const sized_buf *v)
645{
646    DocInfo **pInfo = (DocInfo **) rq->callback_ctx;
647    if (v == NULL) {
648        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
649    }
650    return by_id_read_docinfo(pInfo, k, v);
651}
652
653static couchstore_error_t docinfo_fetch_by_seq(couchfile_lookup_request *rq,
654                                               const sized_buf *k,
655                                               const sized_buf *v)
656{
657    DocInfo **pInfo = (DocInfo **) rq->callback_ctx;
658    if (v == NULL) {
659        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
660    }
661    return by_seq_read_docinfo(pInfo, k, v);
662}
663
664LIBCOUCHSTORE_API
665couchstore_error_t couchstore_docinfo_by_id(Db *db,
666                                            const void *id,
667                                            size_t idlen,
668                                            DocInfo **pInfo)
669{
670    COLLECT_LATENCY();
671
672    sized_buf key;
673    sized_buf *keylist = &key;
674    couchfile_lookup_request rq;
675    couchstore_error_t errcode;
676    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
677
678    if (db->header.by_id_root == NULL) {
679        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
680    }
681
682    key.buf = (char *) id;
683    key.size = idlen;
684
685    rq.cmp.compare = ebin_cmp;
686    rq.file = &db->file;
687    rq.num_keys = 1;
688    rq.keys = &keylist;
689    rq.callback_ctx = pInfo;
690    rq.fetch_callback = docinfo_fetch_by_id;
691    rq.node_callback = NULL;
692    rq.fold = 0;
693
694    errcode = btree_lookup(&rq, db->header.by_id_root->pointer);
695    if (errcode == COUCHSTORE_SUCCESS) {
696        if (*pInfo == NULL) {
697            errcode = COUCHSTORE_ERROR_DOC_NOT_FOUND;
698        }
699    }
700cleanup:
701    return errcode;
702}
703
704LIBCOUCHSTORE_API
705couchstore_error_t couchstore_docinfo_by_sequence(Db *db,
706                                                  uint64_t sequence,
707                                                  DocInfo **pInfo)
708{
709    COLLECT_LATENCY();
710
711    sized_buf key;
712    sized_buf *keylist = &key;
713    couchfile_lookup_request rq;
714    couchstore_error_t errcode;
715    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
716
717    if (db->header.by_id_root == NULL) {
718        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
719    }
720
721    sequence = htonll(sequence);
722    key.buf = (char *)&sequence + 2;
723    key.size = 6;
724
725    rq.cmp.compare = seq_cmp;
726    rq.file = &db->file;
727    rq.num_keys = 1;
728    rq.keys = &keylist;
729    rq.callback_ctx = pInfo;
730    rq.fetch_callback = docinfo_fetch_by_seq;
731    rq.node_callback = NULL;
732    rq.fold = 0;
733
734    errcode = btree_lookup(&rq, db->header.by_seq_root->pointer);
735    if (errcode == COUCHSTORE_SUCCESS) {
736        if (*pInfo == NULL) {
737            errcode = COUCHSTORE_ERROR_DOC_NOT_FOUND;
738        }
739    }
740cleanup:
741    return errcode;
742}
743
744LIBCOUCHSTORE_API
745couchstore_error_t couchstore_open_doc_with_docinfo(Db *db,
746                                                    const DocInfo *docinfo,
747                                                    Doc **pDoc,
748                                                    couchstore_open_options options)
749{
750    COLLECT_LATENCY();
751
752    couchstore_error_t errcode;
753
754    *pDoc = NULL;
755    if (docinfo->bp == 0) {
756        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
757    }
758
759    if (!(docinfo->content_meta & COUCH_DOC_IS_COMPRESSED)) {
760        options &= ~DECOMPRESS_DOC_BODIES;
761    }
762
763    errcode = bp_to_doc(pDoc, db, docinfo->bp, options);
764    if (errcode == COUCHSTORE_SUCCESS) {
765        (*pDoc)->id.buf = docinfo->id.buf;
766        (*pDoc)->id.size = docinfo->id.size;
767    }
768
769    return errcode;
770}
771
772LIBCOUCHSTORE_API
773couchstore_error_t couchstore_open_document(Db *db,
774                                            const void *id,
775                                            size_t idlen,
776                                            Doc **pDoc,
777                                            couchstore_open_options options)
778{
779    COLLECT_LATENCY();
780
781    couchstore_error_t errcode;
782    DocInfo *info;
783    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
784    *pDoc = NULL;
785    errcode = couchstore_docinfo_by_id(db, id, idlen, &info);
786    if (errcode == COUCHSTORE_SUCCESS) {
787        errcode = couchstore_open_doc_with_docinfo(db, info, pDoc, options);
788        if (errcode == COUCHSTORE_SUCCESS) {
789            (*pDoc)->id.buf = (char *) id;
790            (*pDoc)->id.size = idlen;
791        }
792
793        couchstore_free_docinfo(info);
794    }
795cleanup:
796    return errcode;
797}
798
799// context info passed to lookup_callback via btree_lookup
800typedef struct {
801    Db *db;
802    couchstore_docinfos_options options;
803    couchstore_changes_callback_fn callback;
804    void* callback_context;
805    int by_id;
806    int depth;
807    couchstore_walk_tree_callback_fn walk_callback;
808} lookup_context;
809
810// btree_lookup callback, called while iterating keys
811static couchstore_error_t lookup_callback(couchfile_lookup_request *rq,
812                                          const sized_buf *k,
813                                          const sized_buf *v)
814{
815    if (v == NULL) {
816        return COUCHSTORE_SUCCESS;
817    }
818
819    const lookup_context *context = static_cast<const lookup_context *>(rq->callback_ctx);
820    DocInfo *docinfo = NULL;
821    couchstore_error_t errcode;
822    if (context->by_id) {
823        errcode = by_id_read_docinfo(&docinfo, k, v);
824    } else {
825        errcode = by_seq_read_docinfo(&docinfo, k, v);
826    }
827    if (errcode == COUCHSTORE_ERROR_CORRUPT &&
828        (context->options & COUCHSTORE_TOLERATE_CORRUPTION)) {
829        // Invoke callback even if doc info is corrupted/unreadable, if magic flag is set
830        docinfo = static_cast<DocInfo*>(cb_calloc(sizeof(DocInfo), 1));
831        docinfo->id = *k;
832        docinfo->rev_meta = *v;
833    } else if (errcode) {
834        return errcode;
835    }
836
837    if ((context->options & COUCHSTORE_DELETES_ONLY) && docinfo->deleted == 0) {
838        couchstore_free_docinfo(docinfo);
839        return COUCHSTORE_SUCCESS;
840    }
841
842    if ((context->options & COUCHSTORE_NO_DELETES) && docinfo->deleted == 1) {
843        couchstore_free_docinfo(docinfo);
844        return COUCHSTORE_SUCCESS;
845    }
846
847    if (context->walk_callback) {
848        errcode = static_cast<couchstore_error_t>(context->walk_callback(context->db,
849                                                                         context->depth,
850                                                                         docinfo,
851                                                                         0,
852                                                                         NULL,
853                                                                         context->callback_context));
854    } else {
855        errcode = static_cast<couchstore_error_t>(context->callback(context->db,
856                                                                    docinfo,
857                                                                    context->callback_context));
858    }
859    if (errcode <= 0) {
860        couchstore_free_docinfo(docinfo);
861    } else {
862        // User requested docinfo not be freed, don't free it, return success
863        return COUCHSTORE_SUCCESS;
864    }
865    return errcode;
866}
867
868LIBCOUCHSTORE_API
869couchstore_error_t couchstore_changes_since(Db *db,
870                                            uint64_t since,
871                                            couchstore_docinfos_options options,
872                                            couchstore_changes_callback_fn callback,
873                                            void *ctx)
874{
875    COLLECT_LATENCY();
876
877    char since_termbuf[6];
878    sized_buf since_term;
879    sized_buf *keylist = &since_term;
880    lookup_context cbctx = {db, options, callback, ctx, 0, 0, NULL};
881    couchfile_lookup_request rq;
882    couchstore_error_t errcode;
883
884    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
885    if (db->header.by_seq_root == NULL) {
886        return COUCHSTORE_SUCCESS;
887    }
888
889    since_term.buf = since_termbuf;
890    since_term.size = 6;
891    encode_raw48(since, (raw_48*)since_term.buf);
892
893    rq.cmp.compare = seq_cmp;
894    rq.file = &db->file;
895    rq.num_keys = 1;
896    rq.keys = &keylist;
897    rq.callback_ctx = &cbctx;
898    rq.fetch_callback = lookup_callback;
899    rq.node_callback = NULL;
900    rq.fold = 1;
901    rq.tolerate_corruption = (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0;
902
903    errcode = btree_lookup(&rq, db->header.by_seq_root->pointer);
904cleanup:
905    return errcode;
906}
907
908LIBCOUCHSTORE_API
909couchstore_error_t couchstore_all_docs(Db *db,
910                                       const sized_buf* startKeyPtr,
911                                       couchstore_docinfos_options options,
912                                       couchstore_changes_callback_fn callback,
913                                       void *ctx)
914{
915    COLLECT_LATENCY();
916
917    sized_buf startKey = {NULL, 0};
918    sized_buf *keylist = &startKey;
919    lookup_context cbctx = {db, options, callback, ctx, 1, 0, NULL};
920    couchfile_lookup_request rq;
921    couchstore_error_t errcode;
922
923    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
924    if (db->header.by_id_root == NULL) {
925        return COUCHSTORE_SUCCESS;
926    }
927
928    if (startKeyPtr) {
929        startKey = *startKeyPtr;
930    }
931
932    rq.cmp.compare = ebin_cmp;
933    rq.file = &db->file;
934    rq.num_keys = 1;
935    rq.keys = &keylist;
936    rq.callback_ctx = &cbctx;
937    rq.fetch_callback = lookup_callback;
938    rq.node_callback = NULL;
939    rq.fold = 1;
940    rq.tolerate_corruption = (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0;
941
942    errcode = btree_lookup(&rq, db->header.by_id_root->pointer);
943cleanup:
944    return errcode;
945}
946
947static couchstore_error_t walk_node_callback(struct couchfile_lookup_request *rq,
948                                                 uint64_t subtreeSize,
949                                                 const sized_buf *reduceValue)
950{
951    lookup_context* context = static_cast<lookup_context*>(rq->callback_ctx);
952    if (reduceValue) {
953        int result = context->walk_callback(context->db,
954                                            context->depth,
955                                            NULL,
956                                            subtreeSize,
957                                            reduceValue,
958                                            context->callback_context);
959        context->depth++;
960        if (result < 0)
961            return static_cast<couchstore_error_t>(result);
962    } else {
963        context->depth--;
964    }
965    return COUCHSTORE_SUCCESS;
966}
967
968static
969couchstore_error_t couchstore_walk_tree(Db *db,
970                                        int by_id,
971                                        const node_pointer* root,
972                                        const sized_buf* startKeyPtr,
973                                        couchstore_docinfos_options options,
974                                        int (*compare)(const sized_buf *k1, const sized_buf *k2),
975                                        couchstore_walk_tree_callback_fn callback,
976                                        void *ctx)
977{
978    couchstore_error_t errcode;
979    sized_buf startKey = {NULL, 0};
980    sized_buf *keylist;
981    couchfile_lookup_request rq;
982
983    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
984    if (root == NULL) {
985        return COUCHSTORE_SUCCESS;
986    }
987
988    // Invoke the callback on the root node:
989    errcode = static_cast<couchstore_error_t>(callback(db, 0, NULL,
990                                                       root->subtreesize,
991                                                       &root->reduce_value,
992                                                       ctx));
993    if (errcode < 0) {
994        return errcode;
995    }
996
997    if (startKeyPtr) {
998        startKey = *startKeyPtr;
999    }
1000    keylist = &startKey;
1001
1002    {
1003        // Create a new scope here just to mute the warning from the
1004        // compiler that the goto in the macro error_unless
1005        // skips the initialization of lookup_ctx..
1006        lookup_context lookup_ctx = {db, options, NULL, ctx, by_id, 1, callback};
1007
1008        rq.cmp.compare = compare;
1009        rq.file = &db->file;
1010        rq.num_keys = 1;
1011        rq.keys = &keylist;
1012        rq.callback_ctx = &lookup_ctx;
1013        rq.fetch_callback = lookup_callback;
1014        rq.node_callback = walk_node_callback;
1015        rq.fold = 1;
1016        rq.tolerate_corruption = (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0;
1017
1018        error_pass(btree_lookup(&rq, root->pointer));
1019    }
1020cleanup:
1021    return errcode;
1022}
1023
1024LIBCOUCHSTORE_API
1025couchstore_error_t couchstore_walk_id_tree(Db *db,
1026                                           const sized_buf* startDocID,
1027                                           couchstore_docinfos_options options,
1028                                           couchstore_walk_tree_callback_fn callback,
1029                                           void *ctx)
1030{
1031    COLLECT_LATENCY();
1032
1033    return couchstore_walk_tree(db, 1, db->header.by_id_root, startDocID,
1034                                options, ebin_cmp, callback, ctx);
1035}
1036
1037LIBCOUCHSTORE_API
1038couchstore_error_t couchstore_walk_seq_tree(Db *db,
1039                                           uint64_t startSequence,
1040                                           couchstore_docinfos_options options,
1041                                           couchstore_walk_tree_callback_fn callback,
1042                                           void *ctx)
1043{
1044    COLLECT_LATENCY();
1045
1046    raw_48 start_termbuf;
1047    encode_raw48(startSequence, &start_termbuf);
1048    sized_buf start_term = {(char*)&start_termbuf, 6};
1049
1050    return couchstore_walk_tree(db, 0, db->header.by_seq_root, &start_term,
1051                                options, seq_cmp, callback, ctx);
1052}
1053
1054static int id_ptr_cmp(const void *a, const void *b)
1055{
1056    sized_buf **buf1 = (sized_buf**) a;
1057    sized_buf **buf2 = (sized_buf**) b;
1058    return ebin_cmp(*buf1, *buf2);
1059}
1060
1061static int seq_ptr_cmp(const void *a, const void *b)
1062{
1063    sized_buf **buf1 = (sized_buf**) a;
1064    sized_buf **buf2 = (sized_buf**) b;
1065    return seq_cmp(*buf1, *buf2);
1066}
1067
1068// Common subroutine of couchstore_docinfos_by_{ids, sequence}
1069static couchstore_error_t iterate_docinfos(Db *db,
1070                                           const sized_buf keys[],
1071                                           unsigned numDocs,
1072                                           node_pointer *tree,
1073                                           int (*key_ptr_compare)(const void *, const void *),
1074                                           int (*key_compare)(const sized_buf *k1, const sized_buf *k2),
1075                                           couchstore_changes_callback_fn callback,
1076                                           int fold,
1077                                           int tolerate_corruption,
1078                                           void *ctx)
1079{
1080    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
1081    const sized_buf **keyptrs = NULL;
1082    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1083    // Nothing to do if the tree is empty
1084    if (tree == NULL) {
1085        return COUCHSTORE_SUCCESS;
1086    }
1087
1088    if(numDocs <= 0) {
1089        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
1090    }
1091
1092    // Create an array of *pointers to* sized_bufs, which is what btree_lookup wants:
1093    keyptrs = static_cast<const sized_buf**>(cb_malloc(numDocs * sizeof(sized_buf*)));
1094    error_unless(keyptrs, COUCHSTORE_ERROR_ALLOC_FAIL);
1095
1096    {
1097        unsigned i;
1098        for (i = 0; i< numDocs; ++i) {
1099            keyptrs[i] = &keys[i];
1100        }
1101        if (!fold) {
1102            // Sort the key pointers:
1103            qsort(keyptrs, numDocs, sizeof(keyptrs[0]), key_ptr_compare);
1104        }
1105
1106        // Construct the lookup request:
1107        lookup_context cbctx = {db, 0, callback, ctx, (tree == db->header.by_id_root), 0, NULL};
1108        couchfile_lookup_request rq;
1109        rq.cmp.compare = key_compare;
1110        rq.file = &db->file;
1111        rq.num_keys = numDocs;
1112        rq.keys = (sized_buf**) keyptrs;
1113        rq.callback_ctx = &cbctx;
1114        rq.fetch_callback = lookup_callback;
1115        rq.node_callback = NULL;
1116        rq.fold = fold;
1117        rq.tolerate_corruption = tolerate_corruption;
1118
1119        // Go!
1120        error_pass(btree_lookup(&rq, tree->pointer));
1121    }
1122cleanup:
1123    cb_free(keyptrs);
1124    return errcode;
1125}
1126
1127LIBCOUCHSTORE_API
1128couchstore_error_t couchstore_docinfos_by_id(Db *db,
1129                                             const sized_buf ids[],
1130                                             unsigned numDocs,
1131                                             couchstore_docinfos_options options,
1132                                             couchstore_changes_callback_fn callback,
1133                                             void *ctx)
1134{
1135    COLLECT_LATENCY();
1136
1137    return iterate_docinfos(db, ids, numDocs,
1138                            db->header.by_id_root, id_ptr_cmp, ebin_cmp,
1139                            callback,
1140                            (options & RANGES) != 0,
1141                            (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0,
1142                            ctx);
1143}
1144
1145LIBCOUCHSTORE_API
1146couchstore_error_t couchstore_docinfos_by_sequence(Db *db,
1147                                                   const uint64_t sequence[],
1148                                                   unsigned numDocs,
1149                                                   couchstore_docinfos_options options,
1150                                                   couchstore_changes_callback_fn callback,
1151                                                   void *ctx)
1152{
1153    COLLECT_LATENCY();
1154
1155    // Create the array of keys:
1156    sized_buf *keylist = static_cast<sized_buf*>(cb_malloc(numDocs * sizeof(sized_buf)));
1157    raw_by_seq_key *keyvalues = static_cast<raw_by_seq_key*>(cb_malloc(numDocs * sizeof(raw_by_seq_key)));
1158    couchstore_error_t errcode;
1159    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1160    error_unless(keylist && keyvalues, COUCHSTORE_ERROR_ALLOC_FAIL);
1161    unsigned i;
1162    for (i = 0; i< numDocs; ++i) {
1163        encode_raw48(sequence[i], &keyvalues[i].sequence);
1164        keylist[i].buf = static_cast<char*>((void*) &keyvalues[i]);
1165        keylist[i].size = sizeof(keyvalues[i]);
1166    }
1167
1168    error_pass(iterate_docinfos(db, keylist, numDocs,
1169                                db->header.by_seq_root, seq_ptr_cmp, seq_cmp,
1170                                callback,
1171                                (options & RANGES) != 0,
1172                                (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0,
1173                                ctx));
1174cleanup:
1175    cb_free(keylist);
1176    cb_free(keyvalues);
1177    return errcode;
1178}
1179
1180LIBCOUCHSTORE_API
1181couchstore_error_t couchstore_db_info(Db *db, DbInfo* dbinfo) {
1182    if (db == NULL || dbinfo == NULL) {
1183        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
1184    }
1185    const node_pointer *id_root = db->header.by_id_root;
1186    const node_pointer *seq_root = db->header.by_seq_root;
1187    const node_pointer *local_root = db->header.local_docs_root;
1188    dbinfo->filename = db->file.path;
1189    dbinfo->header_position = db->header.position;
1190    dbinfo->last_sequence = db->header.update_seq;
1191    dbinfo->purge_seq = db->header.purge_seq;
1192    dbinfo->deleted_count = dbinfo->doc_count = dbinfo->space_used = 0;
1193    dbinfo->file_size = db->file.pos;
1194    if (id_root) {
1195        raw_by_id_reduce* id_reduce = (raw_by_id_reduce*) id_root->reduce_value.buf;
1196        dbinfo->doc_count = decode_raw40(id_reduce->notdeleted);
1197        dbinfo->deleted_count = decode_raw40(id_reduce->deleted);
1198        dbinfo->space_used = decode_raw48(id_reduce->size);
1199        dbinfo->space_used += id_root->subtreesize;
1200    }
1201    if(seq_root) {
1202        dbinfo->space_used += seq_root->subtreesize;
1203    }
1204    if(local_root) {
1205        dbinfo->space_used += local_root->subtreesize;
1206    }
1207    return COUCHSTORE_SUCCESS;
1208}
1209
1210static couchstore_error_t local_doc_fetch(couchfile_lookup_request *rq,
1211                                          const sized_buf *k,
1212                                          const sized_buf *v)
1213{
1214    LocalDoc **lDoc = (LocalDoc **) rq->callback_ctx;
1215    LocalDoc *dp;
1216
1217    if (!v) {
1218        *lDoc = NULL;
1219        return COUCHSTORE_SUCCESS;
1220    }
1221    fatbuf *ldbuf = fatbuf_alloc(sizeof(LocalDoc) + k->size + v->size);
1222    if (ldbuf == NULL) {
1223        return COUCHSTORE_ERROR_ALLOC_FAIL;
1224    }
1225
1226    dp = *lDoc = (LocalDoc *) fatbuf_get(ldbuf, sizeof(LocalDoc));
1227    dp->id.buf = (char *) fatbuf_get(ldbuf, k->size);
1228    dp->id.size = k->size;
1229
1230    dp->json.buf = (char *) fatbuf_get(ldbuf, v->size);
1231    dp->json.size = v->size;
1232
1233    dp->deleted = 0;
1234
1235    memcpy(dp->id.buf, k->buf, k->size);
1236    memcpy(dp->json.buf, v->buf, v->size);
1237
1238    return COUCHSTORE_SUCCESS;
1239}
1240
1241LIBCOUCHSTORE_API
1242couchstore_error_t couchstore_open_local_document(Db *db,
1243                                                  const void *id,
1244                                                  size_t idlen,
1245                                                  LocalDoc **pDoc)
1246{
1247    sized_buf key;
1248    sized_buf *keylist = &key;
1249    couchfile_lookup_request rq;
1250    couchstore_error_t errcode;
1251    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1252    if (db->header.local_docs_root == NULL) {
1253        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
1254    }
1255
1256    key.buf = (char *) id;
1257    key.size = idlen;
1258
1259    rq.cmp.compare = ebin_cmp;
1260    rq.file = &db->file;
1261    rq.num_keys = 1;
1262    rq.keys = &keylist;
1263    rq.callback_ctx = pDoc;
1264    rq.fetch_callback = local_doc_fetch;
1265    rq.node_callback = NULL;
1266    rq.fold = 0;
1267
1268    errcode = btree_lookup(&rq, db->header.local_docs_root->pointer);
1269    if (errcode == COUCHSTORE_SUCCESS) {
1270        if (*pDoc == NULL) {
1271            errcode = COUCHSTORE_ERROR_DOC_NOT_FOUND;
1272        }
1273    }
1274cleanup:
1275    return errcode;
1276}
1277
1278LIBCOUCHSTORE_API
1279couchstore_error_t couchstore_save_local_document(Db *db, LocalDoc *lDoc)
1280{
1281    couchstore_error_t errcode;
1282    couchfile_modify_action ldupdate;
1283    node_pointer *nroot = NULL;
1284    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1285
1286    if (lDoc->deleted) {
1287        ldupdate.type = ACTION_REMOVE;
1288    } else {
1289        ldupdate.type = ACTION_INSERT;
1290    }
1291
1292    ldupdate.key = &lDoc->id;
1293    ldupdate.value.data = &lDoc->json;
1294
1295    couchfile_modify_request rq;
1296    rq.cmp.compare = ebin_cmp;
1297    rq.num_actions = 1;
1298    rq.actions = &ldupdate;
1299    rq.fetch_callback = NULL;
1300    rq.reduce = NULL;
1301    rq.rereduce = NULL;
1302    rq.file = &db->file;
1303    rq.enable_purging = false;
1304    rq.purge_kp = NULL;
1305    rq.purge_kv = NULL;
1306    rq.compacting = 0;
1307    rq.kv_chunk_threshold = db->file.options.kv_nodesize;
1308    rq.kp_chunk_threshold = db->file.options.kp_nodesize;
1309
1310    nroot = modify_btree(&rq, db->header.local_docs_root, &errcode);
1311    if (errcode == COUCHSTORE_SUCCESS && nroot != db->header.local_docs_root) {
1312        cb_free(db->header.local_docs_root);
1313        db->header.local_docs_root = nroot;
1314    }
1315
1316cleanup:
1317    return errcode;
1318}
1319
1320LIBCOUCHSTORE_API
1321void couchstore_free_local_document(LocalDoc *lDoc)
1322{
1323    if (lDoc) {
1324        char *offset = (char *) (&((fatbuf *) NULL)->buf);
1325        fatbuf_free((fatbuf *) ((char *)lDoc - (char *)offset));
1326    }
1327}
1328
1329LIBCOUCHSTORE_API
1330couchstore_error_t couchstore_last_os_error(const Db *db,
1331                                            char* buf,
1332                                            size_t size) {
1333    if (db == NULL || buf == nullptr || size == 0) {
1334        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
1335    }
1336    const couchstore_error_info_t *err = &db->file.lastError;
1337
1338    int nw;
1339
1340#ifdef WIN32
1341    char* win_msg = NULL;
1342    FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER |
1343                   FORMAT_MESSAGE_FROM_SYSTEM |
1344                   FORMAT_MESSAGE_IGNORE_INSERTS,
1345                   NULL, err->error,
1346                   MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
1347                   (LPTSTR) &win_msg,
1348                   0, NULL);
1349    nw = _snprintf(buf, size, "WINAPI error = %d: '%s'", err->error, win_msg);
1350    LocalFree(win_msg);
1351#else
1352    nw = snprintf(buf, size, "errno = %d: '%s'",
1353                      err->error, strerror(err->error));
1354#endif
1355
1356    if (nw < 0) {
1357        return COUCHSTORE_ERROR_ALLOC_FAIL;
1358    } if (size_t(nw) >= size) {
1359        /* Truncate the error message */
1360        buf[size - 1] = '\0';
1361    }
1362
1363    return COUCHSTORE_SUCCESS;
1364}
1365
1366static couchstore_error_t btree_eval_seq_reduce(Db *db,
1367                                                uint64_t *accum,
1368                                                sized_buf *left,
1369                                                sized_buf *right,
1370                                                bool past_left_edge,
1371                                                uint64_t diskpos) {
1372    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
1373    int bufpos = 1, nodebuflen = 0;
1374    int node_type;
1375    char *nodebuf = NULL;
1376    nodebuflen = pread_compressed(&db->file, diskpos, &nodebuf);
1377    error_unless(nodebuflen >= 0, (static_cast<couchstore_error_t>(nodebuflen)));  // if negative, it's an error code
1378
1379    node_type = nodebuf[0];
1380    while(bufpos < nodebuflen) {
1381        sized_buf k, v;
1382        bufpos += read_kv(nodebuf + bufpos, &k, &v);
1383        int left_cmp = seq_cmp(&k, left);
1384        int right_cmp = seq_cmp(&k, right);
1385        if(left_cmp < 0) {
1386            continue;
1387        }
1388        if(node_type == KP_NODE) {
1389            // In-range Item in a KP Node
1390            const raw_node_pointer *raw = (const raw_node_pointer*)v.buf;
1391            const raw_by_seq_reduce *rawreduce = (const raw_by_seq_reduce*) (v.buf + sizeof(raw_node_pointer));
1392            uint64_t subcount = decode_raw40(rawreduce->count);
1393            uint64_t pointer = decode_raw48(raw->pointer);
1394            if((left_cmp >= 0 && !past_left_edge) || right_cmp >= 0) {
1395                error_pass(btree_eval_seq_reduce(db, accum, left, right, past_left_edge, pointer));
1396                if(right_cmp >= 0) {
1397                    break;
1398                } else {
1399                    past_left_edge = true;
1400                }
1401            } else {
1402                *accum += subcount;
1403            }
1404        } else {
1405            if(right_cmp > 0) {
1406                break;
1407            }
1408            // In-range Item in a KV Node
1409            *accum += 1;
1410        }
1411    }
1412cleanup:
1413    if (nodebuf) {
1414        cb_free(nodebuf);
1415    }
1416    return errcode;
1417}
1418
1419LIBCOUCHSTORE_API
1420couchstore_error_t couchstore_changes_count(Db* db,
1421                                            uint64_t min_seq,
1422                                            uint64_t max_seq,
1423                                            uint64_t *count) {
1424    COLLECT_LATENCY();
1425
1426    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
1427    raw_48 leftkr, rightkr;
1428    sized_buf leftk, rightk;
1429    leftk.buf = (char*) &leftkr;
1430    rightk.buf = (char*) &rightkr;
1431    leftk.size = 6;
1432    rightk.size = 6;
1433    encode_raw48(min_seq, &leftkr);
1434    encode_raw48(max_seq, &rightkr);
1435
1436    *count = 0;
1437    if(db->header.by_seq_root) {
1438        error_pass(btree_eval_seq_reduce(db, count, &leftk, &rightk, false,
1439                                         db->header.by_seq_root->pointer));
1440    }
1441cleanup:
1442    return errcode;
1443}
1444