1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "connections.h"
19 #include "buckets.h"
20 #include "connection.h"
21 #include "runtime.h"
22 #include "settings.h"
23 #include "stats.h"
24 #include "trace.h"
25 #include "utilities/protocol2text.h"
26 
27 #include <cJSON.h>
28 #include <platform/cb_malloc.h>
29 #include <platform/make_unique.h>
30 #include <algorithm>
31 #include <list>
32 
33 /*
34  * Free list management for connections.
35  */
36 struct connections {
37     std::mutex mutex;
38     std::list<Connection*> conns;
39 } connections;
40 
41 
42 /** Types ********************************************************************/
43 
44 /** Result of a buffer loan attempt */
45 enum class BufferLoan {
46     Existing,
47     Loaned,
48     Allocated,
49 };
50 
51 /** Function prototypes ******************************************************/
52 
53 static BufferLoan loan_single_buffer(Connection& c,
54                                      std::unique_ptr<cb::Pipe>& thread_buf,
55                                      std::unique_ptr<cb::Pipe>& conn_buf);
56 static void maybe_return_single_buffer(Connection& c,
57                                        std::unique_ptr<cb::Pipe>& thread_buf,
58                                        std::unique_ptr<cb::Pipe>& conn_buf);
59 static void conn_destructor(Connection* c);
60 static Connection* allocate_connection(SOCKET sfd,
61                                        event_base* base,
62                                        const ListeningPort& interface);
63 
64 static void release_connection(Connection* c);
65 
66 /** External functions *******************************************************/
signal_idle_clients(LIBEVENT_THREAD *me, int bucket_idx, bool logging)67 int signal_idle_clients(LIBEVENT_THREAD *me, int bucket_idx, bool logging)
68 {
69     // We've got a situation right now where we're seeing that
70     // some of the connections is "stuck". Let's dump all
71     // information until we solve the bug.
72     logging = true;
73 
74     int connected = 0;
75     std::lock_guard<std::mutex> lock(connections.mutex);
76     for (auto* c : connections.conns) {
77         if (c->getThread() == me) {
78             ++connected;
79             if (bucket_idx == -1 || c->getBucketIndex() == bucket_idx) {
80                 c->signalIfIdle(logging, me->index);
81             }
82         }
83     }
84 
85     return connected;
86 }
87 
iterate_thread_connections(LIBEVENT_THREAD* thread, std::function<void(Connection&)> callback)88 void iterate_thread_connections(LIBEVENT_THREAD* thread,
89                                 std::function<void(Connection&)> callback) {
90     // Deny modifications to the connection map while we're iterating
91     // over it
92     std::lock_guard<std::mutex> lock(connections.mutex);
93     for (auto* c : connections.conns) {
94         if (c->getThread() == thread) {
95             callback(*c);
96         }
97     }
98 }
99 
destroy_connections()100 void destroy_connections() {
101     std::lock_guard<std::mutex> lock(connections.mutex);
102     /* traverse the list of connections. */
103     for (auto* c : connections.conns) {
104         conn_destructor(c);
105     }
106     connections.conns.clear();
107 }
108 
close_all_connections()109 void close_all_connections() {
110     /* traverse the list of connections. */
111     {
112         std::lock_guard<std::mutex> lock(connections.mutex);
113         for (auto* c : connections.conns) {
114             if (!c->isSocketClosed()) {
115                 safe_close(c->getSocketDescriptor());
116                 c->setSocketDescriptor(INVALID_SOCKET);
117             }
118 
119             if (c->getRefcount() > 1) {
120                 perform_callbacks(ON_DISCONNECT, NULL, c);
121             }
122         }
123     }
124 
125     /*
126      * do a second loop, this time wait for all of them to
127      * be closed.
128      */
129     bool done;
130     do {
131         done = true;
132         {
133             std::lock_guard<std::mutex> lock(connections.mutex);
134             for (auto* c : connections.conns) {
135                 if (c->getRefcount() > 1) {
136                     done = false;
137                     break;
138                 }
139             }
140         }
141 
142         if (!done) {
143             usleep(500);
144         }
145     } while (!done);
146 }
147 
run_event_loop(Connection* c, short which)148 void run_event_loop(Connection* c, short which) {
149     const auto start = ProcessClock::now();
150     c->runEventLoop(which);
151     const auto stop = ProcessClock::now();
152 
153     using namespace std::chrono;
154     const auto ns = duration_cast<nanoseconds>(stop - start);
155     c->addCpuTime(ns);
156 
157     auto* thread = c->getThread();
158     if (thread != nullptr) {
159         scheduler_info[thread->index].add(ns);
160     }
161 
162     if (c->shouldDelete()) {
163         release_connection(c);
164     }
165 }
166 
conn_new(const SOCKET sfd, in_port_t parent_port, struct event_base* base, LIBEVENT_THREAD* thread)167 Connection* conn_new(const SOCKET sfd, in_port_t parent_port,
168                      struct event_base* base,
169                      LIBEVENT_THREAD* thread) {
170 
171     Connection* c;
172     {
173         std::lock_guard<std::mutex> guard(stats_mutex);
174         auto* interface = get_listening_port_instance(parent_port);
175         if (interface == nullptr) {
176             LOG_WARNING("{}: failed to locate server port {}. Disconnecting",
177                         (unsigned int)sfd,
178                         parent_port);
179             return nullptr;
180         }
181 
182         c = allocate_connection(sfd, base, *interface);
183     }
184 
185     if (c == nullptr) {
186         return nullptr;
187     }
188 
189     stats.total_conns++;
190 
191     c->incrementRefcount();
192 
193     associate_initial_bucket(*c);
194 
195     const auto& bucket = c->getBucket();
196     if (bucket.type != BucketType::NoBucket) {
197         LOG_INFO("{}: Accepted new client connected to bucket:[{}]",
198                  c->getId(),
199                  bucket.name);
200     }
201 
202     c->setThread(thread);
203     MEMCACHED_CONN_ALLOCATE(c->getId());
204 
205     if (settings.getVerbose() > 1) {
206         LOG_DEBUG("<{} new client connection", sfd);
207     }
208 
209     return c;
210 }
211 
conn_close(Connection& connection)212 void conn_close(Connection& connection) {
213     if (!connection.isSocketClosed()) {
214         throw std::logic_error("conn_cleanup: socketDescriptor must be closed");
215     }
216     if (connection.getState() != McbpStateMachine::State::immediate_close) {
217         throw std::logic_error("conn_cleanup: Connection:state (which is " +
218                                std::string(connection.getStateName()) +
219                                ") must be conn_immediate_close");
220     }
221 
222     auto thread = connection.getThread();
223     if (thread == nullptr) {
224         throw std::logic_error("conn_close: unable to obtain non-NULL thread from connection");
225     }
226 
227     thread->notification.remove(&connection);
228     // remove from pending-io list
229     {
230         std::lock_guard<std::mutex> lock(thread->pending_io.mutex);
231         thread->pending_io.map.erase(&connection);
232     }
233 
234     connection.read->clear();
235     connection.write->clear();
236     /* Return any buffers back to the thread; before we disassociate the
237      * connection from the thread. Note we clear DCP status first, so
238      * conn_return_buffers() will actually free the buffers.
239      */
240     connection.setDCP(false);
241     conn_return_buffers(&connection);
242 
243     connection.setState(McbpStateMachine::State::destroyed);
244 }
245 
get_listening_port_instance(const in_port_t port)246 ListeningPort *get_listening_port_instance(const in_port_t port) {
247     for (auto &instance : stats.listening_ports) {
248         if (instance.port == port) {
249             return &instance;
250         }
251     }
252 
253     return nullptr;
254 }
255 
256 #ifndef WIN32
257 /**
258  * NOTE: This is <b>not</b> intended to be called during normal situation,
259  * but in the case where we've been exhausting all connections to memcached
260  * we need a way to be able to dump the connection states in order to search
261  * for a bug.
262  */
dump_connection_stat_signal_handler(evutil_socket_t, short, void *)263 void dump_connection_stat_signal_handler(evutil_socket_t, short, void *) {
264     std::lock_guard<std::mutex> lock(connections.mutex);
265     for (auto *c : connections.conns) {
266         try {
267             auto json = c->toJSON();
268             auto info = to_string(json, false);
269             LOG_INFO("Connection: {}", info);
270         } catch (const std::bad_alloc&) {
271             LOG_WARNING("Failed to allocate memory to dump info for {}",
272                         c->getId());
273         }
274     }
275 }
276 #endif
277 
conn_loan_buffers(Connection* c)278 void conn_loan_buffers(Connection* c) {
279     if (c == nullptr) {
280         return;
281     }
282 
283     auto* ts = get_thread_stats(c);
284     switch (loan_single_buffer(*c, c->getThread()->read, c->read)) {
285     case BufferLoan::Existing:
286         ts->rbufs_existing++;
287         break;
288     case BufferLoan::Loaned:
289         ts->rbufs_loaned++;
290         break;
291     case BufferLoan::Allocated:
292         ts->rbufs_allocated++;
293         break;
294     }
295 
296     switch (loan_single_buffer(*c, c->getThread()->write, c->write)) {
297     case BufferLoan::Existing:
298         ts->wbufs_existing++;
299         break;
300     case BufferLoan::Loaned:
301         ts->wbufs_loaned++;
302         break;
303     case BufferLoan::Allocated:
304         ts->wbufs_allocated++;
305         break;
306     }
307 }
308 
conn_return_buffers(Connection* c)309 void conn_return_buffers(Connection* c) {
310     if (c == nullptr) {
311         return;
312     }
313 
314     auto thread = c->getThread();
315 
316     if (thread == nullptr) {
317         // Connection already cleaned up - nothing to do.
318         return;
319     }
320 
321     if (c->isDCP()) {
322         // DCP work differently - let them keep their buffers once allocated.
323         return;
324     }
325 
326     maybe_return_single_buffer(*c, thread->read, c->read);
327     maybe_return_single_buffer(*c, thread->write, c->write);
328 }
329 
330 /** Internal functions *******************************************************/
331 
332 /**
333  * Destructor for all connection objects. Release all allocated resources.
334  */
conn_destructor(Connection* c)335 static void conn_destructor(Connection* c) {
336     delete c;
337     stats.conn_structs--;
338 }
339 
340 /** Allocate a connection, creating memory and adding it to the conections
341  *  list. Returns a pointer to the newly-allocated connection if successful,
342  *  else NULL.
343  */
allocate_connection(SOCKET sfd, event_base* base, const ListeningPort& interface)344 static Connection* allocate_connection(SOCKET sfd,
345                                        event_base* base,
346                                        const ListeningPort& interface) {
347     Connection* ret = nullptr;
348 
349     try {
350         ret = new Connection(sfd, base, interface);
351         std::lock_guard<std::mutex> lock(connections.mutex);
352         connections.conns.push_back(ret);
353         stats.conn_structs++;
354         return ret;
355     } catch (const std::bad_alloc&) {
356         LOG_WARNING("Failed to allocate memory for connection");
357     } catch (const std::exception& error) {
358         LOG_WARNING("Failed to create connection: {}", error.what());
359     } catch (...) {
360         LOG_WARNING("Failed to create connection");
361     }
362 
363     delete ret;
364     return NULL;
365 }
366 
367 /** Release a connection; removing it from the connection list management
368  *  and freeing the Connection object.
369  */
release_connection(Connection* c)370 static void release_connection(Connection* c) {
371     {
372         std::lock_guard<std::mutex> lock(connections.mutex);
373         auto iter = std::find(connections.conns.begin(), connections.conns.end(), c);
374         // I should assert
375         cb_assert(iter != connections.conns.end());
376         connections.conns.erase(iter);
377     }
378 
379     // Finally free it
380     conn_destructor(c);
381 }
382 
383 /**
384  * If the connection doesn't already have a populated conn_buff, ensure that
385  * it does by either loaning out the threads, or allocating a new one if
386  * necessary.
387  */
loan_single_buffer(Connection& c, std::unique_ptr<cb::Pipe>& thread_buf, std::unique_ptr<cb::Pipe>& conn_buf)388 static BufferLoan loan_single_buffer(Connection& c,
389                                      std::unique_ptr<cb::Pipe>& thread_buf,
390                                      std::unique_ptr<cb::Pipe>& conn_buf) {
391     /* Already have a (partial) buffer - nothing to do. */
392     if (conn_buf) {
393         return BufferLoan::Existing;
394     }
395 
396     // If the thread has a buffer, let's loan that to the connection
397     if (thread_buf) {
398         thread_buf.swap(conn_buf);
399         return BufferLoan::Loaned;
400     }
401 
402     // Need to allocate a new buffer
403     try {
404         conn_buf = std::make_unique<cb::Pipe>(DATA_BUFFER_SIZE);
405     } catch (const std::bad_alloc&) {
406         // Unable to alloc a buffer for the thread. Not much we can do here
407         // other than terminate the current connection.
408         LOG_WARNING(
409                 "{}: Failed to allocate new network buffer.. closing "
410                 "connection {}",
411                 c.getId(),
412                 c.getDescription());
413         c.setState(McbpStateMachine::State::closing);
414         return BufferLoan::Existing;
415     }
416 
417     return BufferLoan::Allocated;
418 }
419 
maybe_return_single_buffer(Connection& c, std::unique_ptr<cb::Pipe>& thread_buf, std::unique_ptr<cb::Pipe>& conn_buf)420 static void maybe_return_single_buffer(Connection& c,
421                                        std::unique_ptr<cb::Pipe>& thread_buf,
422                                        std::unique_ptr<cb::Pipe>& conn_buf) {
423     if (conn_buf && conn_buf->empty()) {
424         // Buffer clean, dispose of it
425         if (thread_buf) {
426             // Already got a thread buffer.. release this one
427             conn_buf.reset();
428         } else {
429             conn_buf.swap(thread_buf);
430         }
431     }
432 }
433