xref: /6.6.0/couchstore/src/couch_db.cc (revision 96840397)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2019 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "couchstore_config.h"
19
20#include <assert.h>
21#include <fcntl.h>
22#include <platform/cb_malloc.h>
23#include <platform/cbassert.h>
24#include <platform/platform_socket.h>
25#include <stdio.h>
26#include <stdlib.h>
27#include <cstddef>
28#include <string>
29
30#include "internal.h"
31#include "node_types.h"
32#include "couch_btree.h"
33#include "bitfield.h"
34#include "reduces.h"
35#include "util.h"
36
37#include "couch_latency_internal.h"
38
39#define ROOT_BASE_SIZE 12
40#define HEADER_BASE_SIZE 25
41
42thread_local char internal_error_string[MAX_ERR_STR_LEN];
43
44// Initializes one of the db's root node pointers from data in the file header
45static couchstore_error_t read_db_root(Db *db, node_pointer **root,
46                                       void *root_data, int root_size)
47{
48    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
49    if (root_size > 0) {
50        error_unless(root_size >= ROOT_BASE_SIZE, COUCHSTORE_ERROR_CORRUPT);
51        *root = read_root(root_data, root_size);
52        error_unless(*root, COUCHSTORE_ERROR_ALLOC_FAIL);
53        error_unless((*root)->pointer < db->header.position, COUCHSTORE_ERROR_CORRUPT);
54    } else {
55        *root = NULL;
56    }
57cleanup:
58    return errcode;
59}
60
61// Attempts to initialize the database from a header at the given file position
62static couchstore_error_t find_header_at_pos(Db *db, cs_off_t pos)
63{
64    int seqrootsize;
65    int idrootsize;
66    int localrootsize;
67    char *root_data;
68    int header_len;
69    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
70    union {
71        raw_file_header *raw;
72        char *buf;
73    } header_buf = { NULL };
74    uint8_t buf[2];
75    ssize_t readsize;
76    {
77        // Speculative read looking for header, mark as Empty.
78        ScopedFileTag tag(db->file.ops, db->file.handle, FileTag::Empty);
79        readsize = db->file.ops->pread(
80                &db->file.lastError, db->file.handle, buf, 2, pos);
81    }
82    error_unless(readsize == 2, COUCHSTORE_ERROR_READ);
83    if (buf[0] == 0) {
84        return COUCHSTORE_ERROR_NO_HEADER;
85    } else if (buf[0] != 1) {
86        return COUCHSTORE_ERROR_CORRUPT;
87    }
88
89    header_len = pread_header(&db->file, pos, &header_buf.buf, MAX_DB_HEADER_SIZE);
90    if (header_len < 0) {
91        error_pass(static_cast<couchstore_error_t>(header_len));
92    }
93
94    db->header.position = pos;
95    db->header.disk_version = decode_raw08(header_buf.raw->version);
96
97    // Only 12 and 11 are valid
98    error_unless(db->header.disk_version == COUCH_DISK_VERSION ||
99                 db->header.disk_version == COUCH_DISK_VERSION_11,
100                 COUCHSTORE_ERROR_HEADER_VERSION);
101    db->header.update_seq = decode_raw48(header_buf.raw->update_seq);
102    db->header.purge_seq = decode_raw48(header_buf.raw->purge_seq);
103    db->header.purge_ptr = decode_raw48(header_buf.raw->purge_ptr);
104    error_unless(db->header.purge_ptr <= db->header.position, COUCHSTORE_ERROR_CORRUPT);
105    seqrootsize = decode_raw16(header_buf.raw->seqrootsize);
106    idrootsize = decode_raw16(header_buf.raw->idrootsize);
107    localrootsize = decode_raw16(header_buf.raw->localrootsize);
108    error_unless(header_len == HEADER_BASE_SIZE + seqrootsize + idrootsize + localrootsize,
109                 COUCHSTORE_ERROR_CORRUPT);
110
111    root_data = (char*) (header_buf.raw + 1);  // i.e. just past *header_buf
112    error_pass(read_db_root(db, &db->header.by_seq_root, root_data, seqrootsize));
113    root_data += seqrootsize;
114    error_pass(read_db_root(db, &db->header.by_id_root, root_data, idrootsize));
115    root_data += idrootsize;
116    error_pass(read_db_root(db, &db->header.local_docs_root, root_data, localrootsize));
117
118cleanup:
119    cb_free(header_buf.raw);
120    return errcode;
121}
122
123// Finds the database header by scanning back from the end of the file at 4k boundaries
124static couchstore_error_t find_header(Db *db, int64_t start_pos)
125{
126    couchstore_error_t last_header_errcode = COUCHSTORE_ERROR_NO_HEADER;
127    int64_t pos = start_pos;
128    pos -= pos % COUCH_BLOCK_SIZE;
129    for (; pos >= 0; pos -= COUCH_BLOCK_SIZE) {
130        couchstore_error_t errcode = find_header_at_pos(db, pos);
131        switch(errcode) {
132            case COUCHSTORE_SUCCESS:
133                // Found it!
134                return COUCHSTORE_SUCCESS;
135            case COUCHSTORE_ERROR_NO_HEADER:
136                // No header here, so keep going
137                break;
138            case COUCHSTORE_ERROR_ALLOC_FAIL:
139                // Fatal error
140                return errcode;
141            default:
142                // Invalid header; continue, but remember the last error
143                last_header_errcode = errcode;
144                break;
145        }
146    }
147    return last_header_errcode;
148}
149
150/**
151 * Calculates how large in bytes the current header will be
152 * when written to disk.
153 *
154 * The seqrootsize, idrootsize and localrootsize params are
155 * used to return the respective sizes in this header if
156 * needed.
157 */
158size_t calculate_header_size(Db *db, size_t& seqrootsize,
159                             size_t& idrootsize, size_t& localrootsize)
160{
161    seqrootsize = idrootsize = localrootsize = 0;
162
163    if (db->header.by_seq_root) {
164        seqrootsize = ROOT_BASE_SIZE + db->header.by_seq_root->reduce_value.size;
165    }
166    if (db->header.by_id_root) {
167        idrootsize = ROOT_BASE_SIZE + db->header.by_id_root->reduce_value.size;
168    }
169    if (db->header.local_docs_root) {
170        localrootsize = ROOT_BASE_SIZE + db->header.local_docs_root->reduce_value.size;
171    }
172    return sizeof(raw_file_header) + seqrootsize + idrootsize + localrootsize;
173}
174
175couchstore_error_t db_write_header(Db *db)
176{
177    sized_buf writebuf;
178    size_t seqrootsize, idrootsize, localrootsize;
179    writebuf.size = calculate_header_size(db, seqrootsize,
180                                          idrootsize, localrootsize);
181    writebuf.buf = (char *) cb_malloc(writebuf.size);
182    raw_file_header* header = (raw_file_header*)writebuf.buf;
183    header->version = encode_raw08(db->header.disk_version);
184    encode_raw48(db->header.update_seq, &header->update_seq);
185    encode_raw48(db->header.purge_seq, &header->purge_seq);
186    encode_raw48(db->header.purge_ptr, &header->purge_ptr);
187    header->seqrootsize = encode_raw16((uint16_t)seqrootsize);
188    header->idrootsize = encode_raw16((uint16_t)idrootsize);
189    header->localrootsize = encode_raw16((uint16_t)localrootsize);
190    uint8_t *root = (uint8_t*)(header + 1);
191    encode_root(root, db->header.by_seq_root);
192    root += seqrootsize;
193    encode_root(root, db->header.by_id_root);
194    root += idrootsize;
195    encode_root(root, db->header.local_docs_root);
196    cs_off_t pos;
197    couchstore_error_t errcode = write_header(&db->file, &writebuf, &pos);
198    if (errcode == COUCHSTORE_SUCCESS) {
199        db->header.position = pos;
200    }
201    cb_free(writebuf.buf);
202    return errcode;
203}
204
205static couchstore_error_t create_header(Db *db)
206{
207    // Select the version based upon selected CRC
208    if (db->file.crc_mode == CRC32) {
209        // user is creating down-level files
210        db->header.disk_version = COUCH_DISK_VERSION_11;
211    } else {
212        // user is using latest
213        db->header.disk_version = COUCH_DISK_VERSION;
214    }
215    db->header.update_seq = 0;
216    db->header.by_id_root = NULL;
217    db->header.by_seq_root = NULL;
218    db->header.local_docs_root = NULL;
219    db->header.purge_seq = 0;
220    db->header.purge_ptr = 0;
221    db->header.position = 0;
222    return db_write_header(db);
223}
224
225uint64_t couchstore_get_header_position(Db *db)
226{
227    return db->header.position;
228}
229
230/**
231 * Precommit should occur before writing a header, it has two
232 * purposes. Firstly it ensures data is written before we attempt
233 * to write the header. This means it's impossible for the header
234 * to be written before the data. This is accomplished through
235 * a sync.
236 *
237 * The second purpose is to extend the file to be large enough
238 * to include the subsequently written header. This is done so
239 * the fdatasync performed by writing a header doesn't have to
240 * do an additional (expensive) modified metadata flush on top
241 * of the one we're already doing.
242 */
243couchstore_error_t precommit(Db *db)
244{
245    cs_off_t curpos = db->file.pos;
246
247    db->file.pos = align_to_next_block(db->file.pos);
248    sized_buf zerobyte = { const_cast<char*>("\0"), 1};
249
250    size_t seqrootsize, idrootsize, localrootsize;
251    db->file.pos += calculate_header_size(db, seqrootsize,
252                                          idrootsize, localrootsize);
253
254    //Extend file size to where end of header will land before we do first sync
255    couchstore_error_t errcode = static_cast<couchstore_error_t>(
256        db_write_buf(&db->file, &zerobyte, NULL, NULL));
257
258    if (errcode == COUCHSTORE_SUCCESS) {
259        errcode = db->file.ops->sync(&db->file.lastError, db->file.handle);
260    }
261    // Move cursor back to where it was
262    db->file.pos = curpos;
263    return errcode;
264}
265
266couchstore_error_t couchstore_commit(Db *db)
267{
268    COLLECT_LATENCY();
269
270    couchstore_error_t errcode = precommit(db);
271
272    if (errcode == COUCHSTORE_SUCCESS) {
273        errcode = db_write_header(db);
274    }
275
276    if (errcode == COUCHSTORE_SUCCESS) {
277        errcode = db->file.ops->sync(&db->file.lastError, db->file.handle);
278    }
279
280    return errcode;
281}
282
283static tree_file_options get_tree_file_options_from_flags(couchstore_open_flags flags)
284{
285    tree_file_options options;
286
287    if (flags & COUCHSTORE_OPEN_FLAG_UNBUFFERED) {
288        options.buf_io_enabled = false;
289    } else if (flags & COUCHSTORE_OPEN_WITH_CUSTOM_BUFFER) {
290        // Buffered IO with custom buffer settings.
291        //  * First 4 bits [15:12]: read buffer capacity
292        //  * Next  4 bits [11:08]: max read buffer count
293
294        uint32_t unit_index = (flags >> 12) & 0xf;
295        if (unit_index) {
296            // unit_index    1     2     3     4     ...   15
297            // unit size     1KB   2KB   4KB   8KB   ...   16MB
298            options.buf_io_read_unit_size = 1024 * (1 << (unit_index -1));
299        }
300        uint32_t count_index = (flags >> 8) & 0xf;
301        if (count_index) {
302            // count_index   1     2     3     4     ...   15
303            // # buffers     8     16    32    64    ...   128K
304            options.buf_io_read_buffers = 8 * (1 << (count_index-1));
305        }
306    }
307
308    // Set default value first.
309    options.kp_nodesize = DB_KP_CHUNK_THRESHOLD;
310    options.kv_nodesize = DB_KV_CHUNK_THRESHOLD;
311    if (flags & COUCHSTORE_OPEN_WITH_CUSTOM_NODESIZE) {
312        // B+tree custom node size settings.
313        //  * First 4 bits [23:20]: KP node size
314        //  * Next  4 bits [19:16]: KV node size
315        uint32_t kp_flag = (flags >> 20) & 0xf;
316        if (kp_flag) {
317            options.kp_nodesize = kp_flag * 1024;
318        }
319        uint32_t kv_flag = (flags >> 16) & 0xf;
320        if (kv_flag) {
321            options.kv_nodesize = kv_flag * 1024;
322        }
323    }
324
325    if (flags & COUCHSTORE_OPEN_WITH_PERIODIC_SYNC) {
326        // Automatic sync() every N bytes written.
327        //  * 5 bits [28-24]: power-of-2 * 1kB
328        uint64_t sync_flag = (flags >> 24) & 0x1f;
329        options.periodic_sync_bytes = uint64_t(1024) << (sync_flag - 1);
330    }
331
332    /* set the tracing and validation options */
333    options.tracing_enabled = false;
334    options.write_validation_enabled = false;
335    options.mprotect_enabled = false;
336    if (flags & COUCHSTORE_OPEN_WITH_TRACING) {
337        options.tracing_enabled = true;
338    }
339    if (flags & COUCHSTORE_OPEN_WITH_WRITE_VALIDATION) {
340        options.write_validation_enabled = true;
341    }
342    if (flags & COUCHSTORE_OPEN_WITH_MPROTECT) {
343        options.mprotect_enabled = true;
344    }
345
346    return options;
347}
348
349couchstore_open_flags couchstore_encode_periodic_sync_flags(uint64_t bytes) {
350    // Convert to encoding supported by couchstore_open_flags - KB power-of-2
351    // value.
352    // Round up to whole kilobyte units.
353    const uint64_t kilobytes = (bytes + 1023) / 1024;
354    // Calculate the shift amount (what is the log2 power)
355    uint64_t shiftAmount = std::log2(kilobytes);
356    // Saturate if the user specified more than the encodable amount.
357    shiftAmount = std::min(shiftAmount, uint64_t(30));
358    // Finally, encode in couchstore_open flags
359    return ((shiftAmount + 1)) << 24;
360}
361
362couchstore_error_t couchstore_open_db(const char *filename,
363                                      couchstore_open_flags flags,
364                                      Db **pDb)
365{
366    return couchstore_open_db_ex(filename, flags,
367                                 couchstore_get_default_file_ops(), pDb);
368}
369
370couchstore_error_t couchstore_open_db_ex(const char *filename,
371                                         couchstore_open_flags flags,
372                                         FileOpsInterface* ops,
373                                         Db **pDb)
374{
375    COLLECT_LATENCY();
376
377    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
378    Db *db;
379    int openflags;
380    cs_off_t pos;
381
382    /* Sanity check input parameters */
383    if ((flags & COUCHSTORE_OPEN_FLAG_RDONLY) &&
384        (flags & COUCHSTORE_OPEN_FLAG_CREATE)) {
385        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
386    }
387
388    if ((db = static_cast<Db*>(cb_calloc(1, sizeof(Db)))) == NULL) {
389        return COUCHSTORE_ERROR_ALLOC_FAIL;
390    }
391
392    if (flags & COUCHSTORE_OPEN_FLAG_RDONLY) {
393        openflags = O_RDONLY;
394    } else {
395        openflags = O_RDWR;
396    }
397
398    if (flags & COUCHSTORE_OPEN_FLAG_CREATE) {
399        openflags |= O_CREAT;
400    }
401
402    // open with CRC unknown, CRC will be selected when header is read/or not found.
403    error_pass(tree_file_open(&db->file, filename, openflags, CRC_UNKNOWN, ops,
404                              get_tree_file_options_from_flags(flags)));
405
406    pos = db->file.ops->goto_eof(&db->file.lastError, db->file.handle);
407    db->file.pos = pos;
408    if (pos == 0) {
409        /* This is an empty file. Create a new fileheader unless the
410         * user wanted a read-only version of the file
411         */
412
413        if (flags & COUCHSTORE_OPEN_FLAG_RDONLY) {
414            error_pass(COUCHSTORE_ERROR_NO_HEADER);
415        } else {
416
417            // Select the CRC to use on this new file
418            if (flags & COUCHSTORE_OPEN_WITH_LEGACY_CRC) {
419                db->file.crc_mode = CRC32;
420            } else {
421                db->file.crc_mode = CRC32C;
422            }
423
424            error_pass(create_header(db));
425        }
426    } else if (pos > 0) {
427        error_pass(find_header(db, db->file.pos - 2));
428
429        if (db->header.disk_version <= COUCH_DISK_VERSION_11) {
430            db->file.crc_mode = CRC32;
431        } else {
432            cb_assert(db->header.disk_version >= COUCH_DISK_VERSION_12);
433            db->file.crc_mode = CRC32C;
434        }
435
436        // Not allowed. Can't request legacy_crc but be opening non legacy CRC files.
437        if (db->file.crc_mode == CRC32C && flags & COUCHSTORE_OPEN_WITH_LEGACY_CRC) {
438            errcode = COUCHSTORE_ERROR_INVALID_ARGUMENTS;
439            goto cleanup;
440        }
441    } else {
442        error_pass(static_cast<couchstore_error_t>(db->file.pos));
443    }
444
445    *pDb = db;
446    db->dropped = 0;
447
448cleanup:
449    if (errcode != COUCHSTORE_SUCCESS) {
450        couchstore_close_file(db);
451        couchstore_free_db(db);
452    }
453
454    return errcode;
455}
456
457couchstore_error_t couchstore_close_file(Db* db)
458{
459    COLLECT_LATENCY();
460
461    if(db->dropped) {
462        return COUCHSTORE_SUCCESS;
463    }
464    couchstore_error_t error = tree_file_close(&db->file);
465    db->dropped = 1;
466    return error;
467}
468
469couchstore_error_t couchstore_rewind_db_header(Db *db)
470{
471    COLLECT_LATENCY();
472
473    couchstore_error_t errcode;
474    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
475    // free current header guts
476    cb_free(db->header.by_id_root);
477    cb_free(db->header.by_seq_root);
478    cb_free(db->header.local_docs_root);
479    db->header.by_id_root = NULL;
480    db->header.by_seq_root = NULL;
481    db->header.local_docs_root = NULL;
482
483    error_unless(db->header.position != 0, COUCHSTORE_ERROR_DB_NO_LONGER_VALID);
484    // find older header
485    error_pass(find_header(db, db->header.position - 2));
486
487cleanup:
488    // if we failed, free the handle and return an error
489    if(errcode != COUCHSTORE_SUCCESS) {
490        couchstore_close_file(db);
491        couchstore_free_db(db);
492        errcode = COUCHSTORE_ERROR_DB_NO_LONGER_VALID;
493    }
494    return errcode;
495}
496
497couchstore_error_t couchstore_free_db(Db* db)
498{
499    COLLECT_LATENCY();
500
501    if(!db) {
502        return COUCHSTORE_SUCCESS;
503    }
504
505    if(!db->dropped) {
506        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
507    }
508
509    cb_free(db->header.by_id_root);
510    cb_free(db->header.by_seq_root);
511    cb_free(db->header.local_docs_root);
512    db->header.by_id_root = NULL;
513    db->header.by_seq_root = NULL;
514    db->header.local_docs_root = NULL;
515
516    memset(db, 0xa5, sizeof(*db));
517    cb_free(db);
518
519    return COUCHSTORE_SUCCESS;
520}
521
522const char* couchstore_get_db_filename(Db *db) {
523    return db->file.path;
524}
525
526FileOpsInterface::FHStats* couchstore_get_db_filestats(Db* db) {
527    return db->file.ops->get_stats(db->file.handle);
528}
529
530DocInfo* couchstore_alloc_docinfo(const sized_buf *id, const sized_buf *rev_meta) {
531    size_t size = sizeof(DocInfo);
532    if (id) {
533        size += id->size;
534    }
535    if (rev_meta) {
536        size += rev_meta->size;
537    }
538    DocInfo* docInfo = static_cast<DocInfo*>(cb_malloc(size));
539    if (!docInfo) {
540        return NULL;
541    }
542    memset(docInfo, 0, sizeof(DocInfo));
543    char *extra = (char *)docInfo + sizeof(DocInfo);
544    if (id) {
545        memcpy(extra, id->buf, id->size);
546        docInfo->id.buf = extra;
547        docInfo->id.size = id->size;
548        extra += id->size;
549    }
550    if (rev_meta) {
551        memcpy(extra, rev_meta->buf, rev_meta->size);
552        docInfo->rev_meta.buf = extra;
553        docInfo->rev_meta.size = rev_meta->size;
554    }
555    return docInfo;
556}
557
558void couchstore_free_docinfo(DocInfo *docinfo)
559{
560    cb_free(docinfo);
561}
562
563void couchstore_free_document(Doc *doc)
564{
565    if (doc) {
566        size_t offset = offsetof(fatbuf, buf);
567        fatbuf_free((fatbuf *) ((char *)doc - (char *)offset));
568    }
569}
570
571couchstore_error_t by_seq_read_docinfo(DocInfo **pInfo,
572                                       const sized_buf *k,
573                                       const sized_buf *v)
574{
575    const raw_seq_index_value *raw = (const raw_seq_index_value*)v->buf;
576    ssize_t extraSize = v->size - sizeof(*raw);
577    if (extraSize < 0) {
578        return COUCHSTORE_ERROR_CORRUPT;
579    }
580
581    uint32_t idsize, datasize;
582    decode_kv_length(&raw->sizes, &idsize, &datasize);
583    uint64_t bp = decode_raw48(raw->bp);
584    int deleted = (bp & BP_DELETED_FLAG) != 0;
585    bp &= ~BP_DELETED_FLAG;
586    uint8_t content_meta = decode_raw08(raw->content_meta);
587    uint64_t rev_seq = decode_raw48(raw->rev_seq);
588    uint64_t db_seq = decode_sequence_key(k);
589
590    sized_buf id = {v->buf + sizeof(*raw), idsize};
591    sized_buf rev_meta = {id.buf + idsize, extraSize - id.size};
592    DocInfo* docInfo = couchstore_alloc_docinfo(&id, &rev_meta);
593    if (!docInfo) {
594        return COUCHSTORE_ERROR_ALLOC_FAIL;
595    }
596
597    docInfo->db_seq = db_seq;
598    docInfo->rev_seq = rev_seq;
599    docInfo->deleted = deleted;
600    docInfo->bp = bp;
601    docInfo->size = datasize;
602    docInfo->content_meta = content_meta;
603    *pInfo = docInfo;
604    return COUCHSTORE_SUCCESS;
605}
606
607couchstore_error_t by_id_read_docinfo(DocInfo** pInfo,
608                                      const sized_buf* k,
609                                      const sized_buf* v) {
610    const raw_id_index_value *raw = (const raw_id_index_value*)v->buf;
611    ssize_t revMetaSize = v->size - sizeof(*raw);
612    if (revMetaSize < 0) {
613        return COUCHSTORE_ERROR_CORRUPT;
614    }
615
616    uint32_t datasize, deleted;
617    uint8_t content_meta;
618    uint64_t bp, seq, revnum;
619
620    seq = decode_raw48(raw->db_seq);
621    datasize = decode_raw32(raw->size);
622    bp = decode_raw48(raw->bp);
623    deleted = (bp & BP_DELETED_FLAG) != 0;
624    bp &= ~BP_DELETED_FLAG;
625    content_meta = decode_raw08(raw->content_meta);
626    revnum = decode_raw48(raw->rev_seq);
627
628    sized_buf rev_meta = {v->buf + sizeof(*raw), static_cast<size_t>(revMetaSize)};
629    DocInfo* docInfo = couchstore_alloc_docinfo(k, &rev_meta);
630    if (!docInfo) {
631        return COUCHSTORE_ERROR_ALLOC_FAIL;
632    }
633
634    docInfo->db_seq = seq;
635    docInfo->rev_seq = revnum;
636    docInfo->deleted = deleted;
637    docInfo->bp = bp;
638    docInfo->size = datasize;
639    docInfo->content_meta = content_meta;
640    *pInfo = docInfo;
641    return COUCHSTORE_SUCCESS;
642}
643
644//Fill in doc from reading file.
645static couchstore_error_t bp_to_doc(Doc **pDoc, Db *db, cs_off_t bp, couchstore_open_options options)
646{
647    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
648    int bodylen = 0;
649    char *docbody = NULL;
650    fatbuf *docbuf = NULL;
651    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
652
653    if (options & DECOMPRESS_DOC_BODIES) {
654        bodylen = pread_compressed(&db->file, bp, &docbody);
655    } else {
656        bodylen = pread_bin(&db->file, bp, &docbody);
657    }
658
659    error_unless(bodylen >= 0, static_cast<couchstore_error_t>(bodylen));    // if bodylen is negative it's an error code
660    error_unless(docbody || bodylen == 0, COUCHSTORE_ERROR_READ);
661
662    error_unless(docbuf = fatbuf_alloc(sizeof(Doc) + bodylen), COUCHSTORE_ERROR_ALLOC_FAIL);
663    *pDoc = (Doc *) fatbuf_get(docbuf, sizeof(Doc));
664
665    if (bodylen == 0) { //Empty doc
666        (*pDoc)->data.buf = NULL;
667        (*pDoc)->data.size = 0;
668        cb_free(docbody);
669        return COUCHSTORE_SUCCESS;
670    }
671
672    (*pDoc)->data.buf = (char *) fatbuf_get(docbuf, bodylen);
673    (*pDoc)->data.size = bodylen;
674    memcpy((*pDoc)->data.buf, docbody, bodylen);
675
676cleanup:
677    cb_free(docbody);
678    if (errcode < 0) {
679        fatbuf_free(docbuf);
680    }
681    return errcode;
682}
683
684static couchstore_error_t docinfo_fetch_by_id(couchfile_lookup_request *rq,
685                                              const sized_buf *k,
686                                              const sized_buf *v)
687{
688    DocInfo **pInfo = (DocInfo **) rq->callback_ctx;
689    if (v == NULL) {
690        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
691    }
692    return by_id_read_docinfo(pInfo, k, v);
693}
694
695static couchstore_error_t docinfo_fetch_by_seq(couchfile_lookup_request *rq,
696                                               const sized_buf *k,
697                                               const sized_buf *v)
698{
699    DocInfo **pInfo = (DocInfo **) rq->callback_ctx;
700    if (v == NULL) {
701        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
702    }
703    return by_seq_read_docinfo(pInfo, k, v);
704}
705
706couchstore_error_t couchstore_docinfo_by_id(Db *db,
707                                            const void *id,
708                                            size_t idlen,
709                                            DocInfo **pInfo)
710{
711    COLLECT_LATENCY();
712
713    sized_buf key;
714    sized_buf *keylist = &key;
715    couchfile_lookup_request rq;
716    couchstore_error_t errcode;
717    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
718
719    if (db->header.by_id_root == NULL) {
720        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
721    }
722
723    key.buf = (char *) id;
724    key.size = idlen;
725
726    rq.cmp.compare = ebin_cmp;
727    rq.file = &db->file;
728    rq.num_keys = 1;
729    rq.keys = &keylist;
730    rq.callback_ctx = pInfo;
731    rq.fetch_callback = docinfo_fetch_by_id;
732    rq.node_callback = NULL;
733    rq.fold = 0;
734
735    errcode = btree_lookup(&rq, db->header.by_id_root->pointer);
736    if (errcode == COUCHSTORE_SUCCESS) {
737        if (*pInfo == NULL) {
738            errcode = COUCHSTORE_ERROR_DOC_NOT_FOUND;
739        }
740    }
741cleanup:
742    return errcode;
743}
744
745couchstore_error_t couchstore_docinfo_by_sequence(Db *db,
746                                                  uint64_t sequence,
747                                                  DocInfo **pInfo)
748{
749    COLLECT_LATENCY();
750
751    sized_buf key;
752    sized_buf *keylist = &key;
753    couchfile_lookup_request rq;
754    couchstore_error_t errcode;
755    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
756
757    if (db->header.by_id_root == NULL) {
758        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
759    }
760
761    sequence = htonll(sequence);
762    key.buf = (char *)&sequence + 2;
763    key.size = 6;
764
765    rq.cmp.compare = seq_cmp;
766    rq.file = &db->file;
767    rq.num_keys = 1;
768    rq.keys = &keylist;
769    rq.callback_ctx = pInfo;
770    rq.fetch_callback = docinfo_fetch_by_seq;
771    rq.node_callback = NULL;
772    rq.fold = 0;
773
774    errcode = btree_lookup(&rq, db->header.by_seq_root->pointer);
775    if (errcode == COUCHSTORE_SUCCESS) {
776        if (*pInfo == NULL) {
777            errcode = COUCHSTORE_ERROR_DOC_NOT_FOUND;
778        }
779    }
780cleanup:
781    return errcode;
782}
783
784couchstore_error_t couchstore_open_doc_with_docinfo(Db *db,
785                                                    const DocInfo *docinfo,
786                                                    Doc **pDoc,
787                                                    couchstore_open_options options)
788{
789    COLLECT_LATENCY();
790
791    couchstore_error_t errcode;
792
793    *pDoc = NULL;
794    if (docinfo->bp == 0) {
795        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
796    }
797
798    if (!(docinfo->content_meta & COUCH_DOC_IS_COMPRESSED)) {
799        options &= ~DECOMPRESS_DOC_BODIES;
800    }
801
802    errcode = bp_to_doc(pDoc, db, docinfo->bp, options);
803    if (errcode == COUCHSTORE_SUCCESS) {
804        (*pDoc)->id.buf = docinfo->id.buf;
805        (*pDoc)->id.size = docinfo->id.size;
806    }
807
808    return errcode;
809}
810
811couchstore_error_t couchstore_open_document(Db *db,
812                                            const void *id,
813                                            size_t idlen,
814                                            Doc **pDoc,
815                                            couchstore_open_options options)
816{
817    COLLECT_LATENCY();
818
819    couchstore_error_t errcode;
820    DocInfo *info;
821    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
822    *pDoc = NULL;
823    errcode = couchstore_docinfo_by_id(db, id, idlen, &info);
824    if (errcode == COUCHSTORE_SUCCESS) {
825        errcode = couchstore_open_doc_with_docinfo(db, info, pDoc, options);
826        if (errcode == COUCHSTORE_SUCCESS) {
827            (*pDoc)->id.buf = (char *) id;
828            (*pDoc)->id.size = idlen;
829        }
830
831        couchstore_free_docinfo(info);
832    }
833cleanup:
834    return errcode;
835}
836
837// context info passed to lookup_callback via btree_lookup
838typedef struct {
839    Db *db;
840    couchstore_docinfos_options options;
841    couchstore_changes_callback_fn callback;
842    void* callback_context;
843    int by_id;
844    int depth;
845    couchstore_walk_tree_callback_fn walk_callback;
846} lookup_context;
847
848// btree_lookup callback, called while iterating keys
849static couchstore_error_t lookup_callback(couchfile_lookup_request *rq,
850                                          const sized_buf *k,
851                                          const sized_buf *v)
852{
853    if (v == NULL) {
854        return COUCHSTORE_SUCCESS;
855    }
856
857    const lookup_context *context = static_cast<const lookup_context *>(rq->callback_ctx);
858    DocInfo *docinfo = NULL;
859    couchstore_error_t errcode;
860    if (context->by_id) {
861        errcode = by_id_read_docinfo(&docinfo, k, v);
862    } else {
863        errcode = by_seq_read_docinfo(&docinfo, k, v);
864    }
865    if (errcode == COUCHSTORE_ERROR_CORRUPT &&
866        (context->options & COUCHSTORE_TOLERATE_CORRUPTION)) {
867        // Invoke callback even if doc info is corrupted/unreadable, if magic flag is set
868        docinfo = static_cast<DocInfo*>(cb_calloc(sizeof(DocInfo), 1));
869        docinfo->id = *k;
870        docinfo->rev_meta = *v;
871    } else if (errcode) {
872        return errcode;
873    }
874
875    if ((context->options & COUCHSTORE_DELETES_ONLY) && docinfo->deleted == 0) {
876        couchstore_free_docinfo(docinfo);
877        return COUCHSTORE_SUCCESS;
878    }
879
880    if ((context->options & COUCHSTORE_NO_DELETES) && docinfo->deleted == 1) {
881        couchstore_free_docinfo(docinfo);
882        return COUCHSTORE_SUCCESS;
883    }
884
885    if (context->walk_callback) {
886        errcode = static_cast<couchstore_error_t>(context->walk_callback(context->db,
887                                                                         context->depth,
888                                                                         docinfo,
889                                                                         0,
890                                                                         NULL,
891                                                                         context->callback_context));
892    } else {
893        errcode = static_cast<couchstore_error_t>(context->callback(context->db,
894                                                                    docinfo,
895                                                                    context->callback_context));
896    }
897    if (errcode <= 0) {
898        couchstore_free_docinfo(docinfo);
899    } else {
900        // User requested docinfo not be freed, don't free it, return success
901        return COUCHSTORE_SUCCESS;
902    }
903    return errcode;
904}
905
906couchstore_error_t couchstore_changes_since(Db *db,
907                                            uint64_t since,
908                                            couchstore_docinfos_options options,
909                                            couchstore_changes_callback_fn callback,
910                                            void *ctx)
911{
912    COLLECT_LATENCY();
913
914    char since_termbuf[6];
915    sized_buf since_term;
916    sized_buf *keylist = &since_term;
917    lookup_context cbctx = {db, options, callback, ctx, 0, 0, NULL};
918    couchfile_lookup_request rq;
919    couchstore_error_t errcode;
920
921    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
922    if (db->header.by_seq_root == NULL) {
923        return COUCHSTORE_SUCCESS;
924    }
925
926    since_term.buf = since_termbuf;
927    since_term.size = 6;
928    encode_raw48(since, (raw_48*)since_term.buf);
929
930    rq.cmp.compare = seq_cmp;
931    rq.file = &db->file;
932    rq.num_keys = 1;
933    rq.keys = &keylist;
934    rq.callback_ctx = &cbctx;
935    rq.fetch_callback = lookup_callback;
936    rq.node_callback = NULL;
937    rq.fold = 1;
938    rq.tolerate_corruption = (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0;
939
940    errcode = btree_lookup(&rq, db->header.by_seq_root->pointer);
941cleanup:
942    return errcode;
943}
944
945couchstore_error_t couchstore_all_docs(Db *db,
946                                       const sized_buf* startKeyPtr,
947                                       couchstore_docinfos_options options,
948                                       couchstore_changes_callback_fn callback,
949                                       void *ctx)
950{
951    COLLECT_LATENCY();
952
953    sized_buf startKey = {NULL, 0};
954    sized_buf *keylist = &startKey;
955    lookup_context cbctx = {db, options, callback, ctx, 1, 0, NULL};
956    couchfile_lookup_request rq;
957    couchstore_error_t errcode;
958
959    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
960    if (db->header.by_id_root == NULL) {
961        return COUCHSTORE_SUCCESS;
962    }
963
964    if (startKeyPtr) {
965        startKey = *startKeyPtr;
966    }
967
968    rq.cmp.compare = ebin_cmp;
969    rq.file = &db->file;
970    rq.num_keys = 1;
971    rq.keys = &keylist;
972    rq.callback_ctx = &cbctx;
973    rq.fetch_callback = lookup_callback;
974    rq.node_callback = NULL;
975    rq.fold = 1;
976    rq.tolerate_corruption = (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0;
977
978    errcode = btree_lookup(&rq, db->header.by_id_root->pointer);
979cleanup:
980    return errcode;
981}
982
983static couchstore_error_t walk_node_callback(struct couchfile_lookup_request *rq,
984                                                 uint64_t subtreeSize,
985                                                 const sized_buf *reduceValue)
986{
987    lookup_context* context = static_cast<lookup_context*>(rq->callback_ctx);
988    if (reduceValue) {
989        int result = context->walk_callback(context->db,
990                                            context->depth,
991                                            NULL,
992                                            subtreeSize,
993                                            reduceValue,
994                                            context->callback_context);
995        context->depth++;
996        if (result < 0)
997            return static_cast<couchstore_error_t>(result);
998    } else {
999        context->depth--;
1000    }
1001    return COUCHSTORE_SUCCESS;
1002}
1003
1004static
1005couchstore_error_t couchstore_walk_tree(Db *db,
1006                                        int by_id,
1007                                        const node_pointer* root,
1008                                        const sized_buf* startKeyPtr,
1009                                        couchstore_docinfos_options options,
1010                                        int (*compare)(const sized_buf *k1, const sized_buf *k2),
1011                                        couchstore_walk_tree_callback_fn callback,
1012                                        void *ctx)
1013{
1014    couchstore_error_t errcode;
1015    sized_buf startKey = {NULL, 0};
1016    sized_buf *keylist;
1017    couchfile_lookup_request rq;
1018
1019    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1020    if (root == NULL) {
1021        return COUCHSTORE_SUCCESS;
1022    }
1023
1024    // Invoke the callback on the root node:
1025    errcode = static_cast<couchstore_error_t>(callback(db, 0, NULL,
1026                                                       root->subtreesize,
1027                                                       &root->reduce_value,
1028                                                       ctx));
1029    if (errcode < 0) {
1030        return errcode;
1031    }
1032
1033    if (startKeyPtr) {
1034        startKey = *startKeyPtr;
1035    }
1036    keylist = &startKey;
1037
1038    {
1039        // Create a new scope here just to mute the warning from the
1040        // compiler that the goto in the macro error_unless
1041        // skips the initialization of lookup_ctx..
1042        lookup_context lookup_ctx = {db, options, NULL, ctx, by_id, 1, callback};
1043
1044        rq.cmp.compare = compare;
1045        rq.file = &db->file;
1046        rq.num_keys = 1;
1047        rq.keys = &keylist;
1048        rq.callback_ctx = &lookup_ctx;
1049        rq.fetch_callback = lookup_callback;
1050        rq.node_callback = walk_node_callback;
1051        rq.fold = 1;
1052        rq.tolerate_corruption = (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0;
1053
1054        error_pass(btree_lookup(&rq, root->pointer));
1055    }
1056cleanup:
1057    return errcode;
1058}
1059
1060couchstore_error_t couchstore_walk_id_tree(Db *db,
1061                                           const sized_buf* startDocID,
1062                                           couchstore_docinfos_options options,
1063                                           couchstore_walk_tree_callback_fn callback,
1064                                           void *ctx)
1065{
1066    COLLECT_LATENCY();
1067
1068    return couchstore_walk_tree(db, 1, db->header.by_id_root, startDocID,
1069                                options, ebin_cmp, callback, ctx);
1070}
1071
1072couchstore_error_t couchstore_walk_seq_tree(Db *db,
1073                                           uint64_t startSequence,
1074                                           couchstore_docinfos_options options,
1075                                           couchstore_walk_tree_callback_fn callback,
1076                                           void *ctx)
1077{
1078    COLLECT_LATENCY();
1079
1080    raw_48 start_termbuf;
1081    encode_raw48(startSequence, &start_termbuf);
1082    sized_buf start_term = {(char*)&start_termbuf, 6};
1083
1084    return couchstore_walk_tree(db, 0, db->header.by_seq_root, &start_term,
1085                                options, seq_cmp, callback, ctx);
1086}
1087
1088static int id_ptr_cmp(const void *a, const void *b)
1089{
1090    sized_buf **buf1 = (sized_buf**) a;
1091    sized_buf **buf2 = (sized_buf**) b;
1092    return ebin_cmp(*buf1, *buf2);
1093}
1094
1095static int seq_ptr_cmp(const void *a, const void *b)
1096{
1097    sized_buf **buf1 = (sized_buf**) a;
1098    sized_buf **buf2 = (sized_buf**) b;
1099    return seq_cmp(*buf1, *buf2);
1100}
1101
1102// Common subroutine of couchstore_docinfos_by_{ids, sequence}
1103static couchstore_error_t iterate_docinfos(Db *db,
1104                                           const sized_buf keys[],
1105                                           unsigned numDocs,
1106                                           node_pointer *tree,
1107                                           int (*key_ptr_compare)(const void *, const void *),
1108                                           int (*key_compare)(const sized_buf *k1, const sized_buf *k2),
1109                                           couchstore_changes_callback_fn callback,
1110                                           int fold,
1111                                           int tolerate_corruption,
1112                                           void *ctx)
1113{
1114    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
1115    const sized_buf **keyptrs = NULL;
1116    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1117    // Nothing to do if the tree is empty
1118    if (tree == NULL) {
1119        return COUCHSTORE_SUCCESS;
1120    }
1121
1122    if(numDocs <= 0) {
1123        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
1124    }
1125
1126    // Create an array of *pointers to* sized_bufs, which is what btree_lookup wants:
1127    keyptrs = static_cast<const sized_buf**>(cb_malloc(numDocs * sizeof(sized_buf*)));
1128    error_unless(keyptrs, COUCHSTORE_ERROR_ALLOC_FAIL);
1129
1130    {
1131        unsigned i;
1132        for (i = 0; i< numDocs; ++i) {
1133            keyptrs[i] = &keys[i];
1134        }
1135        if (!fold) {
1136            // Sort the key pointers:
1137            qsort(keyptrs, numDocs, sizeof(keyptrs[0]), key_ptr_compare);
1138        }
1139
1140        // Construct the lookup request:
1141        lookup_context cbctx = {db, 0, callback, ctx, (tree == db->header.by_id_root), 0, NULL};
1142        couchfile_lookup_request rq;
1143        rq.cmp.compare = key_compare;
1144        rq.file = &db->file;
1145        rq.num_keys = numDocs;
1146        rq.keys = (sized_buf**) keyptrs;
1147        rq.callback_ctx = &cbctx;
1148        rq.fetch_callback = lookup_callback;
1149        rq.node_callback = NULL;
1150        rq.fold = fold;
1151        rq.tolerate_corruption = tolerate_corruption;
1152
1153        // Go!
1154        error_pass(btree_lookup(&rq, tree->pointer));
1155    }
1156cleanup:
1157    cb_free(keyptrs);
1158    return errcode;
1159}
1160
1161couchstore_error_t couchstore_docinfos_by_id(Db *db,
1162                                             const sized_buf ids[],
1163                                             unsigned numDocs,
1164                                             couchstore_docinfos_options options,
1165                                             couchstore_changes_callback_fn callback,
1166                                             void *ctx)
1167{
1168    COLLECT_LATENCY();
1169
1170    return iterate_docinfos(db, ids, numDocs,
1171                            db->header.by_id_root, id_ptr_cmp, ebin_cmp,
1172                            callback,
1173                            (options & RANGES) != 0,
1174                            (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0,
1175                            ctx);
1176}
1177
1178couchstore_error_t couchstore_docinfos_by_sequence(Db *db,
1179                                                   const uint64_t sequence[],
1180                                                   unsigned numDocs,
1181                                                   couchstore_docinfos_options options,
1182                                                   couchstore_changes_callback_fn callback,
1183                                                   void *ctx)
1184{
1185    COLLECT_LATENCY();
1186
1187    // Create the array of keys:
1188    sized_buf *keylist = static_cast<sized_buf*>(cb_malloc(numDocs * sizeof(sized_buf)));
1189    raw_by_seq_key *keyvalues = static_cast<raw_by_seq_key*>(cb_malloc(numDocs * sizeof(raw_by_seq_key)));
1190    couchstore_error_t errcode;
1191    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1192    error_unless(keylist && keyvalues, COUCHSTORE_ERROR_ALLOC_FAIL);
1193    unsigned i;
1194    for (i = 0; i< numDocs; ++i) {
1195        encode_raw48(sequence[i], &keyvalues[i].sequence);
1196        keylist[i].buf = static_cast<char*>((void*) &keyvalues[i]);
1197        keylist[i].size = sizeof(keyvalues[i]);
1198    }
1199
1200    error_pass(iterate_docinfos(db, keylist, numDocs,
1201                                db->header.by_seq_root, seq_ptr_cmp, seq_cmp,
1202                                callback,
1203                                (options & RANGES) != 0,
1204                                (options & COUCHSTORE_TOLERATE_CORRUPTION) != 0,
1205                                ctx));
1206cleanup:
1207    cb_free(keylist);
1208    cb_free(keyvalues);
1209    return errcode;
1210}
1211
1212couchstore_error_t couchstore_db_info(Db *db, DbInfo* dbinfo) {
1213    if (db == NULL || dbinfo == NULL) {
1214        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
1215    }
1216    const node_pointer *id_root = db->header.by_id_root;
1217    const node_pointer *seq_root = db->header.by_seq_root;
1218    const node_pointer *local_root = db->header.local_docs_root;
1219    dbinfo->filename = db->file.path;
1220    dbinfo->header_position = db->header.position;
1221    dbinfo->last_sequence = db->header.update_seq;
1222    dbinfo->purge_seq = db->header.purge_seq;
1223    dbinfo->deleted_count = dbinfo->doc_count = dbinfo->space_used = 0;
1224    dbinfo->file_size = db->file.pos;
1225    if (id_root) {
1226        raw_by_id_reduce* id_reduce = (raw_by_id_reduce*) id_root->reduce_value.buf;
1227        dbinfo->doc_count = decode_raw40(id_reduce->notdeleted);
1228        dbinfo->deleted_count = decode_raw40(id_reduce->deleted);
1229        dbinfo->space_used = decode_raw48(id_reduce->size);
1230        dbinfo->space_used += id_root->subtreesize;
1231    }
1232    if(seq_root) {
1233        dbinfo->space_used += seq_root->subtreesize;
1234    }
1235    if(local_root) {
1236        dbinfo->space_used += local_root->subtreesize;
1237    }
1238    return COUCHSTORE_SUCCESS;
1239}
1240
1241static couchstore_error_t local_doc_fetch(couchfile_lookup_request *rq,
1242                                          const sized_buf *k,
1243                                          const sized_buf *v)
1244{
1245    LocalDoc **lDoc = (LocalDoc **) rq->callback_ctx;
1246    LocalDoc *dp;
1247
1248    if (!v) {
1249        *lDoc = NULL;
1250        return COUCHSTORE_SUCCESS;
1251    }
1252    fatbuf *ldbuf = fatbuf_alloc(sizeof(LocalDoc) + k->size + v->size);
1253    if (ldbuf == NULL) {
1254        return COUCHSTORE_ERROR_ALLOC_FAIL;
1255    }
1256
1257    dp = *lDoc = (LocalDoc *) fatbuf_get(ldbuf, sizeof(LocalDoc));
1258    dp->id.buf = (char *) fatbuf_get(ldbuf, k->size);
1259    dp->id.size = k->size;
1260
1261    dp->json.buf = (char *) fatbuf_get(ldbuf, v->size);
1262    dp->json.size = v->size;
1263
1264    dp->deleted = 0;
1265
1266    memcpy(dp->id.buf, k->buf, k->size);
1267    memcpy(dp->json.buf, v->buf, v->size);
1268
1269    return COUCHSTORE_SUCCESS;
1270}
1271
1272couchstore_error_t couchstore_open_local_document(Db *db,
1273                                                  const void *id,
1274                                                  size_t idlen,
1275                                                  LocalDoc **pDoc)
1276{
1277    sized_buf key;
1278    sized_buf *keylist = &key;
1279    couchfile_lookup_request rq;
1280    couchstore_error_t errcode;
1281    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1282    if (db->header.local_docs_root == NULL) {
1283        return COUCHSTORE_ERROR_DOC_NOT_FOUND;
1284    }
1285
1286    key.buf = (char *) id;
1287    key.size = idlen;
1288
1289    rq.cmp.compare = ebin_cmp;
1290    rq.file = &db->file;
1291    rq.num_keys = 1;
1292    rq.keys = &keylist;
1293    rq.callback_ctx = pDoc;
1294    rq.fetch_callback = local_doc_fetch;
1295    rq.node_callback = NULL;
1296    rq.fold = 0;
1297
1298    errcode = btree_lookup(&rq, db->header.local_docs_root->pointer);
1299    if (errcode == COUCHSTORE_SUCCESS) {
1300        if (*pDoc == NULL) {
1301            errcode = COUCHSTORE_ERROR_DOC_NOT_FOUND;
1302        }
1303    }
1304cleanup:
1305    return errcode;
1306}
1307
1308couchstore_error_t couchstore_save_local_document(Db *db, LocalDoc *lDoc)
1309{
1310    couchstore_error_t errcode;
1311    couchfile_modify_action ldupdate;
1312    couchfile_modify_request rq;
1313    node_pointer *nroot = NULL;
1314    error_unless(!db->dropped, COUCHSTORE_ERROR_FILE_CLOSED);
1315
1316    if (lDoc->deleted) {
1317        ldupdate.setType(ACTION_REMOVE);
1318    } else {
1319        ldupdate.setType(ACTION_INSERT);
1320    }
1321
1322    ldupdate.setKey(&lDoc->id);
1323    ldupdate.data = &lDoc->json;
1324
1325    rq.cmp.compare = ebin_cmp;
1326    rq.num_actions = 1;
1327    rq.actions = &ldupdate;
1328    rq.fetch_callback = NULL;
1329    rq.reduce = NULL;
1330    rq.rereduce = NULL;
1331    rq.file = &db->file;
1332    rq.enable_purging = false;
1333    rq.purge_kp = NULL;
1334    rq.purge_kv = NULL;
1335    rq.compacting = 0;
1336    rq.kv_chunk_threshold = db->file.options.kv_nodesize;
1337    rq.kp_chunk_threshold = db->file.options.kp_nodesize;
1338
1339    nroot = modify_btree(&rq, db->header.local_docs_root, &errcode);
1340    if (errcode == COUCHSTORE_SUCCESS && nroot != db->header.local_docs_root) {
1341        cb_free(db->header.local_docs_root);
1342        db->header.local_docs_root = nroot;
1343    }
1344
1345cleanup:
1346    return errcode;
1347}
1348
1349void couchstore_free_local_document(LocalDoc *lDoc)
1350{
1351    if (lDoc) {
1352        size_t offset = offsetof(fatbuf, buf);
1353        fatbuf_free((fatbuf *) ((char *)lDoc - (char *)offset));
1354    }
1355}
1356
1357couchstore_error_t couchstore_last_os_error(const Db *db,
1358                                            char* buf,
1359                                            size_t size) {
1360    if (db == NULL || buf == nullptr || size == 0) {
1361        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
1362    }
1363    const couchstore_error_info_t *err = &db->file.lastError;
1364
1365    int nw;
1366
1367#ifdef WIN32
1368    char* win_msg = NULL;
1369    FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER |
1370                   FORMAT_MESSAGE_FROM_SYSTEM |
1371                   FORMAT_MESSAGE_IGNORE_INSERTS,
1372                   NULL, err->error,
1373                   MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
1374                   (LPTSTR) &win_msg,
1375                   0, NULL);
1376    nw = _snprintf(buf, size, "WINAPI error = %d: '%s'", err->error, win_msg);
1377    LocalFree(win_msg);
1378#else
1379    nw = snprintf(buf, size, "errno = %d: '%s'",
1380                      err->error, strerror(err->error));
1381#endif
1382
1383    if (nw < 0) {
1384        return COUCHSTORE_ERROR_ALLOC_FAIL;
1385    } if (size_t(nw) >= size) {
1386        /* Truncate the error message */
1387        buf[size - 1] = '\0';
1388    }
1389
1390    return COUCHSTORE_SUCCESS;
1391}
1392
1393couchstore_error_t couchstore_last_internal_error(const Db *db,
1394                                                  char* buf,
1395                                                  size_t size) {
1396    if (db == NULL || buf == nullptr || size == 0) {
1397        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
1398    }
1399
1400    int nw;
1401
1402    nw = snprintf(buf, size, "'%s'", internal_error_string);
1403    if (nw < 0) {
1404        return COUCHSTORE_ERROR_ALLOC_FAIL;
1405    }
1406    return COUCHSTORE_SUCCESS;
1407}
1408
1409static couchstore_error_t btree_eval_seq_reduce(Db *db,
1410                                                uint64_t *accum,
1411                                                sized_buf *left,
1412                                                sized_buf *right,
1413                                                bool past_left_edge,
1414                                                uint64_t diskpos) {
1415    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
1416    int bufpos = 1, nodebuflen = 0;
1417    int node_type;
1418    char *nodebuf = NULL;
1419    nodebuflen = pread_compressed(&db->file, diskpos, &nodebuf);
1420    error_unless(nodebuflen >= 0, (static_cast<couchstore_error_t>(nodebuflen)));  // if negative, it's an error code
1421
1422    node_type = nodebuf[0];
1423    while(bufpos < nodebuflen) {
1424        sized_buf k, v;
1425        bufpos += read_kv(nodebuf + bufpos, &k, &v);
1426        int left_cmp = seq_cmp(&k, left);
1427        int right_cmp = seq_cmp(&k, right);
1428        if(left_cmp < 0) {
1429            continue;
1430        }
1431        if(node_type == KP_NODE) {
1432            // In-range Item in a KP Node
1433            const raw_node_pointer *raw = (const raw_node_pointer*)v.buf;
1434            const raw_by_seq_reduce *rawreduce = (const raw_by_seq_reduce*) (v.buf + sizeof(raw_node_pointer));
1435            uint64_t subcount = decode_raw40(rawreduce->count);
1436            uint64_t pointer = decode_raw48(raw->pointer);
1437            if((left_cmp >= 0 && !past_left_edge) || right_cmp >= 0) {
1438                error_pass(btree_eval_seq_reduce(db, accum, left, right, past_left_edge, pointer));
1439                if(right_cmp >= 0) {
1440                    break;
1441                } else {
1442                    past_left_edge = true;
1443                }
1444            } else {
1445                *accum += subcount;
1446            }
1447        } else {
1448            if(right_cmp > 0) {
1449                break;
1450            }
1451            // In-range Item in a KV Node
1452            *accum += 1;
1453        }
1454    }
1455cleanup:
1456    if (nodebuf) {
1457        cb_free(nodebuf);
1458    }
1459    return errcode;
1460}
1461
1462couchstore_error_t couchstore_changes_count(Db* db,
1463                                            uint64_t min_seq,
1464                                            uint64_t max_seq,
1465                                            uint64_t *count) {
1466    COLLECT_LATENCY();
1467
1468    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
1469    raw_48 leftkr, rightkr;
1470    sized_buf leftk, rightk;
1471    leftk.buf = (char*) &leftkr;
1472    rightk.buf = (char*) &rightkr;
1473    leftk.size = 6;
1474    rightk.size = 6;
1475    encode_raw48(min_seq, &leftkr);
1476    encode_raw48(max_seq, &rightkr);
1477
1478    *count = 0;
1479    if(db->header.by_seq_root) {
1480        error_pass(btree_eval_seq_reduce(db, count, &leftk, &rightk, false,
1481                                         db->header.by_seq_root->pointer));
1482    }
1483cleanup:
1484    return errcode;
1485}
1486