1package client
2
3import "sync"
4import "fmt"
5import "errors"
6import "strings"
7import "math/rand"
8import "time"
9import "unsafe"
10import "sync/atomic"
11import "encoding/json"
12import "net/http"
13import "bytes"
14import "io/ioutil"
15import "io"
16import "math"
17import "strconv"
18import "sort"
19
20import "github.com/couchbase/cbauth"
21
22import "github.com/couchbase/indexing/secondary/logging"
23import common "github.com/couchbase/indexing/secondary/common"
24import mclient "github.com/couchbase/indexing/secondary/manager/client"
25
26type metadataClient struct {
27	cluster  string
28	finch    chan bool
29	mdClient *mclient.MetadataProvider
30	indexers unsafe.Pointer // *indexTopology
31	// config
32	servicesNotifierRetryTm int
33	logtick                 time.Duration
34	randomWeight            float64 // value between [0, 1.0)
35	equivalenceFactor       float64 // value between [0, 1.0)
36
37	topoChangeLock sync.Mutex
38	metaCh         chan bool
39	mdNotifyCh     chan bool
40	stNotifyCh     chan map[common.IndexInstId]map[common.PartitionId]common.Statistics
41
42	settings *ClientSettings
43
44	refreshLock    sync.Mutex
45	refreshCond    *sync.Cond
46	refreshCnt     int
47	refreshWaitCnt int
48}
49
50// sherlock topology management, multi-node & single-partition.
51type indexTopology struct {
52	version     uint64
53	adminports  map[string]common.IndexerId // book-keeping for cluster changes
54	topology    map[common.IndexerId][]*mclient.IndexMetadata
55	queryports  map[common.IndexerId]string
56	replicas    map[common.IndexDefnId][]common.IndexInstId
57	equivalents map[common.IndexDefnId][]common.IndexDefnId
58	partitions  map[common.IndexDefnId]map[common.PartitionId][]common.IndexInstId
59	rw          sync.RWMutex
60	loads       map[common.IndexInstId]*loadHeuristics
61	// insts could include pending RState inst if there is no corresponding active instance
62	insts      map[common.IndexInstId]*mclient.InstanceDefn
63	rebalInsts map[common.IndexInstId]*mclient.InstanceDefn
64	defns      map[common.IndexDefnId]*mclient.IndexMetadata
65	allIndexes []*mclient.IndexMetadata
66}
67
68func newMetaBridgeClient(
69	cluster string, config common.Config, metaCh chan bool, settings *ClientSettings) (c *metadataClient, err error) {
70
71	b := &metadataClient{
72		cluster:    cluster,
73		finch:      make(chan bool),
74		metaCh:     metaCh,
75		mdNotifyCh: make(chan bool, 1),
76		stNotifyCh: make(chan map[common.IndexInstId]map[common.PartitionId]common.Statistics, 1),
77		settings:   settings,
78	}
79	b.refreshCond = sync.NewCond(&b.refreshLock)
80	b.refreshCnt = 0
81
82	b.servicesNotifierRetryTm = config["servicesNotifierRetryTm"].Int()
83	b.logtick = time.Duration(config["logtick"].Int()) * time.Millisecond
84	b.randomWeight = config["load.randomWeight"].Float64()
85	b.equivalenceFactor = config["load.equivalenceFactor"].Float64()
86	// initialize meta-data-provide.
87	uuid, err := common.NewUUID()
88	if err != nil {
89		logging.Errorf("Could not generate UUID in common.NewUUID\n")
90		return nil, err
91	}
92	b.mdClient, err = mclient.NewMetadataProvider(cluster, uuid.Str(), b.mdNotifyCh, b.stNotifyCh, b.settings)
93	if err != nil {
94		return nil, err
95	}
96
97	if err := b.updateIndexerList(false); err != nil {
98		logging.Errorf("updateIndexerList(): %v\n", err)
99		b.mdClient.Close()
100		return nil, err
101	}
102
103	go b.watchClusterChanges() // will also update the indexer list
104	go b.logstats()
105	return b, nil
106}
107
108// Sync will update the indexer list.
109func (b *metadataClient) Sync() error {
110	if err := b.updateIndexerList(true); err != nil {
111		logging.Errorf("updateIndexerList(): %v\n", err)
112		return err
113	}
114	return nil
115}
116
117// Refresh implement BridgeAccessor{} interface.
118func (b *metadataClient) Refresh() ([]*mclient.IndexMetadata, uint64, uint64, error) {
119
120	currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
121
122	if currmeta.version < b.mdClient.GetMetadataVersion() {
123		b.refreshLock.Lock()
124
125		if b.refreshCnt > 0 {
126			b.refreshWaitCnt++
127			logging.Debugf("Refresh(): wait metadata update.  Refresh Count %v Wait Count %v", b.refreshCnt, b.refreshWaitCnt)
128			b.refreshCond.Wait()
129			b.refreshWaitCnt--
130			b.refreshLock.Unlock()
131		} else {
132			logging.Debugf("Refresh(): refresh metadata. Refresh Count %v Wait Count %v", b.refreshCnt, b.refreshWaitCnt)
133			b.refreshCnt++
134			b.refreshLock.Unlock()
135
136			b.safeupdate(nil, false)
137
138			b.refreshLock.Lock()
139			b.refreshCnt--
140			b.refreshCond.Broadcast()
141			b.refreshLock.Unlock()
142		}
143
144		currmeta = (*indexTopology)(atomic.LoadPointer(&b.indexers))
145	}
146
147	return currmeta.allIndexes, currmeta.version, b.mdClient.GetClusterVersion(), nil
148}
149
150// Nodes implement BridgeAccessor{} interface.
151func (b *metadataClient) Nodes() ([]*IndexerService, error) {
152	// gather Indexer services
153	currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
154	nodes := make(map[string]*IndexerService)
155	for indexerID := range currmeta.topology {
156		if indexerID != common.INDEXER_ID_NIL {
157			a, q, _, err := b.mdClient.FindServiceForIndexer(indexerID)
158			if err == nil {
159				nodes[a] = &IndexerService{
160					Adminport: a, Queryport: q, Status: "initial",
161				}
162			}
163		}
164	}
165	// gather indexer status
166	for _, indexer := range b.mdClient.CheckIndexerStatus() {
167		if node, ok := nodes[indexer.Adminport]; ok && indexer.Connected {
168			node.Status = "online"
169		}
170	}
171	services := make([]*IndexerService, 0, len(nodes))
172	for _, node := range nodes {
173		services = append(services, node)
174	}
175	return services, nil
176}
177
178// GetIndexDefn implements BridgeAccessor{} interface.
179func (b *metadataClient) GetIndexDefn(defnID uint64) *common.IndexDefn {
180	currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
181	if index, ok := currmeta.defns[common.IndexDefnId(defnID)]; ok {
182		return index.Definition
183	}
184
185	return nil
186}
187
188// GetIndexInst implements BridgeAccessor{} interface.
189func (b *metadataClient) GetIndexInst(instId uint64) *mclient.InstanceDefn {
190	currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
191	if inst, ok := currmeta.insts[common.IndexInstId(instId)]; ok {
192		return inst
193	}
194
195	return nil
196}
197
198// GetIndexReplica implements BridgeAccessor{} interface.
199func (b *metadataClient) GetIndexReplica(defnId uint64) []*mclient.InstanceDefn {
200	var result []*mclient.InstanceDefn
201	currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
202	for _, instId := range currmeta.replicas[common.IndexDefnId(defnId)] {
203		if inst, ok := currmeta.insts[instId]; ok {
204			result = append(result, inst)
205		}
206	}
207
208	return result
209}
210
211// CreateIndex implements BridgeAccessor{} interface.
212func (b *metadataClient) CreateIndex(
213	indexName, bucket, using, exprType, whereExpr string,
214	secExprs []string, desc []bool, isPrimary bool,
215	scheme common.PartitionScheme, partitionKeys []string,
216	planJSON []byte) (uint64, error) {
217
218	plan := make(map[string]interface{})
219	if planJSON != nil && len(planJSON) > 0 {
220		err := json.Unmarshal(planJSON, &plan)
221		if err != nil {
222			return 0, err
223		}
224	}
225
226	refreshCnt := 0
227RETRY:
228	defnID, err, needRefresh := b.mdClient.CreateIndexWithPlan(
229		indexName, bucket, using, exprType, whereExpr,
230		secExprs, desc, isPrimary, scheme, partitionKeys, plan)
231
232	if needRefresh && refreshCnt == 0 {
233		fmsg := "GsiClient: Indexer Node List is out-of-date.  Require refresh."
234		logging.Debugf(fmsg)
235		if err := b.updateIndexerList(false); err != nil {
236			logging.Errorf("updateIndexerList(): %v\n", err)
237			return uint64(defnID), err
238		}
239		refreshCnt++
240		goto RETRY
241	}
242	return uint64(defnID), err
243}
244
245// BuildIndexes implements BridgeAccessor{} interface.
246func (b *metadataClient) BuildIndexes(defnIDs []uint64) error {
247	currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
248
249	for _, defnId := range defnIDs {
250		if _, ok := currmeta.defns[common.IndexDefnId(defnId)]; !ok {
251			return ErrorIndexNotFound
252		}
253	}
254
255	ids := make([]common.IndexDefnId, len(defnIDs))
256	for i, id := range defnIDs {
257		ids[i] = common.IndexDefnId(id)
258	}
259	return b.mdClient.BuildIndexes(ids)
260}
261
262// MoveIndex implements BridgeAccessor{} interface.
263func (b *metadataClient) MoveIndex(defnID uint64, planJSON map[string]interface{}) error {
264
265	currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
266
267	if _, ok := currmeta.defns[common.IndexDefnId(defnID)]; !ok {
268		return ErrorIndexNotFound
269	}
270
271	var httpport string
272	for indexerId, _ := range currmeta.topology {
273		var err error
274		if _, _, httpport, err = b.mdClient.FindServiceForIndexer(indexerId); err == nil {
275			break
276		}
277	}
278
279	if httpport == "" {
280		return ErrorNoHost
281	}
282
283	timeout := time.Duration(0 * time.Second)
284
285	idList := IndexIdList{DefnIds: []uint64{defnID}}
286	ir := IndexRequest{IndexIds: idList, Plan: planJSON}
287	body, err := json.Marshal(&ir)
288	if err != nil {
289		return err
290	}
291
292	bodybuf := bytes.NewBuffer(body)
293
294	url := "/moveIndexInternal"
295	resp, err := postWithAuth(httpport+url, "application/json", bodybuf, timeout)
296	if err != nil {
297		errStr := fmt.Sprintf("Error communicating with index node %v. Reason %v", httpport, err)
298		return errors.New(errStr)
299	}
300	defer resp.Body.Close()
301
302	response := new(IndexResponse)
303	bytes, _ := ioutil.ReadAll(resp.Body)
304	if err := json.Unmarshal(bytes, &response); err != nil {
305		return err
306	}
307	if response.Code == RESP_ERROR {
308		return errors.New(response.Error)
309	}
310
311	return nil
312}
313
314// DropIndex implements BridgeAccessor{} interface.
315func (b *metadataClient) DropIndex(defnID uint64) error {
316	err := b.mdClient.DropIndex(common.IndexDefnId(defnID))
317	if err == nil { // cleanup index local cache.
318		b.safeupdate(nil, false /*force*/)
319	}
320	return err
321}
322
323// GetScanports implements BridgeAccessor{} interface.
324func (b *metadataClient) GetScanports() (queryports []string) {
325	currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
326	queryports = make([]string, 0)
327	for _, queryport := range currmeta.queryports {
328		queryports = append(queryports, queryport)
329	}
330	logging.Debugf("Scan ports %v for all indexes", queryports)
331	return queryports
332}
333
334// GetScanport implements BridgeAccessor{} interface.
335func (b *metadataClient) GetScanport(defnID uint64, excludes map[common.IndexDefnId]map[common.PartitionId]map[uint64]bool,
336	skips map[common.IndexDefnId]bool) (qp []string,
337	targetDefnID uint64, in []uint64, rt []int64, pid [][]common.PartitionId, numPartitions uint32, ok bool) {
338
339	var insts map[common.PartitionId]*mclient.InstanceDefn
340	var rollbackTimes map[common.PartitionId]int64
341
342	currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
343
344	defnID = b.pickEquivalent(defnID, skips)
345	if defnID == 0 {
346		return nil, 0, nil, nil, nil, 0, false
347	}
348
349	var replicas [128]uint64
350	n := 0
351	for _, replicaID := range currmeta.replicas[common.IndexDefnId(defnID)] {
352		replicas[n] = uint64(replicaID)
353		n++
354	}
355
356	insts, rollbackTimes, ok = b.pickRandom(replicas[:n], defnID, excludes[common.IndexDefnId(defnID)])
357	if !ok {
358		if len(currmeta.equivalents[common.IndexDefnId(defnID)]) > 1 || len(currmeta.replicas[common.IndexDefnId(defnID)]) > 1 {
359			// skip this index definition for retry only if there is equivalent index or replica
360			skips[common.IndexDefnId(defnID)] = true
361		}
362		return nil, 0, nil, nil, nil, 0, false
363	}
364
365	targetDefnID = uint64(defnID)
366	numPartitions = uint32(len(insts))
367
368	qpm := make(map[common.IndexerId]map[common.IndexInstId][]common.PartitionId)
369
370	for partnId, inst := range insts {
371		indexerId, ok := inst.IndexerId[partnId]
372		if !ok {
373			return nil, 0, nil, nil, nil, 0, false
374		}
375
376		if _, ok := qpm[indexerId]; !ok {
377			qpm[indexerId] = make(map[common.IndexInstId][]common.PartitionId)
378		}
379
380		if _, ok := qpm[indexerId][inst.InstId]; !ok {
381			qpm[indexerId][inst.InstId] = make([]common.PartitionId, 0, numPartitions)
382		}
383
384		qpm[indexerId][inst.InstId] = append(qpm[indexerId][inst.InstId], partnId)
385	}
386
387	for indexerId, instIds := range qpm {
388		for instId, partnIds := range instIds {
389
390			// queryport
391			q, ok := currmeta.queryports[indexerId]
392			if !ok {
393				return nil, 0, nil, nil, nil, 0, false
394			}
395			qp = append(qp, q)
396
397			// rollback time
398			t, ok := rollbackTimes[partnIds[0]]
399			if !ok {
400				return nil, 0, nil, nil, nil, 0, false
401			}
402			rt = append(rt, t)
403
404			// instance id
405			in = append(in, uint64(instId))
406
407			// partitions
408			pid = append(pid, partnIds)
409		}
410	}
411
412	fmsg := "Scan port %s for index defnID %d of equivalent index defnId %d"
413	logging.Debugf(fmsg, qp, targetDefnID, defnID)
414	return qp, targetDefnID, in, rt, pid, numPartitions, true
415}
416
417// Timeit implement BridgeAccessor{} interface.
418func (b *metadataClient) Timeit(instID uint64, partitionId common.PartitionId, value float64) {
419
420	currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
421
422	// currmeta.loads is immutable once constructed
423	load, ok := currmeta.loads[common.IndexInstId(instID)]
424	if !ok {
425		// it should not happen. But if it does, just return.
426		return
427	}
428
429	load.updateLoad(partitionId, value)
430	load.incHit(partitionId)
431}
432
433// IsPrimary implement BridgeAccessor{} interface.
434func (b *metadataClient) IsPrimary(defnID uint64) bool {
435	b.Refresh()
436	currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
437	if index, ok := currmeta.defns[common.IndexDefnId(defnID)]; ok {
438		return index.Definition.IsPrimary
439	}
440	return false
441}
442
443// NumReplica implement BridgeAccessor{} interface.
444func (b *metadataClient) NumReplica(defnID uint64) int {
445	currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
446	return len(currmeta.replicas[common.IndexDefnId(defnID)])
447}
448
449// IndexState implement BridgeAccessor{} interface.
450func (b *metadataClient) IndexState(defnID uint64) (common.IndexState, error) {
451	b.Refresh()
452	return b.indexState(defnID)
453}
454
455// close this bridge, to be called when a new indexer is added or
456// an active indexer leaves the cluster or during system shutdown.
457func (b *metadataClient) Close() {
458	defer func() { recover() }() // in case async Close is called.
459	b.mdClient.Close()
460	close(b.finch)
461}
462
463//--------------------------------
464// local functions to map replicas
465//--------------------------------
466
467// compute a map of replicas for each index in 2i.
468func (b *metadataClient) computeReplicas(topo map[common.IndexerId][]*mclient.IndexMetadata) (map[common.IndexDefnId][]common.IndexInstId,
469	map[common.IndexDefnId]map[common.PartitionId][]common.IndexInstId) {
470
471	replicaMap := make(map[common.IndexDefnId][]common.IndexInstId)
472	partitionMap := make(map[common.IndexDefnId]map[common.PartitionId][]common.IndexInstId)
473
474	for _, indexes := range topo { // go through the indexes for each indexer
475		for _, index := range indexes {
476
477			if _, ok := replicaMap[index.Definition.DefnId]; ok {
478				continue
479			}
480
481			replicaMap[index.Definition.DefnId] = make([]common.IndexInstId, 0, len(index.Instances))
482			for _, inst := range index.Instances {
483				replicaMap[index.Definition.DefnId] = append(replicaMap[index.Definition.DefnId], inst.InstId)
484			}
485
486			partitionMap[index.Definition.DefnId] = make(map[common.PartitionId][]common.IndexInstId)
487			for _, inst := range index.Instances {
488				for partnId, _ := range inst.IndexerId {
489					if _, ok := partitionMap[index.Definition.DefnId][partnId]; !ok {
490						partitionMap[index.Definition.DefnId][partnId] = make([]common.IndexInstId, 0, len(index.Instances))
491					}
492					partitionMap[index.Definition.DefnId][partnId] = append(partitionMap[index.Definition.DefnId][partnId], inst.InstId)
493				}
494			}
495		}
496	}
497	return replicaMap, partitionMap
498}
499
500// compute a map of eqivalent indexes for each index in 2i.
501func (b *metadataClient) computeEquivalents(topo map[common.IndexerId][]*mclient.IndexMetadata) map[common.IndexDefnId][]common.IndexDefnId {
502
503	equivalentMap := make(map[common.IndexDefnId][]common.IndexDefnId)
504
505	for _, indexes1 := range topo { // go through the indexes for each indexer
506		for _, index1 := range indexes1 {
507
508			if _, ok := equivalentMap[index1.Definition.DefnId]; ok { // skip replica
509				continue
510			}
511
512			// add myself
513			seen := make(map[common.IndexDefnId]bool)
514			seen[index1.Definition.DefnId] = true
515			equivalentMap[index1.Definition.DefnId] = []common.IndexDefnId{index1.Definition.DefnId}
516
517			for _, indexes2 := range topo { // go through the indexes for each indexer
518
519				for _, index2 := range indexes2 {
520					if seen[index2.Definition.DefnId] {
521						continue
522					}
523					seen[index2.Definition.DefnId] = true
524
525					if b.equivalentIndex(index1, index2) { // pick equivalents
526						equivalentMap[index1.Definition.DefnId] = append(equivalentMap[index1.Definition.DefnId], index2.Definition.DefnId)
527					}
528				}
529			}
530		}
531	}
532
533	return equivalentMap
534}
535
536// compare whether two index are equivalent.
537func (b *metadataClient) equivalentIndex(
538	index1, index2 *mclient.IndexMetadata) bool {
539	d1, d2 := index1.Definition, index2.Definition
540	if d1.Bucket != d2.Bucket ||
541		d1.IsPrimary != d2.IsPrimary ||
542		d1.ExprType != d2.ExprType ||
543		d1.PartitionScheme != d2.PartitionScheme ||
544		d1.HashScheme != d2.HashScheme ||
545		d1.WhereExpr != d2.WhereExpr ||
546		d1.RetainDeletedXATTR != d2.RetainDeletedXATTR {
547
548		return false
549	}
550
551	if len(d1.SecExprs) != len(d2.SecExprs) {
552		return false
553	}
554
555	for i, s1 := range d1.SecExprs {
556		if s1 != d2.SecExprs[i] {
557			return false
558		}
559	}
560
561	if len(d1.PartitionKeys) != len(d2.PartitionKeys) {
562		return false
563	}
564
565	for i, s1 := range d1.PartitionKeys {
566		if s1 != d2.PartitionKeys[i] {
567			return false
568		}
569	}
570
571	if len(d1.Desc) != len(d2.Desc) {
572		return false
573	}
574
575	for i, b1 := range d1.Desc {
576		if b1 != d2.Desc[i] {
577			return false
578		}
579	}
580
581	return true
582}
583
584//--------------------------------------
585// local functions for stats
586//--------------------------------------
587
588// manage load statistics.
589type loadHeuristics struct {
590	avgLoad       []uint64
591	hit           []uint64
592	numPartitions int
593	stats         unsafe.Pointer
594}
595
596type loadStats struct {
597	pending       map[common.PartitionId]int64
598	rollbackTime  map[common.PartitionId]int64
599	statsTime     map[common.PartitionId]int64
600	staleCount    map[common.PartitionId]int64
601	numPartitions int
602}
603
604func newLoadStats(numPartitions int) *loadStats {
605
606	stats := &loadStats{
607		pending:       make(map[common.PartitionId]int64), // initialize to maxInt64 -- no stats to compare
608		rollbackTime:  make(map[common.PartitionId]int64), // initialize to 0 -- always allow scan
609		statsTime:     make(map[common.PartitionId]int64), // time when stats is collected at indexer
610		staleCount:    make(map[common.PartitionId]int64),
611		numPartitions: numPartitions,
612	}
613
614	for i := 0; i < numPartitions+1; i++ {
615		stats.pending[common.PartitionId(i)] = math.MaxInt64
616	}
617
618	return stats
619}
620
621func newLoadHeuristics(numPartitions int) *loadHeuristics {
622
623	h := &loadHeuristics{
624		avgLoad:       make([]uint64, numPartitions+1),
625		hit:           make([]uint64, numPartitions+1),
626		numPartitions: numPartitions,
627		stats:         unsafe.Pointer(newLoadStats(numPartitions)),
628	}
629
630	for i := 0; i < numPartitions+1; i++ {
631		h.avgLoad[i] = 0
632		h.hit[i] = 0
633	}
634
635	return h
636}
637
638func (b *loadHeuristics) updateLoad(partitionId common.PartitionId, value float64) {
639
640	avgLoadInt := atomic.LoadUint64(&b.avgLoad[int(partitionId)])
641	avgLoad := math.Float64frombits(avgLoadInt)
642
643	// compute incremental average.
644	avgLoad = (avgLoad + float64(value)) / 2.0
645
646	avgLoadInt = math.Float64bits(avgLoad)
647	atomic.StoreUint64(&b.avgLoad[int(partitionId)], avgLoadInt)
648}
649
650func (b *loadHeuristics) getLoad(partitionId common.PartitionId) (float64, bool) {
651
652	avgLoadInt := atomic.LoadUint64(&b.avgLoad[int(partitionId)])
653	avgLoad := math.Float64frombits(avgLoadInt)
654
655	return avgLoad, avgLoadInt != 0
656}
657
658func (b *loadHeuristics) getAvgLoad() float64 {
659
660	avgLoad, ok := b.getLoad(common.PartitionId(0))
661	if ok {
662		return avgLoad
663	}
664
665	count := 0
666	avgLoad = 0.0
667	for i := 1; i < b.numPartitions+1; i++ {
668		n, ok := b.getLoad(common.PartitionId(i))
669		if ok {
670			avgLoad += n
671			count++
672		}
673	}
674
675	if count == 0 {
676		return 0.0
677	}
678
679	return avgLoad / float64(count)
680}
681
682func (b *loadHeuristics) incHit(partitionId common.PartitionId) {
683
684	atomic.AddUint64(&b.hit[int(partitionId)], 1)
685}
686
687func (b *loadHeuristics) getHit(partitionId common.PartitionId) uint64 {
688
689	return atomic.LoadUint64(&b.hit[int(partitionId)])
690}
691
692func (b *loadHeuristics) getAvgHit() uint64 {
693
694	avgHit := b.getHit(common.PartitionId(0))
695	if avgHit != 0 {
696		return avgHit
697	}
698
699	avgHit = 0
700	for i := 1; i < b.numPartitions+1; i++ {
701		avgHit += b.getHit(common.PartitionId(i))
702	}
703
704	return avgHit / uint64(b.numPartitions)
705}
706
707func (b *loadHeuristics) updateStats(stats *loadStats) {
708
709	atomic.StorePointer(&b.stats, unsafe.Pointer(stats))
710}
711
712func (b *loadHeuristics) getStats() *loadStats {
713
714	return (*loadStats)(atomic.LoadPointer(&b.stats))
715}
716
717func (b *loadHeuristics) copyStats() *loadStats {
718
719	stats := (*loadStats)(atomic.LoadPointer(&b.stats))
720	newStats := newLoadStats(stats.numPartitions)
721
722	for partnId, pending := range stats.pending {
723		newStats.pending[partnId] = pending
724	}
725
726	for partnId, rollbackTime := range stats.rollbackTime {
727		newStats.rollbackTime[partnId] = rollbackTime
728	}
729
730	for partnId, statsTime := range stats.statsTime {
731		newStats.statsTime[partnId] = statsTime
732	}
733
734	for partnId, staleCount := range stats.staleCount {
735		newStats.staleCount[partnId] = staleCount
736	}
737
738	return newStats
739}
740
741func (b *loadHeuristics) cloneRefresh(curInst *mclient.InstanceDefn, newInst *mclient.InstanceDefn) *loadHeuristics {
742
743	clone := newLoadHeuristics(b.numPartitions)
744	for partnId, _ := range newInst.IndexerId {
745		if newInst.Versions[partnId] == curInst.Versions[partnId] {
746			clone.avgLoad[uint64(partnId)] = atomic.LoadUint64(&b.avgLoad[int(partnId)])
747			clone.hit[uint64(partnId)] = atomic.LoadUint64(&b.hit[int(partnId)])
748		}
749	}
750
751	cloneStats := clone.getStats()
752	stats := b.getStats()
753	for partnId, _ := range newInst.IndexerId {
754		if newInst.Versions[partnId] == curInst.Versions[partnId] {
755			cloneStats.updatePendingItem(partnId, stats.getPendingItem(partnId))
756			cloneStats.updateRollbackTime(partnId, stats.getRollbackTime(partnId))
757			cloneStats.updateStatsTime(partnId, stats.statsTime[partnId])
758		}
759	}
760
761	return clone
762}
763
764func (b *loadStats) getPendingItem(partitionId common.PartitionId) int64 {
765
766	return b.pending[partitionId]
767}
768
769func (b *loadStats) getTotalPendingItems() int64 {
770
771	var total int64
772	for _, pending := range b.pending {
773		total += pending
774	}
775
776	return total
777}
778
779func (b *loadStats) updatePendingItem(partitionId common.PartitionId, value int64) {
780
781	b.pending[partitionId] = value
782}
783
784func (b *loadStats) getRollbackTime(partitionId common.PartitionId) int64 {
785
786	return b.rollbackTime[partitionId]
787}
788
789func (b *loadStats) updateRollbackTime(partitionId common.PartitionId, value int64) {
790
791	b.rollbackTime[partitionId] = value
792}
793
794func (b *loadStats) updateStatsTime(partitionId common.PartitionId, value int64) {
795
796	if b.statsTime[partitionId] != value {
797		b.statsTime[partitionId] = value
798		b.staleCount[partitionId] = 0
799	} else {
800		b.staleCount[partitionId]++
801	}
802}
803
804func (b *loadStats) isAllStatsCurrent() bool {
805
806	current := true
807	for _, stale := range b.staleCount {
808		current = current && stale < 10
809	}
810
811	return current
812}
813
814func (b *loadStats) isStatsCurrent(partitionId common.PartitionId) bool {
815
816	return b.staleCount[partitionId] < 10
817}
818
819//-----------------------------------------------
820// local functions to pick index for scanning
821//-----------------------------------------------
822
823func (b *metadataClient) pickEquivalent(defnID uint64, skips map[common.IndexDefnId]bool) uint64 {
824
825	currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
826
827	if len(skips) == len(currmeta.equivalents[common.IndexDefnId(defnID)]) {
828		return uint64(0)
829	}
830
831	for {
832		n := rand.Intn(len(currmeta.equivalents[common.IndexDefnId(defnID)]))
833		candidate := currmeta.equivalents[common.IndexDefnId(defnID)][n]
834		if !skips[candidate] {
835			return uint64(candidate)
836		}
837	}
838}
839
840// Given the list of replicas for a given index definition, this function randomly picks the partitons from the available replicas
841// for scanning.   This function will filter out any replica partition falls behind from other replicas.  It returns:
842// 1) a map of partition Id and index instance
843// 2) a map of partition Id and rollback timestamp
844//
845func (b *metadataClient) pickRandom(replicas []uint64, defnID uint64,
846	excludes map[common.PartitionId]map[uint64]bool) (map[common.PartitionId]*mclient.InstanceDefn, map[common.PartitionId]int64, bool) {
847
848	//
849	// Determine number of partitions and its range
850	//
851	partitionRange := func(currmeta *indexTopology, defnID uint64, numPartition int) (uint64, uint64) {
852		if defn, ok := currmeta.defns[common.IndexDefnId(defnID)]; ok {
853			startPartnId := 0
854			if common.IsPartitioned(defn.Definition.PartitionScheme) {
855				startPartnId = 1
856			}
857			return uint64(startPartnId), uint64(startPartnId + numPartition)
858		}
859		// return 0 if cannot find range
860		return 0, 0
861	}
862
863	numPartition := func(currmeta *indexTopology, replicas []uint64) uint32 {
864		for _, instId := range replicas {
865			if inst, ok := currmeta.insts[common.IndexInstId(instId)]; ok {
866				return inst.NumPartitions
867			}
868		}
869		// return 0 if cannot find numPartition
870		return 0
871	}
872
873	numValidReplica := func(currmeta *indexTopology, partnId uint64, replicas []uint64, rollbackTimesList []map[common.PartitionId]int64) int {
874
875		var count int
876		for n, replica := range replicas {
877			_, ok1 := currmeta.insts[common.IndexInstId(replica)]
878			rollbackTime, ok2 := rollbackTimesList[n][common.PartitionId(partnId)]
879			ok3 := ok2 && rollbackTime != math.MaxInt64
880			if ok1 && ok2 && ok3 {
881				count++
882			}
883		}
884		return count
885	}
886
887	currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
888	numPartn := numPartition(currmeta, replicas)
889	startPartnId, endPartnId := partitionRange(currmeta, defnID, int(numPartn))
890
891	//
892	// Shuffle the replica list
893	//
894	shuffle := func(replicas []uint64) []uint64 {
895		num := len(replicas)
896		result := make([]uint64, num)
897
898		for _, replica := range replicas {
899			found := false
900			for !found {
901				n := rand.Intn(num)
902				if result[n] == 0 {
903					result[n] = replica
904					found = true
905				}
906			}
907		}
908		return result
909	}
910	replicas = shuffle(replicas)
911
912	//
913	// Filter out inst based on pending item stats.
914	//
915	rollbackTimesList := b.pruneStaleReplica(replicas, excludes)
916
917	// Filter based on timing of scan responses
918	b.filterByTiming(currmeta, replicas, rollbackTimesList, startPartnId, endPartnId)
919
920	//
921	// Randomly select an inst after filtering
922	//
923	chosenInst := make(map[common.PartitionId]*mclient.InstanceDefn)
924	chosenTimestamp := make(map[common.PartitionId]int64)
925
926	for partnId := startPartnId; partnId < endPartnId; partnId++ {
927
928		var ok bool
929		var inst *mclient.InstanceDefn
930		var rollbackTime int64
931
932		for n, replica := range replicas {
933
934			var ok1, ok2, ok3 bool
935			inst, ok1 = currmeta.insts[common.IndexInstId(replica)]
936			rollbackTime, ok2 = rollbackTimesList[n][common.PartitionId(partnId)]
937			ok3 = ok2 && rollbackTime != math.MaxInt64
938			ok = ok1 && ok2 && ok3
939
940			if ok {
941				break
942			}
943		}
944
945		if ok {
946			// find an indexer that holds the active partition
947			chosenInst[common.PartitionId(partnId)] = inst
948			chosenTimestamp[common.PartitionId(partnId)] = rollbackTime
949
950			// set the rollback time to 0 if there is only one valid replica
951			if numValidReplica(currmeta, partnId, replicas, rollbackTimesList) <= 1 {
952				chosenTimestamp[common.PartitionId(partnId)] = 0
953			}
954		} else {
955			// cannot find an indexer that holds an active partition
956			// try to find an indexer under rebalancing
957			for _, instId := range replicas {
958				if inst, ok := currmeta.rebalInsts[common.IndexInstId(instId)]; ok {
959					if _, ok := inst.IndexerId[common.PartitionId(partnId)]; ok {
960						chosenInst[common.PartitionId(partnId)] = inst
961						chosenTimestamp[common.PartitionId(partnId)] = 0
962					}
963				}
964			}
965		}
966	}
967
968	if len(chosenInst) != int(numPartn) {
969		logging.Errorf("PickRandom: Fail to find indexer for all index partitions. Num partition %v.  Partition with instances %v ",
970			numPartn, len(chosenInst))
971		for n, instId := range replicas {
972			for partnId := startPartnId; partnId < endPartnId; partnId++ {
973				ts, ok := rollbackTimesList[n][common.PartitionId(partnId)]
974				logging.Debugf("PickRandom: inst %v partition %v timestamp %v ok %v",
975					instId, partnId, ts, ok)
976			}
977		}
978		return nil, nil, false
979	}
980
981	return chosenInst, chosenTimestamp, true
982}
983
984func (b *metadataClient) filterByTiming(currmeta *indexTopology, replicas []uint64, rollbackTimes []map[common.PartitionId]int64,
985	startPartnId uint64, endPartnId uint64) {
986
987	numRollbackTimes := func(partnId uint64) int {
988		count := 0
989		for _, partnMap := range rollbackTimes {
990			if _, ok := partnMap[common.PartitionId(partnId)]; ok {
991				count++
992			}
993		}
994		return count
995	}
996
997	if rand.Float64() >= b.randomWeight {
998		for partnId := startPartnId; partnId < endPartnId; partnId++ {
999			// Do not prune if there is only replica with this partition
1000			if numRollbackTimes(partnId) <= 1 {
1001				continue
1002			}
1003
1004			loadList := make([]float64, len(replicas))
1005			for i, instId := range replicas {
1006				if load, ok := currmeta.loads[common.IndexInstId(instId)]; ok {
1007					if n, ok := load.getLoad(common.PartitionId(partnId)); ok {
1008						loadList[i] = n
1009					} else {
1010						loadList[i] = math.MaxFloat64
1011					}
1012				} else {
1013					loadList[i] = math.MaxFloat64
1014				}
1015			}
1016
1017			// compute replica with least load.
1018			sort.Float64s(loadList)
1019			leastLoad := loadList[0]
1020
1021			//
1022			// Filter inst based on load
1023			//
1024			for i, instId := range replicas {
1025				if load, ok := currmeta.loads[common.IndexInstId(instId)]; ok {
1026					if n, ok := load.getLoad(common.PartitionId(partnId)); ok {
1027						eqivLoad := n * b.equivalenceFactor
1028						if eqivLoad > leastLoad {
1029							logging.Verbosef("remove inst %v partition %v from scan due to slow response time (least %v load %v)",
1030								instId, partnId, leastLoad, eqivLoad)
1031							delete(rollbackTimes[i], common.PartitionId(partnId))
1032						}
1033					}
1034				}
1035			}
1036		}
1037	}
1038}
1039
1040//
1041// This method prune stale partitions from the given replica.  For each replica, it returns
1042// the rollback time of up-to-date partition.  Staleness is based on the limit of how far
1043// the partition is fallen behind the most current partition.
1044//
1045// If the index inst does not exist for a replica, it returns an empty map.  Therefore,
1046// an empty map for a replica could mean index does not exist, or there is no up-to-date partition.
1047//
1048// If there is only replica or pruning is disable, it will return rollback time 0 for all
1049// replica without pruning.
1050//
1051// If there is no stats available for a particular partition (across all replicas), then
1052// no pruning for that partition.
1053//
1054func (b *metadataClient) pruneStaleReplica(replicas []uint64, excludes map[common.PartitionId]map[uint64]bool) []map[common.PartitionId]int64 {
1055
1056	currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
1057
1058	resetRollbackTimes := func(rollbackTimeList []map[common.PartitionId]int64) {
1059		for _, rollbackTimes := range rollbackTimeList {
1060			for partnId, _ := range rollbackTimes {
1061				rollbackTimes[partnId] = 0
1062			}
1063		}
1064	}
1065
1066	isStalePartition := func(minPendings map[common.PartitionId]int64, partnId common.PartitionId) bool {
1067		if _, ok := minPendings[partnId]; !ok {
1068			return true
1069		}
1070		if uint64(minPendings[partnId]) >= uint64(math.MaxInt64) {
1071			return true
1072		}
1073		return false
1074	}
1075
1076	hasStaleStats := func(minPendings map[common.PartitionId]int64) bool {
1077		for partnId, _ := range minPendings {
1078			if isStalePartition(minPendings, partnId) {
1079				return true
1080			}
1081		}
1082		return false
1083	}
1084
1085	// If there is only one instance or disable replica pruning, then just return.
1086	if len(replicas) == 1 || b.settings.DisablePruneReplica() {
1087		_, rollbackTimes, _ := b.getPendingStats(replicas, currmeta, excludes, false)
1088		resetRollbackTimes(rollbackTimes)
1089		return rollbackTimes
1090	}
1091
1092	// read the progress stats from each index -- exclude indexer that has not refreshed its stats
1093	pendings, rollbackTimes, minPendings := b.getPendingStats(replicas, currmeta, excludes, true)
1094	if hasStaleStats(minPendings) {
1095		// if there is no current progress stats available, then read stats even if it could be stale
1096		// This can happen if KV is partitioned away from all indexer nodes, since that indexer cannot
1097		// refresh progress stats
1098		pendings, rollbackTimes, minPendings = b.getPendingStats(replicas, currmeta, excludes, false)
1099	}
1100
1101	result := make([]map[common.PartitionId]int64, len(replicas))
1102	for i, instId := range replicas {
1103		result[i] = make(map[common.PartitionId]int64)
1104
1105		// find the index inst
1106		inst, ok := currmeta.insts[common.IndexInstId(instId)]
1107		if !ok {
1108			continue
1109		}
1110
1111		// prune partition that exceed quota
1112		// If the instance's partition is excluded, then it will be be in the pendings map.  So it will be skipped, and
1113		// this method will not return a rollbackTime for this instance partition
1114		for partnId, pending := range pendings[i] {
1115
1116			// If there is no progress stats available for any replica/partition, then do not filter.
1117			// Stats not available when
1118			// 1) All indexers have not yet been able to send stats to cbq after cluster restart
1119			// 2) index just finish building but not yet send stats over
1120			// 3) There is a single instance (no replica) and not available
1121			// 4) Replica/partiton has just been rebalanced but not yet send stats over
1122			if isStalePartition(minPendings, partnId) {
1123				result[i][partnId] = 0
1124				continue
1125			}
1126
1127			percent := b.settings.ScanLagPercent()
1128			quota := int64(float64(minPendings[partnId]) * (percent + 1.0))
1129
1130			// compute the quota per partition
1131			item := b.settings.ScanLagItem() / uint64(inst.NumPartitions)
1132			if quota < int64(item) {
1133				quota = int64(item)
1134			}
1135
1136			if pending != int64(math.MaxInt64) && pending <= quota {
1137				result[i][partnId] = rollbackTimes[i][partnId]
1138			} else {
1139				logging.Verbosef("remove inst %v partition %v from scan due to stale item count", instId, partnId)
1140			}
1141		}
1142	}
1143
1144	return result
1145}
1146
1147//
1148// This method returns item pending and rollback time for each partition for every given replica.
1149// If replica does not exist, it returns a empty map for item pending and rollback time.
1150// If the partition/replica is excluded, then item pending and rollback time will be missing in the map.
1151// If stats is not available or stale, it returns MaxInt64 for item pending and rollback time.
1152// If replica/partition just being rebalanced, it returns MaxInt64 for item pending and 0 for rollback time (see updateTopology).
1153//
1154// This method also return the minPending for each partition across all replicas.
1155// If no replica has valid stat for that partition, minPending is MaxInt64.
1156// If all replica are excluded for that partition, minPending is also MaxInt64.
1157//
1158func (b *metadataClient) getPendingStats(replicas []uint64, currmeta *indexTopology, excludes map[common.PartitionId]map[uint64]bool,
1159	useCurrent bool) ([]map[common.PartitionId]int64, []map[common.PartitionId]int64,
1160	map[common.PartitionId]int64) {
1161
1162	pendings := make([]map[common.PartitionId]int64, len(replicas))
1163	rollbackTimes := make([]map[common.PartitionId]int64, len(replicas))
1164	minPending := make(map[common.PartitionId]int64)
1165	init := make(map[common.PartitionId]bool)
1166
1167	for i, instId := range replicas {
1168		rollbackTimes[i] = make(map[common.PartitionId]int64)
1169		pendings[i] = make(map[common.PartitionId]int64)
1170
1171		// Get stats from active instance
1172		inst, ok := currmeta.insts[common.IndexInstId(instId)]
1173		if !ok {
1174			continue
1175		}
1176
1177		var stats *loadStats
1178		if load, ok := currmeta.loads[common.IndexInstId(instId)]; ok {
1179			stats = load.getStats()
1180		}
1181
1182		for partnId, _ := range inst.IndexerId {
1183
1184			if !init[partnId] {
1185				// minPending can be MaxInt64 if
1186				// 1) instance partition is exlcuded
1187				// 2) there is no stats for the instance partition
1188				// 3) there is no current stats for the instance partition
1189				minPending[partnId] = math.MaxInt64
1190				init[partnId] = true
1191			}
1192
1193			if excludes[partnId][instId] {
1194				continue
1195			}
1196
1197			if stats == nil || (useCurrent && !stats.isStatsCurrent(partnId)) {
1198				pendings[i][partnId] = math.MaxInt64
1199				rollbackTimes[i][partnId] = math.MaxInt64
1200				continue
1201			}
1202
1203			rollbackTimes[i][partnId] = stats.getRollbackTime(partnId)
1204			pendings[i][partnId] = stats.getPendingItem(partnId)
1205
1206			if pendings[i][partnId] < minPending[partnId] {
1207				minPending[partnId] = pendings[i][partnId]
1208			}
1209		}
1210	}
1211
1212	return pendings, rollbackTimes, minPending
1213}
1214
1215//---------------------------
1216// local utility functions
1217//---------------------------
1218
1219func (b *metadataClient) logstats() {
1220	tick := time.NewTicker(b.logtick)
1221	defer func() {
1222		tick.Stop()
1223	}()
1224
1225loop:
1226	for {
1227		<-tick.C
1228
1229		b.printstats()
1230
1231		select {
1232		case _, ok := <-b.finch:
1233			if !ok {
1234				break loop
1235			}
1236		default:
1237		}
1238	}
1239}
1240
1241func (b *metadataClient) printstats() {
1242
1243	s := make([]string, 0, 16)
1244	currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
1245
1246	logging.Infof("connected with %v indexers\n", len(currmeta.topology))
1247	for id, replicas := range currmeta.replicas {
1248		logging.Infof("index %v has replicas %v: \n", id, replicas)
1249	}
1250	for id, equivalents := range currmeta.equivalents {
1251		logging.Infof("index %v has equivalents %v: \n", id, equivalents)
1252	}
1253	// currmeta.loads is immutable
1254	for id, _ := range currmeta.insts {
1255		load := currmeta.loads[id]
1256		s = append(s, fmt.Sprintf(`"%v": %v`, id, load.getAvgLoad()))
1257	}
1258	logging.Infof("client load stats {%v}", strings.Join(s, ","))
1259
1260	s = make([]string, 0, 16)
1261	for id, _ := range currmeta.insts {
1262		load := currmeta.loads[id]
1263		s = append(s, fmt.Sprintf(`"%v": %v`, id, load.getAvgHit()))
1264	}
1265	logging.Infof("client hit stats {%v}", strings.Join(s, ","))
1266
1267	s = make([]string, 0, 16)
1268	for id, _ := range currmeta.insts {
1269		load := currmeta.loads[id]
1270		s = append(s, fmt.Sprintf(`"%v": %v`, id, load.getStats().getTotalPendingItems()))
1271	}
1272	logging.Infof("client pending item stats {%v}", strings.Join(s, ","))
1273
1274	/*
1275		s = make([]string, 0, 16)
1276		for id, _ := range currmeta.insts {
1277			load := currmeta.loads[id]
1278			s = append(s, fmt.Sprintf(`"%v": %v`, id, load.getStats().getRollbackTime()))
1279		}
1280		logging.Infof("client rollback times {%v}", strings.Join(s, ","))
1281	*/
1282
1283	s = make([]string, 0, 16)
1284	for id, _ := range currmeta.insts {
1285		load := currmeta.loads[id]
1286		s = append(s, fmt.Sprintf(`"%v": %v`, id, load.getStats().isAllStatsCurrent()))
1287	}
1288	logging.Infof("client stats current {%v}", strings.Join(s, ","))
1289}
1290
1291// unprotected access to shared structures.
1292func (b *metadataClient) indexState(defnID uint64) (common.IndexState, error) {
1293	currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
1294	if index, ok := currmeta.defns[common.IndexDefnId(defnID)]; ok {
1295		if index.Error != "" {
1296			return common.INDEX_STATE_ERROR, errors.New(index.Error)
1297		}
1298
1299		return index.State, nil
1300	}
1301
1302	return common.INDEX_STATE_ERROR, ErrorIndexNotFound
1303}
1304
1305// unprotected access to shared structures.
1306func (b *metadataClient) indexInstState(instID uint64) (common.IndexState, error) {
1307	currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
1308
1309	if inst, ok := currmeta.insts[common.IndexInstId(instID)]; ok {
1310		if inst.Error != "" {
1311			return common.INDEX_STATE_ERROR, errors.New(inst.Error)
1312		}
1313
1314		return inst.State, nil
1315	}
1316
1317	return common.INDEX_STATE_ERROR, ErrorInstanceNotFound
1318}
1319
1320//-----------------------------------------------
1321// local functions for metadata and stats update
1322//-----------------------------------------------
1323
1324// update 2i cluster information,
1325func (b *metadataClient) updateIndexerList(discardExisting bool) error {
1326
1327	clusterURL, err := common.ClusterAuthUrl(b.cluster)
1328	if err != nil {
1329		return err
1330	}
1331	cinfo, err := common.NewClusterInfoCache(clusterURL, "default")
1332	if err != nil {
1333		return err
1334	}
1335	if err := cinfo.Fetch(); err != nil {
1336		return err
1337	}
1338
1339	// UpdateIndexerList is synchronous, except for async callback from WatchMetadata() -- when indexer is
1340	// not responding fast enough.
1341	// TopoChangeLock is to protect the updates to index topology made by async callack.   Index topology contains
1342	// the assignment between admniport and indexerId -- which is udpated by async callback.   Metadata version cannot
1343	// enforce serializability of such assigment update, since it is not part of metadata tracked by metadataProvider.
1344	//
1345	// The adminport-indexerId assignment is used to figure out difference in topology so that gsiClient can call
1346	// WatchMetadata and UnwatchMetadata properly.   Corruption of such assignment can cause a lot of issue.
1347	//
1348	// The lock is to protect race condition in such case
1349	// 1) make sure that updateIndexerList() is finished before any its callback is invoked.
1350	//    This is to ensure async callback does not lose its work.
1351	// 2) make sure that the async callback is called sequentially so their changes on adminport-indexerId are accumulative.
1352	//    This is to ensure async callback does not lose its work.
1353	// 3) if there are consecutive topology changes by ns-server, make sure that async callback will not save a stale
1354	//    adminport-indexerId assignment by overwriting the assignment created by second topology changes.
1355	b.topoChangeLock.Lock()
1356	defer b.topoChangeLock.Unlock()
1357
1358	// populate indexers' adminport and queryport
1359	adminports, activeNode, failedNode, unhealthyNode, newNode, err := getIndexerAdminports(cinfo)
1360	if err != nil {
1361		return err
1362	}
1363	b.mdClient.SetClusterStatus(activeNode, failedNode, unhealthyNode, newNode)
1364
1365	fmsg := "Refreshing indexer list due to cluster changes or auto-refresh."
1366	logging.Infof(fmsg)
1367	logging.Infof("Refreshed Indexer List: %v", adminports)
1368
1369	var curradmns map[string]common.IndexerId
1370	currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
1371	if currmeta != nil {
1372		curradmns = make(map[string]common.IndexerId)
1373		for adminport, indexerId := range currmeta.adminports {
1374			curradmns[adminport] = indexerId
1375		}
1376	}
1377
1378	if discardExisting {
1379		for _, indexerID := range curradmns {
1380			b.mdClient.UnwatchMetadata(indexerID, activeNode)
1381		}
1382		curradmns = make(map[string]common.IndexerId)
1383	}
1384
1385	// watch all indexers
1386	m := make(map[string]common.IndexerId)
1387	for _, adminport := range adminports { // add new indexer-nodes if any
1388		if indexerID, ok := curradmns[adminport]; !ok {
1389			// This adminport is provided by cluster manager.  Meta client will
1390			// honor cluster manager to treat this adminport as a healthy node.
1391			// If the indexer is unavail during initialization, WatchMetadata()
1392			// will return afer timeout. A background watcher will keep
1393			// retrying, since it can be tranisent partitioning error.
1394			// If retry eventually successful, this callback will be invoked
1395			// to update meta_client. The metadata client has to rely on the
1396			// cluster manager to send a notification if this node is detected
1397			// to be down, such that the metadata client can stop the
1398			// background watcher.
1399			fn := func(ad string, n_id common.IndexerId, o_id common.IndexerId) {
1400				// following time.Sleep() ensures that bootstrap initialization
1401				// completes before other async activities into metadataClient{}
1402				currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
1403				for currmeta == nil {
1404					time.Sleep(10 * time.Millisecond)
1405					currmeta = (*indexTopology)(atomic.LoadPointer(&b.indexers))
1406				}
1407				b.updateIndexer(ad, n_id, o_id)
1408			}
1409
1410			// WatchMetadata will "unwatch" an old metadata watcher which
1411			// shares the same indexer Id (but the adminport may be different).
1412			indexerID = b.mdClient.WatchMetadata(adminport, fn, activeNode)
1413			m[adminport] = indexerID
1414		} else {
1415			err = b.mdClient.UpdateServiceAddrForIndexer(indexerID, adminport)
1416			m[adminport] = indexerID
1417			delete(curradmns, adminport)
1418		}
1419	}
1420	// delete indexer-nodes that got removed from cluster.
1421	for _, indexerID := range curradmns {
1422		// check if the indexerId exists in var "m".  In case the
1423		// adminport changes for the same index node, there would
1424		// be two adminport mapping to the same indexerId, one
1425		// in b.adminport (old) and the other in "m" (new).  So
1426		// make sure not to accidently unwatch the indexer.
1427		found := false
1428		for _, id := range m {
1429			if indexerID == id {
1430				found = true
1431			}
1432		}
1433		if !found {
1434			b.mdClient.UnwatchMetadata(indexerID, activeNode)
1435		}
1436	}
1437	b.safeupdate(m, true /*force*/)
1438	return err
1439}
1440
1441func (b *metadataClient) updateIndexer(
1442	adminport string, newIndexerId, oldIndexerId common.IndexerId) {
1443
1444	func() {
1445		b.topoChangeLock.Lock()
1446		defer b.topoChangeLock.Unlock()
1447
1448		adminports := make(map[string]common.IndexerId)
1449		currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
1450		for admnport, indexerId := range currmeta.adminports {
1451			adminports[admnport] = indexerId
1452		}
1453
1454		// UpdateIndexer is a async call.  When this is invoked, currmeta.adminports may be different.
1455		if _, ok := adminports[adminport]; ok {
1456			logging.Infof(
1457				"Acknowledged that new indexer is registered.  Indexer = %v, id = %v",
1458				adminport, newIndexerId)
1459			adminports[adminport] = newIndexerId
1460			b.safeupdate(adminports, true /*force*/)
1461		} else {
1462			logging.Infof(
1463				"New indexer registration is skipped.  Indexer may have been rebalanced out (unwatch).  Indexer = %v, id = %v",
1464				adminport, newIndexerId)
1465		}
1466	}()
1467}
1468
1469// replicas:
1470//		Refresh
1471//		deleteIndex
1472// loads:
1473//		Refresh
1474//		deleteIndex
1475// topology:
1476//		Refresh
1477//		deleteIndex
1478//
1479//		Timeit for b.loads
1480
1481func (b *metadataClient) updateTopology(
1482	adminports map[string]common.IndexerId, force bool) *indexTopology {
1483
1484	currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
1485
1486	// Refresh indexer version
1487	if force {
1488		b.mdClient.RefreshIndexerVersion()
1489	}
1490
1491	mindexes, version := b.mdClient.ListIndex()
1492	// detect change in indexer cluster or indexes.
1493	if force == false && currmeta != nil &&
1494		(!b.hasIndexersChanged(adminports) &&
1495			!b.hasIndexesChanged(mindexes, version)) {
1496		return currmeta
1497	}
1498
1499	// create a new topology.
1500	newmeta := &indexTopology{
1501		version:     version,
1502		allIndexes:  mindexes,
1503		adminports:  make(map[string]common.IndexerId),
1504		topology:    make(map[common.IndexerId][]*mclient.IndexMetadata),
1505		replicas:    make(map[common.IndexDefnId][]common.IndexInstId),
1506		equivalents: make(map[common.IndexDefnId][]common.IndexDefnId),
1507		queryports:  make(map[common.IndexerId]string),
1508		insts:       make(map[common.IndexInstId]*mclient.InstanceDefn),
1509		rebalInsts:  make(map[common.IndexInstId]*mclient.InstanceDefn),
1510		defns:       make(map[common.IndexDefnId]*mclient.IndexMetadata),
1511	}
1512
1513	// adminport/queryport
1514	for adminport, indexerID := range adminports {
1515		newmeta.adminports[adminport] = indexerID
1516		newmeta.topology[indexerID] = make([]*mclient.IndexMetadata, 0, 16)
1517
1518		_, qp, _, err := b.mdClient.FindServiceForIndexer(indexerID)
1519		if err == nil {
1520			// This excludes watcher that is not currently connected
1521			newmeta.queryports[indexerID] = qp
1522		}
1523	}
1524
1525	// insts/defns
1526	topologyMap := make(map[common.IndexerId]map[common.IndexDefnId]*mclient.IndexMetadata)
1527	for _, mindex := range mindexes {
1528		newmeta.defns[mindex.Definition.DefnId] = mindex
1529
1530		for _, instance := range mindex.Instances {
1531			for _, indexerId := range instance.IndexerId {
1532				if _, ok := topologyMap[indexerId]; !ok {
1533					topologyMap[indexerId] = make(map[common.IndexDefnId]*mclient.IndexMetadata)
1534				}
1535				if _, ok := topologyMap[indexerId][instance.DefnId]; !ok {
1536					topologyMap[indexerId][instance.DefnId] = mindex
1537				}
1538			}
1539			newmeta.insts[instance.InstId] = instance
1540		}
1541
1542		for _, instance := range mindex.InstsInRebalance {
1543			for _, indexerId := range instance.IndexerId {
1544				if _, ok := topologyMap[indexerId]; !ok {
1545					topologyMap[indexerId] = make(map[common.IndexDefnId]*mclient.IndexMetadata)
1546				}
1547				if _, ok := topologyMap[indexerId][instance.DefnId]; !ok {
1548					topologyMap[indexerId][instance.DefnId] = mindex
1549				}
1550			}
1551			newmeta.rebalInsts[instance.InstId] = instance
1552		}
1553	}
1554
1555	// topology
1556	for indexerId, indexes := range topologyMap {
1557		for _, index := range indexes {
1558			indexes2, ok := newmeta.topology[indexerId]
1559			if !ok {
1560				fmsg := "indexer node %v not available"
1561				logging.Fatalf(fmsg, indexerId)
1562				continue
1563			}
1564			newmeta.topology[indexerId] = append(indexes2, index)
1565		}
1566	}
1567
1568	// replicas/replicaInRebal
1569	newmeta.replicas, newmeta.partitions = b.computeReplicas(newmeta.topology)
1570
1571	// equivalent index
1572	newmeta.equivalents = b.computeEquivalents(newmeta.topology)
1573
1574	// loads - after creation, newmeta.loads is immutable, even though the
1575	// content (loadHeuristics) is mutable
1576	newmeta.loads = make(map[common.IndexInstId]*loadHeuristics)
1577
1578	if currmeta != nil {
1579		// currmeta.loads is immutable
1580		for instId, curInst := range currmeta.insts {
1581			if newInst, ok := newmeta.insts[instId]; ok {
1582				if load, ok := currmeta.loads[instId]; ok {
1583					// carry over the stats.  It will only copy the stats based on the
1584					// partitions in newInst.   So if the partition is pruned, the
1585					// stats will be dropped.  The partition can be pruned when
1586					// 1) The partition may be dropped when node is rebalanced out
1587					// 2) The partition may not be available because the indexer is removed due to unwatchMetadata
1588					newmeta.loads[instId] = load.cloneRefresh(curInst, newInst)
1589				}
1590			}
1591		}
1592	}
1593
1594	for instId, inst := range newmeta.insts {
1595		if _, ok := newmeta.loads[instId]; !ok {
1596			newmeta.loads[instId] = newLoadHeuristics(int(inst.NumPartitions))
1597		}
1598	}
1599
1600	return newmeta
1601}
1602
1603func (b *metadataClient) safeupdate(
1604	adminports map[string]common.IndexerId, force bool) {
1605
1606	var currmeta, newmeta *indexTopology
1607
1608	done := false
1609	for done == false {
1610		currmeta = (*indexTopology)(atomic.LoadPointer(&b.indexers))
1611
1612		// no need to update if cached metadata is already up to date
1613		if !force && currmeta != nil && b.mdClient.GetMetadataVersion() <= currmeta.version {
1614			return
1615		}
1616
1617		// if adminport is nil, then safeupdate is not triggered by
1618		// topology change.  Get the adminports from currmeta.
1619		if currmeta != nil && adminports == nil {
1620			adminports = currmeta.adminports
1621		}
1622
1623		newmeta = b.updateTopology(adminports, force)
1624		if currmeta == nil {
1625			// This should happen only during bootstrap
1626			atomic.StorePointer(&b.indexers, unsafe.Pointer(newmeta))
1627			logging.Infof("initialized currmeta %v force %v \n", newmeta.version, force)
1628			return
1629		} else if force {
1630			if newmeta.version < currmeta.version {
1631				// This should not happen.  But if it does force to increment metadata version.
1632				b.mdClient.IncrementMetadataVersion()
1633				continue
1634			}
1635		} else if newmeta.version <= currmeta.version {
1636			fmsg := "skip newmeta %v <= %v force %v \n"
1637			logging.Infof(fmsg, newmeta.version, currmeta.version, force)
1638			return
1639		}
1640
1641		logging.Debugf("updateTopology %v \n", newmeta)
1642		oldptr := unsafe.Pointer(currmeta)
1643		newptr := unsafe.Pointer(newmeta)
1644		done = atomic.CompareAndSwapPointer(&b.indexers, oldptr, newptr)
1645
1646		// metaCh should never close
1647		if done && b.metaCh != nil {
1648			select {
1649			// update scan clients
1650			case b.metaCh <- true:
1651			default:
1652			}
1653		}
1654	}
1655	fmsg := "switched currmeta from %v -> %v force %v \n"
1656	logging.Infof(fmsg, currmeta.version, newmeta.version, force)
1657}
1658
1659func (b *metadataClient) hasIndexersChanged(
1660	adminports map[string]common.IndexerId) bool {
1661
1662	currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
1663
1664	for adminport, indexerId := range adminports {
1665		x, ok := currmeta.adminports[adminport]
1666		if !ok || x != indexerId { // new indexer node detected.
1667			return true
1668		}
1669	}
1670	for adminport, indexerId := range currmeta.adminports {
1671		x, ok := adminports[adminport]
1672		if !ok || x != indexerId { // indexer node dropped out.
1673			return true
1674		}
1675	}
1676	return false
1677}
1678
1679func (b *metadataClient) hasIndexesChanged(
1680	mindexes []*mclient.IndexMetadata, version uint64) bool {
1681
1682	currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
1683
1684	if currmeta.version < version {
1685		fmsg := "metadata provider version changed %v -> %v\n"
1686		logging.Infof(fmsg, currmeta.version, version)
1687		return true
1688	}
1689
1690	for _, mindex := range mindexes {
1691		_, ok := currmeta.replicas[mindex.Definition.DefnId]
1692		if !ok { // new index detected.
1693			return true
1694		}
1695	}
1696
1697	for _, iindexes := range currmeta.topology { //iindexes -> []*IndexMetadata
1698	loop:
1699		for _, index := range iindexes { // index -> *IndexMetadata
1700			for _, mindex := range mindexes {
1701				if mindex.Definition.DefnId == index.Definition.DefnId {
1702					for _, ix := range index.Instances {
1703						for _, iy := range mindex.Instances {
1704							if ix.InstId == iy.InstId && ix.State != iy.State {
1705								return true
1706							}
1707						}
1708					}
1709					continue loop
1710				}
1711			}
1712			return true
1713		}
1714	}
1715	return false
1716}
1717
1718//
1719// Update statistics index instance
1720//
1721func (b *metadataClient) updateStats(stats map[common.IndexInstId]map[common.PartitionId]common.Statistics) {
1722
1723	currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
1724
1725	for instId, statsByPartitions := range stats {
1726
1727		load, ok := currmeta.loads[instId]
1728		if !ok {
1729			continue
1730		}
1731
1732		newStats := load.copyStats()
1733
1734		for partitionId, stats := range statsByPartitions {
1735
1736			if v := stats.Get("num_docs_pending"); v != nil {
1737				pending := int64(v.(float64))
1738				if v := stats.Get("num_docs_queued"); v != nil {
1739					queued := int64(v.(float64))
1740					newStats.updatePendingItem(partitionId, pending+queued)
1741				}
1742			}
1743
1744			if v := stats.Get("last_rollback_time"); v != nil {
1745				if rollback, err := strconv.ParseInt(v.(string), 10, 64); err == nil {
1746
1747					oldRollbackTime := load.getStats().getRollbackTime(partitionId)
1748					if rollback != oldRollbackTime {
1749						logging.Infof("Rollback time has changed for index inst %v. New rollback time %v", instId, rollback)
1750					}
1751
1752					newStats.updateRollbackTime(partitionId, rollback)
1753				} else {
1754					logging.Errorf("Error in converting last_rollback_time %v, type %v", err)
1755				}
1756			}
1757
1758			if v := stats.Get("progress_stat_time"); v != nil {
1759				if progress, err := strconv.ParseInt(v.(string), 10, 64); err == nil {
1760					newStats.updateStatsTime(partitionId, progress)
1761				} else {
1762					logging.Errorf("Error in converting progress_stat_time %v, type %v", err)
1763				}
1764			}
1765		}
1766
1767		load.updateStats(newStats)
1768	}
1769}
1770
1771// return adminports for all known indexers.
1772func getIndexerAdminports(cinfo *common.ClusterInfoCache) ([]string, int, int, int, int, error) {
1773	iAdminports := make([]string, 0)
1774	unhealthyNodes := 0
1775	for _, node := range cinfo.GetNodesByServiceType("indexAdmin") {
1776		status, err := cinfo.GetNodeStatus(node)
1777		if err != nil {
1778			return nil, 0, 0, 0, 0, err
1779		}
1780		logging.Verbosef("node %v status: %q", node, status)
1781		if status == "healthy" || status == "active" || status == "warmup" {
1782			adminport, err := cinfo.GetServiceAddress(node, "indexAdmin")
1783			if err != nil {
1784				return nil, 0, 0, 0, 0, err
1785			}
1786			iAdminports = append(iAdminports, adminport)
1787		} else {
1788			unhealthyNodes++
1789			logging.Warnf("node %v status: %q", node, status)
1790		}
1791	}
1792
1793	return iAdminports, // active, healthy indexer node with known ports
1794		len(cinfo.GetActiveIndexerNodes()), // all active indexer node (known ports + unknown ports + unhealthy node)
1795		len(cinfo.GetFailedIndexerNodes()), // failover indexer node
1796		unhealthyNodes, // active, unhealthy indexer node with known ports
1797		len(cinfo.GetNewIndexerNodes()), // new indexer node
1798		nil
1799}
1800
1801// FIXME/TODO: based on discussion with John-
1802//
1803//    if we cannot watch the metadata due to network partition we will
1804//    have an empty list of index and cannot query, in other words
1805//    the client will tolerate the partition and rejects scans until it
1806//    is healed.
1807//    i) alternatively, figure out a way to propagate error that happens
1808//       with watchClusterChanges() go-routine.
1809//
1810//    and while propating error back to the caller
1811//    1) we can encourage the caller to Refresh() the client hoping for
1812//       success, or,
1813//    2) Close() the client and re-create it.
1814//
1815//    side-effects of partitioning,
1816//    a) query cannot get indexes from the indexer node -- so n1ql has
1817//       to do bucket scan. It is a perf issue.
1818//    b) Network disconnected after watcher is up. We have the list of
1819//       indexes -- but we cannot query on it. N1QL should still degrade
1820//       to bucket scan.
1821
1822func (b *metadataClient) watchClusterChanges() {
1823	selfRestart := func() {
1824		time.Sleep(time.Duration(b.servicesNotifierRetryTm) * time.Millisecond)
1825		go b.watchClusterChanges()
1826	}
1827
1828	clusterURL, err := common.ClusterAuthUrl(b.cluster)
1829	if err != nil {
1830		logging.Errorf("common.ClusterAuthUrl(): %v\n", err)
1831		selfRestart()
1832		return
1833	}
1834	scn, err := common.NewServicesChangeNotifier(clusterURL, "default")
1835	if err != nil {
1836		logging.Errorf("common.NewServicesChangeNotifier(): %v\n", err)
1837		selfRestart()
1838		return
1839	}
1840	defer scn.Close()
1841
1842	hasUnhealthyNode := false
1843	ticker := time.NewTicker(time.Duration(5) * time.Minute)
1844	defer ticker.Stop()
1845
1846	// For observing node services config
1847	ch := scn.GetNotifyCh()
1848	for {
1849		select {
1850		case _, ok := <-ch:
1851			if !ok {
1852				selfRestart()
1853				return
1854			} else if err := b.updateIndexerList(false); err != nil {
1855				logging.Errorf("updateIndexerList(): %v\n", err)
1856				selfRestart()
1857				return
1858			}
1859		case _, ok := <-b.mdNotifyCh:
1860			if ok {
1861				if err := b.updateIndexerList(false); err != nil {
1862					logging.Errorf("updateIndexerList(): %v\n", err)
1863					selfRestart()
1864					return
1865				}
1866			}
1867		case stats, ok := <-b.stNotifyCh:
1868			if ok {
1869				b.updateStats(stats)
1870			}
1871		case <-ticker.C:
1872			// refresh indexer version
1873			b.mdClient.RefreshIndexerVersion()
1874
1875			cinfo, err := common.FetchNewClusterInfoCache(b.cluster, common.DEFAULT_POOL)
1876			if err != nil {
1877				logging.Errorf("updateIndexerList(): %v\n", err)
1878				selfRestart()
1879				return
1880			}
1881
1882			_, _, _, unhealthyNode, _, err := getIndexerAdminports(cinfo)
1883			if err != nil {
1884				logging.Errorf("updateIndexerList(): %v\n", err)
1885				selfRestart()
1886				return
1887			}
1888
1889			if unhealthyNode != 0 {
1890				hasUnhealthyNode = true
1891				b.mdClient.SetClusterStatus(-1, -1, unhealthyNode, -1)
1892
1893			} else if hasUnhealthyNode {
1894				// refresh indexer version when there is no more unhealthy node
1895				hasUnhealthyNode = false
1896				if err := b.updateIndexerList(false); err != nil {
1897					logging.Errorf("updateIndexerList(): %v\n", err)
1898					selfRestart()
1899					return
1900				}
1901			}
1902		case <-b.finch:
1903			return
1904		}
1905	}
1906}
1907
1908func postWithAuth(url string, bodyType string, body io.Reader, timeout time.Duration) (*http.Response, error) {
1909
1910	if !strings.HasPrefix(url, "http://") {
1911		url = "http://" + url
1912	}
1913
1914	req, err := http.NewRequest("POST", url, body)
1915	if err != nil {
1916		return nil, err
1917	}
1918	req.Header.Set("Content-Type", bodyType)
1919	err = cbauth.SetRequestAuthVia(req, nil)
1920	if err != nil {
1921		logging.Errorf("Error setting auth %v", err)
1922		return nil, err
1923	}
1924
1925	client := http.Client{Timeout: timeout}
1926	return client.Do(req)
1927}
1928