1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2018-2020 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #ifndef LCB_TRACING_INTERNAL_H
19 #define LCB_TRACING_INTERNAL_H
20 
21 #include <libcouchbase/tracing.h>
22 #include "rnd.h"
23 
24 #ifdef __cplusplus
25 
26 #include <queue>
27 #include <map>
28 #include <string>
29 #include <memory>
30 
31 LCB_INTERNAL_API
32 void lcbtrace_span_add_system_tags(lcbtrace_SPAN *span, const lcb_settings *settings, lcbtrace_THRESHOLDOPTS svc);
33 LIBCOUCHBASE_API
34 void lcbtrace_span_add_tag_str_nocopy(lcbtrace_SPAN *span, const char *name, const char *value);
35 void lcbtrace_span_add_host_and_port(lcbtrace_SPAN *span, lcbio_CONNINFO *info);
36 
37 namespace lcb
38 {
39 namespace trace
40 {
41 
42 class Span
43 {
44   public:
45     Span(lcbtrace_TRACER *tracer, const char *opname, uint64_t start, lcbtrace_REF_TYPE ref, lcbtrace_SPAN *other,
46          void *external_span);
47     ~Span();
48 
49     void finish(uint64_t finish);
duration() const50     uint64_t duration() const
51     {
52         return m_finish - m_start;
53     }
54 
55     void add_tag(const char *name, const std::string &value);
56     void add_tag(const char *name, int copy, const char *value, int copy_value);
57     void add_tag(const char *name, int copy_key, const char *value, size_t value_len, int copy_value);
58     void add_tag(const char *name, int copy, uint64_t value);
59     void add_tag(const char *name, int copy, double value);
60     void add_tag(const char *name, int copy, bool value);
61 
62     void service(lcbtrace_THRESHOLDOPTS svc);
63     lcbtrace_THRESHOLDOPTS service() const;
64 
65     void increment_dispatch(uint64_t dispatch_time);
66     void increment_server(uint64_t server_time);
67     lcbtrace_SPAN *find_outer_or_this();
68 
69     const char *service_str() const;
70 
71     void *external_span() const;
72     void external_span(void *ext);
73 
74     bool is_outer() const;
75     void is_outer(bool outer);
76     bool is_dispatch() const;
77     void is_dispatch(bool dispatch);
78     bool is_encode() const;
79     void is_encode(bool encode);
80     bool should_finish() const;
81     void should_finish(bool finish);
82 
83     lcbtrace_TRACER *m_tracer;
84     std::string m_opname;
85     uint64_t m_span_id;
86     uint64_t m_start;
87     uint64_t m_finish{0};
88     bool m_orphaned;
89     Span *m_parent;
90     void *m_extspan;
91     sllist_root m_tags{};
92     bool m_is_outer{false};
93     bool m_is_dispatch{false};
94     bool m_is_encode{false};
95     bool m_should_finish{true};
96     lcbtrace_THRESHOLDOPTS m_svc{LCBTRACE_THRESHOLD__MAX};
97     const char *m_svc_string{nullptr};
98     uint64_t m_total_dispatch{0};
99     uint64_t m_last_dispatch{0};
100     uint64_t m_total_server{0};
101     uint64_t m_last_server{0};
102     uint64_t m_encode{0};
103 };
104 
105 struct ReportedSpan {
106     uint64_t duration;
107     std::string payload;
108 
operator <lcb::trace::ReportedSpan109     bool operator<(const ReportedSpan &rhs) const
110     {
111         return duration < rhs.duration;
112     }
113 };
114 
115 template <typename T>
116 class FixedQueue : private std::priority_queue<T>
117 {
118   public:
FixedQueue(size_t capacity)119     explicit FixedQueue(size_t capacity) : m_capacity(capacity) {}
120 
push(const T &item)121     void push(const T &item)
122     {
123         std::priority_queue<T>::push(item);
124         if (this->size() > m_capacity) {
125             this->c.pop_back();
126         }
127     }
128     using std::priority_queue<T>::empty;
129     using std::priority_queue<T>::top;
130     using std::priority_queue<T>::pop;
131     using std::priority_queue<T>::size;
132 
133   private:
134     size_t m_capacity;
135 };
136 
137 typedef ReportedSpan QueueEntry;
138 typedef FixedQueue<QueueEntry> FixedSpanQueue;
139 class ThresholdLoggingTracer
140 {
141     lcbtrace_TRACER *m_wrapper;
142     lcb_settings *m_settings;
143     size_t m_threshold_queue_size;
144 
145     FixedSpanQueue m_orphans;
146     std::map<std::string, FixedSpanQueue> m_queues;
147 
148     void flush_queue(FixedSpanQueue &queue, const char *message, const char *service, bool warn);
149     QueueEntry convert(lcbtrace_SPAN *span);
150 
151   public:
152     explicit ThresholdLoggingTracer(lcb_INSTANCE *instance);
153 
154     lcbtrace_TRACER *wrap();
155     void add_orphan(lcbtrace_SPAN *span);
156     void check_threshold(lcbtrace_SPAN *span);
157 
158     void flush_orphans();
159     void flush_threshold();
160     void do_flush_orphans();
161     void do_flush_threshold();
162 
163     lcb::io::Timer<ThresholdLoggingTracer, &ThresholdLoggingTracer::flush_orphans> m_oflush;
164     lcb::io::Timer<ThresholdLoggingTracer, &ThresholdLoggingTracer::flush_threshold> m_tflush;
165 };
166 
167 template <typename COMMAND>
start_kv_span(const lcb_settings *settings, const mc_PACKET *packet, std::shared_ptr<COMMAND> cmd)168 lcbtrace_SPAN *start_kv_span(const lcb_settings *settings, const mc_PACKET *packet, std::shared_ptr<COMMAND> cmd)
169 {
170     if (settings == nullptr || settings->tracer == nullptr) {
171         return nullptr;
172     }
173     lcbtrace_SPAN *span;
174     lcbtrace_SPAN *parent_span = cmd->parent_span();
175     if (parent_span != nullptr && parent_span->is_outer() && settings->tracer->flags & LCBTRACE_F_THRESHOLD) {
176         span = parent_span;
177         span->should_finish(false);
178     } else {
179         lcbtrace_REF ref;
180         ref.type = LCBTRACE_REF_CHILD_OF;
181         ref.span = parent_span;
182         bool is_dispatch = (parent_span != nullptr && parent_span->is_outer());
183         span = lcbtrace_span_start((settings)->tracer,
184                                    is_dispatch ? LCBTRACE_OP_DISPATCH_TO_SERVER : cmd->operation_name().c_str(),
185                                    LCBTRACE_NOW, &ref);
186         span->should_finish(true);
187         span->is_outer(!is_dispatch);
188     }
189     span->is_dispatch(true);
190     std::string operation_id = std::to_string(packet->opaque);
191     lcbtrace_span_add_tag_str(span, LCBTRACE_TAG_OPERATION_ID, operation_id.c_str());
192     lcbtrace_span_add_system_tags(span, settings, LCBTRACE_THRESHOLD_KV);
193     span->add_tag(LCBTRACE_TAG_SCOPE, cmd->collection().scope());
194     span->add_tag(LCBTRACE_TAG_COLLECTION, cmd->collection().collection());
195     span->add_tag(LCBTRACE_TAG_OPERATION, cmd->operation_name());
196     return span;
197 }
198 
199 void finish_kv_span(const mc_PIPELINE *pipeline, const mc_PACKET *request_pkt, const MemcachedResponse *response_pkt);
200 
201 template <typename COMMAND>
start_kv_span_with_durability(const lcb_settings *settings, const mc_PACKET *packet, std::shared_ptr<COMMAND> cmd)202 lcbtrace_SPAN *start_kv_span_with_durability(const lcb_settings *settings, const mc_PACKET *packet,
203                                              std::shared_ptr<COMMAND> cmd)
204 {
205     lcbtrace_SPAN *span = start_kv_span(settings, packet, cmd);
206     if (span != nullptr && cmd->durability_level() != LCB_DURABILITYLEVEL_NONE) {
207         span->add_tag(LCBTRACE_TAG_DURABILITY, 0, dur_level_to_string(cmd->durability_level()), 0);
208     }
209     return span;
210 }
211 
212 template <typename COMMAND>
start_http_span(const lcb_settings *settings, const COMMAND *cmd)213 lcbtrace_SPAN *start_http_span(const lcb_settings *settings, const COMMAND *cmd)
214 {
215     if (settings == nullptr || settings->tracer == nullptr) {
216         return nullptr;
217     }
218     lcbtrace_SPAN *span;
219     lcbtrace_SPAN *parent_span = cmd->parent_span();
220     if (parent_span != nullptr && parent_span->is_outer() && settings->tracer->flags & LCBTRACE_F_THRESHOLD) {
221         span = parent_span;
222         span->should_finish(false);
223     } else {
224         lcbtrace_REF ref;
225         ref.type = LCBTRACE_REF_CHILD_OF;
226         ref.span = parent_span;
227         bool is_dispatch = (parent_span != nullptr && parent_span->is_outer());
228         span = lcbtrace_span_start((settings)->tracer,
229                                    is_dispatch ? LCBTRACE_OP_DISPATCH_TO_SERVER : cmd->operation_name().c_str(),
230                                    LCBTRACE_NOW, &ref);
231         span->should_finish(true);
232         span->is_outer(!is_dispatch);
233     }
234     span->is_dispatch(true);
235     lcbtrace_span_add_tag_str(span, LCBTRACE_TAG_OPERATION_ID, cmd->client_context_id().c_str());
236     lcbtrace_span_add_system_tags(span, settings, cmd->service());
237     span->add_tag(LCBTRACE_TAG_OPERATION, cmd->operation_name());
238     return span;
239 }
240 
241 template <typename COMMAND>
start_http_span_with_statement(const lcb_settings *settings, const COMMAND *cmd, const std::string &statement)242 lcbtrace_SPAN *start_http_span_with_statement(const lcb_settings *settings, const COMMAND *cmd,
243                                               const std::string &statement)
244 {
245     lcbtrace_SPAN *span = start_http_span(settings, cmd);
246     if (span != nullptr && !statement.empty()) {
247         span->add_tag(LCBTRACE_TAG_STATEMENT, statement);
248     }
249     return span;
250 }
251 
252 template <typename COMMAND>
finish_http_span(lcbtrace_SPAN *span, const COMMAND *cmd)253 void finish_http_span(lcbtrace_SPAN *span, const COMMAND *cmd)
254 {
255     if (span != nullptr) {
256         span->find_outer_or_this()->add_tag(LCBTRACE_TAG_RETRIES, 0, (uint64_t)cmd->retries());
257         if (span->should_finish()) {
258             lcbtrace_span_finish(span, LCBTRACE_NOW);
259         }
260     }
261 }
262 
263 } // namespace trace
264 } // namespace lcb
265 
266 extern "C" {
267 #endif /* __cplusplus */
268 LCB_INTERNAL_API
269 void lcbtrace_span_set_orphaned(lcbtrace_SPAN *span, int val);
270 
271 const char *dur_level_to_string(lcb_DURABILITY_LEVEL dur_level);
272 #ifdef __cplusplus
273 }
274 #endif /* __cplusplus*/
275 #endif /* LCB_TRACING_INTERNAL_H */
276