1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2012-2020 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #define LCB_IOPS_V12_NO_DEPRECATE
19 
20 #include "internal.h"
21 #include "select_io_opts.h"
22 #include <libcouchbase/plugins/io/bsdio-inl.c>
23 
24 #if defined(_WIN32) && !defined(usleep)
25 #define usleep(n) Sleep((n) / 1000)
26 #endif
27 
28 typedef struct sel_EVENT sel_EVENT;
29 struct sel_EVENT {
30     lcb_list_t list;
31     lcb_socket_t sock;
32     short flags;
33     short eflags; /* effective flags */
34     void *cb_data;
35     lcb_ioE_callback handler;
36     sel_EVENT *next; /* for chaining active events */
37 };
38 
39 typedef struct sel_TIMER sel_TIMER;
40 struct sel_TIMER {
41     lcb_list_t list;
42     int active;
43     hrtime_t exptime;
44     void *cb_data;
45     lcb_ioE_callback handler;
46 };
47 
48 typedef struct {
49     sel_EVENT events;
50     lcb_list_t timers;
51     int event_loop;
52 } sel_LOOP;
53 
timer_cmp_asc(lcb_list_t *a, lcb_list_t *b)54 static int timer_cmp_asc(lcb_list_t *a, lcb_list_t *b)
55 {
56     sel_TIMER *ta = LCB_LIST_ITEM(a, sel_TIMER, list);
57     sel_TIMER *tb = LCB_LIST_ITEM(b, sel_TIMER, list);
58     if (ta->exptime > tb->exptime) {
59         return 1;
60     } else if (ta->exptime < tb->exptime) {
61         return -1;
62     } else {
63         return 0;
64     }
65 }
66 
sel_event_new(lcb_io_opt_t iops)67 static void *sel_event_new(lcb_io_opt_t iops)
68 {
69     sel_LOOP *io = iops->v.v2.cookie;
70     sel_EVENT *ret = calloc(1, sizeof(sel_EVENT));
71     if (ret != NULL) {
72         lcb_list_append(&io->events.list, &ret->list);
73     }
74     return ret;
75 }
76 
sel_event_update(lcb_io_opt_t iops, lcb_socket_t sock, void *event, short flags, void *cb_data, lcb_ioE_callback handler)77 static int sel_event_update(lcb_io_opt_t iops, lcb_socket_t sock, void *event, short flags, void *cb_data,
78                             lcb_ioE_callback handler)
79 {
80     sel_EVENT *ev = event;
81     ev->sock = sock;
82     ev->handler = handler;
83     ev->cb_data = cb_data;
84     ev->flags = flags;
85     (void)iops;
86     return 0;
87 }
88 
sel_event_free(lcb_io_opt_t iops, void *event)89 static void sel_event_free(lcb_io_opt_t iops, void *event)
90 {
91     sel_EVENT *ev = event;
92     lcb_list_delete(&ev->list);
93     free(ev);
94     (void)iops;
95 }
96 
sel_event_cancel(lcb_io_opt_t iops, lcb_socket_t sock, void *event)97 static void sel_event_cancel(lcb_io_opt_t iops, lcb_socket_t sock, void *event)
98 {
99     sel_EVENT *ev = event;
100     ev->flags = 0;
101     ev->cb_data = NULL;
102     ev->handler = NULL;
103     (void)iops;
104     (void)sock;
105 }
106 
sel_timer_new(lcb_io_opt_t iops)107 static void *sel_timer_new(lcb_io_opt_t iops)
108 {
109     sel_TIMER *ret = calloc(1, sizeof(sel_TIMER));
110     (void)iops;
111     return ret;
112 }
113 
sel_timer_cancel(lcb_io_opt_t iops, void *timer)114 static void sel_timer_cancel(lcb_io_opt_t iops, void *timer)
115 {
116     sel_TIMER *tm = timer;
117     if (tm->active) {
118         tm->active = 0;
119         lcb_list_delete(&tm->list);
120     }
121     (void)iops;
122 }
123 
sel_timer_free(lcb_io_opt_t iops, void *timer)124 static void sel_timer_free(lcb_io_opt_t iops, void *timer)
125 {
126     sel_timer_cancel(iops, timer);
127     free(timer);
128     (void)iops;
129 }
130 
sel_timer_schedule(lcb_io_opt_t iops, void *timer, lcb_U32 usec, void *cb_data, lcb_ioE_callback handler)131 static int sel_timer_schedule(lcb_io_opt_t iops, void *timer, lcb_U32 usec, void *cb_data, lcb_ioE_callback handler)
132 {
133     sel_TIMER *tm = timer;
134     sel_LOOP *cookie = iops->v.v2.cookie;
135     lcb_assert(!tm->active);
136     tm->exptime = gethrtime() + (usec * (hrtime_t)1000);
137     tm->cb_data = cb_data;
138     tm->handler = handler;
139     tm->active = 1;
140     lcb_list_add_sorted(&cookie->timers, &tm->list, timer_cmp_asc);
141 
142     (void)iops;
143     return 0;
144 }
145 
sel_stop_loop(struct lcb_io_opt_st *iops)146 static void sel_stop_loop(struct lcb_io_opt_st *iops)
147 {
148     sel_LOOP *io = iops->v.v2.cookie;
149     io->event_loop = 0;
150 }
151 
pop_next_timer(sel_LOOP *cookie, hrtime_t now)152 static sel_TIMER *pop_next_timer(sel_LOOP *cookie, hrtime_t now)
153 {
154     sel_TIMER *ret;
155 
156     if (LCB_LIST_IS_EMPTY(&cookie->timers)) {
157         return NULL;
158     }
159 
160     ret = LCB_LIST_ITEM(cookie->timers.next, sel_TIMER, list);
161     if (ret->exptime > now) {
162         return NULL;
163     }
164     lcb_list_shift(&cookie->timers);
165     ret->active = 0;
166     return ret;
167 }
168 
get_next_timeout(sel_LOOP *cookie, struct timeval *tmo, hrtime_t now)169 static int get_next_timeout(sel_LOOP *cookie, struct timeval *tmo, hrtime_t now)
170 {
171     sel_TIMER *first;
172     hrtime_t delta;
173 
174     if (LCB_LIST_IS_EMPTY(&cookie->timers)) {
175         tmo->tv_sec = 0;
176         tmo->tv_usec = 0;
177         return 0;
178     }
179 
180     first = LCB_LIST_ITEM(cookie->timers.next, sel_TIMER, list);
181     if (now < first->exptime) {
182         delta = first->exptime - now;
183     } else {
184         delta = 0;
185     }
186 
187     if (delta) {
188         delta /= 1000;
189         tmo->tv_sec = (long)(delta / 1000000);
190         tmo->tv_usec = delta % 1000000;
191     } else {
192         tmo->tv_sec = 0;
193         tmo->tv_usec = 0;
194     }
195     return 1;
196 }
197 
run_loop(sel_LOOP *io, int is_tick)198 static void run_loop(sel_LOOP *io, int is_tick)
199 {
200     sel_EVENT *ev;
201     lcb_list_t *ii;
202 
203     fd_set readfds, writefds, exceptfds;
204 
205     io->event_loop = !is_tick;
206     do {
207         struct timeval tmo, *t;
208         int ret;
209         int nevents = 0;
210         int has_timers;
211         lcb_socket_t fdmax = 0;
212         hrtime_t now;
213 
214         t = NULL;
215         now = gethrtime();
216 
217         FD_ZERO(&readfds);
218         FD_ZERO(&writefds);
219         FD_ZERO(&exceptfds);
220 
221         LCB_LIST_FOR(ii, &io->events.list)
222         {
223             ev = LCB_LIST_ITEM(ii, sel_EVENT, list);
224             if (ev->flags != 0) {
225                 if (ev->flags & LCB_READ_EVENT) {
226                     FD_SET(ev->sock, &readfds);
227                 }
228 
229                 if (ev->flags & LCB_WRITE_EVENT) {
230                     FD_SET(ev->sock, &writefds);
231                 }
232 
233                 FD_SET(ev->sock, &exceptfds);
234                 if (ev->sock > fdmax) {
235                     fdmax = ev->sock;
236                 }
237                 ++nevents;
238             }
239         }
240 
241         has_timers = get_next_timeout(io, &tmo, now);
242         if (has_timers) {
243             t = &tmo;
244         } else if (is_tick) {
245             /* do not wait forever on tick */
246             tmo.tv_sec = 0;
247             tmo.tv_usec = LCB_MS2US(100);
248             t = &tmo;
249         }
250 
251         if (nevents == 0 && has_timers == 0) {
252             io->event_loop = 0;
253             return;
254         }
255 
256         if (nevents) {
257             ret = select(fdmax + 1, &readfds, &writefds, &exceptfds, t);
258             if (ret == SOCKET_ERROR) {
259                 return;
260             }
261         } else {
262             ret = 0;
263             if (!is_tick) {
264                 usleep((t->tv_sec * 1000000) + t->tv_usec);
265             }
266         }
267 
268         /** Always invoke the pending timers */
269         if (has_timers) {
270             sel_TIMER *tm;
271             now = gethrtime();
272 
273             while ((tm = pop_next_timer(io, now))) {
274                 tm->handler(-1, 0, tm->cb_data);
275             }
276         }
277 
278         /* To be completely safe, we need to copy active events
279          * before handing them. Iterating over the list of
280          * registered events isn't safe, because one callback can
281          * cancel all registered events before iteration will end
282          */
283 
284         if (ret && nevents) {
285             sel_EVENT *active = NULL;
286             LCB_LIST_FOR(ii, &io->events.list)
287             {
288                 ev = LCB_LIST_ITEM(ii, sel_EVENT, list);
289                 if (ev->flags != 0) {
290                     ev->eflags = 0;
291                     if (FD_ISSET(ev->sock, &readfds)) {
292                         ev->eflags |= LCB_READ_EVENT;
293                     }
294                     if (FD_ISSET(ev->sock, &writefds)) {
295                         ev->eflags |= LCB_WRITE_EVENT;
296                     }
297                     if (FD_ISSET(ev->sock, &exceptfds)) {
298                         ev->eflags = LCB_ERROR_EVENT | LCB_RW_EVENT; /** It should error */
299                     }
300                     if (ev->eflags != 0) {
301                         ev->next = active;
302                         active = ev;
303                     }
304                 }
305             }
306             ev = active;
307             while (ev) {
308                 sel_EVENT *p = ev->next;
309                 ev->handler(ev->sock, ev->eflags, ev->cb_data);
310                 ev = p;
311             }
312         }
313     } while (io->event_loop);
314 }
315 
sel_run_loop(struct lcb_io_opt_st *iops)316 static void sel_run_loop(struct lcb_io_opt_st *iops)
317 {
318     run_loop(iops->v.v2.cookie, 0);
319 }
sel_tick_loop(struct lcb_io_opt_st *iops)320 static void sel_tick_loop(struct lcb_io_opt_st *iops)
321 {
322     run_loop(iops->v.v2.cookie, 1);
323 }
324 
sel_destroy_iops(struct lcb_io_opt_st *iops)325 static void sel_destroy_iops(struct lcb_io_opt_st *iops)
326 {
327     sel_LOOP *io = iops->v.v2.cookie;
328     lcb_list_t *nn, *ii;
329     sel_EVENT *ev;
330     sel_TIMER *tm;
331 
332     if (io->event_loop != 0) {
333         fprintf(stderr, "WARN: libcouchbase(plugin-select): the event loop might be still active, but it still try to "
334                         "free resources\n");
335     }
336     LCB_LIST_SAFE_FOR(ii, nn, &io->events.list)
337     {
338         ev = LCB_LIST_ITEM(ii, sel_EVENT, list);
339         sel_event_free(iops, ev);
340     }
341     lcb_assert(LCB_LIST_IS_EMPTY(&io->events.list));
342     LCB_LIST_SAFE_FOR(ii, nn, &io->timers)
343     {
344         tm = LCB_LIST_ITEM(ii, sel_TIMER, list);
345         sel_timer_free(iops, tm);
346     }
347     lcb_assert(LCB_LIST_IS_EMPTY(&io->timers));
348     free(io);
349     free(iops);
350 }
351 
sel_socket_wrap(lcb_io_opt_t io, int domain, int type, int protocol)352 static lcb_socket_t sel_socket_wrap(lcb_io_opt_t io, int domain, int type, int protocol)
353 {
354     lcb_socket_t res = socket_impl(io, domain, type, protocol);
355 #ifndef _WIN32
356 
357     /* This only works on non-Windows where FD_SETSIZE is in effect wrt the
358      * actual FD number. On Windows, FD_SETSIZE is the cap on the _total_
359      * number of sockets to be used in select; not necessarily what their
360      * FD values are.
361      *
362      * TODO: Just use poll() on POSIX in the future.
363      */
364     if (res != INVALID_SOCKET && res > FD_SETSIZE) {
365         close_impl(io, res);
366         fprintf(stderr, "COUCHBASE: too many FDs. Cannot have socket > FD_SETSIZE. Use other I/O plugin\n");
367         io->v.v3.error = EINVAL;
368         res = INVALID_SOCKET;
369     }
370 #endif
371     return res;
372 }
373 
procs2_sel_callback(int version, lcb_loop_procs *loop_procs, lcb_timer_procs *timer_procs, lcb_bsd_procs *bsd_procs, lcb_ev_procs *ev_procs, lcb_completion_procs *completion_procs, lcb_iomodel_t *iomodel)374 static void procs2_sel_callback(int version, lcb_loop_procs *loop_procs, lcb_timer_procs *timer_procs,
375                                 lcb_bsd_procs *bsd_procs, lcb_ev_procs *ev_procs,
376                                 lcb_completion_procs *completion_procs, lcb_iomodel_t *iomodel)
377 {
378     ev_procs->create = sel_event_new;
379     ev_procs->destroy = sel_event_free;
380     ev_procs->watch = sel_event_update;
381     ev_procs->cancel = sel_event_cancel;
382 
383     timer_procs->create = sel_timer_new;
384     timer_procs->destroy = sel_timer_free;
385     timer_procs->schedule = sel_timer_schedule;
386     timer_procs->cancel = sel_timer_cancel;
387 
388     loop_procs->start = sel_run_loop;
389     loop_procs->stop = sel_stop_loop;
390     loop_procs->tick = sel_tick_loop;
391 
392     *iomodel = LCB_IOMODEL_EVENT;
393     wire_lcb_bsd_impl2(bsd_procs, version);
394 
395     /* Override */
396     bsd_procs->socket0 = sel_socket_wrap;
397     (void)completion_procs;
398 }
399 
400 LIBCOUCHBASE_API
lcb_create_select_io_opts(int version, lcb_io_opt_t *io, void *arg)401 lcb_STATUS lcb_create_select_io_opts(int version, lcb_io_opt_t *io, void *arg)
402 {
403     lcb_io_opt_t ret;
404     sel_LOOP *cookie;
405 
406     if (version != 0) {
407         return LCB_ERR_PLUGIN_VERSION_MISMATCH;
408     }
409     ret = calloc(1, sizeof(*ret));
410     cookie = calloc(1, sizeof(*cookie));
411     if (ret == NULL || cookie == NULL) {
412         free(ret);
413         free(cookie);
414         return LCB_ERR_NO_MEMORY;
415     }
416     lcb_list_init(&cookie->events.list);
417     lcb_list_init(&cookie->timers);
418 
419     /* setup io iops! */
420     ret->version = 3;
421     ret->dlhandle = NULL;
422     ret->destructor = sel_destroy_iops;
423 
424     /* consider that struct isn't allocated by the library,
425      * `need_cleanup' flag might be set in lcb_create() */
426     ret->v.v3.need_cleanup = 0;
427     ret->v.v3.get_procs = procs2_sel_callback;
428     ret->v.v3.cookie = cookie;
429 
430     /* For backwards compatibility */
431     wire_lcb_bsd_impl(ret);
432 
433     *io = ret;
434     (void)arg;
435     return LCB_SUCCESS;
436 }
437