1 /* LibMemcached
2  * Copyright (C) 2006-2009 Brian Aker
3  * All rights reserved.
4  *
5  * Use and distribution licensed under the BSD license.  See
6  * the COPYING file in the parent directory for full text.
7  *
8  * Summary: Get functions for libmemcached
9  *
10  */
11 
12 #include "common.h"
13 
14 /*
15   What happens if no servers exist?
16 */
memcached_get(memcached_st *ptr, const char *key, size_t key_length, size_t *value_length, uint32_t *flags, memcached_return_t *error)17 char *memcached_get(memcached_st *ptr, const char *key,
18                     size_t key_length,
19                     size_t *value_length,
20                     uint32_t *flags,
21                     memcached_return_t *error)
22 {
23   return memcached_get_by_key(ptr, NULL, 0, key, key_length, value_length,
24                               flags, error);
25 }
26 
27 static memcached_return_t memcached_mget_by_key_real(memcached_st *ptr,
28                                                      const char *master_key,
29                                                      size_t master_key_length,
30                                                      const char * const *keys,
31                                                      const size_t *key_length,
32                                                      size_t number_of_keys,
33                                                      bool mget_mode);
34 
memcached_get_by_key(memcached_st *ptr, const char *master_key, size_t master_key_length, const char *key, size_t key_length, size_t *value_length, uint32_t *flags, memcached_return_t *error)35 char *memcached_get_by_key(memcached_st *ptr,
36                            const char *master_key,
37                            size_t master_key_length,
38                            const char *key, size_t key_length,
39                            size_t *value_length,
40                            uint32_t *flags,
41                            memcached_return_t *error)
42 {
43   char *value;
44   size_t dummy_length;
45   uint32_t dummy_flags;
46   memcached_return_t dummy_error;
47 
48   unlikely (ptr->flags.use_udp)
49   {
50     *error= MEMCACHED_NOT_SUPPORTED;
51     return NULL;
52   }
53 
54   /* Request the key */
55   *error= memcached_mget_by_key_real(ptr, master_key, master_key_length,
56                                      (const char * const *)&key,
57                                      &key_length, 1, false);
58 
59   value= memcached_fetch(ptr, NULL, NULL,
60                          value_length, flags, error);
61   /* This is for historical reasons */
62   if (*error == MEMCACHED_END)
63     *error= MEMCACHED_NOTFOUND;
64 
65   if (value == NULL)
66   {
67     if (ptr->get_key_failure && *error == MEMCACHED_NOTFOUND)
68     {
69       memcached_return_t rc;
70 
71       memcached_result_reset(&ptr->result);
72       rc= ptr->get_key_failure(ptr, key, key_length, &ptr->result);
73 
74       /* On all failure drop to returning NULL */
75       if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED)
76       {
77         if (rc == MEMCACHED_BUFFERED)
78         {
79           uint64_t latch; /* We use latch to track the state of the original socket */
80           latch= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS);
81           if (latch == 0)
82             memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1);
83 
84           rc= memcached_set(ptr, key, key_length,
85                             (memcached_result_value(&ptr->result)),
86                             (memcached_result_length(&ptr->result)),
87                             0,
88                             (memcached_result_flags(&ptr->result)));
89 
90           if (rc == MEMCACHED_BUFFERED && latch == 0)
91             memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 0);
92         }
93         else
94         {
95           rc= memcached_set(ptr, key, key_length,
96                             (memcached_result_value(&ptr->result)),
97                             (memcached_result_length(&ptr->result)),
98                             0,
99                             (memcached_result_flags(&ptr->result)));
100         }
101 
102         if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED)
103         {
104           *error= rc;
105           *value_length= memcached_result_length(&ptr->result);
106           *flags= memcached_result_flags(&ptr->result);
107           return memcached_string_c_copy(&ptr->result.value);
108         }
109       }
110     }
111 
112     return NULL;
113   }
114 
115   (void)memcached_fetch(ptr, NULL, NULL,
116                         &dummy_length, &dummy_flags,
117                         &dummy_error);
118   WATCHPOINT_ASSERT(dummy_length == 0);
119 
120   return value;
121 }
122 
memcached_mget(memcached_st *ptr, const char * const *keys, const size_t *key_length, size_t number_of_keys)123 memcached_return_t memcached_mget(memcached_st *ptr,
124                                   const char * const *keys,
125                                   const size_t *key_length,
126                                   size_t number_of_keys)
127 {
128   return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
129 }
130 
131 static memcached_return_t binary_mget_by_key(memcached_st *ptr,
132                                              uint32_t master_server_key,
133                                              bool is_master_key_set,
134                                              const char * const *keys,
135                                              const size_t *key_length,
136                                              size_t number_of_keys,
137                                              bool mget_mode);
138 
memcached_mget_by_key_real(memcached_st *ptr, const char *master_key, size_t master_key_length, const char * const *keys, const size_t *key_length, size_t number_of_keys, bool mget_mode)139 static memcached_return_t memcached_mget_by_key_real(memcached_st *ptr,
140                                                      const char *master_key,
141                                                      size_t master_key_length,
142                                                      const char * const *keys,
143                                                      const size_t *key_length,
144                                                      size_t number_of_keys,
145                                                      bool mget_mode)
146 {
147   bool failures_occured_in_sending= false;
148   const char *get_command= "get ";
149   uint8_t get_command_length= 4;
150   unsigned int master_server_key= (unsigned int)-1; /* 0 is a valid server id! */
151   bool is_master_key_set= false;
152   memcached_return_t rc= MEMCACHED_SUCCESS;
153   size_t hosts_connected= 0;
154   uint32_t x;
155   bool success_happened= false;
156 
157   unlikely (ptr->flags.use_udp)
158     return MEMCACHED_NOT_SUPPORTED;
159 
160   LIBMEMCACHED_MEMCACHED_MGET_START();
161 
162   if (number_of_keys == 0)
163     return MEMCACHED_NOTFOUND;
164 
165   if (memcached_server_count(ptr) == 0)
166     return MEMCACHED_NO_SERVERS;
167 
168   if (ptr->flags.verify_key && (memcached_key_test(keys, key_length, number_of_keys) == MEMCACHED_BAD_KEY_PROVIDED))
169     return MEMCACHED_BAD_KEY_PROVIDED;
170 
171   if (master_key && master_key_length)
172   {
173     if (ptr->flags.verify_key && (memcached_key_test((const char * const *)&master_key, &master_key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
174       return MEMCACHED_BAD_KEY_PROVIDED;
175     master_server_key= memcached_generate_hash_with_redistribution(ptr, master_key, master_key_length);
176     is_master_key_set= true;
177   }
178 
179   /*
180     Here is where we pay for the non-block API. We need to remove any data sitting
181     in the queue before we start our get.
182 
183     It might be optimum to bounce the connection if count > some number.
184   */
185   for (x= 0; x < memcached_server_count(ptr); x++)
186   {
187     memcached_server_write_instance_st instance=
188       memcached_server_instance_fetch(ptr, x);
189 
190     if (memcached_server_response_count(instance))
191     {
192       char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
193 
194       if (ptr->flags.no_block)
195         (void)memcached_io_write(instance, NULL, 0, true);
196 
197       while(memcached_server_response_count(instance))
198         (void)memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result);
199     }
200   }
201 
202   if (ptr->flags.binary_protocol)
203   {
204     return binary_mget_by_key(ptr, master_server_key, is_master_key_set, keys,
205                               key_length, number_of_keys, mget_mode);
206   }
207 
208   if (ptr->flags.support_cas)
209   {
210     get_command= "gets ";
211     get_command_length= 5;
212   }
213 
214   /*
215     If a server fails we warn about errors and start all over with sending keys
216     to the server.
217   */
218   for (x= 0; x < number_of_keys; x++)
219   {
220     memcached_server_write_instance_st instance;
221     uint32_t server_key;
222     struct libmemcached_io_vector_st vector[4];
223     memset(&vector, 0, sizeof(vector));
224 
225     if (is_master_key_set)
226     {
227       server_key= master_server_key;
228     }
229     else
230     {
231       server_key= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
232     }
233 
234     instance= memcached_server_instance_fetch(ptr, server_key);
235 
236     vector[0].length= get_command_length;
237     vector[0].buffer= get_command;
238     vector[1].length= ptr->prefix_key_length;
239     vector[1].buffer= ptr->prefix_key;
240     vector[2].length= key_length[x];
241     vector[2].buffer= keys[x];
242     vector[3].length= 1;
243     vector[3].buffer= " ";
244 
245     if (memcached_server_response_count(instance) == 0)
246     {
247       rc= memcached_connect(instance);
248 
249       if (rc != MEMCACHED_SUCCESS)
250       {
251         continue;
252       }
253       hosts_connected++;
254 
255       if ((memcached_io_writev(instance, vector, 4, false)) == -1)
256       {
257         failures_occured_in_sending= true;
258         continue;
259       }
260       WATCHPOINT_ASSERT(instance->cursor_active == 0);
261       memcached_server_response_increment(instance);
262       WATCHPOINT_ASSERT(instance->cursor_active == 1);
263     }
264     else
265     {
266       if ((memcached_io_writev(instance, (vector + 1), 3, false)) == -1)
267       {
268         memcached_server_response_reset(instance);
269         failures_occured_in_sending= true;
270         continue;
271       }
272     }
273   }
274 
275   if (hosts_connected == 0)
276   {
277     LIBMEMCACHED_MEMCACHED_MGET_END();
278 
279     if (rc != MEMCACHED_SUCCESS)
280       return rc;
281 
282     return MEMCACHED_NO_SERVERS;
283   }
284 
285 
286   /*
287     Should we muddle on if some servers are dead?
288   */
289   for (x= 0; x < memcached_server_count(ptr); x++)
290   {
291     memcached_server_write_instance_st instance=
292       memcached_server_instance_fetch(ptr, x);
293 
294     if (memcached_server_response_count(instance))
295     {
296       /* We need to do something about non-connnected hosts in the future */
297       if ((memcached_io_write(instance, "\r\n", 2, true)) == -1)
298       {
299         failures_occured_in_sending= true;
300       }
301       else
302       {
303         success_happened= true;
304       }
305     }
306   }
307 
308   LIBMEMCACHED_MEMCACHED_MGET_END();
309 
310   if (failures_occured_in_sending && success_happened)
311     return MEMCACHED_SOME_ERRORS;
312 
313   if (success_happened)
314     return MEMCACHED_SUCCESS;
315 
316   return MEMCACHED_FAILURE;
317 }
318 
memcached_mget_by_key(memcached_st *ptr, const char *master_key, size_t master_key_length, const char * const *keys, const size_t *key_length, const size_t number_of_keys)319 memcached_return_t memcached_mget_by_key(memcached_st *ptr,
320                                          const char *master_key,
321                                          size_t master_key_length,
322                                          const char * const *keys,
323                                          const size_t *key_length,
324                                          const size_t number_of_keys)
325 {
326   return memcached_mget_by_key_real(ptr, master_key, master_key_length, keys,
327                                     key_length, number_of_keys, true);
328 }
329 
memcached_mget_execute(memcached_st *ptr, const char * const *keys, const size_t *key_length, const size_t number_of_keys, memcached_execute_fn *callback, void *context, const unsigned int number_of_callbacks)330 memcached_return_t memcached_mget_execute(memcached_st *ptr,
331                                           const char * const *keys,
332                                           const size_t *key_length,
333                                           const size_t number_of_keys,
334                                           memcached_execute_fn *callback,
335                                           void *context,
336                                           const unsigned int number_of_callbacks)
337 {
338   return memcached_mget_execute_by_key(ptr, NULL, 0, keys, key_length,
339                                        number_of_keys, callback,
340                                        context, number_of_callbacks);
341 }
342 
memcached_mget_execute_by_key(memcached_st *ptr, const char *master_key, size_t master_key_length, const char * const *keys, const size_t *key_length, size_t number_of_keys, memcached_execute_fn *callback, void *context, const unsigned int number_of_callbacks)343 memcached_return_t memcached_mget_execute_by_key(memcached_st *ptr,
344                                                  const char *master_key,
345                                                  size_t master_key_length,
346                                                  const char * const *keys,
347                                                  const size_t *key_length,
348                                                  size_t number_of_keys,
349                                                  memcached_execute_fn *callback,
350                                                  void *context,
351                                                  const unsigned int number_of_callbacks)
352 {
353   memcached_return_t rc;
354   memcached_callback_st *original_callbacks;
355   memcached_callback_st cb;
356 
357   if ((ptr->flags.binary_protocol) == 0)
358     return MEMCACHED_NOT_SUPPORTED;
359 
360   original_callbacks= ptr->callbacks;
361   memset(&cb, 0, sizeof(cb));
362   cb.callback= callback;
363   cb.context= context;
364   cb.number_of_callback= number_of_callbacks;
365 
366   ptr->callbacks= &cb;
367   rc= memcached_mget_by_key(ptr, master_key, master_key_length, keys,
368                             key_length, number_of_keys);
369   ptr->callbacks= original_callbacks;
370   return rc;
371 }
372 
simple_binary_mget(memcached_st *ptr, uint32_t master_server_key, bool is_master_key_set, const char * const *keys, const size_t *key_length, size_t number_of_keys, bool mget_mode)373 static memcached_return_t simple_binary_mget(memcached_st *ptr,
374                                              uint32_t master_server_key,
375                                              bool is_master_key_set,
376                                              const char * const *keys,
377                                              const size_t *key_length,
378                                              size_t number_of_keys, bool mget_mode)
379 {
380   memcached_return_t rc= MEMCACHED_NOTFOUND;
381 
382   int flush= number_of_keys == 1;
383   uint32_t x;
384   /*
385     If a server fails we warn about errors and start all over with sending keys
386     to the server.
387   */
388   for (x= 0; x < number_of_keys; ++x)
389   {
390     uint32_t server_key;
391     memcached_server_write_instance_st instance;
392     memcached_return_t vk;
393     struct libmemcached_io_vector_st vector[3];
394     protocol_binary_request_getk request;
395     memset(&request, 0, sizeof(request));
396     memset(&vector, 0, sizeof(vector));
397 
398     if (is_master_key_set)
399     {
400       server_key= master_server_key;
401     }
402     else
403     {
404       server_key= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
405     }
406 
407     instance= memcached_server_instance_fetch(ptr, server_key);
408 
409     if (memcached_server_response_count(instance) == 0)
410     {
411       rc= memcached_connect(instance);
412       if (rc != MEMCACHED_SUCCESS)
413         continue;
414     }
415 
416     request.message.header.request.magic= PROTOCOL_BINARY_REQ;
417     if (mget_mode)
418       request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ;
419     else
420       request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
421 
422     vk= memcached_validate_key_length(key_length[x],
423                                       ptr->flags.binary_protocol);
424     unlikely (vk != MEMCACHED_SUCCESS)
425     {
426       if (x > 0)
427       {
428         memcached_io_reset(instance);
429       }
430 
431       return vk;
432     }
433 
434     request.message.header.request.keylen= htons((uint16_t)(key_length[x] + ptr->prefix_key_length));
435     request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
436     request.message.header.request.bodylen= htonl((uint32_t)( key_length[x] + ptr->prefix_key_length));
437 
438     vector[0].length= sizeof(request.bytes);
439     vector[0].buffer= request.bytes;
440     vector[1].length= ptr->prefix_key_length;
441     vector[1].buffer= ptr->prefix_key;
442     vector[2].length= key_length[x];
443     vector[2].buffer= keys[x];
444 
445     if (memcached_io_writev(instance, vector, 3, flush) == -1)
446     {
447       memcached_server_response_reset(instance);
448       rc= MEMCACHED_SOME_ERRORS;
449       continue;
450     }
451 
452     /* We just want one pending response per server */
453     memcached_server_response_reset(instance);
454     memcached_server_response_increment(instance);
455     if ((x > 0 && x == ptr->io_key_prefetch) && memcached_flush_buffers(ptr) != MEMCACHED_SUCCESS)
456     {
457       rc= MEMCACHED_SOME_ERRORS;
458     }
459   }
460 
461   if (mget_mode)
462   {
463     /*
464      * Send a noop command to flush the buffers
465      */
466     protocol_binary_request_noop request;
467     memset(&request, 0, sizeof(request));
468     request.message.header.request.magic= PROTOCOL_BINARY_REQ;
469     request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP;
470     request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
471 
472     for (x= 0; x < memcached_server_count(ptr); ++x)
473     {
474       memcached_server_write_instance_st instance=
475         memcached_server_instance_fetch(ptr, x);
476 
477       if (memcached_server_response_count(instance))
478       {
479         if (memcached_io_write(instance, NULL, 0, true) == -1)
480         {
481           memcached_server_response_reset(instance);
482           memcached_io_reset(instance);
483           rc= MEMCACHED_SOME_ERRORS;
484         }
485 
486         if (memcached_io_write(instance, request.bytes,
487                                sizeof(request.bytes), true) == -1)
488         {
489           memcached_server_response_reset(instance);
490           memcached_io_reset(instance);
491           rc= MEMCACHED_SOME_ERRORS;
492         }
493       }
494     }
495   }
496 
497 
498   return rc;
499 }
500 
replication_binary_mget(memcached_st *ptr, uint32_t* hash, bool* dead_servers, const char *const *keys, const size_t *key_length, size_t number_of_keys)501 static memcached_return_t replication_binary_mget(memcached_st *ptr,
502                                                   uint32_t* hash,
503                                                   bool* dead_servers,
504                                                   const char *const *keys,
505                                                   const size_t *key_length,
506                                                   size_t number_of_keys)
507 {
508   memcached_return_t rc= MEMCACHED_NOTFOUND;
509   uint32_t start= 0;
510   uint64_t randomize_read= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ);
511   uint32_t x;
512   uint32_t replica;
513 
514   if (randomize_read)
515     start= (uint32_t)random() % (uint32_t)(ptr->number_of_replicas + 1);
516 
517   /* Loop for each replica */
518   for (replica= 0; replica <= ptr->number_of_replicas; ++replica)
519   {
520     bool success= true;
521 
522     for (x= 0; x < number_of_keys; ++x)
523     {
524       memcached_server_write_instance_st instance;
525       uint32_t server;
526       struct libmemcached_io_vector_st vector[3];
527       protocol_binary_request_getk request;
528       memset(&request, 0, sizeof(request));
529       memset(&vector, 0, sizeof(vector));
530 
531       if (hash[x] == memcached_server_count(ptr))
532         continue; /* Already successfully sent */
533 
534       server= hash[x] + replica;
535 
536       /* In case of randomized reads */
537       if (randomize_read && ((server + start) <= (hash[x] + ptr->number_of_replicas)))
538         server += start;
539 
540       while (server >= memcached_server_count(ptr))
541         server -= memcached_server_count(ptr);
542 
543       if (dead_servers[server])
544         continue;
545 
546       instance= memcached_server_instance_fetch(ptr, server);
547 
548       if (memcached_server_response_count(instance) == 0)
549       {
550         rc= memcached_connect(instance);
551         if (rc != MEMCACHED_SUCCESS)
552         {
553           memcached_io_reset(instance);
554           dead_servers[server]= true;
555           success= false;
556           continue;
557         }
558       }
559 
560       request.message.header.request.magic= PROTOCOL_BINARY_REQ;
561       request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
562       request.message.header.request.keylen= htons((uint16_t)(key_length[x] + ptr->prefix_key_length));
563       request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
564       request.message.header.request.bodylen= htonl((uint32_t)(key_length[x] + ptr->prefix_key_length));
565 
566       /*
567        * We need to disable buffering to actually know that the request was
568        * successfully sent to the server (so that we should expect a result
569        * back). It would be nice to do this in buffered mode, but then it
570        * would be complex to handle all error situations if we got to send
571        * some of the messages, and then we failed on writing out some others
572        * and we used the callback interface from memcached_mget_execute so
573        * that we might have processed some of the responses etc. For now,
574        * just make sure we work _correctly_
575        */
576       vector[0].length= sizeof(request.bytes);
577       vector[0].buffer= request.bytes;
578       vector[1].length= ptr->prefix_key_length;
579       vector[1].buffer= ptr->prefix_key;
580       vector[2].length= key_length[x];
581       vector[2].buffer= keys[x];
582 
583       if (memcached_io_writev(instance, vector, 3, true) == -1)
584       {
585         memcached_io_reset(instance);
586         dead_servers[server]= true;
587         success= false;
588         continue;
589       }
590 
591       memcached_server_response_increment(instance);
592       hash[x]= memcached_server_count(ptr);
593     }
594 
595     if (success)
596       break;
597   }
598 
599   return rc;
600 }
601 
binary_mget_by_key(memcached_st *ptr, uint32_t master_server_key, bool is_master_key_set, const char * const *keys, const size_t *key_length, size_t number_of_keys, bool mget_mode)602 static memcached_return_t binary_mget_by_key(memcached_st *ptr,
603                                              uint32_t master_server_key,
604                                              bool is_master_key_set,
605                                              const char * const *keys,
606                                              const size_t *key_length,
607                                              size_t number_of_keys,
608                                              bool mget_mode)
609 {
610   memcached_return_t rc;
611 
612   if (ptr->number_of_replicas == 0)
613   {
614     rc= simple_binary_mget(ptr, master_server_key, is_master_key_set,
615                            keys, key_length, number_of_keys, mget_mode);
616   }
617   else
618   {
619     uint32_t* hash;
620     bool* dead_servers;
621     size_t x;
622 
623     hash= libmemcached_malloc(ptr, sizeof(uint32_t) * number_of_keys);
624     dead_servers= libmemcached_calloc(ptr, memcached_server_count(ptr), sizeof(bool));
625 
626     if (hash == NULL || dead_servers == NULL)
627     {
628       libmemcached_free(ptr, hash);
629       libmemcached_free(ptr, dead_servers);
630       return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
631     }
632 
633     if (is_master_key_set)
634     {
635       for (x= 0; x < number_of_keys; x++)
636       {
637         hash[x]= master_server_key;
638       }
639     }
640     else
641     {
642       for (x= 0; x < number_of_keys; x++)
643       {
644         hash[x]= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
645       }
646     }
647 
648     rc= replication_binary_mget(ptr, hash, dead_servers, keys,
649                                 key_length, number_of_keys);
650 
651     libmemcached_free(ptr, hash);
652     libmemcached_free(ptr, dead_servers);
653 
654     return MEMCACHED_SUCCESS;
655   }
656 
657   return rc;
658 }
659