1d0366df5STrond Norbye#include <platform/cbassert.h>
2a85aa543STrond Norbye#include <stdlib.h>
3a85aa543STrond Norbye#include <stdio.h>
4a85aa543STrond Norbye#include <sys/types.h>
5a85aa543STrond Norbye
6a85aa543STrond Norbye#ifdef WIN32
7a85aa543STrond Norbye#include <winsock2.h>
8a85aa543STrond Norbye#include <ws2tcpip.h>
9a85aa543STrond Norbye#define strdup _strdup
10a85aa543STrond Norbye#define sleep(a) Sleep(a * 1000)
11a85aa543STrond Norbye#else
12a85aa543STrond Norbye#include <unistd.h>
13a85aa543STrond Norbye#include <sys/socket.h>
14a85aa543STrond Norbye#endif
15a85aa543STrond Norbye
16a85aa543STrond Norbye#include <string.h>
17a85aa543STrond Norbye#include <curl/curl.h>
18a85aa543STrond Norbye
19a85aa543STrond Norbye#include <libconflate/conflate.h>
20a85aa543STrond Norbye#include "rest.h"
21a85aa543STrond Norbye#include "conflate_internal.h"
22a85aa543STrond Norbye
23a85aa543STrond Norbyelong curl_init_flags = CURL_GLOBAL_ALL;
24a85aa543STrond Norbye
25a85aa543STrond Norbyestatic int g_tot_process_new_configs = 0;
26a85aa543STrond Norbye
27a85aa543STrond Norbyestruct response_buffer {
28a85aa543STrond Norbye    char *data;
29a85aa543STrond Norbye    size_t bytes_used;
30a85aa543STrond Norbye    size_t buffer_size;
31a85aa543STrond Norbye    struct response_buffer *next;
32a85aa543STrond Norbye};
33a85aa543STrond Norbye
34a85aa543STrond Norbyestruct response_buffer *response_buffer_head = NULL;
35a85aa543STrond Norbyestruct response_buffer *cur_response_buffer = NULL;
36a85aa543STrond Norbye
37a85aa543STrond Norbyestatic struct response_buffer *mk_response_buffer(size_t size) {
38a85aa543STrond Norbye    struct response_buffer *r =
39a85aa543STrond Norbye      (struct response_buffer *) calloc(1, sizeof(struct response_buffer));
40d0366df5STrond Norbye    cb_assert(r);
41a85aa543STrond Norbye    r->data = malloc(size);
42d0366df5STrond Norbye    cb_assert(r->data);
43a85aa543STrond Norbye    r->bytes_used = 0;
44a85aa543STrond Norbye    r->buffer_size = size;
45a85aa543STrond Norbye    r->next = NULL;
46a85aa543STrond Norbye    return r;
47a85aa543STrond Norbye}
48a85aa543STrond Norbye
49a85aa543STrond Norbyestatic void free_response(struct response_buffer *response) {
50a85aa543STrond Norbye    if (!response) {
51a85aa543STrond Norbye        return;
52a85aa543STrond Norbye    }
53a85aa543STrond Norbye    if (response->next) {
54a85aa543STrond Norbye        free_response(response->next);
55a85aa543STrond Norbye    }
56a85aa543STrond Norbye    if (response->data) {
57a85aa543STrond Norbye        free(response->data);
58a85aa543STrond Norbye    }
59a85aa543STrond Norbye    free(response);
60a85aa543STrond Norbye}
61a85aa543STrond Norbye
62a85aa543STrond Norbyestatic struct response_buffer *write_data_to_buffer(struct response_buffer *buffer,
63a85aa543STrond Norbye                                                    const char *data, size_t len) {
64a85aa543STrond Norbye    size_t bytes_written = 0;
65a85aa543STrond Norbye    while (bytes_written < len) {
66a85aa543STrond Norbye        size_t bytes_to_write = (len - bytes_written);
67a85aa543STrond Norbye        size_t space = buffer->buffer_size - buffer->bytes_used;
68a85aa543STrond Norbye        if (space == 0) {
69a85aa543STrond Norbye            struct response_buffer *new_buffer = mk_response_buffer(buffer->buffer_size);
70a85aa543STrond Norbye            buffer->next = new_buffer;
71a85aa543STrond Norbye            buffer = new_buffer;
72a85aa543STrond Norbye        } else {
73a85aa543STrond Norbye            char *d;
74a85aa543STrond Norbye            if (bytes_to_write > space) {
75a85aa543STrond Norbye                bytes_to_write = space;
76a85aa543STrond Norbye            }
77a85aa543STrond Norbye            d = buffer->data;
78a85aa543STrond Norbye            d = &d[buffer->bytes_used];
79a85aa543STrond Norbye            memcpy(d,&data[bytes_written], bytes_to_write);
80a85aa543STrond Norbye            bytes_written += bytes_to_write;
81a85aa543STrond Norbye            buffer->bytes_used += bytes_to_write;
82a85aa543STrond Norbye        }
83a85aa543STrond Norbye    }
84a85aa543STrond Norbye    return buffer;
85a85aa543STrond Norbye}
86a85aa543STrond Norbye
87a85aa543STrond Norbyestatic char *assemble_complete_response(struct response_buffer *response_head) {
88a85aa543STrond Norbye    struct response_buffer *cur_buffer = response_head;
89a85aa543STrond Norbye    size_t response_size = 0;
90a85aa543STrond Norbye    char *response = NULL;
91a85aa543STrond Norbye    char *ptr;
92a85aa543STrond Norbye
93a85aa543STrond Norbye    if (response_head == NULL) {
94a85aa543STrond Norbye        return NULL;
95a85aa543STrond Norbye    }
96a85aa543STrond Norbye
97a85aa543STrond Norbye    /* figure out how big the message is */
98a85aa543STrond Norbye    while (cur_buffer) {
99a85aa543STrond Norbye        response_size += cur_buffer->bytes_used;
100a85aa543STrond Norbye        cur_buffer = cur_buffer->next;
101a85aa543STrond Norbye    }
102a85aa543STrond Norbye
103a85aa543STrond Norbye    /* create buffer */
104a85aa543STrond Norbye    response = malloc(response_size + 1);
105d0366df5STrond Norbye    cb_assert(response);
106a85aa543STrond Norbye
107a85aa543STrond Norbye    /* populate buffer */
108a85aa543STrond Norbye    cur_buffer = response_head;
109a85aa543STrond Norbye    ptr = response;
110a85aa543STrond Norbye    while (cur_buffer) {
111a85aa543STrond Norbye        memcpy(ptr, cur_buffer->data, cur_buffer->bytes_used);
112a85aa543STrond Norbye        ptr += cur_buffer->bytes_used;
113a85aa543STrond Norbye        cur_buffer = cur_buffer->next;
114a85aa543STrond Norbye    }
115a85aa543STrond Norbye
116a85aa543STrond Norbye    response[response_size] = '\0';
117a85aa543STrond Norbye
118a85aa543STrond Norbye    return response;
119a85aa543STrond Norbye}
120a85aa543STrond Norbye
121a85aa543STrond Norbyestatic bool pattern_ends_with(const char *pattern, const char *target, size_t target_size) {
122a85aa543STrond Norbye    size_t pattern_size;
123d0366df5STrond Norbye    cb_assert(target);
124d0366df5STrond Norbye    cb_assert(pattern);
125a85aa543STrond Norbye
126a85aa543STrond Norbye    pattern_size = strlen(pattern);
127a85aa543STrond Norbye    if (target_size < pattern_size) {
128a85aa543STrond Norbye        return false;
129a85aa543STrond Norbye    }
130a85aa543STrond Norbye    return memcmp(&target[target_size - pattern_size], pattern, pattern_size) == 0;
131a85aa543STrond Norbye}
132a85aa543STrond Norbye
1331bfc03f7SArtem Stemkovskistatic conflate_result process_new_config(long http_code, conflate_handle_t *conf_handle) {
134a85aa543STrond Norbye    char *values[2];
135a85aa543STrond Norbye    kvpair_t *kv;
136a85aa543STrond Norbye    conflate_result (*call_back)(void *, kvpair_t *);
137a85aa543STrond Norbye    conflate_result r;
1381bfc03f7SArtem Stemkovski    char buf[4];
139a85aa543STrond Norbye
140a85aa543STrond Norbye    g_tot_process_new_configs++;
141a85aa543STrond Norbye
1421bfc03f7SArtem Stemkovski    snprintf(buf, 4, "%ld", http_code),
1431bfc03f7SArtem Stemkovski    values[0] = buf;
1441bfc03f7SArtem Stemkovski    values[1] = NULL;
1451bfc03f7SArtem Stemkovski
1461bfc03f7SArtem Stemkovski    kv = mk_kvpair(HTTP_CODE_KEY, values);
1471bfc03f7SArtem Stemkovski
148a85aa543STrond Norbye    /* construct the new config from its components */
149a85aa543STrond Norbye    values[0] = assemble_complete_response(response_buffer_head);
150a85aa543STrond Norbye
151a85aa543STrond Norbye    free_response(response_buffer_head);
152a85aa543STrond Norbye    response_buffer_head = NULL;
153a85aa543STrond Norbye    cur_response_buffer = NULL;
154a85aa543STrond Norbye
155a85aa543STrond Norbye    if (values[0] == NULL) {
156a85aa543STrond Norbye        fprintf(stderr, "ERROR: invalid response from REST server\n");
157a85aa543STrond Norbye        return CONFLATE_ERROR;
158a85aa543STrond Norbye    }
159a85aa543STrond Norbye
1601bfc03f7SArtem Stemkovski    kv->next = mk_kvpair(CONFIG_KEY, values);
161a85aa543STrond Norbye
162a85aa543STrond Norbye    if (conf_handle->url != NULL) {
163a85aa543STrond Norbye        char *url[2];
164a85aa543STrond Norbye        url[0] = conf_handle->url;
165a85aa543STrond Norbye        url[1] = NULL;
1661bfc03f7SArtem Stemkovski        kv->next->next = mk_kvpair("url", url);
167a85aa543STrond Norbye    }
168a85aa543STrond Norbye
169a85aa543STrond Norbye    /* execute the provided call back */
170a85aa543STrond Norbye    call_back = conf_handle->conf->new_config;
171a85aa543STrond Norbye    r = call_back(conf_handle->conf->userdata, kv);
172a85aa543STrond Norbye
173a85aa543STrond Norbye    /* clean up */
174a85aa543STrond Norbye    free_kvpair(kv);
175a85aa543STrond Norbye    free(values[0]);
176a85aa543STrond Norbye
177a85aa543STrond Norbye    response_buffer_head = mk_response_buffer(RESPONSE_BUFFER_SIZE);
178a85aa543STrond Norbye    cur_response_buffer = response_buffer_head;
179a85aa543STrond Norbye
180a85aa543STrond Norbye    return r;
181a85aa543STrond Norbye}
182a85aa543STrond Norbye
183a85aa543STrond Norbyestatic size_t handle_response(void *data, size_t s, size_t num, void *cb) {
184a85aa543STrond Norbye    conflate_handle_t *c_handle = (conflate_handle_t *) cb;
185a85aa543STrond Norbye    size_t size = s * num;
186a85aa543STrond Norbye    bool end_of_message = pattern_ends_with(END_OF_CONFIG, data, size);
187a85aa543STrond Norbye    cur_response_buffer = write_data_to_buffer(cur_response_buffer, data, size);
188a85aa543STrond Norbye    if (end_of_message) {
1891bfc03f7SArtem Stemkovski        process_new_config(200, c_handle);
190a85aa543STrond Norbye    }
191a85aa543STrond Norbye    return size;
192a85aa543STrond Norbye}
193a85aa543STrond Norbye
194a85aa543STrond Norbyestatic int setup_curl_sock(void *clientp,
195a85aa543STrond Norbye                           curl_socket_t curlfd,
196a85aa543STrond Norbye                           curlsocktype purpose) {
197a85aa543STrond Norbye  int       optval = 1;
198a85aa543STrond Norbye  socklen_t optlen = sizeof(optval);
199a85aa543STrond Norbye  setsockopt(curlfd, SOL_SOCKET, SO_KEEPALIVE, (void *) &optval, optlen);
200a85aa543STrond Norbye  (void) clientp;
201a85aa543STrond Norbye  (void) purpose;
202a85aa543STrond Norbye  return 0;
203a85aa543STrond Norbye}
204a85aa543STrond Norbye
205a85aa543STrond Norbyestatic void setup_handle(CURL *handle, char *url, char *userpass,
206a85aa543STrond Norbye                         conflate_handle_t *chandle,
207a85aa543STrond Norbye                         size_t (response_handler)(void *, size_t, size_t, void *)) {
208a85aa543STrond Norbye    if (url != NULL) {
209a85aa543STrond Norbye
210a85aa543STrond Norbye        CURLcode c;
211a85aa543STrond Norbye
212a85aa543STrond Norbye        c = curl_easy_setopt(handle, CURLOPT_SOCKOPTFUNCTION, setup_curl_sock);
213d0366df5STrond Norbye        cb_assert(c == CURLE_OK);
214a85aa543STrond Norbye        c = curl_easy_setopt(handle, CURLOPT_WRITEDATA, chandle);
215d0366df5STrond Norbye        cb_assert(c == CURLE_OK);
216a85aa543STrond Norbye        c = curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, response_handler);
217d0366df5STrond Norbye        cb_assert(c == CURLE_OK);
218a85aa543STrond Norbye        c = curl_easy_setopt(handle, CURLOPT_URL, url);
219d0366df5STrond Norbye        cb_assert(c == CURLE_OK);
220a85aa543STrond Norbye
221a85aa543STrond Norbye        if (userpass != NULL) {
222a85aa543STrond Norbye            c = curl_easy_setopt(handle, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
223d0366df5STrond Norbye            cb_assert(c == CURLE_OK);
224a85aa543STrond Norbye            c = curl_easy_setopt(handle, CURLOPT_USERPWD, userpass);
225d0366df5STrond Norbye            cb_assert(c == CURLE_OK);
226a85aa543STrond Norbye        }
227a85aa543STrond Norbye
228a85aa543STrond Norbye        c = curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
229d0366df5STrond Norbye        cb_assert(c == CURLE_OK);
230a85aa543STrond Norbye    }
231a85aa543STrond Norbye}
232a85aa543STrond Norbye
233a85aa543STrond Norbye#ifdef WIN32
234a85aa543STrond Norbye/*
235a85aa543STrond Norbye * NOTE!!! we are only using "|" as the pattern, so this code will _NOT_
236a85aa543STrond Norbye * work if you change that to include more than a single character!
237a85aa543STrond Norbye */
238a85aa543STrond Norbyestatic char *strsep(char **stringp, char *pattern) {
239a85aa543STrond Norbye   char *ptr = *stringp;
240a85aa543STrond Norbye   *stringp = strchr(*stringp, pattern[0]);
241a85aa543STrond Norbye   if (*stringp != NULL) {
242a85aa543STrond Norbye      **stringp = '\0';
243a85aa543STrond Norbye      *stringp = (*stringp) + 1;
244a85aa543STrond Norbye   }
245a85aa543STrond Norbye
246a85aa543STrond Norbye   return ptr;
247a85aa543STrond Norbye}
248a85aa543STrond Norbye#endif
249a85aa543STrond Norbye
250a85aa543STrond Norbyevoid run_rest_conflate(void *arg) {
251a85aa543STrond Norbye    conflate_handle_t *handle = (conflate_handle_t *) arg;
252a85aa543STrond Norbye    char curl_error_string[CURL_ERROR_SIZE];
253a85aa543STrond Norbye    kvpair_t *conf;
254a85aa543STrond Norbye    CURLcode c;
255a85aa543STrond Norbye    CURL *curl_handle;
256a85aa543STrond Norbye    bool always_retry = true;
257a85aa543STrond Norbye
258a85aa543STrond Norbye
259a85aa543STrond Norbye
260a85aa543STrond Norbye    /* prep the buffers used to hold the config */
261a85aa543STrond Norbye    response_buffer_head = mk_response_buffer(RESPONSE_BUFFER_SIZE);
262a85aa543STrond Norbye    cur_response_buffer = response_buffer_head;
263a85aa543STrond Norbye
264a85aa543STrond Norbye    /* Before connecting and all that, load the stored config */
265a85aa543STrond Norbye    conf = load_kvpairs(handle, handle->conf->save_path);
266a85aa543STrond Norbye    if (conf) {
267a85aa543STrond Norbye        handle->conf->new_config(handle->conf->userdata, conf);
268a85aa543STrond Norbye        free_kvpair(conf);
269a85aa543STrond Norbye    }
270a85aa543STrond Norbye
271a85aa543STrond Norbye    /* init curl */
272a85aa543STrond Norbye    c = curl_global_init(curl_init_flags);
273d0366df5STrond Norbye    cb_assert(c == CURLE_OK);
274a85aa543STrond Norbye
275a85aa543STrond Norbye    curl_handle = curl_easy_init();
276d0366df5STrond Norbye    cb_assert(curl_handle);
277a85aa543STrond Norbye
278a85aa543STrond Norbye    curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, &curl_error_string);
279a85aa543STrond Norbye
280a85aa543STrond Norbye    while (true) {
281a85aa543STrond Norbye        int start_tot_process_new_configs = g_tot_process_new_configs;
282a85aa543STrond Norbye        bool succeeding = true;
283a85aa543STrond Norbye
284a85aa543STrond Norbye        while (succeeding) {
285a85aa543STrond Norbye            char *urls = strdup(handle->conf->host);  /* Might be a '|' delimited list of url's. */
286a85aa543STrond Norbye            char *next = urls;
287a85aa543STrond Norbye            char *userpass = NULL;
288a85aa543STrond Norbye            succeeding = false;
289a85aa543STrond Norbye
290a85aa543STrond Norbye            if (handle->conf->jid && strlen(handle->conf->jid)) {
291a85aa543STrond Norbye                size_t buff_size = strlen(handle->conf->jid) + strlen(handle->conf->pass) + 2;
292a85aa543STrond Norbye                userpass = (char *) malloc(buff_size);
293d0366df5STrond Norbye                cb_assert(userpass);
294a85aa543STrond Norbye                snprintf(userpass, buff_size, "%s:%s", handle->conf->jid, handle->conf->pass);
295a85aa543STrond Norbye                userpass[buff_size - 1] = '\0';
296a85aa543STrond Norbye            }
297a85aa543STrond Norbye
298a85aa543STrond Norbye            while (next != NULL) {
299a85aa543STrond Norbye                char *url = strsep(&next, "|");
300a85aa543STrond Norbye
301a85aa543STrond Norbye                handle->url = url;
302a85aa543STrond Norbye
303a85aa543STrond Norbye                setup_handle(curl_handle,
304a85aa543STrond Norbye                             url,  /* The full URL. */
305a85aa543STrond Norbye                             userpass, /* The auth user and password. */
306a85aa543STrond Norbye                             handle, handle_response);
307a85aa543STrond Norbye
308a85aa543STrond Norbye                if (curl_easy_perform(curl_handle) == 0) {
3091bfc03f7SArtem Stemkovski                    long http_code = 0;
3101bfc03f7SArtem Stemkovski                    if (curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_code) != CURLE_OK) {
3111bfc03f7SArtem Stemkovski                        http_code = 0;
3121bfc03f7SArtem Stemkovski                    }
313a85aa543STrond Norbye                    /* We reach here if the REST server didn't provide a
314a85aa543STrond Norbye                       streaming JSON response and so we need to process
315a85aa543STrond Norbye                       the just-one-JSON response */
3161bfc03f7SArtem Stemkovski                    conflate_result r = process_new_config(http_code, handle);
317a85aa543STrond Norbye                    if (r == CONFLATE_SUCCESS ||
318a85aa543STrond Norbye                        r == CONFLATE_ERROR) {
319a85aa543STrond Norbye                      /* Restart at the beginning of the urls list */
320a85aa543STrond Norbye                      /* on either a success or a 'local' error. */
321a85aa543STrond Norbye                      /* In contrast, if the callback returned a */
322a85aa543STrond Norbye                      /* value of CONFLATE_ERROR_BAD_SOURCE, then */
323a85aa543STrond Norbye                      /* we should try the next url on the list. */
324a85aa543STrond Norbye                      succeeding = true;
325a85aa543STrond Norbye                      next = NULL;
326a85aa543STrond Norbye                    }
327a85aa543STrond Norbye                } else {
328a85aa543STrond Norbye                    fprintf(stderr, "WARNING: curl error: %s from: %s\n",
329a85aa543STrond Norbye                            curl_error_string, url);
330a85aa543STrond Norbye                }
331a85aa543STrond Norbye            }
332a85aa543STrond Norbye
333a85aa543STrond Norbye            sleep(1);  /* Don't overload the REST servers with tons of retries. */
334a85aa543STrond Norbye
335a85aa543STrond Norbye            free(urls);
336a85aa543STrond Norbye            free(userpass);
337a85aa543STrond Norbye        }
338a85aa543STrond Norbye
339a85aa543STrond Norbye        if (start_tot_process_new_configs == g_tot_process_new_configs) {
3404915b93eSSteve Yen            if (start_tot_process_new_configs == 0) {
3414915b93eSSteve Yen                fprintf(stderr, "WARNING: could not contact REST server(s): %s;"
3424915b93eSSteve Yen                        " perhaps they are unavailable or still initializing\n",
3434915b93eSSteve Yen                        handle->conf->host);
3444915b93eSSteve Yen            } else {
3454915b93eSSteve Yen                fprintf(stderr, "ERROR: could not contact REST server(s): %s\n",
3464915b93eSSteve Yen                        handle->conf->host);
3474915b93eSSteve Yen            }
348a85aa543STrond Norbye
349a85aa543STrond Norbye            if (always_retry == false) {
350a85aa543STrond Norbye              /* If we went through all our URL's and didn't see any new */
351a85aa543STrond Norbye              /* configs, then stop trying. */
352a85aa543STrond Norbye
353a85aa543STrond Norbye              break;
354a85aa543STrond Norbye            }
355a85aa543STrond Norbye        }
356a85aa543STrond Norbye    }
357a85aa543STrond Norbye
358a85aa543STrond Norbye    free_response(response_buffer_head);
359a85aa543STrond Norbye    response_buffer_head = NULL;
360a85aa543STrond Norbye    cur_response_buffer = NULL;
361a85aa543STrond Norbye
362a85aa543STrond Norbye    curl_easy_cleanup(curl_handle);
363a85aa543STrond Norbye
364a85aa543STrond Norbye    exit(1);
365a85aa543STrond Norbye}