1package gocb
2
3import (
4	"encoding/json"
5	"sort"
6	"sync"
7	"sync/atomic"
8	"time"
9
10	"github.com/opentracing/opentracing-go"
11	otlog "github.com/opentracing/opentracing-go/log"
12)
13
14var defaultThresholdLogTracer ThresholdLoggingTracer
15
16type thresholdLogGroup struct {
17	name  string
18	floor time.Duration
19	ops   []*thresholdLogSpan
20	lock  sync.RWMutex
21}
22
23func (g *thresholdLogGroup) init(name string, floor time.Duration, size uint32) {
24	g.name = name
25	g.floor = floor
26	g.ops = make([]*thresholdLogSpan, 0, size)
27}
28
29func (g *thresholdLogGroup) recordOp(span *thresholdLogSpan) {
30	if span.duration < g.floor {
31		return
32	}
33
34	// Preemptively check that we actually need to be inserted using a read lock first
35	// this is a performance improvement measure to avoid locking the mutex all the time.
36	g.lock.RLock()
37	if len(g.ops) == cap(g.ops) && span.duration < g.ops[0].duration {
38		// we are at capacity and we are faster than the fastest slow op
39		g.lock.RUnlock()
40		return
41	}
42	g.lock.RUnlock()
43
44	g.lock.Lock()
45	if len(g.ops) == cap(g.ops) && span.duration < g.ops[0].duration {
46		// we are at capacity and we are faster than the fastest slow op
47		g.lock.Unlock()
48		return
49	}
50
51	l := len(g.ops)
52	i := sort.Search(l, func(i int) bool { return span.duration < g.ops[i].duration })
53
54	// i represents the slot where it should be inserted
55
56	if len(g.ops) < cap(g.ops) {
57		if i == l {
58			g.ops = append(g.ops, span)
59		} else {
60			g.ops = append(g.ops, nil)
61			copy(g.ops[i+1:], g.ops[i:])
62			g.ops[i] = span
63		}
64	} else {
65		if i == 0 {
66			g.ops[i] = span
67		} else {
68			copy(g.ops[0:i-1], g.ops[1:i])
69			g.ops[i-1] = span
70		}
71	}
72
73	g.lock.Unlock()
74}
75
76type thresholdLogItem struct {
77	OperationName          string `json:"operation_name,omitempty"`
78	TotalTimeUs            uint64 `json:"total_us,omitempty"`
79	EncodeDurationUs       uint64 `json:"encode_us,omitempty"`
80	DispatchDurationUs     uint64 `json:"dispatch_us,omitempty"`
81	ServerDurationUs       uint64 `json:"server_us,omitempty"`
82	DecodeDurationUs       uint64 `json:"decode_us,omitempty"`
83	LastRemoteAddress      string `json:"last_remote_address,omitempty"`
84	LastLocalAddress       string `json:"last_local_address,omitempty"`
85	LastDispatchDurationUs uint64 `json:"last_dispatch_us,omitempty"`
86}
87
88type thresholdLogService struct {
89	Service string             `json:"service"`
90	Count   int                `json:"count"`
91	Top     []thresholdLogItem `json:"top"`
92}
93
94func (g *thresholdLogGroup) logRecordedRecords() {
95	// capacity is static for the group, no need for a lock
96	sampleSize := cap(g.ops)
97
98	// Preallocate space to copy the ops into...
99	oldOps := make([]*thresholdLogSpan, sampleSize)
100
101	g.lock.Lock()
102	// Escape early if we have no ops to log...
103	if len(g.ops) == 0 {
104		g.lock.Unlock()
105		return
106	}
107
108	// Copy out our ops so we can cheaply print them out without blocking
109	// our ops from actually being recorded in other goroutines (which would
110	// effectively slow down the op pipeline for logging).
111
112	oldOps = oldOps[0:len(g.ops)]
113	copy(oldOps, g.ops)
114	g.ops = g.ops[:0]
115
116	g.lock.Unlock()
117
118	jsonData := thresholdLogService{
119		Service: g.name,
120	}
121
122	for i := len(oldOps) - 1; i >= 0; i-- {
123		op := oldOps[i]
124
125		jsonData.Top = append(jsonData.Top, thresholdLogItem{
126			OperationName:          op.opName,
127			TotalTimeUs:            uint64(op.duration / time.Microsecond),
128			DispatchDurationUs:     uint64(op.totalDispatchDuration / time.Microsecond),
129			ServerDurationUs:       uint64(op.totalServerDuration / time.Microsecond),
130			EncodeDurationUs:       uint64(op.totalEncodeDuration / time.Microsecond),
131			DecodeDurationUs:       uint64(op.totalDecodeDuration / time.Microsecond),
132			LastRemoteAddress:      op.lastDispatchPeer,
133			LastDispatchDurationUs: uint64(op.lastDispatchDuration / time.Microsecond),
134		})
135	}
136
137	jsonData.Count = len(jsonData.Top)
138
139	jsonBytes, err := json.Marshal(jsonData)
140	if err != nil {
141		logDebugf("Failed to generate threshold logging service JSON: %s", err)
142	}
143
144	logInfof("  %s", jsonBytes)
145}
146
147// ThresholdLoggingTracer is a specialized Tracer implementation which will automatically
148// log operations which fall outside of a set of thresholds.  Note that this tracer is
149// only safe for use within the Couchbase SDK, uses by external event sources are
150// likely to fail.
151// EXPERIMENTAL
152type ThresholdLoggingTracer struct {
153	TickInterval   time.Duration
154	SampleSize     uint32
155	KvFloor        time.Duration
156	ViewsFloor     time.Duration
157	QueryFloor     time.Duration
158	SearchFloor    time.Duration
159	AnalyticsFloor time.Duration
160
161	killCh         chan struct{}
162	refCount       int32
163	nextTick       time.Time
164	kvGroup        thresholdLogGroup
165	viewsGroup     thresholdLogGroup
166	queryGroup     thresholdLogGroup
167	searchGroup    thresholdLogGroup
168	analyticsGroup thresholdLogGroup
169}
170
171// AddRef is used internally to keep track of the number of Cluster instances referring to it.
172// This is used to correctly shut down the aggregation routines once there are no longer any
173// instances tracing to it.
174func (t *ThresholdLoggingTracer) AddRef() int32 {
175	newRefCount := atomic.AddInt32(&t.refCount, 1)
176	if newRefCount == 1 {
177		t.startLoggerRoutine()
178	}
179	return newRefCount
180}
181
182// DecRef is the counterpart to AddRef (see AddRef for more information).
183func (t *ThresholdLoggingTracer) DecRef() int32 {
184	newRefCount := atomic.AddInt32(&t.refCount, -1)
185	if newRefCount == 0 {
186		t.killCh <- struct{}{}
187	}
188	return newRefCount
189}
190
191func (t *ThresholdLoggingTracer) logRecordedRecords() {
192	logInfof("Threshold Log:")
193
194	t.kvGroup.logRecordedRecords()
195	t.viewsGroup.logRecordedRecords()
196	t.queryGroup.logRecordedRecords()
197	t.searchGroup.logRecordedRecords()
198	t.analyticsGroup.logRecordedRecords()
199}
200
201func (t *ThresholdLoggingTracer) startLoggerRoutine() {
202	if t.TickInterval == 0 {
203		t.TickInterval = 10 * time.Second
204	}
205	if t.SampleSize == 0 {
206		t.SampleSize = 10
207	}
208	if t.KvFloor == 0 {
209		t.KvFloor = 500 * time.Millisecond
210	}
211	if t.ViewsFloor == 0 {
212		t.ViewsFloor = 1 * time.Second
213	}
214	if t.QueryFloor == 0 {
215		t.QueryFloor = 1 * time.Second
216	}
217	if t.SearchFloor == 0 {
218		t.SearchFloor = 1 * time.Second
219	}
220	if t.AnalyticsFloor == 0 {
221		t.AnalyticsFloor = 1 * time.Second
222	}
223
224	t.kvGroup.init("kv", t.KvFloor, t.SampleSize)
225	t.viewsGroup.init("views", t.ViewsFloor, t.SampleSize)
226	t.queryGroup.init("query", t.QueryFloor, t.SampleSize)
227	t.searchGroup.init("search", t.SearchFloor, t.SampleSize)
228	t.analyticsGroup.init("analytics", t.AnalyticsFloor, t.SampleSize)
229
230	if t.killCh == nil {
231		t.killCh = make(chan struct{})
232	}
233
234	if t.nextTick.IsZero() {
235		t.nextTick = time.Now().Add(t.TickInterval)
236	}
237
238	go t.loggerRoutine()
239}
240
241func (t *ThresholdLoggingTracer) loggerRoutine() {
242	for {
243		select {
244		case <-time.After(t.nextTick.Sub(time.Now())):
245			t.nextTick = t.nextTick.Add(t.TickInterval)
246			t.logRecordedRecords()
247		case <-t.killCh:
248			t.logRecordedRecords()
249			return
250		}
251	}
252}
253
254func (t *ThresholdLoggingTracer) recordOp(span *thresholdLogSpan) {
255	switch span.serviceName {
256	case "kv":
257		t.kvGroup.recordOp(span)
258	case "views":
259		t.viewsGroup.recordOp(span)
260	case "n1ql":
261		t.queryGroup.recordOp(span)
262	case "fts":
263		t.searchGroup.recordOp(span)
264	case "cbas":
265		t.analyticsGroup.recordOp(span)
266	}
267}
268
269// StartSpan belongs to the Tracer interface.
270func (t *ThresholdLoggingTracer) StartSpan(operationName string, opts ...opentracing.StartSpanOption) opentracing.Span {
271	span := &thresholdLogSpan{
272		tracer:    t,
273		opName:    operationName,
274		startTime: time.Now(),
275	}
276
277	for _, opt := range opts {
278		switch opt := opt.(type) {
279		case opentracing.SpanReference:
280			if opt.Type == opentracing.ChildOfRef {
281				if context, ok := opt.ReferencedContext.(*thresholdLogSpanContext); ok {
282					span.parent = context.span
283				}
284			}
285		case opentracing.Tag:
286			span.SetTag(opt.Key, opt.Value)
287		}
288	}
289
290	return span
291}
292
293// Inject belongs to the Tracer interface.
294func (t *ThresholdLoggingTracer) Inject(sp opentracing.SpanContext, format interface{}, carrier interface{}) error {
295	return nil
296}
297
298// Extract belongs to the Tracer interface.
299func (t *ThresholdLoggingTracer) Extract(format interface{}, carrier interface{}) (opentracing.SpanContext, error) {
300	return nil, opentracing.ErrSpanContextNotFound
301}
302
303type thresholdLogSpan struct {
304	tracer                *ThresholdLoggingTracer
305	parent                *thresholdLogSpan
306	opName                string
307	startTime             time.Time
308	serviceName           string
309	peerAddress           string
310	serverDuration        time.Duration
311	duration              time.Duration
312	totalServerDuration   time.Duration
313	totalDispatchDuration time.Duration
314	totalEncodeDuration   time.Duration
315	totalDecodeDuration   time.Duration
316	lastDispatchPeer      string
317	lastDispatchDuration  time.Duration
318}
319
320func (n *thresholdLogSpan) Context() opentracing.SpanContext {
321	return &thresholdLogSpanContext{n}
322}
323
324func (n *thresholdLogSpan) SetBaggageItem(key, val string) opentracing.Span {
325	return n
326}
327
328func (n *thresholdLogSpan) BaggageItem(key string) string {
329	return ""
330}
331
332func (n *thresholdLogSpan) SetTag(key string, value interface{}) opentracing.Span {
333	var ok bool
334
335	switch key {
336	case "server_duration":
337		if n.serverDuration, ok = value.(time.Duration); !ok {
338			logDebugf("Failed to cast span server_duration tag")
339		}
340	case "couchbase.service":
341		if n.serviceName, ok = value.(string); !ok {
342			logDebugf("Failed to cast span couchbase.service tag")
343		}
344	case "peer.address":
345		if n.peerAddress, ok = value.(string); !ok {
346			logDebugf("Failed to cast span peer.address tag")
347		}
348	}
349	return n
350}
351
352func (n *thresholdLogSpan) LogFields(fields ...otlog.Field) {
353
354}
355
356func (n *thresholdLogSpan) LogKV(keyVals ...interface{}) {
357
358}
359
360func (n *thresholdLogSpan) Finish() {
361	n.duration = time.Now().Sub(n.startTime)
362
363	n.totalServerDuration += n.serverDuration
364	if n.opName == "dispatch" {
365		n.totalDispatchDuration += n.duration
366		n.lastDispatchPeer = n.peerAddress
367		n.lastDispatchDuration = n.duration
368	}
369	if n.opName == "encode" {
370		n.totalEncodeDuration += n.duration
371	}
372	if n.opName == "decode" {
373		n.totalDecodeDuration += n.duration
374	}
375
376	if n.parent != nil {
377		n.parent.totalServerDuration += n.totalServerDuration
378		n.parent.totalDispatchDuration += n.totalDispatchDuration
379		n.parent.totalEncodeDuration += n.totalEncodeDuration
380		n.parent.totalDecodeDuration += n.totalDecodeDuration
381		if n.lastDispatchPeer != "" || n.lastDispatchDuration > 0 {
382			n.parent.lastDispatchPeer = n.lastDispatchPeer
383			n.parent.lastDispatchDuration = n.lastDispatchDuration
384		}
385	}
386
387	if n.serviceName != "" {
388		n.tracer.recordOp(n)
389	}
390}
391
392func (n *thresholdLogSpan) FinishWithOptions(opts opentracing.FinishOptions) {
393	n.Finish()
394}
395
396func (n *thresholdLogSpan) SetOperationName(operationName string) opentracing.Span {
397	n.opName = operationName
398	return n
399}
400
401func (n *thresholdLogSpan) Tracer() opentracing.Tracer {
402	return n.tracer
403}
404
405func (n *thresholdLogSpan) LogEvent(event string) {
406
407}
408
409func (n *thresholdLogSpan) LogEventWithPayload(event string, payload interface{}) {
410
411}
412
413func (n *thresholdLogSpan) Log(data opentracing.LogData) {
414
415}
416
417type thresholdLogSpanContext struct {
418	span *thresholdLogSpan
419}
420
421func (n *thresholdLogSpanContext) ForeachBaggageItem(handler func(k, v string) bool) {
422}
423