1// Copyright (c) 2014 Couchbase, Inc.
2// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
3// except in compliance with the License. You may obtain a copy of the License at
4//   http://www.apache.org/licenses/LICENSE-2.0
5// Unless required by applicable law or agreed to in writing, software distributed under the
6// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
7// either express or implied. See the License for the specific language governing permissions
8// and limitations under the License.
9
10package client
11
12import (
13	"github.com/couchbase/cbauth/metakv"
14	"github.com/couchbase/indexing/secondary/common"
15	"github.com/couchbase/indexing/secondary/logging"
16	"math"
17	"sync"
18	"sync/atomic"
19	"time"
20)
21
22type ClientSettings struct {
23	numReplica     int32
24	numPartition   int32
25	backfillLimit  int32
26	scanLagPercent uint64
27	scanLagItem    uint64
28	prune_replica  int32
29	queueSize      uint64
30	concurrency    uint32
31	config         common.Config
32	cancelCh       chan struct{}
33
34	storageMode string
35	mutex       sync.RWMutex
36
37	needRefresh bool
38}
39
40func NewClientSettings(needRefresh bool) *ClientSettings {
41
42	s := &ClientSettings{
43		config:      nil,
44		cancelCh:    make(chan struct{}, 1),
45		needRefresh: needRefresh,
46	}
47
48	if needRefresh {
49		config, err := common.GetSettingsConfig(common.SystemConfig)
50		if err != nil {
51			logging.Errorf("ClientSettings: Fail to initialize metakv for reading latest indexer setting (%v).  Will use default indexer setting.", err)
52		} else {
53			s.config = config
54		}
55	}
56
57	if s.config == nil {
58		s.config = common.SystemConfig.Clone()
59	}
60
61	if needRefresh {
62		go func() {
63			fn := func(r int, err error) error {
64				if r > 0 {
65					logging.Errorf("ClientSettings: metakv notifier failed (%v)..Restarting %v", err, r)
66				}
67				err = metakv.RunObserveChildren(common.IndexingSettingsMetaDir, s.metaKVCallback, s.cancelCh)
68				return err
69			}
70			rh := common.NewRetryHelper(200, time.Second, 2, fn)
71			err := rh.Run()
72			if err != nil {
73				logging.Errorf("ClientSettings: metakv notifier failed even after max retries.")
74			}
75		}()
76	}
77
78	s.handleSettings(s.config)
79
80	return s
81}
82
83func (s *ClientSettings) Close() {
84
85	close(s.cancelCh)
86}
87
88func (s *ClientSettings) metaKVCallback(path string, value []byte, rev interface{}) error {
89
90	if path == common.IndexingSettingsMetaPath {
91		logging.Infof("New settings received: \n%s", string(value))
92
93		config := s.config.Clone()
94		config.Update(value)
95		s.config = config
96
97		s.handleSettings(s.config)
98	}
99
100	return nil
101}
102
103func (s *ClientSettings) handleSettings(config common.Config) {
104
105	numReplica := int32(config["indexer.settings.num_replica"].Int())
106	if numReplica >= 0 {
107		atomic.StoreInt32(&s.numReplica, numReplica)
108	} else {
109		logging.Errorf("ClientSettings: invalid setting value for num_replica=%v", numReplica)
110	}
111
112	numPartition := int32(config["indexer.numPartitions"].Int())
113	if numPartition > 0 {
114		atomic.StoreInt32(&s.numPartition, numPartition)
115	} else {
116		logging.Errorf("ClientSettings: invalid setting value for numPartitions=%v", numPartition)
117	}
118
119	backfillLimit := int32(config["queryport.client.settings.backfillLimit"].Int())
120	if backfillLimit >= 0 {
121		atomic.StoreInt32(&s.backfillLimit, backfillLimit)
122	} else {
123		logging.Errorf("ClientSettings: invalid setting value for backfillLimit=%v", backfillLimit)
124	}
125
126	scanLagPercent := config["queryport.client.scanLagPercent"].Float64()
127	if scanLagPercent >= 0 {
128		atomic.StoreUint64(&s.scanLagPercent, math.Float64bits(scanLagPercent))
129	} else {
130		logging.Errorf("ClientSettings: invalid setting value for scanLagPercent=%v", scanLagPercent)
131	}
132
133	scanLagItem := config["queryport.client.scanLagItem"].Int()
134	if scanLagItem >= 0 {
135		atomic.StoreUint64(&s.scanLagItem, uint64(scanLagItem))
136	} else {
137		logging.Errorf("ClientSettings: invalid setting value for scanLagItem=%v", scanLagItem)
138	}
139
140	prune_replica := config["queryport.client.disable_prune_replica"].Bool()
141	if prune_replica {
142		atomic.StoreInt32(&s.prune_replica, int32(1))
143	} else {
144		atomic.StoreInt32(&s.prune_replica, int32(0))
145	}
146
147	queueSize := config["queryport.client.scan.queue_size"].Int()
148	if queueSize >= 0 {
149		atomic.StoreUint64(&s.queueSize, uint64(queueSize))
150	} else {
151		logging.Errorf("ClientSettings: invalid setting value for queueSize=%v", queueSize)
152	}
153
154	concurrency := config["queryport.client.scan.max_concurrency"].Int()
155	if concurrency >= 0 {
156		atomic.StoreUint32(&s.concurrency, uint32(concurrency))
157	} else {
158		logging.Errorf("ClientSettings: invalid setting value for max_concurrency=%v", concurrency)
159	}
160
161	storageMode := config["indexer.settings.storage_mode"].String()
162	if len(storageMode) != 0 {
163		func() {
164			s.mutex.Lock()
165			defer s.mutex.Unlock()
166			s.storageMode = storageMode
167		}()
168	}
169
170	if s.needRefresh {
171		logLevel := config["queryport.client.log_level"].String()
172		level := logging.Level(logLevel)
173		logging.SetLogLevel(level)
174	}
175}
176
177func (s *ClientSettings) NumReplica() int32 {
178	return atomic.LoadInt32(&s.numReplica)
179}
180
181func (s *ClientSettings) NumPartition() int32 {
182	return atomic.LoadInt32(&s.numPartition)
183}
184
185func (s *ClientSettings) StorageMode() string {
186
187	s.mutex.RLock()
188	defer s.mutex.RUnlock()
189
190	return s.storageMode
191}
192
193func (s *ClientSettings) BackfillLimit() int32 {
194	return atomic.LoadInt32(&s.backfillLimit)
195}
196
197func (s *ClientSettings) ScanLagPercent() float64 {
198	bits := atomic.LoadUint64(&s.scanLagPercent)
199	return math.Float64frombits(bits)
200}
201
202func (s *ClientSettings) ScanLagItem() uint64 {
203	return atomic.LoadUint64(&s.scanLagItem)
204}
205
206func (s *ClientSettings) DisablePruneReplica() bool {
207	if atomic.LoadInt32(&s.prune_replica) == 1 {
208		return true
209	}
210	return false
211}
212
213func (s *ClientSettings) ScanQueueSize() uint64 {
214	return atomic.LoadUint64(&s.queueSize)
215}
216
217func (s *ClientSettings) MaxConcurrency() uint32 {
218	return atomic.LoadUint32(&s.concurrency)
219}
220