1 #include <platform/cbassert.h>
2 #include <stdlib.h>
3 #include <stdio.h>
4 #include <sys/types.h>
5 
6 #ifdef WIN32
7 #include <winsock2.h>
8 #include <ws2tcpip.h>
9 #define strdup _strdup
10 #define sleep(a) Sleep(a * 1000)
11 #else
12 #include <unistd.h>
13 #include <sys/socket.h>
14 #endif
15 
16 #include <string.h>
17 #include <curl/curl.h>
18 
19 #include <libconflate/conflate.h>
20 #include "rest.h"
21 #include "conflate_internal.h"
22 
23 long curl_init_flags = CURL_GLOBAL_ALL;
24 
25 static int g_tot_process_new_configs = 0;
26 
27 struct response_buffer {
28     char *data;
29     size_t bytes_used;
30     size_t buffer_size;
31     struct response_buffer *next;
32 };
33 
34 struct response_buffer *response_buffer_head = NULL;
35 struct response_buffer *cur_response_buffer = NULL;
36 
mk_response_buffer(size_t size)37 static struct response_buffer *mk_response_buffer(size_t size) {
38     struct response_buffer *r =
39       (struct response_buffer *) calloc(1, sizeof(struct response_buffer));
40     cb_assert(r);
41     r->data = malloc(size);
42     cb_assert(r->data);
43     r->bytes_used = 0;
44     r->buffer_size = size;
45     r->next = NULL;
46     return r;
47 }
48 
free_response(struct response_buffer *response)49 static void free_response(struct response_buffer *response) {
50     if (!response) {
51         return;
52     }
53     if (response->next) {
54         free_response(response->next);
55     }
56     if (response->data) {
57         free(response->data);
58     }
59     free(response);
60 }
61 
write_data_to_buffer(struct response_buffer *buffer, const char *data, size_t len)62 static struct response_buffer *write_data_to_buffer(struct response_buffer *buffer,
63                                                     const char *data, size_t len) {
64     size_t bytes_written = 0;
65     while (bytes_written < len) {
66         size_t bytes_to_write = (len - bytes_written);
67         size_t space = buffer->buffer_size - buffer->bytes_used;
68         if (space == 0) {
69             struct response_buffer *new_buffer = mk_response_buffer(buffer->buffer_size);
70             buffer->next = new_buffer;
71             buffer = new_buffer;
72         } else {
73             char *d;
74             if (bytes_to_write > space) {
75                 bytes_to_write = space;
76             }
77             d = buffer->data;
78             d = &d[buffer->bytes_used];
79             memcpy(d,&data[bytes_written], bytes_to_write);
80             bytes_written += bytes_to_write;
81             buffer->bytes_used += bytes_to_write;
82         }
83     }
84     return buffer;
85 }
86 
assemble_complete_response(struct response_buffer *response_head)87 static char *assemble_complete_response(struct response_buffer *response_head) {
88     struct response_buffer *cur_buffer = response_head;
89     size_t response_size = 0;
90     char *response = NULL;
91     char *ptr;
92 
93     if (response_head == NULL) {
94         return NULL;
95     }
96 
97     /* figure out how big the message is */
98     while (cur_buffer) {
99         response_size += cur_buffer->bytes_used;
100         cur_buffer = cur_buffer->next;
101     }
102 
103     /* create buffer */
104     response = malloc(response_size + 1);
105     cb_assert(response);
106 
107     /* populate buffer */
108     cur_buffer = response_head;
109     ptr = response;
110     while (cur_buffer) {
111         memcpy(ptr, cur_buffer->data, cur_buffer->bytes_used);
112         ptr += cur_buffer->bytes_used;
113         cur_buffer = cur_buffer->next;
114     }
115 
116     response[response_size] = '\0';
117 
118     return response;
119 }
120 
pattern_ends_with(const char *pattern, const char *target, size_t target_size)121 static bool pattern_ends_with(const char *pattern, const char *target, size_t target_size) {
122     size_t pattern_size;
123     cb_assert(target);
124     cb_assert(pattern);
125 
126     pattern_size = strlen(pattern);
127     if (target_size < pattern_size) {
128         return false;
129     }
130     return memcmp(&target[target_size - pattern_size], pattern, pattern_size) == 0;
131 }
132 
process_new_config(long http_code, conflate_handle_t *conf_handle)133 static conflate_result process_new_config(long http_code, conflate_handle_t *conf_handle) {
134     char *values[2];
135     kvpair_t *kv;
136     conflate_result (*call_back)(void *, kvpair_t *);
137     conflate_result r;
138     char buf[4];
139 
140     g_tot_process_new_configs++;
141 
142     snprintf(buf, 4, "%ld", http_code),
143     values[0] = buf;
144     values[1] = NULL;
145 
146     kv = mk_kvpair(HTTP_CODE_KEY, values);
147 
148     /* construct the new config from its components */
149     values[0] = assemble_complete_response(response_buffer_head);
150 
151     free_response(response_buffer_head);
152     response_buffer_head = NULL;
153     cur_response_buffer = NULL;
154 
155     if (values[0] == NULL) {
156         fprintf(stderr, "ERROR: invalid response from REST server\n");
157         return CONFLATE_ERROR;
158     }
159 
160     kv->next = mk_kvpair(CONFIG_KEY, values);
161 
162     if (conf_handle->url != NULL) {
163         char *url[2];
164         url[0] = conf_handle->url;
165         url[1] = NULL;
166         kv->next->next = mk_kvpair("url", url);
167     }
168 
169     /* execute the provided call back */
170     call_back = conf_handle->conf->new_config;
171     r = call_back(conf_handle->conf->userdata, kv);
172 
173     /* clean up */
174     free_kvpair(kv);
175     free(values[0]);
176 
177     response_buffer_head = mk_response_buffer(RESPONSE_BUFFER_SIZE);
178     cur_response_buffer = response_buffer_head;
179 
180     return r;
181 }
182 
handle_response(void *data, size_t s, size_t num, void *cb)183 static size_t handle_response(void *data, size_t s, size_t num, void *cb) {
184     conflate_handle_t *c_handle = (conflate_handle_t *) cb;
185     size_t size = s * num;
186     bool end_of_message = pattern_ends_with(END_OF_CONFIG, data, size);
187     cur_response_buffer = write_data_to_buffer(cur_response_buffer, data, size);
188     if (end_of_message) {
189         process_new_config(200, c_handle);
190     }
191     return size;
192 }
193 
setup_curl_sock(void *clientp, curl_socket_t curlfd, curlsocktype purpose)194 static int setup_curl_sock(void *clientp,
195                            curl_socket_t curlfd,
196                            curlsocktype purpose) {
197   int       optval = 1;
198   socklen_t optlen = sizeof(optval);
199   setsockopt(curlfd, SOL_SOCKET, SO_KEEPALIVE, (void *) &optval, optlen);
200   (void) clientp;
201   (void) purpose;
202   return 0;
203 }
204 
setup_handle(CURL *handle, char *url, char *userpass, conflate_handle_t *chandle, size_t (response_handler)(void *, size_t, size_t, void *))205 static void setup_handle(CURL *handle, char *url, char *userpass,
206                          conflate_handle_t *chandle,
207                          size_t (response_handler)(void *, size_t, size_t, void *)) {
208     if (url != NULL) {
209 
210         CURLcode c;
211 
212         c = curl_easy_setopt(handle, CURLOPT_SOCKOPTFUNCTION, setup_curl_sock);
213         cb_assert(c == CURLE_OK);
214         c = curl_easy_setopt(handle, CURLOPT_WRITEDATA, chandle);
215         cb_assert(c == CURLE_OK);
216         c = curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, response_handler);
217         cb_assert(c == CURLE_OK);
218         c = curl_easy_setopt(handle, CURLOPT_URL, url);
219         cb_assert(c == CURLE_OK);
220 
221         if (userpass != NULL) {
222             c = curl_easy_setopt(handle, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
223             cb_assert(c == CURLE_OK);
224             c = curl_easy_setopt(handle, CURLOPT_USERPWD, userpass);
225             cb_assert(c == CURLE_OK);
226         }
227 
228         c = curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
229         cb_assert(c == CURLE_OK);
230     }
231 }
232 
233 #ifdef WIN32
234 /*
235  * NOTE!!! we are only using "|" as the pattern, so this code will _NOT_
236  * work if you change that to include more than a single character!
237  */
strsep(char **stringp, char *pattern)238 static char *strsep(char **stringp, char *pattern) {
239    char *ptr = *stringp;
240    *stringp = strchr(*stringp, pattern[0]);
241    if (*stringp != NULL) {
242       **stringp = '\0';
243       *stringp = (*stringp) + 1;
244    }
245 
246    return ptr;
247 }
248 #endif
249 
run_rest_conflate(void *arg)250 void run_rest_conflate(void *arg) {
251     conflate_handle_t *handle = (conflate_handle_t *) arg;
252     char curl_error_string[CURL_ERROR_SIZE];
253     kvpair_t *conf;
254     CURLcode c;
255     CURL *curl_handle;
256     bool always_retry = true;
257 
258 
259 
260     /* prep the buffers used to hold the config */
261     response_buffer_head = mk_response_buffer(RESPONSE_BUFFER_SIZE);
262     cur_response_buffer = response_buffer_head;
263 
264     /* Before connecting and all that, load the stored config */
265     conf = load_kvpairs(handle, handle->conf->save_path);
266     if (conf) {
267         handle->conf->new_config(handle->conf->userdata, conf);
268         free_kvpair(conf);
269     }
270 
271     /* init curl */
272     c = curl_global_init(curl_init_flags);
273     cb_assert(c == CURLE_OK);
274 
275     curl_handle = curl_easy_init();
276     cb_assert(curl_handle);
277 
278     curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, &curl_error_string);
279 
280     while (true) {
281         int start_tot_process_new_configs = g_tot_process_new_configs;
282         bool succeeding = true;
283 
284         while (succeeding) {
285             char *urls = strdup(handle->conf->host);  /* Might be a '|' delimited list of url's. */
286             char *next = urls;
287             char *userpass = NULL;
288             succeeding = false;
289 
290             if (handle->conf->jid && strlen(handle->conf->jid)) {
291                 size_t buff_size = strlen(handle->conf->jid) + strlen(handle->conf->pass) + 2;
292                 userpass = (char *) malloc(buff_size);
293                 cb_assert(userpass);
294                 snprintf(userpass, buff_size, "%s:%s", handle->conf->jid, handle->conf->pass);
295                 userpass[buff_size - 1] = '\0';
296             }
297 
298             while (next != NULL) {
299                 char *url = strsep(&next, "|");
300 
301                 handle->url = url;
302 
303                 setup_handle(curl_handle,
304                              url,  /* The full URL. */
305                              userpass, /* The auth user and password. */
306                              handle, handle_response);
307 
308                 if (curl_easy_perform(curl_handle) == 0) {
309                     long http_code = 0;
310                     if (curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_code) != CURLE_OK) {
311                         http_code = 0;
312                     }
313                     /* We reach here if the REST server didn't provide a
314                        streaming JSON response and so we need to process
315                        the just-one-JSON response */
316                     conflate_result r = process_new_config(http_code, handle);
317                     if (r == CONFLATE_SUCCESS ||
318                         r == CONFLATE_ERROR) {
319                       /* Restart at the beginning of the urls list */
320                       /* on either a success or a 'local' error. */
321                       /* In contrast, if the callback returned a */
322                       /* value of CONFLATE_ERROR_BAD_SOURCE, then */
323                       /* we should try the next url on the list. */
324                       succeeding = true;
325                       next = NULL;
326                     }
327                 } else {
328                     fprintf(stderr, "WARNING: curl error: %s from: %s\n",
329                             curl_error_string, url);
330                 }
331             }
332 
333             sleep(1);  /* Don't overload the REST servers with tons of retries. */
334 
335             free(urls);
336             free(userpass);
337         }
338 
339         if (start_tot_process_new_configs == g_tot_process_new_configs) {
340             if (start_tot_process_new_configs == 0) {
341                 fprintf(stderr, "WARNING: could not contact REST server(s): %s;"
342                         " perhaps they are unavailable or still initializing\n",
343                         handle->conf->host);
344             } else {
345                 fprintf(stderr, "ERROR: could not contact REST server(s): %s\n",
346                         handle->conf->host);
347             }
348 
349             if (always_retry == false) {
350               /* If we went through all our URL's and didn't see any new */
351               /* configs, then stop trying. */
352 
353               break;
354             }
355         }
356     }
357 
358     free_response(response_buffer_head);
359     response_buffer_head = NULL;
360     cur_response_buffer = NULL;
361 
362     curl_easy_cleanup(curl_handle);
363 
364     exit(1);
365 }
366