1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013-2020 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 /**
19  * New-Style v1 plugin for Windows, Using IOCP
20  * @author Mark Nunberg
21  */
22 
23 #include "iocp_iops.h"
24 #include <stdio.h>
25 #include <stdlib.h>
start_write(lcb_io_opt_t iobase, lcb_sockdata_t *sockbase, struct lcb_iovec_st *iov, lcb_size_t niov, void *uarg, lcb_ioC_write2_callback callback)26 static int start_write(lcb_io_opt_t iobase, lcb_sockdata_t *sockbase, struct lcb_iovec_st *iov, lcb_size_t niov,
27                        void *uarg, lcb_ioC_write2_callback callback)
28 {
29     iocp_t *io = (iocp_t *)iobase;
30     iocp_write_t *w;
31     iocp_sockdata_t *sd = (iocp_sockdata_t *)sockbase;
32     int rv;
33     DWORD dwNbytes;
34 
35     /** Figure out which w we should use */
36     if (sd->w_info.state == IOCP_WRITEBUF_AVAILABLE) {
37         w = &sd->w_info;
38         w->state = IOCP_WRITEBUF_INUSE;
39         memset(&w->ol_write.base, 0, sizeof(w->ol_write.base));
40     } else {
41         w = calloc(1, sizeof(*w));
42         lcb_assert(w);
43         if (!w) {
44             iobase->v.v2.error = WSA_NOT_ENOUGH_MEMORY;
45             return -1;
46         }
47 
48         w->state = IOCP_WRITEBUF_ALLOCATED;
49         w->ol_write.action = LCBIOCP_ACTION_WRITE;
50         w->ol_write.sd = sd;
51     }
52 
53     w->cb = callback;
54     w->uarg = uarg;
55 
56     /* nbytes is ignored here, but mandatory for WSASend() */
57     rv = WSASend(sd->sSocket, (WSABUF *)iov, niov, &dwNbytes, 0 /* Flags */, (OVERLAPPED *)&w->ol_write,
58                  NULL /* IOCP Callback */);
59     rv = iocp_just_scheduled(io, &w->ol_write, rv);
60     /* TODO: if write completes immediately, maybe return a special code */
61     return rv;
62 }
63 
start_read(lcb_io_opt_t iobase, lcb_sockdata_t *sockbase, lcb_IOV *iov, lcb_size_t niov, void *uarg, lcb_ioC_read2_callback callback)64 static int start_read(lcb_io_opt_t iobase, lcb_sockdata_t *sockbase, lcb_IOV *iov, lcb_size_t niov, void *uarg,
65                       lcb_ioC_read2_callback callback)
66 {
67     int rv;
68     DWORD flags = 0, dwNbytes;
69     iocp_t *io = (iocp_t *)iobase;
70     iocp_sockdata_t *sd = (iocp_sockdata_t *)sockbase;
71     struct lcb_buf_info *bi = &sockbase->read_buffer;
72 
73     IOCP_LOG(IOCP_DEBUG, "Read Requested..");
74     sd->ol_read.action = LCBIOCP_ACTION_READ;
75     sd->rdcb = callback;
76     sd->rdarg = uarg;
77 
78     /** Remove the leftover bits */
79     ZeroMemory(&sd->ol_read.base, sizeof(OVERLAPPED));
80 
81     /* nbytes and flags, required as an argument, but unused in our code */
82     rv = WSARecv(sd->sSocket, (WSABUF *)iov, niov, &dwNbytes, &flags, (OVERLAPPED *)&sd->ol_read, NULL);
83 
84     return iocp_just_scheduled(io, &sd->ol_read, rv);
85 }
86 
start_connect(lcb_io_opt_t iobase, lcb_sockdata_t *sdbase, const struct sockaddr *name, unsigned int namelen, lcb_io_connect_cb callback)87 static int start_connect(lcb_io_opt_t iobase, lcb_sockdata_t *sdbase, const struct sockaddr *name, unsigned int namelen,
88                          lcb_io_connect_cb callback)
89 {
90     /* In order to use ConnectEx(), the socket must be bound. */
91     union {
92         struct sockaddr_in in4;
93         struct sockaddr_in6 in6;
94     } u_addr;
95     BOOL result;
96     LPFN_CONNECTEX pConnectEx;
97     iocp_t *io = (iocp_t *)iobase;
98     iocp_sockdata_t *sd = (iocp_sockdata_t *)sdbase;
99     iocp_connect_t *conn;
100 
101     conn = calloc(1, sizeof(*conn));
102 
103     lcb_assert(conn);
104     if (conn == NULL) {
105         iobase->v.v2.error = WSA_NOT_ENOUGH_MEMORY;
106         return -1;
107     }
108 
109     conn->cb = callback;
110     conn->ol_conn.sd = sd;
111     conn->ol_conn.action = LCBIOCP_ACTION_CONNECT;
112     IOCP_LOG(IOCP_INFO, "Connnection OL=%p", &conn->ol_conn);
113 
114     memset(&u_addr, 0, sizeof(u_addr));
115     if (namelen == sizeof(u_addr.in4)) {
116         u_addr.in4.sin_family = AF_INET;
117     } else if (namelen == sizeof(u_addr.in6)) {
118         u_addr.in6.sin6_family = AF_INET6;
119     } else {
120         free(conn);
121         iobase->v.v2.error = WSAEINVAL;
122         return -1;
123     }
124 
125     if (bind(sd->sSocket, (const struct sockaddr *)&u_addr, namelen) != 0) {
126         iocp_set_last_error(iobase, sd->sSocket);
127         free(conn);
128         return -1;
129     }
130 
131     pConnectEx = iocp_initialize_connectex(sd->sSocket);
132     if (!pConnectEx) {
133         iocp_set_last_error(iobase, INVALID_SOCKET);
134         free(conn);
135         return -1;
136     }
137 
138     result = pConnectEx(sd->sSocket, name, namelen, NULL, 0,
139                         NULL, /* Optional buffer and length to send when connected. (unused)*/
140                         (OVERLAPPED *)conn);
141 
142     /** Other functions return 0 to indicate success. Here it's the opposite */
143     return iocp_just_scheduled(io, &conn->ol_conn, result == TRUE ? 0 : -1);
144 }
145 
create_socket(lcb_io_opt_t iobase, int domain, int type, int protocol)146 static lcb_sockdata_t *create_socket(lcb_io_opt_t iobase, int domain, int type, int protocol)
147 {
148     iocp_t *io = (iocp_t *)iobase;
149     HANDLE hResult;
150     SOCKET s;
151     iocp_sockdata_t *sd;
152 
153     sd = calloc(1, sizeof(*sd));
154     if (sd == NULL) {
155         return NULL;
156     }
157 
158     /* We need to use WSASocket to set the WSA_FLAG_OVERLAPPED option */
159     s = WSASocket(domain, type, protocol, NULL /* protocol info */, 0 /* "Group" */, WSA_FLAG_OVERLAPPED);
160 
161     if (s == INVALID_SOCKET) {
162         iocp_set_last_error(iobase, s);
163         free(sd);
164         return NULL;
165     }
166 
167     /**
168      * Disable the send buffer. This ensures a callback is invoked.
169      * If this is not set, the send operation may complete immediately
170      * and our operations may never be queued. The KB article says we should
171      * ensure multiple write requests are batched into a single operation, which
172      * is something we already do :)
173      *
174      * See http://support.microsoft.com/kb/181611 for details.
175      * Disabled currently until we can get actual benchmark numbers
176      */
177     if (0) {
178         int optval = 0, rv;
179         rv = setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char *)&optval, sizeof optval);
180     }
181 
182     hResult = CreateIoCompletionPort((HANDLE)s, io->hCompletionPort, (ULONG_PTR)sd, 0 /* nthreads */);
183 
184     if (hResult == NULL) {
185         iocp_set_last_error(iobase, s);
186         closesocket(s);
187         free(sd);
188         return NULL;
189     }
190 
191     sd->ol_read.sd = sd;
192     sd->refcount = 1;
193     sd->sSocket = s;
194     sd->sd_base.socket = s; /* Informational, used in tests */
195 
196     /** Initialize the write structure */
197     sd->w_info.ol_write.sd = sd;
198     sd->w_info.state = IOCP_WRITEBUF_AVAILABLE;
199     sd->w_info.ol_write.action = LCBIOCP_ACTION_WRITE;
200 
201     lcb_list_append(&io->sockets, &sd->list);
202 
203     return &sd->sd_base;
204 }
205 
close_socket(lcb_io_opt_t iobase, lcb_sockdata_t *sockbase)206 static unsigned int close_socket(lcb_io_opt_t iobase, lcb_sockdata_t *sockbase)
207 {
208     iocp_sockdata_t *sd = (iocp_sockdata_t *)sockbase;
209 
210     if (sd->sSocket != INVALID_SOCKET) {
211         closesocket(sd->sSocket);
212         sd->sSocket = INVALID_SOCKET;
213     }
214     iocp_socket_decref((iocp_t *)iobase, sd);
215     return 0;
216 }
217 
sock_nameinfo(lcb_io_opt_t iobase, lcb_sockdata_t *sockbase, struct lcb_nameinfo_st *ni)218 static int sock_nameinfo(lcb_io_opt_t iobase, lcb_sockdata_t *sockbase, struct lcb_nameinfo_st *ni)
219 {
220     iocp_sockdata_t *sd = (iocp_sockdata_t *)sockbase;
221     getsockname(sd->sSocket, ni->local.name, ni->local.len);
222     getpeername(sd->sSocket, ni->remote.name, ni->remote.len);
223     return 0;
224 }
225 
create_timer(lcb_io_opt_t iobase)226 static void *create_timer(lcb_io_opt_t iobase)
227 {
228     (void)iobase;
229     return calloc(1, sizeof(iocp_timer_t));
230 }
231 
delete_timer(lcb_io_opt_t iobase, void *opaque)232 static void delete_timer(lcb_io_opt_t iobase, void *opaque)
233 {
234     iocp_timer_t *tmr = (iocp_timer_t *)opaque;
235     iocp_t *io = (iocp_t *)iobase;
236     if (tmr->is_active) {
237         tmr->is_active = 0;
238         iocp_tmq_del(&io->timer_queue.list, tmr);
239     }
240 }
241 
update_timer(lcb_io_opt_t iobase, void *opaque, lcb_U32 usec, void *arg, lcb_ioE_callback cb)242 static int update_timer(lcb_io_opt_t iobase, void *opaque, lcb_U32 usec, void *arg, lcb_ioE_callback cb)
243 {
244     iocp_t *io = (iocp_t *)iobase;
245     iocp_timer_t *tmr = (iocp_timer_t *)opaque;
246     lcb_U64 now;
247 
248     if (tmr->is_active) {
249         iocp_tmq_del(&io->timer_queue.list, tmr);
250     }
251 
252     tmr->cb = cb;
253     tmr->arg = arg;
254     tmr->is_active = 1;
255     now = iocp_millis();
256     tmr->ms = now + (usec / 1000);
257     iocp_tmq_add(&io->timer_queue.list, tmr);
258     return 0;
259 }
260 
destroy_timer(lcb_io_opt_t iobase, void *opaque)261 static void destroy_timer(lcb_io_opt_t iobase, void *opaque)
262 {
263     free(opaque);
264     (void)iobase;
265 }
266 
set_nbio(SOCKET s, u_long mode)267 static int set_nbio(SOCKET s, u_long mode)
268 {
269     int rv;
270     rv = ioctlsocket(s, FIONBIO, &mode);
271     if (rv == 0) {
272         return 0;
273     } else {
274         fprintf(stderr, "libcouchbase: ioctlsocket => %lu\n", WSAGetLastError());
275         return -1;
276     }
277 }
278 
check_closed(lcb_io_opt_t io, lcb_sockdata_t *sockbase, int flags)279 static int check_closed(lcb_io_opt_t io, lcb_sockdata_t *sockbase, int flags)
280 {
281     int rv, err;
282     char buf;
283     iocp_sockdata_t *sd = (iocp_sockdata_t *)sockbase;
284     WSABUF iov;
285     DWORD dwReceived, dwFlags = MSG_PEEK;
286 
287     /* Currently don't know if IOCP lets use use MSG_PEEK.
288      * On the one hand: "This flag is valid only for nonoverlapped sockets".
289      * On the other hand:
290         > If both lpOverlapped and lpCompletionRoutine are NULL, the socket
291         > in this function will be treated as a nonoverlapped socket.
292      *
293      * Source: http://msdn.microsoft.com/en-us/library/windows/desktop/ms741688(v=vs.85).aspx
294 
295      * As a workaround for now, let's just disable this check if we are
296      * expecting unsolicited data. It is apparently legal to mix overlapped
297      * and non-overlapped calls.
298      */
299 
300     if ((flags & LCB_IO_SOCKCHECK_PEND_IS_ERROR) == 0) {
301         return LCB_IO_SOCKCHECK_STATUS_UNKNOWN;
302     }
303 
304     if (set_nbio(sd->sSocket, 1) != 0) {
305         return LCB_IO_SOCKCHECK_STATUS_UNKNOWN;
306     }
307 
308     iov.buf = &buf;
309     iov.len = 1;
310     rv = WSARecv(sd->sSocket, &iov, 1, &dwReceived, &dwFlags, NULL, NULL);
311     err = WSAGetLastError();
312 
313     if (set_nbio(sd->sSocket, 0) != 0) {
314         return LCB_IO_SOCKCHECK_STATUS_CLOSED;
315     }
316 
317     if (rv == 0) {
318         return LCB_IO_SOCKCHECK_STATUS_CLOSED;
319     } else if (err == WSAEWOULDBLOCK) {
320         return LCB_IO_SOCKCHECK_STATUS_OK;
321     } else {
322         return LCB_IO_SOCKCHECK_STATUS_UNKNOWN;
323     }
324 }
325 
iops_dtor(lcb_io_opt_t iobase)326 static void iops_dtor(lcb_io_opt_t iobase)
327 {
328     iocp_t *io = (iocp_t *)iobase;
329     lcb_list_t *cur;
330 
331     /** Close all sockets first so we can get events for them */
332     LCB_LIST_FOR(cur, &io->sockets)
333     {
334         iocp_sockdata_t *sd;
335         sd = LCB_LIST_ITEM(cur, iocp_sockdata_t, list);
336         if (sd->sSocket != INVALID_SOCKET) {
337             closesocket(sd->sSocket);
338             sd->sSocket = INVALID_SOCKET;
339         }
340     }
341     /** Drain the queue. This should not block */
342     while (1) {
343         DWORD nbytes;
344         ULONG_PTR completionKey;
345         LPOVERLAPPED pOl;
346         iocp_sockdata_t *sd;
347         iocp_overlapped_t *ol;
348 
349         GetQueuedCompletionStatus(io->hCompletionPort, &nbytes, &completionKey, &pOl, 0 /* Timeout */);
350         sd = (iocp_sockdata_t *)completionKey;
351         ol = (iocp_overlapped_t *)pOl;
352 
353         if (!ol) {
354             break;
355         }
356 
357         if (ol->action == LCBIOCP_ACTION_CONNECT) {
358             free(ol);
359         } else if (ol->action == LCBIOCP_ACTION_WRITE) {
360             iocp_write_done(io, IOCP_WRITEOBJ_FROM_OVERLAPPED(ol), -1);
361         } else if (ol->action == LCBIOCP_ACTION_READ) {
362             io->base.v.v2.error = WSAECONNRESET;
363             sd->rdcb(&sd->sd_base, -1, sd->rdarg);
364         }
365         iocp_socket_decref(io, sd);
366     }
367 
368     /* Destroy all remaining sockets */
369     LCB_LIST_FOR(cur, &io->sockets)
370     {
371         iocp_sockdata_t *sd = LCB_LIST_ITEM(cur, iocp_sockdata_t, list);
372 
373         IOCP_LOG(IOCP_WARN, "Leak detected in socket %p (%lu). Refcount=%d", sd, sd->sSocket, sd->refcount);
374         if (sd->sSocket != INVALID_SOCKET) {
375             closesocket(sd->sSocket);
376             sd->sSocket = INVALID_SOCKET;
377         }
378     }
379 
380     if (io->hCompletionPort && CloseHandle(io->hCompletionPort)) {
381         IOCP_LOG(IOCP_ERR, "Couldn't CloseHandle: %d", GetLastError());
382     }
383     free(io);
384 }
385 
get_procs(int version, lcb_loop_procs *loop, lcb_timer_procs *timer, lcb_bsd_procs *bsd, lcb_ev_procs *ev, lcb_completion_procs *iocp, lcb_iomodel_t *model)386 static void get_procs(int version, lcb_loop_procs *loop, lcb_timer_procs *timer, lcb_bsd_procs *bsd, lcb_ev_procs *ev,
387                       lcb_completion_procs *iocp, lcb_iomodel_t *model)
388 {
389     *model = LCB_IOMODEL_COMPLETION;
390 
391     loop->start = iocp_run;
392     loop->stop = iocp_stop;
393     loop->tick = iocp_tick;
394 
395     iocp->connect = start_connect;
396     iocp->read2 = start_read;
397     iocp->write2 = start_write;
398     iocp->socket = create_socket;
399     iocp->close = close_socket;
400     iocp->nameinfo = sock_nameinfo;
401     iocp->is_closed = check_closed;
402 
403     timer->create = create_timer;
404     timer->cancel = delete_timer;
405     timer->schedule = update_timer;
406     timer->destroy = destroy_timer;
407     (void)ev;
408     (void)bsd;
409 }
410 
411 LIBCOUCHBASE_API
lcb_iocp_new_iops(int version, lcb_io_opt_t *ioret, void *arg)412 lcb_STATUS lcb_iocp_new_iops(int version, lcb_io_opt_t *ioret, void *arg)
413 {
414     iocp_t *io;
415     lcb_io_opt_t tbl;
416 
417     io = calloc(1, sizeof(*io));
418     if (!io) {
419         return LCB_ERR_NO_MEMORY;
420     }
421 
422     /** These functions check if they were called more than once using atomic ops */
423     iocp_initialize_loop_globals();
424     lcb_list_init(&io->timer_queue.list);
425     lcb_list_init(&io->sockets);
426 
427     tbl = &io->base;
428     *ioret = tbl;
429 
430     io->breakout = TRUE;
431 
432     /** Create IOCP */
433     io->hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
434 
435     if (!io->hCompletionPort) {
436         return LCB_ERR_SDK_INTERNAL;
437     }
438 
439     tbl->destructor = iops_dtor;
440     tbl->version = 2;
441     tbl->v.v2.get_procs = get_procs;
442 
443     (void)version;
444     (void)arg;
445 
446     return LCB_SUCCESS;
447 }
448 
449 LIBCOUCHBASE_API
lcb_create_iocp_io_opts(void)450 struct lcb_io_opt_st *lcb_create_iocp_io_opts(void)
451 {
452     struct lcb_io_opt_st *ret;
453     lcb_iocp_new_iops(0, &ret, NULL);
454     return ret;
455 }
456