1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013-2020 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 /**
19  * New-Style v2 plugin for Windows, Using IOCP
20  *
21  * This file contains the core routines which actually make up the
22  * various "loops" of the event loop.
23  *
24  * @author Mark Nunberg
25  */
26 
27 #include "iocp_iops.h"
28 #include <stdio.h>
29 
30 static sGetQueuedCompletionStatusEx pGetQueuedCompletionStatusEx = NULL;
31 static int Have_GQCS_Ex = 1;
32 
iocp_initialize_loop_globals(void)33 void iocp_initialize_loop_globals(void)
34 {
35     HMODULE hKernel32;
36     static IOCP_SYNCTYPE initialized = 0;
37 
38     if (!IOCP_INITONCE(initialized)) {
39         return;
40     }
41 
42     hKernel32 = GetModuleHandleA("kernel32.dll");
43     if (!hKernel32) {
44         fprintf(stderr, "Couldn't load Kernel32.dll: [%u]\n", GetLastError());
45         return;
46     }
47 
48     pGetQueuedCompletionStatusEx =
49         (sGetQueuedCompletionStatusEx)GetProcAddress(hKernel32, "GetQueuedCompletionStatusEx");
50     if (pGetQueuedCompletionStatusEx == NULL) {
51         Have_GQCS_Ex = 0;
52         fprintf(stderr, "Couldn't load GetQueuedCompletionStatusEx. Using fallback [%u]\n", GetLastError());
53     }
54 }
55 
56 /**
57  * Make these macros prominent.
58  * It's important that they get called after each of our own calls to
59  * lcb.
60  */
61 #define LOOP_CAN_CONTINUE(io) ((io)->breakout == FALSE)
62 #define DO_IF_BREAKOUT(io, e)                                                                                          \
63     if (!LOOP_CAN_CONTINUE(io)) {                                                                                      \
64         e;                                                                                                             \
65     }
66 #define HAS_QUEUED_IO(io) (io)->n_iopending
67 
iocp_write_done(iocp_t *io, iocp_write_t *w, int status)68 void iocp_write_done(iocp_t *io, iocp_write_t *w, int status)
69 {
70     lcb_ioC_write2_callback callback = w->cb;
71     void *uarg = w->uarg;
72     iocp_sockdata_t *sd = w->ol_write.sd;
73 
74     if (w->state == IOCP_WRITEBUF_ALLOCATED) {
75         free(w);
76     } else {
77         w->state = IOCP_WRITEBUF_AVAILABLE;
78     }
79     callback(&sd->sd_base, status, uarg);
80 }
81 
82 /**
83  * Handles a single OVERLAPPED entry, and invokes
84  * the appropriate event
85  */
handle_single_overlapped(iocp_t *io, OVERLAPPED *lpOverlapped, ULONG_PTR lpCompletionKey, DWORD dwNumberOfBytesTransferred)86 static void handle_single_overlapped(iocp_t *io, OVERLAPPED *lpOverlapped, ULONG_PTR lpCompletionKey,
87                                      DWORD dwNumberOfBytesTransferred)
88 {
89     union {
90         iocp_write_t *w;
91         iocp_connect_t *conn;
92     } u_ol;
93     void *pointer_to_free = NULL;
94     int opstatus = 0;
95     int ws_status;
96     int action;
97     iocp_overlapped_t *ol = (iocp_overlapped_t *)lpOverlapped;
98     iocp_sockdata_t *sd = (iocp_sockdata_t *)lpCompletionKey;
99 
100     IOCP_LOG(IOCP_TRACE, "OL=%p, NB=%lu", ol, dwNumberOfBytesTransferred);
101 
102     ws_status = iocp_overlapped_status(lpOverlapped);
103 
104     if (ws_status) {
105         IOCP_LOG(IOCP_WARN, "Got negative status for %p: %d", ol, ws_status);
106         io->base.v.v2.error = iocp_w32err_2errno(ws_status);
107         opstatus = -1;
108     }
109 
110     action = ol->action;
111 
112     switch (action) {
113         case LCBIOCP_ACTION_READ:
114             /** Nothing special in the OVERLAPPED. */
115             if (sd->rdcb) {
116                 sd->rdcb(&sd->sd_base, dwNumberOfBytesTransferred, sd->rdarg);
117             }
118             break;
119 
120         case LCBIOCP_ACTION_WRITE:
121             u_ol.w = IOCP_WRITEOBJ_FROM_OVERLAPPED(lpOverlapped);
122             iocp_write_done(io, u_ol.w, opstatus);
123             break;
124 
125         case LCBIOCP_ACTION_CONNECT:
126             u_ol.conn = (iocp_connect_t *)ol;
127 
128             if (opstatus == 0) {
129                 /* This "Syncs" the connected state on the socket.. */
130                 int rv = setsockopt(ol->sd->sSocket, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
131 
132                 if (rv == SOCKET_ERROR) {
133                     iocp_set_last_error(&io->base, ol->sd->sSocket);
134                     opstatus = -1;
135                 }
136             }
137             u_ol.conn->cb(&sd->sd_base, opstatus);
138             pointer_to_free = u_ol.conn;
139             break;
140 
141         default:
142             fprintf(stderr, "COUCHBASE-IOCP: Unrecognized OVERLAPPED action %d\n", (int)action);
143             lcb_assert(0);
144             return;
145     }
146 
147     iocp_on_dequeued(io, sd, action);
148     free(pointer_to_free);
149 }
150 
dequeue_io_impl_ex(iocp_t *io, DWORD msTimeout)151 static int dequeue_io_impl_ex(iocp_t *io, DWORD msTimeout)
152 {
153     OVERLAPPED_ENTRY entries[64];
154     BOOL status;
155     ULONG ulRemoved;
156     const unsigned max_entries = sizeof(entries) / sizeof(entries[0]);
157     unsigned int ii;
158 
159     status = pGetQueuedCompletionStatusEx(io->hCompletionPort, entries, max_entries, &ulRemoved, msTimeout, FALSE);
160 
161     if (status == FALSE || ulRemoved == 0) {
162         return 0;
163     }
164 
165     for (ii = 0; ii < ulRemoved; ii++) {
166         OVERLAPPED_ENTRY *ent = entries + ii;
167 
168         io->n_iopending--;
169         handle_single_overlapped(io, ent->lpOverlapped, ent->lpCompletionKey, ent->dwNumberOfBytesTransferred);
170     }
171 
172     return LOOP_CAN_CONTINUE(io);
173 }
174 
dequeue_io_impl_compat(iocp_t *io, DWORD msTimeout)175 static int dequeue_io_impl_compat(iocp_t *io, DWORD msTimeout)
176 {
177     BOOL result;
178     DWORD dwNbytes;
179     ULONG_PTR ulPtr;
180     OVERLAPPED *lpOverlapped;
181 
182     result = GetQueuedCompletionStatus(io->hCompletionPort, &dwNbytes, &ulPtr, &lpOverlapped, msTimeout);
183 
184     if (lpOverlapped == NULL) {
185         IOCP_LOG(IOCP_TRACE, "No events left");
186         /** Nothing to do here */
187         return 0;
188     }
189 
190     io->n_iopending--;
191     handle_single_overlapped(io, lpOverlapped, ulPtr, dwNbytes);
192     return LOOP_CAN_CONTINUE(io);
193 }
194 
deque_expired_timers(iocp_t *io, lcb_U64 now)195 static void deque_expired_timers(iocp_t *io, lcb_U64 now)
196 {
197     while (LOOP_CAN_CONTINUE(io)) {
198         iocp_timer_t *timer = iocp_tmq_pop(&io->timer_queue.list, now);
199 
200         if (!timer) {
201             return;
202         }
203 
204         timer->is_active = 0;
205         timer->cb(-1, 0, timer->arg);
206     }
207 }
208 
209 /** Maximum amount of time the I/O can hog the loop */
210 #define IOCP_IOLOOP_MAXTIME 1000
211 
should_yield(lcb_U32 start)212 static int should_yield(lcb_U32 start)
213 {
214     lcb_U32 now = iocp_micros();
215     return now - start > IOCP_IOLOOP_MAXTIME;
216 }
217 
218 /**
219  * I'd like to make several behavioral guidelines here:
220  *
221  * 1) LCB shall call breakout if it wishes to terminate the loop.
222  * 2) We shall not handle the case where the user accidentally calls lcb_wait()
223  *    while not having anything pending. That's just too bad.
224  */
iocp_run_loop(lcb_io_opt_t iobase, int is_tick)225 static void iocp_run_loop(lcb_io_opt_t iobase, int is_tick)
226 {
227     iocp_t *io = (iocp_t *)iobase;
228     lcb_U64 now = 0;
229     DWORD tmo;
230     int remaining;
231 
232     if (!io->breakout) {
233         return;
234     }
235 
236     io->breakout = FALSE;
237     IOCP_LOG(IOCP_INFO, "do-loop BEGIN");
238 
239     do {
240         /** To ensure we don't starve pending timers, use an iteration */
241         lcb_U32 usStartTime;
242 
243         if (!now) {
244             now = iocp_millis();
245         }
246 
247         do {
248             tmo = (DWORD)iocp_tmq_next_timeout(&io->timer_queue.list, now);
249             IOCP_LOG(IOCP_TRACE, "Timeout=%lu msec", tmo);
250 
251             if (tmo) {
252                 break;
253             }
254 
255             deque_expired_timers(io, now);
256         } while (tmo == 0 && LOOP_CAN_CONTINUE(io));
257 
258         if (!LOOP_CAN_CONTINUE(io)) {
259             break;
260         }
261 
262         /** TODO: Use reference counting */
263         if (tmo == INFINITE) {
264             if (HAS_QUEUED_IO(io)) {
265                 lcb_assert(0 && "Found I/O without any timers");
266             }
267             break;
268         }
269 
270         usStartTime = iocp_micros();
271         do {
272             remaining = Have_GQCS_Ex ? dequeue_io_impl_ex(io, tmo) : dequeue_io_impl_compat(io, tmo);
273             tmo = 0;
274         } while (LOOP_CAN_CONTINUE(io) && remaining && should_yield(usStartTime) == 0);
275 
276         IOCP_LOG(IOCP_TRACE, "Stopped IO loop");
277 
278         if (LOOP_CAN_CONTINUE(io)) {
279             now = iocp_millis();
280             deque_expired_timers(io, now);
281             tmo = (DWORD)iocp_tmq_next_timeout(&io->timer_queue.list, now);
282         }
283     } while (!is_tick && LOOP_CAN_CONTINUE(io) && (HAS_QUEUED_IO(io) || tmo != INFINITE));
284 
285     IOCP_LOG(IOCP_INFO, "do-loop END");
286     io->breakout = TRUE;
287 }
288 
iocp_run(lcb_io_opt_t iobase)289 void iocp_run(lcb_io_opt_t iobase)
290 {
291     return iocp_run_loop(iobase, 0);
292 }
293 
iocp_tick(lcb_io_opt_t iobase)294 void iocp_tick(lcb_io_opt_t iobase)
295 {
296     return iocp_run_loop(iobase, 1);
297 }
298 
iocp_stop(lcb_io_opt_t iobase)299 void iocp_stop(lcb_io_opt_t iobase)
300 {
301     iocp_t *io = (iocp_t *)iobase;
302     IOCP_LOG(IOCP_INFO, "Breakout requested");
303     io->breakout = TRUE;
304 }
305