1a018a7b0SSergey Avseyev/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2a018a7b0SSergey Avseyev/*
3cfedb1f4SSergey Avseyev *     Copyright 2018-2020 Couchbase, Inc.
4a018a7b0SSergey Avseyev *
5a018a7b0SSergey Avseyev *   Licensed under the Apache License, Version 2.0 (the "License");
6a018a7b0SSergey Avseyev *   you may not use this file except in compliance with the License.
7a018a7b0SSergey Avseyev *   You may obtain a copy of the License at
8a018a7b0SSergey Avseyev *
9a018a7b0SSergey Avseyev *       http://www.apache.org/licenses/LICENSE-2.0
10a018a7b0SSergey Avseyev *
11a018a7b0SSergey Avseyev *   Unless required by applicable law or agreed to in writing, software
12a018a7b0SSergey Avseyev *   distributed under the License is distributed on an "AS IS" BASIS,
13a018a7b0SSergey Avseyev *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14a018a7b0SSergey Avseyev *   See the License for the specific language governing permissions and
15a018a7b0SSergey Avseyev *   limitations under the License.
16a018a7b0SSergey Avseyev */
17a018a7b0SSergey Avseyev
18a018a7b0SSergey Avseyev#include "internal.h"
19a018a7b0SSergey Avseyev
20a018a7b0SSergey Avseyev#include <iostream>
21a018a7b0SSergey Avseyev#include <vector>
22a018a7b0SSergey Avseyev
23a018a7b0SSergey Avseyev#define LOGARGS(tracer, lvl) tracer->m_settings, "tracer", LCB_LOG_##lvl, __FILE__, __LINE__
24a018a7b0SSergey Avseyev
25a018a7b0SSergey Avseyevusing namespace lcb::trace;
26a018a7b0SSergey Avseyev
27a018a7b0SSergey Avseyevextern "C" {
28a018a7b0SSergey Avseyevstatic void tlt_destructor(lcbtrace_TRACER *wrapper)
29a018a7b0SSergey Avseyev{
30ec0aafceSSergey Avseyev    if (wrapper == nullptr) {
31a018a7b0SSergey Avseyev        return;
32a018a7b0SSergey Avseyev    }
33a018a7b0SSergey Avseyev    if (wrapper->cookie) {
34ec0aafceSSergey Avseyev        auto *tracer = reinterpret_cast<ThresholdLoggingTracer *>(wrapper->cookie);
3525525da6SSergey Avseyev        tracer->do_flush_orphans();
3625525da6SSergey Avseyev        tracer->do_flush_threshold();
37a018a7b0SSergey Avseyev        delete tracer;
38ec0aafceSSergey Avseyev        wrapper->cookie = nullptr;
39a018a7b0SSergey Avseyev    }
40a018a7b0SSergey Avseyev    delete wrapper;
41a018a7b0SSergey Avseyev}
42a018a7b0SSergey Avseyev
43a018a7b0SSergey Avseyevstatic void tlt_report(lcbtrace_TRACER *wrapper, lcbtrace_SPAN *span)
44a018a7b0SSergey Avseyev{
45ec0aafceSSergey Avseyev    if (wrapper == nullptr || wrapper->cookie == nullptr) {
46a018a7b0SSergey Avseyev        return;
47a018a7b0SSergey Avseyev    }
48a018a7b0SSergey Avseyev
49ec0aafceSSergey Avseyev    auto *tracer = reinterpret_cast<ThresholdLoggingTracer *>(wrapper->cookie);
506b7f4186SDavid Kelly    if (span->is_dispatch()) {
516b7f4186SDavid Kelly        span->find_outer_or_this()->increment_dispatch(span->duration());
526b7f4186SDavid Kelly    }
536b7f4186SDavid Kelly    if (span->is_encode()) {
546b7f4186SDavid Kelly        span->find_outer_or_this()->m_encode = span->duration();
556b7f4186SDavid Kelly    }
566b7f4186SDavid Kelly    if (span->is_outer()) {
576b7f4186SDavid Kelly        if (span->m_orphaned) {
586b7f4186SDavid Kelly            tracer->add_orphan(span);
596b7f4186SDavid Kelly        } else {
606b7f4186SDavid Kelly            tracer->check_threshold(span);
61a018a7b0SSergey Avseyev        }
62a018a7b0SSergey Avseyev    }
63a018a7b0SSergey Avseyev}
64a018a7b0SSergey Avseyev}
65a018a7b0SSergey Avseyev
66a018a7b0SSergey Avseyevlcbtrace_TRACER *ThresholdLoggingTracer::wrap()
67a018a7b0SSergey Avseyev{
68a018a7b0SSergey Avseyev    if (m_wrapper) {
69a018a7b0SSergey Avseyev        return m_wrapper;
70a018a7b0SSergey Avseyev    }
71a018a7b0SSergey Avseyev    m_wrapper = new lcbtrace_TRACER();
72a018a7b0SSergey Avseyev    m_wrapper->version = 0;
736b7f4186SDavid Kelly    m_wrapper->flags = LCBTRACE_F_THRESHOLD;
74a018a7b0SSergey Avseyev    m_wrapper->cookie = this;
75a018a7b0SSergey Avseyev    m_wrapper->destructor = tlt_destructor;
76a018a7b0SSergey Avseyev    m_wrapper->v.v0.report = tlt_report;
77a018a7b0SSergey Avseyev    return m_wrapper;
78a018a7b0SSergey Avseyev}
79a018a7b0SSergey Avseyev
80cfcbd22eSEllis BreenQueueEntry ThresholdLoggingTracer::convert(lcbtrace_SPAN *span)
81a018a7b0SSergey Avseyev{
82cfcbd22eSEllis Breen    QueueEntry orphan;
83a018a7b0SSergey Avseyev    orphan.duration = span->duration();
84eac059bcSSergey Avseyev    Json::Value entry;
856b7f4186SDavid Kelly    char *value, *value2;
866b7f4186SDavid Kelly    size_t nvalue, nvalue2;
87bca7f437SSergey Avseyev
88dac96030SBrett Lawson    entry["operation_name"] = std::string(span->m_opname);
890b0310e1SSergey Avseyev    if (lcbtrace_span_get_tag_str(span, LCBTRACE_TAG_OPERATION_ID, &value, &nvalue) == LCB_SUCCESS) {
90dac96030SBrett Lawson        entry["last_operation_id"] = std::string(value, value + nvalue);
91bca7f437SSergey Avseyev    }
920b0310e1SSergey Avseyev    if (lcbtrace_span_get_tag_str(span, LCBTRACE_TAG_LOCAL_ID, &value, &nvalue) == LCB_SUCCESS) {
93638c5782SEllis Breen        entry["last_local_id"] = std::string(value, value + nvalue);
94bca7f437SSergey Avseyev    }
950b0310e1SSergey Avseyev    if (lcbtrace_span_get_tag_str(span, LCBTRACE_TAG_LOCAL_ADDRESS, &value, &nvalue) == LCB_SUCCESS) {
966b7f4186SDavid Kelly        if (lcbtrace_span_get_tag_str(span, LCBTRACE_TAG_LOCAL_PORT, &value2, &nvalue2) == LCB_SUCCESS) {
976b7f4186SDavid Kelly            std::string address(value, value + nvalue);
986b7f4186SDavid Kelly            address.append(":");
996b7f4186SDavid Kelly            address.append(value2, nvalue2);
1006b7f4186SDavid Kelly            entry["last_local_socket"] = address;
1016b7f4186SDavid Kelly        }
102bca7f437SSergey Avseyev    }
1030b0310e1SSergey Avseyev    if (lcbtrace_span_get_tag_str(span, LCBTRACE_TAG_PEER_ADDRESS, &value, &nvalue) == LCB_SUCCESS) {
1046b7f4186SDavid Kelly        if (lcbtrace_span_get_tag_str(span, LCBTRACE_TAG_PEER_PORT, &value2, &nvalue2) == LCB_SUCCESS) {
1056b7f4186SDavid Kelly            std::string address(value, value + nvalue);
1066b7f4186SDavid Kelly            address.append(":");
1076b7f4186SDavid Kelly            address.append(value2, nvalue2);
1086b7f4186SDavid Kelly            entry["last_remote_socket"] = address;
1096b7f4186SDavid Kelly        }
110bca7f437SSergey Avseyev    }
1116b7f4186SDavid Kelly    if (span->service() == LCBTRACE_THRESHOLD_KV) {
1126b7f4186SDavid Kelly        entry["last_server_duration_us"] = (Json::UInt64)span->m_last_server;
1136b7f4186SDavid Kelly        entry["total_server_duration_us"] = (Json::UInt64)span->m_total_server;
114bca7f437SSergey Avseyev    }
1156b7f4186SDavid Kelly    if (span->m_encode > 0) {
1166b7f4186SDavid Kelly        entry["encode_duration_us"] = (Json::UInt64)span->m_encode;
1176b7f4186SDavid Kelly    }
1186b7f4186SDavid Kelly    entry["total_duration_us"] = (Json::UInt64)orphan.duration;
1196b7f4186SDavid Kelly    entry["last_dispatch_duration_us"] = (Json::UInt64)span->m_last_dispatch;
1206b7f4186SDavid Kelly    entry["total_dispatch_duration_us"] = (Json::UInt64)span->m_total_dispatch;
121a018a7b0SSergey Avseyev    orphan.payload = Json::FastWriter().write(entry);
122a018a7b0SSergey Avseyev    return orphan;
123a018a7b0SSergey Avseyev}
124a018a7b0SSergey Avseyev
125a018a7b0SSergey Avseyevvoid ThresholdLoggingTracer::add_orphan(lcbtrace_SPAN *span)
126a018a7b0SSergey Avseyev{
127a018a7b0SSergey Avseyev    m_orphans.push(convert(span));
128a018a7b0SSergey Avseyev}
129a018a7b0SSergey Avseyev
130a018a7b0SSergey Avseyevvoid ThresholdLoggingTracer::check_threshold(lcbtrace_SPAN *span)
131a018a7b0SSergey Avseyev{
1326b7f4186SDavid Kelly    if (span->is_outer()) {
1336b7f4186SDavid Kelly        if (span->service() == LCBTRACE_THRESHOLD__MAX) {
1346b7f4186SDavid Kelly            return;
1356b7f4186SDavid Kelly        }
1366b7f4186SDavid Kelly        if (span->duration() > m_settings->tracer_threshold[span->service()]) {
1376b7f4186SDavid Kelly
1386b7f4186SDavid Kelly            auto it = m_queues.find(span->service_str());
1396b7f4186SDavid Kelly            if (it != m_queues.end()) {
1406b7f4186SDavid Kelly                // found the queue, so push this in.
1416b7f4186SDavid Kelly                it->second.push(convert(span));
1426b7f4186SDavid Kelly            } else {
1436b7f4186SDavid Kelly                // add a new queue, then push this in.
1446b7f4186SDavid Kelly                auto pr = m_queues.emplace(span->service_str(), FixedSpanQueue{m_threshold_queue_size});
1456b7f4186SDavid Kelly                pr.first->second.push(convert(span));
1466b7f4186SDavid Kelly            }
1476b7f4186SDavid Kelly        }
148a018a7b0SSergey Avseyev    }
149a018a7b0SSergey Avseyev}
150a018a7b0SSergey Avseyev
1516b7f4186SDavid Kellyvoid ThresholdLoggingTracer::flush_queue(FixedSpanQueue &queue, const char *message, const char *service,
1526b7f4186SDavid Kelly                                         bool warn = false)
153a018a7b0SSergey Avseyev{
154eac059bcSSergey Avseyev    Json::Value entries;
1556b7f4186SDavid Kelly    if (nullptr != service) {
1566b7f4186SDavid Kelly        entries["service"] = service;
1576b7f4186SDavid Kelly    }
158cfcbd22eSEllis Breen    entries["count"] = (Json::UInt)queue.size();
159eac059bcSSergey Avseyev    Json::Value top;
1600b0310e1SSergey Avseyev    while (!queue.empty()) {
161a018a7b0SSergey Avseyev        Json::Value entry;
162cfcbd22eSEllis Breen        if (Json::Reader().parse(queue.top().payload, entry)) {
163a018a7b0SSergey Avseyev            top.append(entry);
164a018a7b0SSergey Avseyev        }
165cfcbd22eSEllis Breen        queue.pop();
166a018a7b0SSergey Avseyev    }
167a018a7b0SSergey Avseyev    entries["top"] = top;
168a018a7b0SSergey Avseyev    std::string doc = Json::FastWriter().write(entries);
169ec0aafceSSergey Avseyev    if (!doc.empty() && doc[doc.size() - 1] == '\n') {
170a018a7b0SSergey Avseyev        doc[doc.size() - 1] = '\0';
171a018a7b0SSergey Avseyev    }
172d88c9131SSergey Avseyev    if (warn) {
173d88c9131SSergey Avseyev        lcb_log(LOGARGS(this, WARN), "%s: %s", message, doc.c_str());
174d88c9131SSergey Avseyev    } else {
175d88c9131SSergey Avseyev        lcb_log(LOGARGS(this, INFO), "%s: %s", message, doc.c_str());
176d88c9131SSergey Avseyev    }
177a018a7b0SSergey Avseyev}
178a018a7b0SSergey Avseyev
17925525da6SSergey Avseyevvoid ThresholdLoggingTracer::do_flush_orphans()
18025525da6SSergey Avseyev{
18125525da6SSergey Avseyev    if (m_orphans.empty()) {
18225525da6SSergey Avseyev        return;
18325525da6SSergey Avseyev    }
1846b7f4186SDavid Kelly    flush_queue(m_orphans, "Orphan responses observed", nullptr, true);
18525525da6SSergey Avseyev}
18625525da6SSergey Avseyev
18725525da6SSergey Avseyevvoid ThresholdLoggingTracer::do_flush_threshold()
18825525da6SSergey Avseyev{
1896b7f4186SDavid Kelly    for (auto &element : m_queues) {
1906b7f4186SDavid Kelly        if (!element.second.empty()) {
1916b7f4186SDavid Kelly            flush_queue(element.second, "Operations over threshold", element.first.c_str());
1926b7f4186SDavid Kelly        }
19325525da6SSergey Avseyev    }
19425525da6SSergey Avseyev}
19525525da6SSergey Avseyev
196a018a7b0SSergey Avseyevvoid ThresholdLoggingTracer::flush_orphans()
197a018a7b0SSergey Avseyev{
198a018a7b0SSergey Avseyev    lcb_U32 tv = m_settings->tracer_orphaned_queue_flush_interval;
199a018a7b0SSergey Avseyev    if (tv == 0) {
200a018a7b0SSergey Avseyev        m_oflush.cancel();
201a018a7b0SSergey Avseyev    } else {
202a018a7b0SSergey Avseyev        m_oflush.rearm(tv);
203a018a7b0SSergey Avseyev    }
20425525da6SSergey Avseyev    do_flush_orphans();
205a018a7b0SSergey Avseyev}
206a018a7b0SSergey Avseyev
207a018a7b0SSergey Avseyevvoid ThresholdLoggingTracer::flush_threshold()
208a018a7b0SSergey Avseyev{
209a018a7b0SSergey Avseyev    lcb_U32 tv = m_settings->tracer_threshold_queue_flush_interval;
210a018a7b0SSergey Avseyev    if (tv == 0) {
211a018a7b0SSergey Avseyev        m_tflush.cancel();
212a018a7b0SSergey Avseyev    } else {
213a018a7b0SSergey Avseyev        m_tflush.rearm(tv);
214a018a7b0SSergey Avseyev    }
21525525da6SSergey Avseyev    do_flush_threshold();
216a018a7b0SSergey Avseyev}
217a018a7b0SSergey Avseyev
21807521d60SSergey AvseyevThresholdLoggingTracer::ThresholdLoggingTracer(lcb_INSTANCE *instance)
2196b7f4186SDavid Kelly    : m_wrapper(nullptr), m_settings(instance->settings),
2206b7f4186SDavid Kelly      m_threshold_queue_size(LCBT_SETTING(instance, tracer_threshold_queue_size)),
2216b7f4186SDavid Kelly      m_orphans(LCBT_SETTING(instance, tracer_orphaned_queue_size)), m_oflush(instance->iotable, this),
222a018a7b0SSergey Avseyev      m_tflush(instance->iotable, this)
223a018a7b0SSergey Avseyev{
224a018a7b0SSergey Avseyev    lcb_U32 tv = m_settings->tracer_orphaned_queue_flush_interval;
225a018a7b0SSergey Avseyev    if (tv > 0) {
226a018a7b0SSergey Avseyev        m_oflush.rearm(tv);
227a018a7b0SSergey Avseyev    }
228a018a7b0SSergey Avseyev    tv = m_settings->tracer_threshold_queue_flush_interval;
229a018a7b0SSergey Avseyev    if (tv > 0) {
230a018a7b0SSergey Avseyev        m_tflush.rearm(tv);
231a018a7b0SSergey Avseyev    }
232a018a7b0SSergey Avseyev}
233