1//  Copyright (c) 2014 Couchbase, Inc.
2//  Licensed under the Apache License, Version 2.0 (the "License");
3//  you may not use this file except in compliance with the
4//  License. You may obtain a copy of the License at
5//    http://www.apache.org/licenses/LICENSE-2.0
6//  Unless required by applicable law or agreed to in writing,
7//  software distributed under the License is distributed on an "AS
8//  IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
9//  express or implied. See the License for the specific language
10//  governing permissions and limitations under the License.
11
12package cbft
13
14import (
15	"bytes"
16	"encoding/json"
17	"fmt"
18	"net/http"
19	"net/url"
20	"runtime"
21	"runtime/debug"
22	"sort"
23	"strconv"
24	"strings"
25	"sync"
26	"sync/atomic"
27	"time"
28	"unicode"
29
30	log "github.com/couchbase/clog"
31
32	"github.com/couchbase/cbgt"
33	"github.com/couchbase/cbgt/rest"
34	"github.com/couchbase/moss"
35	"github.com/dustin/go-jsonpointer"
36)
37
38// Dump stats to log once every 5min. This stat is configurable
39// through the ENV_OPTION: logStatsEvery
40var LogEveryNStats = 300
41
42var SourcePartitionSeqsSleepDefault = 10 * time.Second
43var SourcePartitionSeqsCacheTimeoutDefault = 60 * time.Second
44
45// Atomic counters to keep track of the number of active
46// http and https limit listeners.
47var TotHTTPLimitListenersOpened uint64
48var TotHTTPLimitListenersClosed uint64
49var TotHTTPSLimitListenersOpened uint64
50var TotHTTPSLimitListenersClosed uint64
51
52// Atomic stat that tracks current memory acquired, not including
53// HeapIdle (memory reclaimed); updated every second;
54// Used by the app_herder to track memory consumption by process.
55var currentMemoryUsed uint64
56
57// Optional callback when current memory used has dropped since the
58// last sampling.
59var OnMemoryUsedDropped func(curMemoryUsed, prevMemoryUsed uint64)
60
61// PartitionSeqsProvider represents source object that can provide
62// partition seqs, such as some pindex or dest implementations.
63type PartitionSeqsProvider interface {
64	// Returned map is keyed by partition id.
65	PartitionSeqs() (map[string]cbgt.UUIDSeq, error)
66}
67
68var statsNamePrefix = []byte("\"")
69var statsNameSuffix = []byte("\":")
70
71// NsStatsHandler is a REST handler that provides stats/metrics for
72// consumption by ns_server
73type NsStatsHandler struct {
74	statsCount int64
75	mgr        *cbgt.Manager
76}
77
78func NewNsStatsHandler(mgr *cbgt.Manager) *NsStatsHandler {
79	return &NsStatsHandler{mgr: mgr}
80}
81
82type NSIndexStats map[string]map[string]interface{}
83
84// MarshalJSON formats the index stats using the
85// colon separated convention found in other
86// stats consumed by ns_server
87func (n NSIndexStats) MarshalJSON() ([]byte, error) {
88	rv := make(map[string]interface{})
89	for k, nsis := range n {
90		for nsik, nsiv := range nsis {
91			if k == "" {
92				rv[nsik] = nsiv
93			} else {
94				rv[k+":"+nsik] = nsiv
95			}
96		}
97	}
98	return json.Marshal(rv)
99}
100
101var statkeys = []string{
102	// manual
103	"num_pindexes",
104
105	// pindex
106	"doc_count",
107	"timer_batch_store_count",
108
109	// kv store
110	"batch_merge_count",
111	"iterator_next_count",
112	"iterator_seek_count",
113	"reader_get_count",
114	"reader_multi_get_count",
115	"reader_prefix_iterator_count",
116	"reader_range_iterator_count",
117	"writer_execute_batch_count",
118
119	// feed
120	"timer_opaque_set_count",
121	"timer_rollback_count",
122	"timer_data_update_count",
123	"timer_data_delete_count",
124	"timer_snapshot_start_count",
125	"timer_opaque_get_count",
126
127	// --------------------------------------------
128	// stats from "FTS Stats" spec, see:
129	// https://docs.google.com/spreadsheets/d/1w8P68gLBIs0VUN4egUuUH6U_92_5xi9azfvH8pPw21s/edit#gid=104567684
130
131	"num_mutations_to_index", // per-index stat.
132	// "doc_count",           // per-index stat (see above).
133	"total_bytes_indexed", // per-index stat.
134	"num_recs_to_persist", // per-index stat.
135
136	"num_bytes_used_disk", // per-index stat.
137	"num_files_on_disk",   // per-index stat.
138	"num_bytes_live_data", // per-index stat, not in spec
139	// "num_bytes_used_ram" -- PROCESS-LEVEL stat.
140
141	"num_pindexes_actual", // per-index stat.
142	"num_pindexes_target", // per-index stat.
143
144	"num_root_memorysegments", // per-index stat.
145	"num_root_filesegments",   // per-index stat.
146
147	"num_persister_nap_pause_completed", // per-index stat.
148	"num_persister_nap_merger_break",    // per-index stat.
149
150	"total_compactions",              // per-index stat.
151	"total_compaction_written_bytes", // per-index stat.
152
153	// "total_gc"   -- PROCESS-LEVEL stat.
154	// "pct_cpu_gc" -- PROCESS-LEVEL stat.
155
156	"total_queries",             // per-index stat.
157	"avg_queries_latency",       // per-index stat.
158	"total_request_time",        // per-index stat.
159	"total_queries_slow",        // per-index stat.
160	"total_queries_timeout",     // per-index stat.
161	"total_queries_error",       // per-index stat.
162	"total_bytes_query_results", // per-index stat.
163	"total_term_searchers",      // per-index stat.
164}
165
166// NewIndexStat ensures that all index stats
167// have the same shape and 0 values to
168// prevent seeing N/A in ns_server UI
169func NewIndexStat() map[string]interface{} {
170	rv := make(map[string]interface{})
171	for _, key := range statkeys {
172		rv[key] = float64(0)
173	}
174	return rv
175}
176
177func (h *NsStatsHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
178	currentStatsCount := atomic.AddInt64(&h.statsCount, 1)
179	initNsServerCaching(h.mgr)
180
181	rd := <-recentInfoCh
182	if rd.err != nil {
183		rest.ShowError(w, req, fmt.Sprintf("could not retrieve defs: %v", rd.err),
184			http.StatusInternalServerError)
185		return
186	}
187
188	indexDefsMap := rd.indexDefsMap
189	planPIndexes := rd.planPIndexes
190
191	nodeUUID := h.mgr.UUID()
192
193	nsIndexStats := make(NSIndexStats, len(indexDefsMap))
194
195	indexNameToSourceName := map[string]string{}
196
197	indexNameToPlanPIndexes := map[string][]*cbgt.PlanPIndex{}
198	if planPIndexes != nil {
199		for _, planPIndex := range planPIndexes.PlanPIndexes {
200			// Only focus on the planPIndex entries for this node.
201			if planPIndex.Nodes[nodeUUID] != nil {
202				indexNameToPlanPIndexes[planPIndex.IndexName] =
203					append(indexNameToPlanPIndexes[planPIndex.IndexName], planPIndex)
204			}
205		}
206	}
207
208	indexQueryPathStats := MapRESTPathStats[RESTIndexQueryPath]
209
210	// Keyed by indexName, sub-key is source partition id.
211	indexNameToSourcePartitionSeqs := map[string]map[string]cbgt.UUIDSeq{}
212
213	// Keyed by indexName, sub-key is source partition id.
214	indexNameToDestPartitionSeqs := map[string]map[string]cbgt.UUIDSeq{}
215
216	for indexName, indexDef := range indexDefsMap {
217		nsIndexStat := NewIndexStat()
218		nsIndexStats[indexDef.SourceName+":"+indexName] = nsIndexStat
219
220		indexNameToSourceName[indexName] = indexDef.SourceName
221
222		focusStats := indexQueryPathStats.FocusStats(indexName)
223		if focusStats != nil {
224			totalQueries := atomic.LoadUint64(&focusStats.TotClientRequest)
225			nsIndexStat["total_queries"] = totalQueries
226			if totalQueries > 0 {
227				nsIndexStat["avg_queries_latency"] =
228					float64((atomic.LoadUint64(&focusStats.TotClientRequestTimeNS) /
229						totalQueries)) / 1000000.0 // Convert from nanosecs to millisecs.
230			}
231			nsIndexStat["total_request_time"] =
232				atomic.LoadUint64(&focusStats.TotRequestTimeNS)
233			nsIndexStat["total_queries_slow"] =
234				atomic.LoadUint64(&focusStats.TotRequestSlow)
235			nsIndexStat["total_queries_timeout"] =
236				atomic.LoadUint64(&focusStats.TotRequestTimeout)
237			nsIndexStat["total_queries_error"] =
238				atomic.LoadUint64(&focusStats.TotRequestErr)
239			nsIndexStat["total_bytes_query_results"] =
240				atomic.LoadUint64(&focusStats.TotResponseBytes)
241			nsIndexStat["num_pindexes_target"] =
242				uint64(len(indexNameToPlanPIndexes[indexName]))
243		}
244
245		feedType, exists := cbgt.FeedTypes[indexDef.SourceType]
246		if !exists || feedType == nil || feedType.PartitionSeqs == nil {
247			continue
248		}
249
250		partitionSeqs := GetSourcePartitionSeqs(SourceSpec{
251			SourceType:   indexDef.SourceType,
252			SourceName:   indexDef.SourceName,
253			SourceUUID:   indexDef.SourceUUID,
254			SourceParams: indexDef.SourceParams,
255			Server:       h.mgr.Server(),
256		})
257		if partitionSeqs != nil {
258			indexNameToSourcePartitionSeqs[indexName] = partitionSeqs
259		}
260	}
261
262	feeds, pindexes := h.mgr.CurrentMaps()
263
264	for _, pindex := range pindexes {
265		nsIndexName := pindex.SourceName + ":" + pindex.IndexName
266		nsIndexStat, ok := nsIndexStats[nsIndexName]
267		if ok {
268			// manually track num pindexes
269			oldValue, ok := nsIndexStat["num_pindexes_actual"]
270			if ok {
271				switch oldValue := oldValue.(type) {
272				case float64:
273					oldValue += float64(1)
274
275					nsIndexStat["num_pindexes_actual"] = oldValue
276
277					// TODO: Former name was num_pindexes, need to remove one day.
278					nsIndexStat["num_pindexes"] = oldValue
279				}
280			}
281
282			// automatically process all the pindex dest stats
283			err := addPIndexStats(pindex, nsIndexStat)
284			if err != nil {
285				rest.ShowError(w, req,
286					fmt.Sprintf("error processing PIndex stats: %v", err),
287					http.StatusInternalServerError)
288				return
289			}
290
291			dest := pindex.Dest
292			if dest != nil {
293				destForwarder, ok := dest.(*cbgt.DestForwarder)
294				if !ok {
295					continue
296				}
297
298				partitionSeqsProvider, ok :=
299					destForwarder.DestProvider.(PartitionSeqsProvider)
300				if !ok {
301					continue
302				}
303
304				partitionSeqs, err := partitionSeqsProvider.PartitionSeqs()
305				if err == nil {
306					m := indexNameToDestPartitionSeqs[pindex.IndexName]
307					if m == nil {
308						m = map[string]cbgt.UUIDSeq{}
309						indexNameToDestPartitionSeqs[pindex.IndexName] = m
310					}
311
312					for partitionId, uuidSeq := range partitionSeqs {
313						m[partitionId] = uuidSeq
314					}
315				}
316			}
317		}
318	}
319
320	for _, feed := range feeds {
321		sourceName := indexNameToSourceName[feed.IndexName()]
322		nsIndexName := sourceName + ":" + feed.IndexName()
323		nsIndexStat, ok := nsIndexStats[nsIndexName]
324		if ok {
325			// automatically process all the feed stats
326			err := addFeedStats(feed, nsIndexStat)
327			if err != nil {
328				rest.ShowError(w, req,
329					fmt.Sprintf("error processing Feed stats: %v", err),
330					http.StatusInternalServerError)
331				return
332			}
333		}
334	}
335
336	for indexName, indexDef := range indexDefsMap {
337		nsIndexStat, ok := nsIndexStats[indexDef.SourceName+":"+indexName]
338		if ok {
339			src := indexNameToSourcePartitionSeqs[indexName]
340			if src == nil {
341				continue
342			}
343
344			dst := indexNameToDestPartitionSeqs[indexName]
345			if dst == nil {
346				continue
347			}
348
349			var totSeq uint64
350			var curSeq uint64
351
352			for partitionId, dstUUIDSeq := range dst {
353				srcUUIDSeq, exists := src[partitionId]
354				if exists {
355					totSeq += srcUUIDSeq.Seq
356					curSeq += dstUUIDSeq.Seq
357				}
358			}
359			if totSeq >= curSeq {
360				nsIndexStat["num_mutations_to_index"] = totSeq - curSeq
361			}
362		}
363	}
364
365	topLevelStats := map[string]interface{}{}
366
367	// (Sys - HeapReleased) is the estimate the cbft process can make that
368	// best represents the process RSS; this accounts for memory that has been
369	// acquired by the process and the amount that has been released back.
370	topLevelStats["num_bytes_used_ram"] = rd.memStats.Sys - rd.memStats.HeapReleased
371	topLevelStats["total_gc"] = rd.memStats.NumGC
372	topLevelStats["pct_cpu_gc"] = rd.memStats.GCCPUFraction
373	topLevelStats["tot_remote_http"] = atomic.LoadUint64(&totRemoteHttp)
374	topLevelStats["tot_remote_http2"] = atomic.LoadUint64(&totRemoteHttp2)
375	topLevelStats["tot_queryreject_on_memquota"] =
376		atomic.LoadUint64(&totQueryRejectOnNotEnoughQuota)
377
378	topLevelStats["tot_http_limitlisteners_opened"] =
379		atomic.LoadUint64(&TotHTTPLimitListenersOpened)
380	topLevelStats["tot_http_limitlisteners_closed"] =
381		atomic.LoadUint64(&TotHTTPLimitListenersClosed)
382	topLevelStats["tot_https_limitlisteners_opened"] =
383		atomic.LoadUint64(&TotHTTPSLimitListenersOpened)
384	topLevelStats["tot_https_limitlisteners_closed"] =
385		atomic.LoadUint64(&TotHTTPSLimitListenersClosed)
386
387	topLevelStats["batch_bytes_added"] = atomic.LoadUint64(&BatchBytesAdded)
388	topLevelStats["batch_bytes_removed"] = atomic.LoadUint64(&BatchBytesRemoved)
389
390	topLevelStats["tot_batches_flushed_on_maxops"] = atomic.LoadUint64(&TotBatchesFlushedOnMaxOps)
391	topLevelStats["tot_batches_flushed_on_timer"] = atomic.LoadUint64(&TotBatchesFlushedOnTimer)
392
393	nsIndexStats[""] = topLevelStats
394
395	if LogEveryNStats != 0 && currentStatsCount%int64(LogEveryNStats) == 0 {
396		go func() {
397			var buf bytes.Buffer
398			err := rest.WriteManagerStatsJSON(h.mgr, &buf, "")
399			if err != nil {
400				log.Warnf("error formatting managerStatsJSON for logs: %v", err)
401			} else {
402				log.Printf("managerStats: %s", buf.String())
403			}
404
405			statsJSON, err := json.MarshalIndent(nsIndexStats, "", "    ")
406			if err != nil {
407				log.Warnf("error formatting JSON for logs: %v", err)
408				return
409			}
410			log.Printf("stats: %s", string(statsJSON))
411		}()
412	}
413
414	rest.MustEncode(w, nsIndexStats)
415}
416
417func addFeedStats(feed cbgt.Feed, nsIndexStat map[string]interface{}) error {
418	buffer := new(bytes.Buffer)
419	err := feed.Stats(buffer)
420	if err != nil {
421		return err
422	}
423	return massageStats(buffer, nsIndexStat)
424}
425
426func addPIndexStats(pindex *cbgt.PIndex, nsIndexStat map[string]interface{}) error {
427	if destForwarder, ok := pindex.Dest.(*cbgt.DestForwarder); ok {
428		if bp, ok := destForwarder.DestProvider.(*BleveDest); ok {
429			bpsm, err := bp.StatsMap()
430			if err != nil {
431				return err
432			}
433			return extractStats(bpsm, nsIndexStat)
434		}
435	}
436	return nil
437}
438
439func updateStat(name string, val float64, nsIndexStat map[string]interface{}) {
440	oldValue, ok := nsIndexStat[name]
441	if ok {
442		switch oldValue := oldValue.(type) {
443		case float64:
444			oldValue += val
445			nsIndexStat[name] = oldValue
446		}
447	}
448}
449
450func extractStats(bpsm, nsIndexStat map[string]interface{}) error {
451	// common stats across different index types
452	v := jsonpointer.Get(bpsm, "/DocCount")
453	if vuint64, ok := v.(uint64); ok {
454		updateStat("doc_count", float64(vuint64), nsIndexStat)
455	}
456	v = jsonpointer.Get(bpsm, "/bleveIndexStats/index/term_searchers_started")
457	if vuint64, ok := v.(uint64); ok {
458		updateStat("total_term_searchers", float64(vuint64), nsIndexStat)
459	}
460	v = jsonpointer.Get(bpsm, "/bleveIndexStats/index/num_plain_text_bytes_indexed")
461	if vuint64, ok := v.(uint64); ok {
462		updateStat("total_bytes_indexed", float64(vuint64), nsIndexStat)
463	}
464
465	v = jsonpointer.Get(bpsm, "/bleveIndexStats/index/kv")
466	if _, ok := v.(map[string]interface{}); ok {
467		// see if metrics are enabled, they would always be at the top-level
468		v = jsonpointer.Get(bpsm, "/bleveIndexStats/index/kv/metrics")
469		if metrics, ok := v.(map[string]interface{}); ok {
470			extractMetricsStats(metrics, nsIndexStat)
471			// if we found metrics, look for moss one level deeper
472			v = jsonpointer.Get(bpsm, "/bleveIndexStats/index/kv/kv/moss")
473			if mossStats, ok := v.(*moss.CollectionStats); ok {
474				extractMossStats(mossStats, nsIndexStat)
475				// if we found moss, look for kv one level deeper
476				v = jsonpointer.Get(bpsm, "/bleveIndexStats/index/kv/kv/kv")
477				if kvStats, ok := v.(map[string]interface{}); ok {
478					extractKVStats(kvStats, nsIndexStat)
479				}
480			} else {
481				// no moss at this level, but still look for kv stats
482				v = jsonpointer.Get(bpsm, "/bleveIndexStats/index/kv/kv")
483				if kvStats, ok := v.(map[string]interface{}); ok {
484					extractKVStats(kvStats, nsIndexStat)
485				}
486			}
487		} else {
488			// maybe no metrics, look for moss at this level
489			v = jsonpointer.Get(bpsm, "/bleveIndexStats/index/kv/moss")
490			if mossStats, ok := v.(*moss.CollectionStats); ok {
491				extractMossStats(mossStats, nsIndexStat)
492				// if we found moss, look for kv one level deeper
493				v = jsonpointer.Get(bpsm, "/bleveIndexStats/index/kv/kv")
494				if kvStats, ok := v.(map[string]interface{}); ok {
495					extractKVStats(kvStats, nsIndexStat)
496				}
497			} else {
498				// maybe no metrics or moss, look for kv here
499				v = jsonpointer.Get(bpsm, "/bleveIndexStats/index/kv")
500				if kvStats, ok := v.(map[string]interface{}); ok {
501					extractKVStats(kvStats, nsIndexStat)
502				}
503			}
504		}
505	} else {
506		// scorch stats are available at bleveIndexStats/index
507		v = jsonpointer.Get(bpsm, "/bleveIndexStats/index")
508		if sstats, ok := v.(map[string]interface{}); ok {
509			extractScorchStats(sstats, nsIndexStat)
510		}
511	}
512
513	return nil
514}
515
516var metricStats = map[string]string{
517	"/batch_merge/count":            "batch_merge_count",
518	"/iterator_next/count":          "iterator_next_count",
519	"/iterator_seek/count":          "iterator_seek_count",
520	"/reader_get/count":             "reader_get_count",
521	"/reader_multi_get/count":       "reader_multi_get_count",
522	"/reader_prefix_iterator/count": "reader_prefix_iterator_count",
523	"/reader_range_iterator/count":  "reader_range_iterator_count",
524	"/writer_execute_batch/count":   "writer_execute_batch_count",
525}
526
527func extractMetricsStats(metrics, nsIndexStat map[string]interface{}) error {
528	for path, statname := range metricStats {
529		v := jsonpointer.Get(metrics, path)
530		if vint64, ok := v.(int64); ok {
531			updateStat(statname, float64(vint64), nsIndexStat)
532		}
533	}
534	return nil
535}
536
537func extractMossStats(mossStats *moss.CollectionStats, nsIndexStat map[string]interface{}) error {
538	updateStat("num_recs_to_persist", float64(mossStats.CurDirtyOps), nsIndexStat)
539	return nil
540}
541
542var kvStats = map[string]string{
543	// From mossStore...
544	"/num_bytes_used_disk":            "num_bytes_used_disk",
545	"/total_compaction_written_bytes": "total_compaction_written_bytes",
546}
547
548func extractKVStats(kvs, nsIndexStat map[string]interface{}) error {
549	for path, statname := range kvStats {
550		v := jsonpointer.Get(kvs, path)
551		if vint, ok := v.(int); ok {
552			updateStat(statname, float64(vint), nsIndexStat)
553		} else if vuint64, ok := v.(uint64); ok {
554			updateStat(statname, float64(vuint64), nsIndexStat)
555		}
556	}
557	return nil
558}
559
560var scorchStats = map[string]string{
561	"/num_bytes_used_disk":               "num_bytes_used_disk",
562	"/num_files_on_disk":                 "num_files_on_disk",
563	"/total_compaction_written_bytes":    "total_compaction_written_bytes",
564	"/num_recs_to_persist":               "num_recs_to_persist",
565	"/num_root_memorysegments":           "num_root_memorysegments",
566	"/num_root_filesegments":             "num_root_filesegments",
567	"/num_persister_nap_pause_completed": "num_persister_nap_pause_completed",
568	"/num_persister_nap_merger_break":    "num_persister_nap_merger_break",
569}
570
571func extractScorchStats(sstats, nsIndexStat map[string]interface{}) error {
572	for path, statname := range scorchStats {
573		v := jsonpointer.Get(sstats, path)
574		if vuint64, ok := v.(uint64); ok {
575			updateStat(statname, float64(vuint64), nsIndexStat)
576		}
577	}
578
579	return nil
580}
581
582func massageStats(buffer *bytes.Buffer, nsIndexStat map[string]interface{}) error {
583	statsBytes := buffer.Bytes()
584	pointers, err := jsonpointer.ListPointers(statsBytes)
585	if err != nil {
586		return err
587	}
588
589	countPointers := make([]string, 0)
590	for _, pointer := range pointers {
591		if strings.HasSuffix(pointer, "/count") ||
592			strings.HasSuffix(pointer, "/DocCount") ||
593			matchAnyFixedSuffixes(pointer) {
594			countPointers = append(countPointers, pointer)
595		}
596	}
597
598	countValueMap, err := jsonpointer.FindMany(statsBytes, countPointers)
599	if err != nil {
600		return err
601	}
602
603	for k, v := range countValueMap {
604		statName := convertStatName(k)
605		var statValue float64
606		err := json.Unmarshal(v, &statValue)
607		if err != nil {
608			return err
609		}
610		oldValue, ok := nsIndexStat[statName]
611		if ok {
612			switch oldValue := oldValue.(type) {
613			case float64:
614				oldValue += statValue
615				nsIndexStat[statName] = oldValue
616			}
617		}
618	}
619
620	return nil
621}
622
623var fixedSuffixToStatNameMapping = map[string]string{
624	"compacts":                     "total_compactions",
625	"term_searchers_started":       "total_term_searchers",
626	"estimated_space_used":         "num_bytes_used_disk",
627	"CurDirtyOps":                  "num_recs_to_persist",
628	"num_plain_text_bytes_indexed": "total_bytes_indexed",
629}
630
631func matchAnyFixedSuffixes(pointer string) bool {
632	for k := range fixedSuffixToStatNameMapping {
633		if strings.HasSuffix(pointer, "/"+k) {
634			return true
635		}
636	}
637	return false
638}
639
640func convertStatName(key string) string {
641	lastSlash := strings.LastIndex(key, "/")
642	if lastSlash < 0 {
643		return "unknown"
644	}
645	statSuffix := key[lastSlash+1:]
646	if fixedStatName, ok := fixedSuffixToStatNameMapping[statSuffix]; ok {
647		return fixedStatName
648	}
649	statNameStart := strings.LastIndex(key[:lastSlash], "/")
650	if statNameStart < 0 {
651		return "unknown"
652	}
653	statPrefix := key[statNameStart+1 : lastSlash]
654	if statPrefix == "basic" {
655		statPrefix = ""
656	}
657	statName := camelToUnderscore(statSuffix)
658	if statPrefix != "" {
659		statName = camelToUnderscore(statPrefix) + "_" + statName
660	}
661	return statName
662}
663
664func camelToUnderscore(name string) string {
665	rv := ""
666	for i, r := range name {
667		if unicode.IsUpper(r) && i != 0 {
668			rv += "_"
669			rv += string(unicode.ToLower(r))
670		} else if unicode.IsUpper(r) && i == 0 {
671			rv += string(unicode.ToLower(r))
672		} else {
673			rv += string(r)
674		}
675	}
676	return rv
677}
678
679// ---------------------------------------------------
680
681// NsStatusHandler is a REST handler that provides status for
682// consumption by ns_server
683type NsStatusHandler struct {
684	mgr       *cbgt.Manager
685	serverURL *url.URL
686}
687
688func NewNsStatusHandler(mgr *cbgt.Manager, server string) (*NsStatusHandler, error) {
689	serverURL, err := url.Parse(server)
690	if err != nil {
691		return nil, err
692	}
693
694	return &NsStatusHandler{
695		mgr:       mgr,
696		serverURL: serverURL,
697	}, nil
698}
699
700func NsHostsForIndex(name string, planPIndexes *cbgt.PlanPIndexes,
701	nodeDefs *cbgt.NodeDefs) []string {
702	if planPIndexes == nil {
703		return nil
704	}
705
706	v := struct{}{}
707
708	// find the node UUIDs related to this index
709	nodes := make(map[string]struct{})
710	for _, planPIndex := range planPIndexes.PlanPIndexes {
711		if planPIndex.IndexName == name {
712			for planPIndexNodeUUID := range planPIndex.Nodes {
713				nodes[planPIndexNodeUUID] = v
714			}
715		}
716	}
717
718	// look for all these nodes in the nodes wanted
719	nodeExtras := make(map[string]string) // Keyed by node UUID.
720	for _, nodeDef := range nodeDefs.NodeDefs {
721		_, ok := nodes[nodeDef.UUID]
722		if ok {
723			extras, err := ParseExtras(nodeDef.Extras)
724			if err == nil {
725				nodeExtras[nodeDef.UUID] = extras["nsHostPort"]
726			}
727		}
728	}
729
730	// build slice of node extras
731	nodeStrings := make(sort.StringSlice, 0, len(nodeExtras))
732	for _, nodeExtra := range nodeExtras {
733		nodeStrings = append(nodeStrings, nodeExtra)
734	}
735
736	// sort slice for stability
737	sort.Sort(nodeStrings)
738
739	return nodeStrings
740}
741
742func (h *NsStatusHandler) ServeHTTP(
743	w http.ResponseWriter, req *http.Request) {
744	initNsServerCaching(h.mgr)
745
746	rd := <-recentInfoCh
747	if rd.err != nil {
748		rest.ShowError(w, req, fmt.Sprintf("could not retrieve defs: %v", rd.err),
749			http.StatusInternalServerError)
750		return
751	}
752
753	indexDefsMap := rd.indexDefsMap
754	nodeDefs := rd.nodeDefs
755	planPIndexes := rd.planPIndexes
756
757	w.Header().Set("Content-Type", "application/json; charset=utf-8")
758	w.Write(cbgt.JsonOpenBrace)
759	w.Write(statsNamePrefix)
760	w.Write([]byte("status"))
761	w.Write(statsNameSuffix)
762	w.Write([]byte("["))
763
764	indexDefNames := make(sort.StringSlice, 0, len(indexDefsMap))
765	for indexDefName := range indexDefsMap {
766		indexDefNames = append(indexDefNames, indexDefName)
767	}
768
769	sort.Sort(indexDefNames)
770
771	for i, indexDefName := range indexDefNames {
772		indexDef := indexDefsMap[indexDefName]
773		if i > 0 {
774			w.Write(cbgt.JsonComma)
775		}
776
777		rest.MustEncode(w, struct {
778			Hosts  []string `json:"hosts"`
779			Bucket string   `json:"bucket"`
780			Name   string   `json:"name"`
781		}{
782			Bucket: indexDef.SourceName,
783			Name:   indexDefName,
784			Hosts:  NsHostsForIndex(indexDefName, planPIndexes, nodeDefs),
785		})
786	}
787
788	w.Write([]byte("],"))
789	w.Write(statsNamePrefix)
790	w.Write([]byte("code"))
791	w.Write(statsNameSuffix)
792	w.Write([]byte("\"success\""))
793	w.Write(cbgt.JsonCloseBrace)
794}
795
796// ---------------------------------------------------------------
797
798type SourceSpec struct {
799	SourceType   string
800	SourceName   string
801	SourceUUID   string
802	SourceParams string
803	Server       string
804}
805
806type SourcePartitionSeqs struct {
807	PartitionSeqs map[string]cbgt.UUIDSeq
808	LastUpdated   time.Time
809	LastUsed      time.Time
810}
811
812var mapSourcePartitionSeqs = map[SourceSpec]*SourcePartitionSeqs{}
813var mapSourcePartitionSeqsM sync.Mutex
814var runSourcePartitionSeqsOnce sync.Once
815
816func initNsServerCaching(mgr *cbgt.Manager) {
817	runSourcePartitionSeqsOnce.Do(func() {
818		go RunSourcePartitionSeqs(mgr.Options(), nil)
819		go RunRecentInfoCache(mgr)
820	})
821}
822
823func GetSourcePartitionSeqs(sourceSpec SourceSpec) map[string]cbgt.UUIDSeq {
824	mapSourcePartitionSeqsM.Lock()
825	s := mapSourcePartitionSeqs[sourceSpec]
826	if s == nil {
827		s = &SourcePartitionSeqs{}
828		mapSourcePartitionSeqs[sourceSpec] = s
829	}
830	s.LastUsed = time.Now()
831	rv := s.PartitionSeqs
832	mapSourcePartitionSeqsM.Unlock()
833	return rv
834}
835
836func RunSourcePartitionSeqs(options map[string]string, stopCh chan struct{}) {
837	sourcePartitionSeqsSleep := SourcePartitionSeqsSleepDefault
838	v, exists := options["sourcePartitionSeqsSleepMS"]
839	if exists {
840		sourcePartitionSeqsSleepMS, err := strconv.Atoi(v)
841		if err != nil {
842			log.Warnf("ns_server: parse sourcePartitionSeqsSleepMS: %q,"+
843				" err: %v", v, err)
844		} else {
845			sourcePartitionSeqsSleep = time.Millisecond *
846				time.Duration(sourcePartitionSeqsSleepMS)
847		}
848	}
849
850	sourcePartitionSeqsCacheTimeout := SourcePartitionSeqsCacheTimeoutDefault
851	v, exists = options["sourcePartitionSeqsCacheTimeoutMS"]
852	if exists {
853		sourcePartitionSeqsCacheTimeoutMS, err := strconv.Atoi(v)
854		if err != nil {
855			log.Warnf("ns_server: parse sourcePartitionSeqsCacheTimeoutMS: %q,"+
856				" err: %v", v, err)
857		} else {
858			sourcePartitionSeqsCacheTimeout = time.Millisecond *
859				time.Duration(sourcePartitionSeqsCacheTimeoutMS)
860		}
861	}
862
863	m := &mapSourcePartitionSeqsM
864
865	for {
866		select {
867		case <-stopCh:
868			return
869		case <-time.After(sourcePartitionSeqsSleep):
870			// NO-OP.
871		}
872
873		m.Lock()
874		var sourceSpecs []SourceSpec // Snapshot the wanted sourceSpecs.
875		for sourceSpec := range mapSourcePartitionSeqs {
876			sourceSpecs = append(sourceSpecs, sourceSpec)
877		}
878		m.Unlock()
879
880		for _, sourceSpec := range sourceSpecs {
881			select {
882			case <-stopCh:
883				return
884			default:
885				// NO-OP.
886			}
887
888			m.Lock()
889			s := SourcePartitionSeqs{}
890			v, exists := mapSourcePartitionSeqs[sourceSpec]
891			if exists && v != nil {
892				s = *v // Copy fields.
893			}
894			m.Unlock()
895
896			if s.LastUpdated.After(s.LastUsed) {
897				if s.LastUpdated.Sub(s.LastUsed) > sourcePartitionSeqsCacheTimeout {
898					m.Lock()
899					delete(mapSourcePartitionSeqs, sourceSpec)
900					m.Unlock()
901				}
902
903				continue
904			}
905
906			if s.LastUsed.Sub(s.LastUpdated) > sourcePartitionSeqsSleep {
907				next := &SourcePartitionSeqs{}
908				*next = s // Copy fields.
909
910				feedType, exists := cbgt.FeedTypes[sourceSpec.SourceType]
911				if exists && feedType != nil && feedType.PartitionSeqs != nil {
912					partitionSeqs, err := feedType.PartitionSeqs(
913						sourceSpec.SourceType,
914						sourceSpec.SourceName,
915						sourceSpec.SourceUUID,
916						sourceSpec.SourceParams,
917						sourceSpec.Server, options)
918					if err != nil {
919						log.Warnf("ns_server: retrieve partition seqs: %v", err)
920					} else {
921						next.PartitionSeqs = partitionSeqs
922					}
923				}
924
925				next.LastUpdated = time.Now()
926
927				m.Lock()
928				curr, exists := mapSourcePartitionSeqs[sourceSpec]
929				if exists && curr != nil {
930					next.LastUsed = curr.LastUsed
931				}
932				mapSourcePartitionSeqs[sourceSpec] = next
933				m.Unlock()
934			}
935		}
936	}
937}
938
939// ---------------------------------------------------------------
940
941type recentInfo struct {
942	indexDefs    *cbgt.IndexDefs
943	indexDefsMap map[string]*cbgt.IndexDef
944	nodeDefs     *cbgt.NodeDefs
945	planPIndexes *cbgt.PlanPIndexes
946	memStats     runtime.MemStats
947	err          error
948}
949
950var recentInfoCh = make(chan *recentInfo)
951
952func FetchCurMemoryUsed() uint64 {
953	return atomic.LoadUint64(&currentMemoryUsed)
954}
955
956func UpdateCurMemoryUsed() uint64 {
957	var memStats *runtime.MemStats
958	runtime.ReadMemStats(memStats)
959	return setCurMemoryUsedWith(memStats)
960}
961
962func setCurMemoryUsedWith(memStats *runtime.MemStats) uint64 {
963	// (Sys - HeapIdle) best represents the amount of memory that the go process
964	// is consuming at the moment that is not reusable; the go process has
965	// as idle memory component that it holds on to which can be reused;
966	// HeapIdle includes memory that is idle and the part that has been released.
967	curMemoryUsed := memStats.Sys - memStats.HeapIdle
968	atomic.StoreUint64(&currentMemoryUsed, curMemoryUsed)
969	return curMemoryUsed
970}
971
972func RunRecentInfoCache(mgr *cbgt.Manager) {
973	cfg := mgr.Cfg()
974
975	cfgChangedCh := make(chan struct{}, 10)
976
977	go func() { // Debounce cfg events to feed into the cfgChangedCh.
978		ech := make(chan cbgt.CfgEvent)
979		cfg.Subscribe(cbgt.PLAN_PINDEXES_KEY, ech)
980		cfg.Subscribe(cbgt.PLAN_PINDEXES_DIRECTORY_STAMP, ech)
981
982		for {
983			<-ech // First, wait for a cfg event.
984
985			debounceTimeCh := time.After(500 * time.Millisecond)
986
987		DEBOUNCE_LOOP:
988			for {
989				select {
990				case <-ech:
991					// NO-OP when there are more, spammy cfg events.
992
993				case <-debounceTimeCh:
994					break DEBOUNCE_LOOP
995				}
996			}
997
998			cfgChangedCh <- struct{}{}
999		}
1000	}()
1001
1002	tickCh := time.Tick(1 * time.Second)
1003
1004	memStatsLoggingInterval, _ := strconv.Atoi(mgr.Options()["memStatsLoggingInterval"])
1005	logMemStatCh := time.Tick(time.Duration(memStatsLoggingInterval) * time.Second)
1006
1007	var prevMemoryUsed uint64
1008	var curMemoryUsed uint64
1009
1010	for {
1011		var nodeDefs *cbgt.NodeDefs
1012		var planPIndexes *cbgt.PlanPIndexes
1013
1014		indexDefs, indexDefsMap, err := mgr.GetIndexDefs(false)
1015		if err == nil {
1016			nodeDefs, _, err = cbgt.CfgGetNodeDefs(cfg, cbgt.NODE_DEFS_WANTED)
1017			if err == nil {
1018				planPIndexes, _, err = cbgt.CfgGetPlanPIndexes(cfg)
1019			}
1020		}
1021
1022		rd := &recentInfo{
1023			indexDefs:    indexDefs,
1024			indexDefsMap: indexDefsMap,
1025			nodeDefs:     nodeDefs,
1026			planPIndexes: planPIndexes,
1027			err:          err,
1028		}
1029
1030		runtime.ReadMemStats(&rd.memStats)
1031
1032		prevMemoryUsed = curMemoryUsed
1033		curMemoryUsed = setCurMemoryUsedWith(&rd.memStats)
1034
1035		if curMemoryUsed < prevMemoryUsed &&
1036			OnMemoryUsedDropped != nil {
1037			OnMemoryUsedDropped(curMemoryUsed, prevMemoryUsed)
1038		}
1039
1040		// Check memory quota if golang's GC needs to be triggered.
1041		ftsMemoryQuota, _ := strconv.Atoi(mgr.Options()["ftsMemoryQuota"])
1042		gcMinThreshold, _ := strconv.Atoi(mgr.Options()["gcMinThreshold"])
1043		gcTriggerPct, _ := strconv.Atoi(mgr.Options()["gcTriggerPct"])
1044
1045		if gcTriggerPct > 0 {
1046			allocedBytes := rd.memStats.Alloc
1047			if allocedBytes > uint64(gcMinThreshold) &&
1048				allocedBytes >= uint64(gcTriggerPct*ftsMemoryQuota/100) {
1049				// Invoke golang's gargage collector through runtime/debug's
1050				// FreeOSMemory api which forces a garbage collection followed
1051				// by an attempt to return as much memory to the operating
1052				// system as possible.
1053				log.Printf("runtime/debug.FreeOSMemory() start..")
1054				debug.FreeOSMemory()
1055				log.Printf("runtime/debuf.FreeOSMemory() done.")
1056			}
1057		}
1058
1059	REUSE_CACHE:
1060		for {
1061			select {
1062			case <-cfgChangedCh:
1063				break REUSE_CACHE
1064
1065			case <-tickCh:
1066				break REUSE_CACHE
1067
1068			case <-logMemStatCh:
1069				logMemStatInfo(&rd.memStats)
1070
1071			case recentInfoCh <- rd:
1072				if rd.err != nil {
1073					break REUSE_CACHE
1074				}
1075			}
1076		}
1077	}
1078}
1079
1080func logMemStatInfo(ms *runtime.MemStats) {
1081	if ms != nil {
1082		log.Printf("memstats:: TotalAlloc: %+v, Alloc: %+v, Sys: %+v", ms.TotalAlloc, ms.Alloc, ms.Sys)
1083		log.Printf("memstats:: Lookups: %+v, Mallocs: %+v, Frees: %+v", ms.Lookups, ms.Mallocs, ms.Frees)
1084		log.Printf("memstats:: LastGC: %+v, NextGC: %+v, NumGC: %+v", ms.LastGC, ms.NextGC, ms.NumGC)
1085		log.Printf("memstats:: PauseTotalNs: %+v, EnableGC: %+v, GCCPUFraction: %+v", ms.PauseTotalNs, ms.EnableGC, ms.GCCPUFraction)
1086		log.Printf("heapstats:: HeapAlloc: %+v, HeapSys: %+v, HeapIdle: %+v", ms.HeapAlloc, ms.HeapSys, ms.HeapIdle)
1087		log.Printf("heapstats:: HeapInuse: %+v, HeapReleased: %+v, HeapObjects: %+v", ms.HeapInuse, ms.HeapReleased, ms.HeapObjects)
1088		log.Printf("stackstats:: StackInuse: %+v, StackSys: %+v, MSpanInuse: %+v", ms.StackInuse, ms.StackSys, ms.MSpanInuse)
1089		log.Printf("stackstats:: MSpanSys: %+v, MCacheInuse: %+v, GCSys: %+v", ms.MSpanSys, ms.MCacheInuse, ms.GCSys)
1090	}
1091}
1092
1093// ---------------------------------------------------------------
1094
1095type NsSearchResultRedirct struct {
1096	mgr *cbgt.Manager
1097}
1098
1099func NsSearchResultRedirctHandler(mgr *cbgt.Manager) (*NsSearchResultRedirct, error) {
1100	return &NsSearchResultRedirct{
1101		mgr: mgr,
1102	}, nil
1103}
1104
1105func (h *NsSearchResultRedirct) ServeHTTP(
1106	w http.ResponseWriter, req *http.Request) {
1107	allPlanPIndexes, _, err := h.mgr.GetPlanPIndexes(false)
1108	if err != nil {
1109		rest.ShowError(w, req, fmt.Sprintf("could not get plan pindexes: %v", err),
1110			http.StatusInternalServerError)
1111		return
1112	}
1113
1114	pIndexName := rest.PIndexNameLookup(req)
1115	planPIndex, ok := allPlanPIndexes.PlanPIndexes[pIndexName]
1116	if !ok {
1117		rest.ShowError(w, req, fmt.Sprintf("no pindex named: %s", pIndexName),
1118			http.StatusBadRequest)
1119		return
1120	}
1121
1122	docID := rest.DocIDLookup(req)
1123	source := planPIndex.SourceName
1124	http.Redirect(w, req, "/ui/index.html#!/buckets/documents/"+docID+"?bucket="+source, http.StatusMovedPermanently)
1125}
1126