1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2018-2020 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "internal.h"
19 
20 #include <iostream>
21 #include <vector>
22 
23 #define LOGARGS(tracer, lvl) tracer->m_settings, "tracer", LCB_LOG_##lvl, __FILE__, __LINE__
24 
25 using namespace lcb::trace;
26 
27 extern "C" {
tlt_destructor(lcbtrace_TRACER *wrapper)28 static void tlt_destructor(lcbtrace_TRACER *wrapper)
29 {
30     if (wrapper == nullptr) {
31         return;
32     }
33     if (wrapper->cookie) {
34         auto *tracer = reinterpret_cast<ThresholdLoggingTracer *>(wrapper->cookie);
35         tracer->do_flush_orphans();
36         tracer->do_flush_threshold();
37         delete tracer;
38         wrapper->cookie = nullptr;
39     }
40     delete wrapper;
41 }
42 
tlt_report(lcbtrace_TRACER *wrapper, lcbtrace_SPAN *span)43 static void tlt_report(lcbtrace_TRACER *wrapper, lcbtrace_SPAN *span)
44 {
45     if (wrapper == nullptr || wrapper->cookie == nullptr) {
46         return;
47     }
48 
49     auto *tracer = reinterpret_cast<ThresholdLoggingTracer *>(wrapper->cookie);
50     if (span->is_dispatch()) {
51         span->find_outer_or_this()->increment_dispatch(span->duration());
52     }
53     if (span->is_encode()) {
54         span->find_outer_or_this()->m_encode = span->duration();
55     }
56     if (span->is_outer()) {
57         if (span->m_orphaned) {
58             tracer->add_orphan(span);
59         } else {
60             tracer->check_threshold(span);
61         }
62     }
63 }
64 }
65 
wrap()66 lcbtrace_TRACER *ThresholdLoggingTracer::wrap()
67 {
68     if (m_wrapper) {
69         return m_wrapper;
70     }
71     m_wrapper = new lcbtrace_TRACER();
72     m_wrapper->version = 0;
73     m_wrapper->flags = LCBTRACE_F_THRESHOLD;
74     m_wrapper->cookie = this;
75     m_wrapper->destructor = tlt_destructor;
76     m_wrapper->v.v0.report = tlt_report;
77     return m_wrapper;
78 }
79 
convert(lcbtrace_SPAN *span)80 QueueEntry ThresholdLoggingTracer::convert(lcbtrace_SPAN *span)
81 {
82     QueueEntry orphan;
83     orphan.duration = span->duration();
84     Json::Value entry;
85     char *value, *value2;
86     size_t nvalue, nvalue2;
87 
88     entry["operation_name"] = std::string(span->m_opname);
89     if (lcbtrace_span_get_tag_str(span, LCBTRACE_TAG_OPERATION_ID, &value, &nvalue) == LCB_SUCCESS) {
90         entry["last_operation_id"] = std::string(value, value + nvalue);
91     }
92     if (lcbtrace_span_get_tag_str(span, LCBTRACE_TAG_LOCAL_ID, &value, &nvalue) == LCB_SUCCESS) {
93         entry["last_local_id"] = std::string(value, value + nvalue);
94     }
95     if (lcbtrace_span_get_tag_str(span, LCBTRACE_TAG_LOCAL_ADDRESS, &value, &nvalue) == LCB_SUCCESS) {
96         if (lcbtrace_span_get_tag_str(span, LCBTRACE_TAG_LOCAL_PORT, &value2, &nvalue2) == LCB_SUCCESS) {
97             std::string address(value, value + nvalue);
98             address.append(":");
99             address.append(value2, nvalue2);
100             entry["last_local_socket"] = address;
101         }
102     }
103     if (lcbtrace_span_get_tag_str(span, LCBTRACE_TAG_PEER_ADDRESS, &value, &nvalue) == LCB_SUCCESS) {
104         if (lcbtrace_span_get_tag_str(span, LCBTRACE_TAG_PEER_PORT, &value2, &nvalue2) == LCB_SUCCESS) {
105             std::string address(value, value + nvalue);
106             address.append(":");
107             address.append(value2, nvalue2);
108             entry["last_remote_socket"] = address;
109         }
110     }
111     if (span->service() == LCBTRACE_THRESHOLD_KV) {
112         entry["last_server_duration_us"] = (Json::UInt64)span->m_last_server;
113         entry["total_server_duration_us"] = (Json::UInt64)span->m_total_server;
114     }
115     if (span->m_encode > 0) {
116         entry["encode_duration_us"] = (Json::UInt64)span->m_encode;
117     }
118     entry["total_duration_us"] = (Json::UInt64)orphan.duration;
119     entry["last_dispatch_duration_us"] = (Json::UInt64)span->m_last_dispatch;
120     entry["total_dispatch_duration_us"] = (Json::UInt64)span->m_total_dispatch;
121     orphan.payload = Json::FastWriter().write(entry);
122     return orphan;
123 }
124 
add_orphan(lcbtrace_SPAN *span)125 void ThresholdLoggingTracer::add_orphan(lcbtrace_SPAN *span)
126 {
127     m_orphans.push(convert(span));
128 }
129 
check_threshold(lcbtrace_SPAN *span)130 void ThresholdLoggingTracer::check_threshold(lcbtrace_SPAN *span)
131 {
132     if (span->is_outer()) {
133         if (span->service() == LCBTRACE_THRESHOLD__MAX) {
134             return;
135         }
136         if (span->duration() > m_settings->tracer_threshold[span->service()]) {
137 
138             auto it = m_queues.find(span->service_str());
139             if (it != m_queues.end()) {
140                 // found the queue, so push this in.
141                 it->second.push(convert(span));
142             } else {
143                 // add a new queue, then push this in.
144                 auto pr = m_queues.emplace(span->service_str(), FixedSpanQueue{m_threshold_queue_size});
145                 pr.first->second.push(convert(span));
146             }
147         }
148     }
149 }
150 
flush_queue(FixedSpanQueue &queue, const char *message, const char *service, bool warn = false)151 void ThresholdLoggingTracer::flush_queue(FixedSpanQueue &queue, const char *message, const char *service,
152                                          bool warn = false)
153 {
154     Json::Value entries;
155     if (nullptr != service) {
156         entries["service"] = service;
157     }
158     entries["count"] = (Json::UInt)queue.size();
159     Json::Value top;
160     while (!queue.empty()) {
161         Json::Value entry;
162         if (Json::Reader().parse(queue.top().payload, entry)) {
163             top.append(entry);
164         }
165         queue.pop();
166     }
167     entries["top"] = top;
168     std::string doc = Json::FastWriter().write(entries);
169     if (!doc.empty() && doc[doc.size() - 1] == '\n') {
170         doc[doc.size() - 1] = '\0';
171     }
172     if (warn) {
173         lcb_log(LOGARGS(this, WARN), "%s: %s", message, doc.c_str());
174     } else {
175         lcb_log(LOGARGS(this, INFO), "%s: %s", message, doc.c_str());
176     }
177 }
178 
do_flush_orphans()179 void ThresholdLoggingTracer::do_flush_orphans()
180 {
181     if (m_orphans.empty()) {
182         return;
183     }
184     flush_queue(m_orphans, "Orphan responses observed", nullptr, true);
185 }
186 
do_flush_threshold()187 void ThresholdLoggingTracer::do_flush_threshold()
188 {
189     for (auto &element : m_queues) {
190         if (!element.second.empty()) {
191             flush_queue(element.second, "Operations over threshold", element.first.c_str());
192         }
193     }
194 }
195 
flush_orphans()196 void ThresholdLoggingTracer::flush_orphans()
197 {
198     lcb_U32 tv = m_settings->tracer_orphaned_queue_flush_interval;
199     if (tv == 0) {
200         m_oflush.cancel();
201     } else {
202         m_oflush.rearm(tv);
203     }
204     do_flush_orphans();
205 }
206 
flush_threshold()207 void ThresholdLoggingTracer::flush_threshold()
208 {
209     lcb_U32 tv = m_settings->tracer_threshold_queue_flush_interval;
210     if (tv == 0) {
211         m_tflush.cancel();
212     } else {
213         m_tflush.rearm(tv);
214     }
215     do_flush_threshold();
216 }
217 
ThresholdLoggingTracer(lcb_INSTANCE *instance)218 ThresholdLoggingTracer::ThresholdLoggingTracer(lcb_INSTANCE *instance)
219     : m_wrapper(nullptr), m_settings(instance->settings),
220       m_threshold_queue_size(LCBT_SETTING(instance, tracer_threshold_queue_size)),
221       m_orphans(LCBT_SETTING(instance, tracer_orphaned_queue_size)), m_oflush(instance->iotable, this),
222       m_tflush(instance->iotable, this)
223 {
224     lcb_U32 tv = m_settings->tracer_orphaned_queue_flush_interval;
225     if (tv > 0) {
226         m_oflush.rearm(tv);
227     }
228     tv = m_settings->tracer_threshold_queue_flush_interval;
229     if (tv > 0) {
230         m_tflush.rearm(tv);
231     }
232 }
233