1//  Copyright (c) 2016 Couchbase, Inc.
2//  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
3//  except in compliance with the License. You may obtain a copy of the License at
4//    http://www.apache.org/licenses/LICENSE-2.0
5//  Unless required by applicable law or agreed to in writing, software distributed under the
6//  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
7//  either express or implied. See the License for the specific language governing permissions
8//  and limitations under the License.
9
10package rocksdb
11
12/*
13#include <stdio.h>
14#include <stdlib.h>
15#include "rocksdb/c.h"
16
17char *blevex_rocksdb_execute_direct_batch(
18    rocksdb_t* db,
19    const unsigned char writeoptions_sync,
20    const unsigned char writeoptions_disable_WAL,
21    const int num_sets,
22    const char* const* set_keys,
23    const size_t* set_keys_sizes,
24    const char* const* set_vals,
25    const size_t* set_vals_sizes,
26    int num_deletes,
27    const char* const* delete_keys,
28    const size_t* delete_keys_sizes,
29    int num_merges,
30    const char* const* merge_keys,
31    const size_t* merge_keys_sizes,
32    const char* const* merge_vals,
33    const size_t* merge_vals_sizes) {
34    rocksdb_writebatch_t* b = rocksdb_writebatch_create();
35
36    if (num_sets > 0) {
37        rocksdb_writebatch_putv(b,
38            num_sets, set_keys, set_keys_sizes,
39            num_sets, set_vals, set_vals_sizes);
40    }
41    if (num_deletes > 0) {
42        rocksdb_writebatch_deletev(b,
43            num_deletes, delete_keys, delete_keys_sizes);
44    }
45    if (num_merges > 0) {
46        rocksdb_writebatch_mergev(b,
47            num_merges, merge_keys, merge_keys_sizes,
48            num_merges, merge_vals, merge_vals_sizes);
49    }
50
51    char *errMsg = NULL;
52
53    rocksdb_writeoptions_t *options = rocksdb_writeoptions_create();
54
55    rocksdb_writeoptions_set_sync(options, writeoptions_sync);
56    rocksdb_writeoptions_disable_WAL(options, writeoptions_disable_WAL);
57
58    rocksdb_write(db, options, b, &errMsg);
59
60    rocksdb_writeoptions_destroy(options);
61
62    rocksdb_writebatch_destroy(b);
63
64    return errMsg;
65}
66
67void blevex_rocksdb_alloc_direct_batch(size_t totalBytes, size_t n, void **out) {
68    out[0] = malloc(totalBytes);
69    out[1] = malloc(n * sizeof(char *));
70    out[2] = malloc(n * sizeof(size_t));
71}
72
73void blevex_rocksdb_free_direct_batch(void **bufs) {
74    free(bufs[0]);
75    free(bufs[1]);
76    free(bufs[2]);
77}
78*/
79import "C"
80
81import (
82	"errors"
83	"reflect"
84	"unsafe"
85
86	"github.com/blevesearch/bleve/index/store"
87)
88
89type BatchEx struct {
90	cbufs []unsafe.Pointer
91	buf   []byte
92
93	num_sets       int
94	set_keys       []*C.char
95	set_keys_sizes []C.size_t
96	set_vals       []*C.char
97	set_vals_sizes []C.size_t
98
99	num_deletes       int
100	delete_keys       []*C.char
101	delete_keys_sizes []C.size_t
102
103	num_merges       int
104	merge_keys       []*C.char
105	merge_keys_sizes []C.size_t
106	merge_vals       []*C.char
107	merge_vals_sizes []C.size_t
108}
109
110func newBatchEx(o store.KVBatchOptions) *BatchEx {
111	s := o.NumSets
112	ss := s + o.NumSets
113	ssd := ss + o.NumDeletes
114	ssdm := ssd + o.NumMerges
115	ssdmm := ssdm + o.NumMerges
116
117	cbufs := make([]unsafe.Pointer, 3)
118
119	C.blevex_rocksdb_alloc_direct_batch(C.size_t(o.TotalBytes),
120		C.size_t(ssdmm), (*unsafe.Pointer)(&cbufs[0]))
121
122	buf := unsafeToByteSlice(cbufs[0], o.TotalBytes)
123	arr_ptr_char := unsafeToCPtrCharSlice(cbufs[1], ssdmm)
124	arr_size_t := unsafeToCSizeTSlice(cbufs[2], ssdmm)
125
126	return &BatchEx{
127		cbufs:             cbufs,
128		buf:               buf,
129		set_keys:          arr_ptr_char[0:s],
130		set_keys_sizes:    arr_size_t[0:s],
131		set_vals:          arr_ptr_char[s:ss],
132		set_vals_sizes:    arr_size_t[s:ss],
133		delete_keys:       arr_ptr_char[ss:ssd],
134		delete_keys_sizes: arr_size_t[ss:ssd],
135		merge_keys:        arr_ptr_char[ssd:ssdm],
136		merge_keys_sizes:  arr_size_t[ssd:ssdm],
137		merge_vals:        arr_ptr_char[ssdm:ssdmm],
138		merge_vals_sizes:  arr_size_t[ssdm:ssdmm],
139	}
140}
141
142func (b *BatchEx) Set(key, val []byte) {
143	b.set_keys[b.num_sets] = (*C.char)(unsafe.Pointer(&key[0]))
144	b.set_keys_sizes[b.num_sets] = (C.size_t)(len(key))
145	b.set_vals[b.num_sets] = (*C.char)(unsafe.Pointer(&val[0]))
146	b.set_vals_sizes[b.num_sets] = (C.size_t)(len(val))
147	b.num_sets += 1
148}
149
150func (b *BatchEx) Delete(key []byte) {
151	b.delete_keys[b.num_deletes] = (*C.char)(unsafe.Pointer(&key[0]))
152	b.delete_keys_sizes[b.num_deletes] = (C.size_t)(len(key))
153	b.num_deletes += 1
154}
155
156func (b *BatchEx) Merge(key, val []byte) {
157	b.merge_keys[b.num_merges] = (*C.char)(unsafe.Pointer(&key[0]))
158	b.merge_keys_sizes[b.num_merges] = (C.size_t)(len(key))
159	b.merge_vals[b.num_merges] = (*C.char)(unsafe.Pointer(&val[0]))
160	b.merge_vals_sizes[b.num_merges] = (C.size_t)(len(val))
161	b.num_merges += 1
162}
163
164func (b *BatchEx) Reset() {
165	b.num_sets = 0
166	b.num_deletes = 0
167	b.num_merges = 0
168}
169
170func (b *BatchEx) Close() error {
171	b.Reset()
172
173	C.blevex_rocksdb_free_direct_batch((*unsafe.Pointer)(&b.cbufs[0]))
174
175	b.cbufs = nil
176	b.buf = nil
177	b.set_keys = nil
178	b.set_keys_sizes = nil
179	b.set_vals = nil
180	b.set_vals_sizes = nil
181	b.delete_keys = nil
182	b.delete_keys_sizes = nil
183	b.merge_keys = nil
184	b.merge_keys_sizes = nil
185	b.merge_vals = nil
186	b.merge_vals_sizes = nil
187
188	return nil
189}
190
191func (b *BatchEx) execute(w *Writer) error {
192	var num_sets C.int
193	var set_keys **C.char
194	var set_keys_sizes *C.size_t
195	var set_vals **C.char
196	var set_vals_sizes *C.size_t
197
198	var num_deletes C.int
199	var delete_keys **C.char
200	var delete_keys_sizes *C.size_t
201
202	var num_merges C.int
203	var merge_keys **C.char
204	var merge_keys_sizes *C.size_t
205	var merge_vals **C.char
206	var merge_vals_sizes *C.size_t
207
208	if b.num_sets > 0 {
209		num_sets = (C.int)(b.num_sets)
210		set_keys = (**C.char)(unsafe.Pointer(&b.set_keys[0]))
211		set_keys_sizes = (*C.size_t)(unsafe.Pointer(&b.set_keys_sizes[0]))
212		set_vals = (**C.char)(unsafe.Pointer(&b.set_vals[0]))
213		set_vals_sizes = (*C.size_t)(unsafe.Pointer(&b.set_vals_sizes[0]))
214	}
215
216	if b.num_deletes > 0 {
217		num_deletes = (C.int)(b.num_deletes)
218		delete_keys = (**C.char)(unsafe.Pointer(&b.delete_keys[0]))
219		delete_keys_sizes = (*C.size_t)(unsafe.Pointer(&b.delete_keys_sizes[0]))
220	}
221
222	if b.num_merges > 0 {
223		num_merges = (C.int)(b.num_merges)
224		merge_keys = (**C.char)(unsafe.Pointer(&b.merge_keys[0]))
225		merge_keys_sizes = (*C.size_t)(unsafe.Pointer(&b.merge_keys_sizes[0]))
226		merge_vals = (**C.char)(unsafe.Pointer(&b.merge_vals[0]))
227		merge_vals_sizes = (*C.size_t)(unsafe.Pointer(&b.merge_vals_sizes[0]))
228	}
229
230	// request fsync on write for safety by default (bleve's convention),
231	// although rocksdb writeoptions normal default is false for sync.
232	woptSync := C.uchar(1)
233	if w.store.woptSyncUse {
234		woptSync = boolToChar(w.store.woptSync)
235	}
236
237	woptDisableWAL := C.uchar(0)
238	if w.store.woptDisableWALUse {
239		woptDisableWAL = boolToChar(w.store.woptDisableWAL)
240	}
241
242	cErr := C.blevex_rocksdb_execute_direct_batch(
243		(*C.rocksdb_t)(w.store.db.UnsafeGetDB()),
244		woptSync,
245		woptDisableWAL,
246		num_sets,
247		set_keys,
248		set_keys_sizes,
249		set_vals,
250		set_vals_sizes,
251		num_deletes,
252		delete_keys,
253		delete_keys_sizes,
254		num_merges,
255		merge_keys,
256		merge_keys_sizes,
257		merge_vals,
258		merge_vals_sizes)
259	if cErr != nil {
260		err := errors.New(C.GoString(cErr))
261		C.free(unsafe.Pointer(cErr))
262		return err
263	}
264
265	return nil
266}
267
268// Originally from github.com/tecbot/gorocksdb/util.go.
269func unsafeToByteSlice(data unsafe.Pointer, len int) []byte {
270	var value []byte
271
272	sH := (*reflect.SliceHeader)(unsafe.Pointer(&value))
273	sH.Cap, sH.Len, sH.Data = len, len, uintptr(data)
274
275	return value
276}
277
278func unsafeToCPtrCharSlice(data unsafe.Pointer, len int) []*C.char {
279	var value []*C.char
280
281	sH := (*reflect.SliceHeader)(unsafe.Pointer(&value))
282	sH.Cap, sH.Len, sH.Data = len, len, uintptr(data)
283
284	return value
285}
286
287func unsafeToCSizeTSlice(data unsafe.Pointer, len int) []C.size_t {
288	var value []C.size_t
289
290	sH := (*reflect.SliceHeader)(unsafe.Pointer(&value))
291	sH.Cap, sH.Len, sH.Data = len, len, uintptr(data)
292
293	return value
294}
295
296func boolToChar(b bool) C.uchar {
297	if b {
298		return 1
299	}
300	return 0
301}
302