1// Copyright (c) 2016 Couchbase, Inc. 2// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file 3// except in compliance with the License. You may obtain a copy of the License at 4// http://www.apache.org/licenses/LICENSE-2.0 5// Unless required by applicable law or agreed to in writing, software distributed under the 6// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 7// either express or implied. See the License for the specific language governing permissions 8// and limitations under the License. 9 10package rocksdb 11 12/* 13#include <stdio.h> 14#include <stdlib.h> 15#include "rocksdb/c.h" 16 17char *blevex_rocksdb_execute_direct_batch( 18 rocksdb_t* db, 19 const unsigned char writeoptions_sync, 20 const unsigned char writeoptions_disable_WAL, 21 const int num_sets, 22 const char* const* set_keys, 23 const size_t* set_keys_sizes, 24 const char* const* set_vals, 25 const size_t* set_vals_sizes, 26 int num_deletes, 27 const char* const* delete_keys, 28 const size_t* delete_keys_sizes, 29 int num_merges, 30 const char* const* merge_keys, 31 const size_t* merge_keys_sizes, 32 const char* const* merge_vals, 33 const size_t* merge_vals_sizes) { 34 rocksdb_writebatch_t* b = rocksdb_writebatch_create(); 35 36 if (num_sets > 0) { 37 rocksdb_writebatch_putv(b, 38 num_sets, set_keys, set_keys_sizes, 39 num_sets, set_vals, set_vals_sizes); 40 } 41 if (num_deletes > 0) { 42 rocksdb_writebatch_deletev(b, 43 num_deletes, delete_keys, delete_keys_sizes); 44 } 45 if (num_merges > 0) { 46 rocksdb_writebatch_mergev(b, 47 num_merges, merge_keys, merge_keys_sizes, 48 num_merges, merge_vals, merge_vals_sizes); 49 } 50 51 char *errMsg = NULL; 52 53 rocksdb_writeoptions_t *options = rocksdb_writeoptions_create(); 54 55 rocksdb_writeoptions_set_sync(options, writeoptions_sync); 56 rocksdb_writeoptions_disable_WAL(options, writeoptions_disable_WAL); 57 58 rocksdb_write(db, options, b, &errMsg); 59 60 rocksdb_writeoptions_destroy(options); 61 62 rocksdb_writebatch_destroy(b); 63 64 return errMsg; 65} 66 67void blevex_rocksdb_alloc_direct_batch(size_t totalBytes, size_t n, void **out) { 68 out[0] = malloc(totalBytes); 69 out[1] = malloc(n * sizeof(char *)); 70 out[2] = malloc(n * sizeof(size_t)); 71} 72 73void blevex_rocksdb_free_direct_batch(void **bufs) { 74 free(bufs[0]); 75 free(bufs[1]); 76 free(bufs[2]); 77} 78*/ 79import "C" 80 81import ( 82 "errors" 83 "reflect" 84 "unsafe" 85 86 "github.com/blevesearch/bleve/index/store" 87) 88 89type BatchEx struct { 90 cbufs []unsafe.Pointer 91 buf []byte 92 93 num_sets int 94 set_keys []*C.char 95 set_keys_sizes []C.size_t 96 set_vals []*C.char 97 set_vals_sizes []C.size_t 98 99 num_deletes int 100 delete_keys []*C.char 101 delete_keys_sizes []C.size_t 102 103 num_merges int 104 merge_keys []*C.char 105 merge_keys_sizes []C.size_t 106 merge_vals []*C.char 107 merge_vals_sizes []C.size_t 108} 109 110func newBatchEx(o store.KVBatchOptions) *BatchEx { 111 s := o.NumSets 112 ss := s + o.NumSets 113 ssd := ss + o.NumDeletes 114 ssdm := ssd + o.NumMerges 115 ssdmm := ssdm + o.NumMerges 116 117 cbufs := make([]unsafe.Pointer, 3) 118 119 C.blevex_rocksdb_alloc_direct_batch(C.size_t(o.TotalBytes), 120 C.size_t(ssdmm), (*unsafe.Pointer)(&cbufs[0])) 121 122 buf := unsafeToByteSlice(cbufs[0], o.TotalBytes) 123 arr_ptr_char := unsafeToCPtrCharSlice(cbufs[1], ssdmm) 124 arr_size_t := unsafeToCSizeTSlice(cbufs[2], ssdmm) 125 126 return &BatchEx{ 127 cbufs: cbufs, 128 buf: buf, 129 set_keys: arr_ptr_char[0:s], 130 set_keys_sizes: arr_size_t[0:s], 131 set_vals: arr_ptr_char[s:ss], 132 set_vals_sizes: arr_size_t[s:ss], 133 delete_keys: arr_ptr_char[ss:ssd], 134 delete_keys_sizes: arr_size_t[ss:ssd], 135 merge_keys: arr_ptr_char[ssd:ssdm], 136 merge_keys_sizes: arr_size_t[ssd:ssdm], 137 merge_vals: arr_ptr_char[ssdm:ssdmm], 138 merge_vals_sizes: arr_size_t[ssdm:ssdmm], 139 } 140} 141 142func (b *BatchEx) Set(key, val []byte) { 143 b.set_keys[b.num_sets] = (*C.char)(unsafe.Pointer(&key[0])) 144 b.set_keys_sizes[b.num_sets] = (C.size_t)(len(key)) 145 b.set_vals[b.num_sets] = (*C.char)(unsafe.Pointer(&val[0])) 146 b.set_vals_sizes[b.num_sets] = (C.size_t)(len(val)) 147 b.num_sets += 1 148} 149 150func (b *BatchEx) Delete(key []byte) { 151 b.delete_keys[b.num_deletes] = (*C.char)(unsafe.Pointer(&key[0])) 152 b.delete_keys_sizes[b.num_deletes] = (C.size_t)(len(key)) 153 b.num_deletes += 1 154} 155 156func (b *BatchEx) Merge(key, val []byte) { 157 b.merge_keys[b.num_merges] = (*C.char)(unsafe.Pointer(&key[0])) 158 b.merge_keys_sizes[b.num_merges] = (C.size_t)(len(key)) 159 b.merge_vals[b.num_merges] = (*C.char)(unsafe.Pointer(&val[0])) 160 b.merge_vals_sizes[b.num_merges] = (C.size_t)(len(val)) 161 b.num_merges += 1 162} 163 164func (b *BatchEx) Reset() { 165 b.num_sets = 0 166 b.num_deletes = 0 167 b.num_merges = 0 168} 169 170func (b *BatchEx) Close() error { 171 b.Reset() 172 173 C.blevex_rocksdb_free_direct_batch((*unsafe.Pointer)(&b.cbufs[0])) 174 175 b.cbufs = nil 176 b.buf = nil 177 b.set_keys = nil 178 b.set_keys_sizes = nil 179 b.set_vals = nil 180 b.set_vals_sizes = nil 181 b.delete_keys = nil 182 b.delete_keys_sizes = nil 183 b.merge_keys = nil 184 b.merge_keys_sizes = nil 185 b.merge_vals = nil 186 b.merge_vals_sizes = nil 187 188 return nil 189} 190 191func (b *BatchEx) execute(w *Writer) error { 192 var num_sets C.int 193 var set_keys **C.char 194 var set_keys_sizes *C.size_t 195 var set_vals **C.char 196 var set_vals_sizes *C.size_t 197 198 var num_deletes C.int 199 var delete_keys **C.char 200 var delete_keys_sizes *C.size_t 201 202 var num_merges C.int 203 var merge_keys **C.char 204 var merge_keys_sizes *C.size_t 205 var merge_vals **C.char 206 var merge_vals_sizes *C.size_t 207 208 if b.num_sets > 0 { 209 num_sets = (C.int)(b.num_sets) 210 set_keys = (**C.char)(unsafe.Pointer(&b.set_keys[0])) 211 set_keys_sizes = (*C.size_t)(unsafe.Pointer(&b.set_keys_sizes[0])) 212 set_vals = (**C.char)(unsafe.Pointer(&b.set_vals[0])) 213 set_vals_sizes = (*C.size_t)(unsafe.Pointer(&b.set_vals_sizes[0])) 214 } 215 216 if b.num_deletes > 0 { 217 num_deletes = (C.int)(b.num_deletes) 218 delete_keys = (**C.char)(unsafe.Pointer(&b.delete_keys[0])) 219 delete_keys_sizes = (*C.size_t)(unsafe.Pointer(&b.delete_keys_sizes[0])) 220 } 221 222 if b.num_merges > 0 { 223 num_merges = (C.int)(b.num_merges) 224 merge_keys = (**C.char)(unsafe.Pointer(&b.merge_keys[0])) 225 merge_keys_sizes = (*C.size_t)(unsafe.Pointer(&b.merge_keys_sizes[0])) 226 merge_vals = (**C.char)(unsafe.Pointer(&b.merge_vals[0])) 227 merge_vals_sizes = (*C.size_t)(unsafe.Pointer(&b.merge_vals_sizes[0])) 228 } 229 230 // request fsync on write for safety by default (bleve's convention), 231 // although rocksdb writeoptions normal default is false for sync. 232 woptSync := C.uchar(1) 233 if w.store.woptSyncUse { 234 woptSync = boolToChar(w.store.woptSync) 235 } 236 237 woptDisableWAL := C.uchar(0) 238 if w.store.woptDisableWALUse { 239 woptDisableWAL = boolToChar(w.store.woptDisableWAL) 240 } 241 242 cErr := C.blevex_rocksdb_execute_direct_batch( 243 (*C.rocksdb_t)(w.store.db.UnsafeGetDB()), 244 woptSync, 245 woptDisableWAL, 246 num_sets, 247 set_keys, 248 set_keys_sizes, 249 set_vals, 250 set_vals_sizes, 251 num_deletes, 252 delete_keys, 253 delete_keys_sizes, 254 num_merges, 255 merge_keys, 256 merge_keys_sizes, 257 merge_vals, 258 merge_vals_sizes) 259 if cErr != nil { 260 err := errors.New(C.GoString(cErr)) 261 C.free(unsafe.Pointer(cErr)) 262 return err 263 } 264 265 return nil 266} 267 268// Originally from github.com/tecbot/gorocksdb/util.go. 269func unsafeToByteSlice(data unsafe.Pointer, len int) []byte { 270 var value []byte 271 272 sH := (*reflect.SliceHeader)(unsafe.Pointer(&value)) 273 sH.Cap, sH.Len, sH.Data = len, len, uintptr(data) 274 275 return value 276} 277 278func unsafeToCPtrCharSlice(data unsafe.Pointer, len int) []*C.char { 279 var value []*C.char 280 281 sH := (*reflect.SliceHeader)(unsafe.Pointer(&value)) 282 sH.Cap, sH.Len, sH.Data = len, len, uintptr(data) 283 284 return value 285} 286 287func unsafeToCSizeTSlice(data unsafe.Pointer, len int) []C.size_t { 288 var value []C.size_t 289 290 sH := (*reflect.SliceHeader)(unsafe.Pointer(&value)) 291 sH.Cap, sH.Len, sH.Data = len, len, uintptr(data) 292 293 return value 294} 295 296func boolToChar(b bool) C.uchar { 297 if b { 298 return 1 299 } 300 return 0 301} 302