1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2012-2013 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 #include <snappy-c.h>
18 #include <time.h>
19 #include <sys/time.h>
20 #include <stdio.h>
21 #include <vector>
22 #include <libcouchbase/couchbase.h>
23 #include <libcouchbase/durability.h>
24 #include <stdlib.h>
25 #include <string>
26 #include <cstring>
27 #include <stdint.h>
28 #ifdef _WIN32
29 #define PRIu64 "I64u"
30 #else
31 #include <inttypes.h>
32 #endif
33 
34 char* getvaluebuf;
35 char* statkeybuf;
36 char* statvaluebuf;
37 lcb_size_t* statkeysize;
38 lcb_size_t* statvaluesize;
39 long* getvaluesize;
40 lcb_datatype_t* getvaluedtype;
41 std::vector<std::string> threadstate;
42 bool warmupdone = false;
43 bool gotwm_val_cn = false;
44 bool gotreplica_curr_items = false;
45 bool goteject_num_items = false;
46 bool got_active_resident = false;
47 char* wm_val_cn;
48 char* rep_val_cn;
49 char* ejected_val_cn;
50 char* resident_val_cn;
51 int* ep_warmup_value_count;
52 //extern double t1;
53 extern struct timeval tim;
54 struct timeval reptim;
55 extern double t1;
56 double t2;
57 std::vector<std::pair<std::string, double> > replatencies;
58 typedef std::vector<std::pair<std::string, std::string> > StatsVector;
59 struct stats_generic
60 {
61 	int refcount;
62 	StatsVector statsvec;
63 };
64 struct stats_generic genericstats =
65 { 0 };
66 extern int numReplicaget;
67 
error_callback(lcb_t instance, lcb_error_t error, const char *errinfo)68 static void error_callback(lcb_t instance, lcb_error_t error, const char *errinfo)
69 {
70 	fprintf(stderr, "ERROR: %s (0x%x), %s\n", lcb_strerror(instance, error), error, errinfo);
71 	//exit(EXIT_FAILURE);
72 }
73 
durability_callback(lcb_t instance, const void *cookie, lcb_error_t error, const lcb_durability_resp_t *resp)74 static void durability_callback(lcb_t instance, const void *cookie, lcb_error_t error, const lcb_durability_resp_t *resp)
75 {
76 	if (error == LCB_SUCCESS)
77 	{
78 		/*gettimeofday(&reptim, NULL);
79 		 t2 = (double) reptim.tv_usec - t1;
80 		 std::string key1((char *)resp->v.v0.key,(size_t)resp->v.v0.nkey);
81 		 std::pair <std::string,double> keylatency(key1,(double) (reptim.tv_sec*1000000+reptim.tv_usec));
82 		 replatencies.push_back(keylatency);
83 		 fprintf(stderr, "key: ");
84 		 fwrite(resp->v.v0.key, sizeof(char), resp->v.v0.nkey, stderr);
85 		 fprintf(stderr, " replicated to number of nodes:");
86 		 fprintf(stderr, "%x",resp->v.v0.nreplicated);
87 		 fprintf(stderr, " reptim is:");
88 		 fprintf(stderr, "%.6lf",((double) reptim.tv_usec)/1000);
89 		 fprintf(stderr, " replication latency is:");
90 		 fprintf(stderr, "%.6lf",t2/1000);
91 		 fprintf(stderr, "\n");*/
92 
93 	}
94 	else
95 	{
96 		fprintf(stderr, "ERROR: \n", lcb_strerror(instance, error), error);
97 		//exit(EXIT_FAILURE);
98 	}
99 }
100 
store_callback(lcb_t instance, const void *cookie, lcb_storage_t operation, lcb_error_t error, const lcb_store_resp_t *item)101 static void store_callback(lcb_t instance, const void *cookie, lcb_storage_t operation, lcb_error_t error, const lcb_store_resp_t *item)
102 {
103 	if (error == LCB_SUCCESS)
104 	{
105 		//fprintf(stderr, "STORED \"");
106 		//fwrite(item->v.v0.key, sizeof(char), item->v.v0.nkey, stderr);
107 	}
108 	else
109 	{
110 		fprintf(stderr, "STORE ERROR: %s (0x%x)\n", lcb_strerror(instance, error), error);
111 		//exit(EXIT_FAILURE);
112 	}
113 	(void) cookie;
114 	(void) operation;
115 }
116 
get_callback(lcb_t instance, const void *cookie, lcb_error_t error, const lcb_get_resp_t *item)117 static void get_callback(lcb_t instance, const void *cookie, lcb_error_t error, const lcb_get_resp_t *item)
118 {
119 	if (error == LCB_SUCCESS)
120 	{
121 
122 		std::string key;
123 		key.assign((const char *) item->v.v0.key, item->v.v0.nkey);
124 		if (key.compare(0, 7, "replica"))
125 			numReplicaget++;
126 		/*getvaluesize = (long *) calloc(1,sizeof(long));
127 		 getvaluedtype = (lcb_datatype_t*) calloc(1,sizeof(lcb_datatype_t));
128 		 getvaluebuf = (char *)item->v.v0.bytes;
129 		 *getvaluesize = (long)item->v.v0.nbytes;
130 		 *getvaluedtype = item->v.v0.datatype;
131 
132 		 fprintf(stderr, "GOT size of doc: %ld \n",(long)item->v.v0.nbytes);
133 		 fwrite(item->v.v0.key, sizeof(char), item->v.v0.nkey, stderr);
134 		 fwrite(item->v.v0.bytes, sizeof(char), item->v.v0.nbytes, stderr);
135 		 fprintf(stderr, "\n");
136 		 fprintf(stderr, "GOT CAS value: %ld \n",(long)item->v.v0.cas);
137 		 if(item->v.v0.datatype==LCB_BINARY_DATATYPE_COMPRESSED_JSON||item->v.v0.datatype==LCB_BINARY_DATATYPE_COMPRESSED){
138 		 char uncompressed[2560];
139 		 size_t uncompressed_len = 2560;
140 		 snappy_status status;
141 		 status=snappy_uncompress((const char *)item->v.v0.bytes, item->v.v0.nbytes, uncompressed, &uncompressed_len);
142 		 fprintf(stderr, "uncompressed length %d\n",(int)uncompressed_len);
143 		 fwrite(uncompressed, sizeof(char), uncompressed_len, stderr);
144 		 fprintf(stderr,"\n");
145 		 }*/
146 	}
147 	else
148 	{
149 		fprintf(stderr, "GET ERROR: %s (0x%x)\n", lcb_strerror(instance, error), error);
150 	}
151 	(void) cookie;
152 }
153 
isWarmupdone(const lcb_server_stat_resp_t *resp)154 bool isWarmupdone(const lcb_server_stat_resp_t *resp)
155 {
156 
157 	char* keystr = (char *) calloc(resp->v.v0.nkey, sizeof(char));
158 	memcpy(keystr, resp->v.v0.key, resp->v.v0.nkey);
159 	std::string key = std::string(keystr);
160 	std::string keyref("ep_warmup_state");
161 
162 	char* valstr = (char *) calloc(resp->v.v0.nbytes, sizeof(char));
163 	memcpy(valstr, resp->v.v0.bytes, resp->v.v0.nbytes);
164 	std::string val = std::string(valstr);
165 	std::string valref("done");
166 
167 	if (keyref.compare(key) == 0 && valref.compare(val) == 0)
168 	{
169 		char* thstate = (char *) calloc(resp->v.v0.nbytes + 1, sizeof(char));
170 		memcpy(thstate, resp->v.v0.bytes, resp->v.v0.nbytes);
171 		thstate[resp->v.v0.nbytes] = '\0';
172 		std::string str = std::string(thstate);
173 		threadstate.push_back(str);
174 		return true;
175 	}
176 
177 	else if (keyref.compare(key) == 0 && valref.compare(val) != 0)
178 	{
179 		char* thstate = (char *) calloc(resp->v.v0.nbytes + 1, sizeof(char));
180 		memcpy(thstate, resp->v.v0.bytes, resp->v.v0.nbytes);
181 		thstate[resp->v.v0.nbytes] = '\0';
182 		std::string str = std::string(thstate);
183 		threadstate.push_back(str);
184 		return false;
185 	}
186 
187 	else
188 		return false;
189 }
190 
isWarmupvalcount(const lcb_server_stat_resp_t *resp)191 bool isWarmupvalcount(const lcb_server_stat_resp_t *resp)
192 {
193 
194 	char* keystr = (char *) calloc(resp->v.v0.nkey, sizeof(char));
195 	memcpy(keystr, resp->v.v0.key, resp->v.v0.nkey);
196 	std::string key = std::string(keystr);
197 	std::string keyref("ep_warmup_value_count");
198 
199 	if (key.compare(keyref) == 0)
200 	{
201 		return true;
202 	}
203 	else
204 		return false;
205 }
206 
isreplica_curr_items(const lcb_server_stat_resp_t *resp)207 bool isreplica_curr_items(const lcb_server_stat_resp_t *resp)
208 {
209 
210 	char* keystr = (char *) calloc(resp->v.v0.nkey, sizeof(char));
211 	memcpy(keystr, resp->v.v0.key, resp->v.v0.nkey);
212 	std::string key = std::string(keystr);
213 	std::string keyref("vb_replica_curr_items");
214 
215 	if (key.compare(keyref) == 0)
216 	{
217 		//char* valstr= (char *)calloc(resp->v.v0.nbytes,sizeof(char));
218 		//memcpy(valstr,resp->v.v0.bytes,resp->v.v0.nbytes);
219 		//std::string val = std::string(valstr);
220 		//fprintf(stderr, "\n%s: %s",key.c_str(),val.c_str());
221 		return true;
222 	}
223 	else
224 		return false;
225 }
226 
isactive_resident(const lcb_server_stat_resp_t *resp)227 bool isactive_resident(const lcb_server_stat_resp_t *resp)
228 {
229 
230 	char* keystr = (char *) calloc(resp->v.v0.nkey, sizeof(char));
231 	memcpy(keystr, resp->v.v0.key, resp->v.v0.nkey);
232 	std::string key = std::string(keystr);
233 	std::string keyref("vb_active_perc_mem_resident");
234 
235 	if (key.compare(keyref) == 0)
236 	{
237 		return true;
238 	}
239 	else
240 		return false;
241 }
242 
isevicted_items(const lcb_server_stat_resp_t *resp)243 bool isevicted_items(const lcb_server_stat_resp_t *resp)
244 {
245 
246 	char* keystr = (char *) calloc(resp->v.v0.nkey, sizeof(char));
247 	memcpy(keystr, resp->v.v0.key, resp->v.v0.nkey);
248 	std::string key = std::string(keystr);
249 	std::string keyref("ep_num_value_ejects");
250 
251 	if (key.compare(keyref) == 0)
252 	{
253 		return true;
254 	}
255 	else
256 		return false;
257 }
258 
stats_generic_callback(lcb_t instance, const void *cookie, lcb_error_t error, const lcb_server_stat_resp_t *resp)259 static void stats_generic_callback(lcb_t instance, const void *cookie, lcb_error_t error, const lcb_server_stat_resp_t *resp)
260 {
261 	genericstats.refcount++;
262 	if (error == LCB_SUCCESS)
263 	{
264 		if (resp->v.v0.nkey > 0)
265 		{
266 			std::string key;
267 			key.assign((const char *) resp->v.v0.key, resp->v.v0.nkey);
268 			std::string value;
269 			value.assign((const char *) resp->v.v0.bytes, resp->v.v0.nbytes);
270 			genericstats.statsvec.push_back(std::pair<std::string, std::string>(key, value));
271 		}
272 	}
273 
274 	else
275 	{
276 		fprintf(stderr, "GET ERROR: %s (0x%x)\n", lcb_strerror(instance, error), error);
277 	}
278 	(void) cookie;
279 }
280 
stats_callback(lcb_t instance, const void *cookie, lcb_error_t error, const lcb_server_stat_resp_t *resp)281 static void stats_callback(lcb_t instance, const void *cookie, lcb_error_t error, const lcb_server_stat_resp_t *resp)
282 {
283 	if (error == LCB_SUCCESS)
284 	{
285 
286 		//check if ep_warmup_state is done and mark bool variable warmupdone
287 		if (isWarmupdone(resp))
288 			warmupdone = true;
289 
290 		//read ep_warmup_value_count
291 		if (isWarmupvalcount(resp))
292 		{
293 
294 			if (warmupdone && !gotwm_val_cn)
295 			{
296 				wm_val_cn = (char *) calloc(resp->v.v0.nbytes, sizeof(char));
297 				memcpy(wm_val_cn, resp->v.v0.bytes, resp->v.v0.nbytes);
298 				gotwm_val_cn = true;
299 				//*ep_warmup_value_count = atoi(num);
300 				//fprintf(stderr, "\n%d\n",*ep_warmup_value_count);
301 			}
302 		}
303 		if (isreplica_curr_items(resp))
304 		{
305 			if (!gotreplica_curr_items)
306 			{
307 				rep_val_cn = (char *) calloc(resp->v.v0.nbytes, sizeof(char));
308 				memcpy(rep_val_cn, resp->v.v0.bytes, resp->v.v0.nbytes);
309 				gotreplica_curr_items = true;
310 			}
311 		}
312 		if (isactive_resident(resp))
313 		{
314 			if (!got_active_resident)
315 			{
316 				resident_val_cn = (char *) calloc(resp->v.v0.nbytes, sizeof(char));
317 				memcpy(resident_val_cn, resp->v.v0.bytes, resp->v.v0.nbytes);
318 				got_active_resident = true;
319 			}
320 		}
321 		if (isevicted_items(resp))
322 		{
323 			if (!goteject_num_items)
324 			{
325 				ejected_val_cn = (char *) calloc(resp->v.v0.nbytes, sizeof(char));
326 				memcpy(ejected_val_cn, resp->v.v0.bytes, resp->v.v0.nbytes);
327 				goteject_num_items = true;
328 			}
329 		}
330 	}
331 	else
332 	{
333 		fprintf(stderr, "GET ERROR: %s (0x%x)\n", lcb_strerror(instance, error), error);
334 	}
335 	(void) cookie;
336 }
337 
callget(lcb_t* instance, const void* gkey, size_t gnkey)338 int callget(lcb_t* instance, const void* gkey, size_t gnkey)
339 {
340 	lcb_wait(*instance);
341 	{
342 		lcb_error_t err;
343 		lcb_get_cmd_t cmd;
344 		const lcb_get_cmd_t *commands[1];
345 		commands[0] = &cmd;
346 		memset(&cmd, 0, sizeof(cmd));
347 		cmd.v.v0.key = gkey;
348 		cmd.v.v0.nkey = gnkey;
349 		err = lcb_get(*instance, NULL, 1, commands);
350 		if (err != LCB_SUCCESS)
351 		{
352 			fprintf(stderr, "Failed to get: %s\n", lcb_strerror(NULL, err));
353 			return 1;
354 		}
355 	}
356 	lcb_wait(*instance);
357 	//lcb_destroy(*instance);
358 	return 0;
359 }
360