xref: /6.0.3/platform/cbsocket/socket.cc (revision 11d8eb5f)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2017 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <platform/socket.h>
19
20#include <platform/dirutils.h>
21#include <platform/memorymap.h>
22#include <platform/platform.h>
23
24#include <event2/util.h>
25#include <sys/stat.h>
26
27#include <string.h>
28#include <atomic>
29#include <cerrno>
30#include <chrono>
31#include <memory>
32#include <mutex>
33#include <string>
34#include <system_error>
35#include <unordered_map>
36
37#ifndef WIN32
38#include <netdb.h>
39#include <unistd.h>
40#endif
41
42/**
43 * We cache each directory name in an unordered map where we remove
44 * the entry every time the socket is successfully closed. By
45 * doing this we can create a new file every time the same socket
46 * is reopened.
47 */
48std::mutex mutex;
49std::unordered_map<SOCKET, std::string> name_map;
50
51struct FileDeleter {
52    void operator()(FILE* fp) { fclose(fp); }
53};
54
55std::string get_log_filename(SOCKET socket) {
56    std::lock_guard<std::mutex> guard(mutex);
57
58    auto iter = name_map.find(socket);
59    if (iter == name_map.end()) {
60        std::string prefix{"/tmp/"};
61        prefix.append(std::to_string(cb_getpid()));
62        prefix.append("-");
63        prefix.append(std::to_string(socket));
64        prefix.append("-");
65
66        std::string name;
67        uint32_t seqno = 0;
68        do {
69            name = prefix+ std::to_string(seqno++);
70        } while (cb::io::isFile(name));
71
72        // Create the file to make sure we don't race ;)
73        std::unique_ptr<FILE, FileDeleter> fp(fopen(name.c_str(), "ab"));
74
75        name_map[socket] = name;
76        return name;
77    }
78
79    return iter->second;
80}
81
82static inline uint64_t get_timestamp() {
83    using namespace std::chrono;
84    auto now = std::chrono::steady_clock::now();
85    return uint64_t(
86            duration_cast<microseconds>(now.time_since_epoch()).count());
87}
88
89static void log_it(SOCKET sock, const void* data, size_t nb, const char* dir) {
90    std::string fname = get_log_filename(sock);
91    std::unique_ptr<FILE, FileDeleter> fp(fopen(fname.c_str(), "ab"));
92
93    if (!fp) {
94        throw std::system_error(errno,
95                                std::system_category(),
96                                "Failed to open socket log file " + fname);
97    }
98
99    if (nb > std::numeric_limits<uint32_t>::max()) {
100        throw std::runtime_error(
101                "cb::net::log_it: can't deal with buffers > uin32_t::max()");
102    }
103
104    // Write header
105    const auto now = uint64_t(get_timestamp());
106    const auto nbytes = uint32_t(nb);
107
108    if (fwrite(&now, 1, sizeof(now), fp.get()) != sizeof(now) ||
109        fwrite(&nbytes, 1, sizeof(nbytes), fp.get()) != sizeof(nbytes) ||
110        fwrite(dir, 1, 1, fp.get()) != 1) {
111        throw std::system_error(
112                errno,
113                std::system_category(),
114                "Incorrect number of bytes written to socket log file");
115    }
116
117    auto nw = fwrite(data, 1, nb, fp.get());
118    if (nw != nb) {
119        throw std::system_error(
120            errno,
121            std::system_category(),
122            "Incorrect number of bytes written to socket log file");
123    }
124}
125
126static bool defaultFilterHandler(SOCKET sock) {
127    (void)sock;
128    return true;
129}
130
131static void defaultCloseHandler(SOCKET sock, const std::string& dir) {
132    try {
133        cb::io::rmrf(dir);
134    } catch (...) {
135        // Empty
136    }
137}
138
139static std::atomic<bool(*)(SOCKET)> filter_handler {defaultFilterHandler};
140static std::atomic<void(*)(SOCKET, const std::string&)> close_handler {defaultCloseHandler};
141static std::atomic_bool logging;
142
143static inline bool should_log(SOCKET sock) {
144    if (!logging.load(std::memory_order_relaxed)) {
145        return false;
146    }
147
148    // Logging is enabled.. check the filter for this socket
149    return filter_handler.load()(sock);
150}
151
152namespace cb {
153namespace net {
154
155CBSOCKET_PUBLIC_API
156void set_log_filter_handler(bool (* handler)(SOCKET)) {
157    if (handler == nullptr) {
158        filter_handler.store(defaultFilterHandler);
159    } else {
160        filter_handler.store(handler);
161    }
162}
163
164CBSOCKET_PUBLIC_API
165void set_on_close_handler(void (* handler)(SOCKET, const std::string& dir)) {
166    if (handler == nullptr) {
167        close_handler.store(defaultCloseHandler);
168    } else {
169        close_handler.store(handler);
170    }
171}
172
173CBSOCKET_PUBLIC_API
174void set_socket_logging(bool enable) {
175    logging.store(enable);
176}
177
178CBSOCKET_PUBLIC_API
179void iterate_logfile(const std::string& file, IteratorFunc callback) {
180    struct stat st;
181    if (stat(file.c_str(), &st) == -1) {
182        throw std::system_error(
183            errno, std::system_category(),
184            "cb::net::iterate_logfile: stat() failed");
185    }
186
187    if (st.st_size == 0) {
188        // We can't memorymap a file of size 0
189        return;
190    }
191
192    cb::MemoryMappedFile map(file.c_str(), cb::MemoryMappedFile::Mode::RDONLY);
193    map.open();
194    cb::const_byte_buffer buffer{static_cast<const uint8_t*>(map.getRoot()),
195                                 map.getSize()};
196    iterate_logfile(buffer, callback);
197}
198
199CBSOCKET_PUBLIC_API
200void iterate_logfile(cb::const_byte_buffer buffer, IteratorFunc callback) {
201    const auto HeaderSize = sizeof(uint64_t) + sizeof(uint32_t) + sizeof(char);
202    size_t offset = 0;
203
204    while ((offset + HeaderSize) < buffer.size()) {
205        uint64_t timestamp;
206        uint32_t size;
207
208        memcpy(&timestamp, buffer.data() + offset, sizeof(timestamp));
209        memcpy(&size, buffer.data() + offset + sizeof(timestamp), sizeof(size));
210
211        cb::const_byte_buffer packet{buffer.data() + offset + HeaderSize, size};
212        offset += HeaderSize;
213        Direction direction{buffer[offset - 1] == 'r' ? Direction::Receive
214                                                      : Direction::Send};
215        offset += packet.size();
216
217        if (offset > buffer.size()) {
218            // we don't have the entire packet
219            throw std::underflow_error(
220                    "cb::net::iterate_logfile: Truncated file");
221        }
222
223        if (!callback(timestamp, direction, packet)) {
224            return;
225        }
226    }
227
228    if (offset != buffer.size()) {
229        // there is a format error...
230        throw std::underflow_error("cb::net::iterate_logfile: Truncated file");
231    }
232}
233
234CBSOCKET_PUBLIC_API
235int closesocket(SOCKET s) {
236    if (logging) {
237        std::lock_guard<std::mutex> guard(mutex);
238        int ret;
239#ifdef WIN32
240        ret = ::closesocket(s);
241#else
242        ret = ::close(s);
243#endif
244
245        if (ret == 0 && logging) {
246            std::string name;
247            auto iter = name_map.find(s);
248            if (iter != name_map.end()) {
249                name = iter->second;
250                name_map.erase(iter);
251                close_handler.load()(s, name);
252            }
253        }
254
255        return ret;
256    }
257
258#ifdef WIN32
259    return ::closesocket(s);
260#else
261    return ::close(s);
262#endif
263
264}
265
266CBSOCKET_PUBLIC_API
267int get_socket_error() {
268#ifdef WIN32
269    return WSAGetLastError();
270#else
271    return errno;
272#endif
273}
274
275CBSOCKET_PUBLIC_API
276int bind(SOCKET sock, const struct sockaddr* name, socklen_t namelen) {
277    return ::bind(sock, name, namelen);
278}
279
280CBSOCKET_PUBLIC_API
281int listen(SOCKET sock, int backlog) {
282    return ::listen(sock, backlog);
283}
284
285
286CBSOCKET_PUBLIC_API
287SOCKET accept(SOCKET sock, struct sockaddr* addr, socklen_t* addrlen) {
288    auto ret = ::accept(sock, addr, addrlen);
289    if (ret != INVALID_SOCKET && should_log(ret)) {
290        get_log_filename(ret);
291    }
292
293    return ret;
294}
295
296CBSOCKET_PUBLIC_API
297int connect(SOCKET sock, const struct sockaddr* name, int namelen) {
298    return ::connect(sock, name, namelen);
299}
300
301CBSOCKET_PUBLIC_API
302SOCKET socket(int domain, int type, int protocol) {
303    auto ret = ::socket(domain, type, protocol);
304    if (ret != INVALID_SOCKET && should_log(ret)) {
305        get_log_filename(ret);
306    }
307    return ret;
308}
309
310CBSOCKET_PUBLIC_API
311int shutdown(SOCKET sock, int how) {
312    return ::shutdown(sock, how);
313}
314
315CBSOCKET_PUBLIC_API
316ssize_t send(SOCKET sock, const void* buffer, size_t length, int flags) {
317#ifdef WIN32
318    auto ret =
319            ::send(sock, static_cast<const char*>(buffer), int(length), flags);
320#else
321    auto ret = ::send(sock, buffer, length, flags);
322#endif
323    if (ret > 0 && should_log(sock)) {
324        log_it(sock, buffer, size_t(ret), "w");
325    }
326
327    return ret;
328}
329
330CBSOCKET_PUBLIC_API
331ssize_t sendmsg(SOCKET sock, const struct msghdr* message, int flags) {
332    if (logging) {
333        int res = 0;
334        for (size_t ii = 0; ii < size_t(message->msg_iovlen); ii++) {
335            auto nw = cb::net::send(sock,
336                                    message->msg_iov[ii].iov_base,
337                                    message->msg_iov[ii].iov_len,
338                                    0);
339            if (nw == -1) {
340                return (res == 0) ? -1 : res;
341            }
342
343            res += nw;
344        }
345
346        return res;
347    }
348
349    return ::sendmsg(sock, message, flags);
350}
351
352CBSOCKET_PUBLIC_API
353ssize_t sendto(SOCKET sock,
354               const void* buffer,
355               size_t length,
356               int flags,
357               const struct sockaddr* dest_addr,
358               socklen_t dest_len) {
359    if (should_log(sock)) {
360        throw std::runtime_error("cb::net::sendto: not implemented");
361    }
362
363#ifdef WIN32
364    return ::sendto(sock,
365                    static_cast<const char*>(buffer),
366                    int(length),
367                    flags,
368                    dest_addr,
369                    dest_len);
370#else
371    return ::sendto(sock, buffer, length, flags, dest_addr, dest_len);
372#endif
373}
374
375CBSOCKET_PUBLIC_API
376ssize_t recv(SOCKET sock, void* buffer, size_t length, int flags) {
377#ifdef WIN32
378    auto ret = ::recv(sock, static_cast<char*>(buffer), length, flags);
379#else
380    auto ret = ::recv(sock, buffer, length, flags);
381#endif
382
383    if (ret > 0 && should_log(sock)) {
384        log_it(sock, buffer, size_t(ret), "r");
385    }
386
387    return ret;
388}
389
390CBSOCKET_PUBLIC_API
391ssize_t recvfrom(SOCKET sock,
392                 void* buffer,
393                 size_t length,
394                 int flags,
395                 struct sockaddr* address,
396                 socklen_t* address_len) {
397    if (should_log(sock)) {
398        throw std::runtime_error("cb::net::recvfrom: not implemented");
399    }
400
401#ifdef WIN32
402    return ::recvfrom(sock,
403                      static_cast<char*>(buffer),
404                      int(length),
405                      flags,
406                      address,
407                      address_len);
408#else
409    return ::recvfrom(sock, buffer, length, flags, address, address_len);
410#endif
411}
412
413CBSOCKET_PUBLIC_API
414ssize_t recvmsg(SOCKET sock, struct msghdr* message, int flags) {
415    if (logging) {
416        int res = 0;
417        for (size_t ii = 0; ii < size_t(message->msg_iovlen); ii++) {
418            auto nr = cb::net::recv(sock,
419                                    message->msg_iov[ii].iov_base,
420                                    message->msg_iov[ii].iov_len,
421                                    0);
422            if (nr == -1) {
423                return (res == 0) ? -1 : res;
424            }
425
426            res += nr;
427        }
428
429        return res;
430    }
431
432#ifdef WIN32
433    throw std::runtime_error("cb::net::recvmsg: Not implemented for win32");
434#else
435    return ::recvmsg(sock, message, flags);
436#endif
437}
438
439CBSOCKET_PUBLIC_API
440int socketpair(int domain, int type, int protocol, SOCKET socket_vector[2]) {
441    auto ret = evutil_socketpair(domain,
442                                 type,
443                                 protocol,
444                                 reinterpret_cast<evutil_socket_t*>(socket_vector));
445    if (ret == 0 && logging) {
446        get_log_filename(socket_vector[0]);
447        get_log_filename(socket_vector[1]);
448    }
449    return ret;
450}
451
452CBSOCKET_PUBLIC_API
453int getsockopt(SOCKET sock,
454               int level,
455               int option_name,
456               void* option_value,
457               socklen_t* option_len) {
458#ifdef WIN32
459    return ::getsockopt(sock,
460                        level,
461                        option_name,
462                        reinterpret_cast<char*>(option_value),
463                        option_len);
464#else
465    return ::getsockopt(sock, level, option_name, option_value, option_len);
466#endif
467}
468
469CBSOCKET_PUBLIC_API
470int setsockopt(SOCKET sock,
471               int level,
472               int option_name,
473               const void* option_value,
474               socklen_t option_len) {
475#ifdef WIN32
476    return ::setsockopt(sock,
477                        level,
478                        option_name,
479                        reinterpret_cast<const char*>(option_value),
480                        option_len);
481#else
482    return ::setsockopt(sock, level, option_name, option_value, option_len);
483#endif
484}
485
486CBSOCKET_PUBLIC_API
487int set_socket_noblocking(SOCKET sock) {
488    return evutil_make_socket_nonblocking(sock);
489}
490
491} // namespace net
492} // namespace cb
493
494// C API which wraps into the C++ API
495
496CBSOCKET_PUBLIC_API
497int cb_closesocket(SOCKET sock) {
498    return cb::net::closesocket(sock);
499}
500
501CBSOCKET_PUBLIC_API
502int cb_get_socket_error() {
503    return cb::net::get_socket_error();
504}
505
506CBSOCKET_PUBLIC_API
507int cb_bind(SOCKET sock, const struct sockaddr* name, socklen_t namelen) {
508    return cb::net::bind(sock, name, namelen);
509}
510
511CBSOCKET_PUBLIC_API
512SOCKET cb_accept(SOCKET sock, struct sockaddr* addr, socklen_t* addrlen) {
513    return cb::net::accept(sock, addr, addrlen);
514}
515
516CBSOCKET_PUBLIC_API
517int cb_connect(SOCKET sock, const struct sockaddr* name, int namelen) {
518    return cb::net::connect(sock, name, namelen);
519}
520
521CBSOCKET_PUBLIC_API
522SOCKET cb_socket(int domain, int type, int protocol) {
523    return cb::net::socket(domain, type, protocol);
524}
525
526CBSOCKET_PUBLIC_API
527int cb_shutdown(SOCKET sock, int how) {
528    return cb::net::shutdown(sock, how);
529}
530
531CBSOCKET_PUBLIC_API
532ssize_t cb_send(SOCKET sock, const void* buffer, size_t length, int flags) {
533    return cb::net::send(sock, buffer, length, flags);
534}
535
536CBSOCKET_PUBLIC_API
537ssize_t cb_sendmsg(SOCKET sock, const struct msghdr* message, int flags) {
538    return cb::net::sendmsg(sock, message, flags);
539}
540
541CBSOCKET_PUBLIC_API
542ssize_t cb_sendto(SOCKET sock,
543                  const void* buffer,
544                  size_t length,
545                  int flags,
546                  const struct sockaddr* dest_addr,
547                  socklen_t dest_len) {
548    return cb::net::sendto(sock, buffer, length, flags, dest_addr, dest_len);
549}
550
551CBSOCKET_PUBLIC_API
552ssize_t cb_recv(SOCKET sock, void* buffer, size_t length, int flags) {
553    return cb::net::recv(sock, buffer, length, flags);
554}
555
556CBSOCKET_PUBLIC_API
557ssize_t cb_recvfrom(SOCKET sock,
558                    void* buffer,
559                    size_t length,
560                    int flags,
561                    struct sockaddr* address,
562                    socklen_t* address_len) {
563    return cb::net::recvfrom(sock, buffer, length, flags, address, address_len);
564}
565
566CBSOCKET_PUBLIC_API
567ssize_t cb_recvmsg(SOCKET sock, struct msghdr* message, int flags) {
568    return cb::net::recvmsg(sock, message, flags);
569}
570
571CBSOCKET_PUBLIC_API
572void cb_set_socket_logging(bool enable) {
573    cb::net::set_socket_logging(enable);
574}
575
576CBSOCKET_PUBLIC_API
577int cb_socketpair(int domain, int type, int protocol, SOCKET socket_vector[2]) {
578    return cb::net::socketpair(domain, type, protocol, socket_vector);
579}
580
581CBSOCKET_PUBLIC_API
582int cb_set_socket_noblocking(SOCKET sock) {
583    return cb::net::set_socket_noblocking(sock);
584}
585
586CBSOCKET_PUBLIC_API
587int cb_listen(SOCKET sock, int backlog) {
588    return cb::net::listen(sock, backlog);
589}
590