xref: /6.6.0/kv_engine/daemon/opentracing.cc (revision ee5212ee)
1/*
2 *     Copyright 2019 Couchbase, Inc
3 *
4 *   Licensed under the Apache License, Version 2.0 (the "License");
5 *   you may not use this file except in compliance with the License.
6 *   You may obtain a copy of the License at
7 *
8 *       http://www.apache.org/licenses/LICENSE-2.0
9 *
10 *   Unless required by applicable law or agreed to in writing, software
11 *   distributed under the License is distributed on an "AS IS" BASIS,
12 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 *   See the License for the specific language governing permissions and
14 *   limitations under the License.
15 */
16
17#include "opentracing.h"
18
19#include "connection.h"
20#include "cookie.h"
21#include "cookie_trace_context.h"
22#include "log_macros.h"
23#include "opentracing_config.h"
24
25#include <mcbp/protocol/request.h>
26#include <nlohmann/json.hpp>
27#include <sstream>
28
29#include <platform/thread.h>
30
31namespace cb {
32using Thread = Couchbase::Thread;
33using ThreadState = Couchbase::ThreadState;
34} // namespace cb
35
36#ifdef ENABLE_OPENTRACING
37std::atomic_bool OpenTracing::enabled;
38
39// Create a shorter version so that we don't have to wrap lines all the
40// time
41using ParentSpan =
42        opentracing::expected<std::unique_ptr<opentracing::SpanContext>>;
43#endif
44
45std::unique_ptr<OpenTracing> OpenTracing::instance;
46
47class OpenTracingThread : public OpenTracing, public cb::Thread {
48public:
49    OpenTracingThread(const OpenTracingConfig& config)
50        : OpenTracing(config), cb::Thread("mcd:trace") {
51    }
52
53    void stop() {
54        {
55            std::lock_guard<std::mutex> guard(mutex);
56            running = false;
57        }
58        condition_variable.notify_all();
59    }
60
61protected:
62    /// The main loop of the thread
63    void run() override;
64
65    void pushOne(std::chrono::system_clock::time_point system_now,
66                 std::chrono::steady_clock::time_point steady_now,
67                 const CookieTraceContext& entry);
68
69    /// move all of the contents to the internal list. We live under
70    /// the assumption that only a few commands contains trace requests
71    /// so there won't be too many elemnets.
72    void push(CookieTraceContext& context) override {
73        {
74            std::lock_guard<std::mutex> guard(mutex);
75            contexts.push_back(std::move(context));
76        }
77        condition_variable.notify_all();
78    }
79
80#ifdef ENABLE_OPENTRACING
81    /// Try to decode the context as if it was encoded as a text map
82    /// (key1=value1&key2=value2)
83    ParentSpan decodeAsTextMap(const std::string& context);
84
85    /// Try to decode teh context as if it was encoded as a binary blob
86    ParentSpan decodeAsBinary(const std::string& context);
87#endif
88
89    bool running = true;
90
91    /// The mutex variable used to protect access to _all_ the internal
92    /// members
93    std::mutex mutex;
94    /// The daemon thread will block and wait on this variable when there
95    /// isn't any work to do
96    std::condition_variable condition_variable;
97
98    /// The list of entries to submit
99    std::vector<CookieTraceContext> contexts;
100};
101
102void OpenTracingThread::run() {
103    setRunning();
104    std::unique_lock<std::mutex> lock(mutex);
105    while (running) {
106        if (contexts.empty()) {
107            // There isn't any work to do... wait until we get some
108            condition_variable.wait(
109                    lock, [this] { return !running || !contexts.empty(); });
110        }
111
112        // move the entries over to another vector so the clients don't have
113        // to wait while I'm working
114        auto entries = std::move(contexts);
115
116        // Release the lock to the internal variables while I process the
117        // batch of trace elements
118        lock.unlock();
119
120        if (isEnabled()) {
121            // Unfortunately OpenTracing want system clock, and we operate
122            // with steady clocks internally.. snapshot the two and try to
123            // convert between them. (I don't want to cache this "forever"
124            // as the system clock could have been changed)
125            const auto system_now = std::chrono::system_clock::now();
126            const auto steady_now = std::chrono::steady_clock::now();
127
128            for (const auto& e : entries) {
129                pushOne(system_now, steady_now, e);
130            }
131        }
132
133        // make sure we run all destructors before we're grabbing the lock
134        entries.clear();
135
136        // acquire the lock
137        lock.lock();
138    }
139}
140
141#ifdef ENABLE_OPENTRACING
142// This code is copied from the example at:
143// https://github.com/opentracing/opentracing-cpp/blob/master/README.md)
144//
145// Some clients don't implement the binary encoding scheme (at least
146// jaeger don't have an implementation) so we need to work around that
147// by having the client use the space inefficient text-map implementation.
148struct CustomCarrierReader : opentracing::TextMapReader {
149    explicit CustomCarrierReader(
150            const std::unordered_map<std::string, std::string>& data_)
151        : data{data_} {
152    }
153
154    using F = std::function<opentracing::expected<void>(
155            opentracing::string_view, opentracing::string_view)>;
156
157    opentracing::expected<void> ForeachKey(F f) const override {
158        // Iterate through all key-value pairs, the tracer will use the
159        // relevant keys to extract a span context.
160        for (auto& key_value : data) {
161            auto was_successful = f(key_value.first, key_value.second);
162            if (!was_successful) {
163                // If the callback returns and unexpected value, bail out
164                // of the loop.
165                return was_successful;
166            }
167        }
168
169        // Indicate successful iteration.
170        return {};
171    }
172
173    // Optional, define TextMapReader::LookupKey to allow for faster extraction.
174    opentracing::expected<opentracing::string_view> LookupKey(
175            opentracing::string_view key) const override {
176        auto iter = data.find(key);
177        if (iter != data.end()) {
178            return opentracing::make_unexpected(
179                    opentracing::key_not_found_error);
180        }
181        return opentracing::string_view{iter->second};
182    }
183
184    const std::unordered_map<std::string, std::string>& data;
185};
186
187ParentSpan OpenTracingThread::decodeAsTextMap(const std::string& context) {
188    std::unordered_map<std::string, std::string> data;
189    std::istringstream istr(context);
190    std::string token;
191    while (std::getline(istr, token, '&')) {
192        size_t pos = token.find('=');
193        data[token.substr(0, pos)] = token.substr(pos + 1);
194    }
195
196    CustomCarrierReader carrier{data};
197    return tracer->Extract(carrier);
198}
199
200ParentSpan OpenTracingThread::decodeAsBinary(const std::string& context) {
201    std::istringstream istr(context);
202    return tracer->Extract(istr);
203}
204
205#endif
206
207void OpenTracingThread::pushOne(
208        std::chrono::system_clock::time_point system_now,
209        std::chrono::steady_clock::time_point steady_now,
210        const CookieTraceContext& entry) {
211#ifdef ENABLE_OPENTRACING
212    try {
213        ParentSpan parent;
214
215        if (entry.context.find('=') == std::string::npos) {
216            // this is most likely a binary encoded blob
217            parent = decodeAsBinary(entry.context);
218            if (!parent) {
219                // fall back to text:
220                parent = decodeAsTextMap(entry.context);
221            }
222        } else {
223            // this is most likely a text-map
224            parent = decodeAsTextMap(entry.context);
225            if (!parent) {
226                // fall back to binary blob
227                parent = decodeAsBinary(entry.context);
228            }
229        }
230
231        if (!parent) {
232            LOG_WARNING("Failed to parse OpenTracing context");
233            return;
234        }
235
236        for (const auto& d : entry.traceSpans) {
237            // Convert the start time to our system clock
238            const auto start =
239                    system_now - std::chrono::duration_cast<
240                                         std::chrono::system_clock::duration>(
241                                         steady_now - d.start);
242
243            std::string text;
244            if (d.code == cb::tracing::Code::Request) {
245                if (cb::mcbp::is_client_magic(entry.magic)) {
246                    text = to_string(cb::mcbp::ClientOpcode(entry.opcode));
247                } else {
248                    text = to_string(cb::mcbp::ServerOpcode(entry.opcode));
249                }
250            } else {
251                text = to_string(d.code);
252            }
253
254            auto span = tracer->StartSpan(text,
255                                          {opentracing::ChildOf(parent->get()),
256                                           opentracing::StartTimestamp(start)});
257            if (span) {
258                if (d.code == cb::tracing::Code::Request) {
259                    if (!entry.rawKey.empty()) {
260                        span->SetTag("key", entry.rawKey);
261                    }
262                    span->SetTag("opaque", entry.opaque);
263                    span->SetTag("span.kind", "server");
264                }
265
266                span->Finish(
267                        {opentracing::FinishTimestamp(d.start + d.duration)});
268            }
269        }
270    } catch (const std::exception& e) {
271        LOG_WARNING("Failed to generate OpenTracing entry: {}", e.what());
272    }
273#endif
274}
275
276OpenTracing::OpenTracing(const OpenTracingConfig& config) {
277#ifdef ENABLE_OPENTRACING
278    if (config.module.empty()) {
279        throw std::runtime_error(
280                "Configuration error: module must be specified");
281    }
282
283    LOG_INFO("Loading OpenTracing module: {}", config.module);
284    std::string error;
285    handle = opentracing::DynamicallyLoadTracingLibrary(config.module.c_str(),
286                                                        error);
287    if (!handle) {
288        std::string msg = "Failed to load OpenTracing library: \"" +
289                          config.module + "\": " + error;
290        throw std::runtime_error(msg);
291    }
292
293    auto& tracer_factory = handle->tracer_factory();
294    auto tracer_maybe = tracer_factory.MakeTracer(config.config.c_str(), error);
295    if (!tracer_maybe) {
296        std::string msg = "Failed to create OpenTracing tracer from \"" +
297                          config.module + "\": " + error;
298        throw std::runtime_error(msg);
299    }
300    tracer = *tracer_maybe;
301#endif
302}
303
304void OpenTracing::pushTraceLog(CookieTraceContext&& context) {
305    if (isEnabled()) {
306        instance->push(context);
307    }
308}
309
310void OpenTracing::updateConfig(const OpenTracingConfig& config) {
311#ifdef ENABLE_OPENTRACING
312    static std::mutex lock;
313    std::lock_guard<std::mutex> guard(lock);
314
315    if (!config.enabled) {
316        enabled.store(false, std::memory_order_release);
317        return;
318    }
319
320    // We want to enable tracing.. Only create one if we don't have one
321    if (!instance) {
322        try {
323            instance = std::make_unique<OpenTracingThread>(config);
324            dynamic_cast<OpenTracingThread*>(instance.get())->start();
325        } catch (const std::exception& e) {
326            LOG_ERROR("Failed to create OpenTracing: {}", e.what());
327            return;
328        }
329    }
330
331    enabled.store(true, std::memory_order_release);
332#endif
333}
334
335void OpenTracing::shutdown() {
336    if (!instance) {
337        return;
338    }
339
340    LOG_INFO("Shutting down OpenTracing");
341    auto thread = dynamic_cast<OpenTracingThread*>(instance.get());
342    if (thread == nullptr) {
343        throw std::runtime_error(
344                "OpenTracing::shutdown: expected instance to be "
345                "OpenTracingThread");
346    }
347
348    thread->stop();
349    thread->waitForState(cb::ThreadState::Zombie);
350    instance.reset();
351}
352