1//  Copyright (c) 2019 Couchbase, Inc.
2//  Licensed under the Apache License, Version 2.0 (the "License");
3//  you may not use this file except in compliance with the
4//  License. You may obtain a copy of the License at
5//    http://www.apache.org/licenses/LICENSE-2.0
6//  Unless required by applicable law or agreed to in writing,
7//  software distributed under the License is distributed on an "AS
8//  IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
9//  express or implied. See the License for the specific language
10//  governing permissions and limitations under the License.
11
12package n1fty
13
14import (
15	"fmt"
16	"io/ioutil"
17	"os"
18	"path"
19	"strings"
20	"sync"
21	"sync/atomic"
22	"time"
23
24	"github.com/couchbase/cbgt"
25	"github.com/couchbase/n1fty/util"
26	"github.com/couchbase/query/datastore"
27	"github.com/couchbase/query/errors"
28	"github.com/couchbase/query/logging"
29)
30
31const backfillSpaceDir = "query_tmpspace_dir"
32const backfillSpaceLimit = "query_tmpspace_limit"
33const searchTimeoutMS = "searchTimeoutMS"
34
35const metakvMetaDir = "/fts/cbgt/cfg/"
36
37// 5120 is the default tmp space limit shared between gsi/n1fty
38var defaultBackfillLimit = int64(5120 / 2)
39var defaultSearchTimeoutMS = int64(120000) // 2min
40const backfillPrefix = "search-results"
41
42// ftsConfig is the metakv config listener which helps the
43// n1fty indexer to refresh it's config information like
44// index/node definitions.
45type ftsConfig struct {
46	cfg     cbgt.Cfg
47	eventCh chan cbgt.CfgEvent
48
49	version uint64 // version for the metakv config changes
50
51	m           sync.Mutex
52	subscribers map[string]datastore.Indexer
53}
54
55var once sync.Once
56
57var srvConfig *ftsConfig
58
59func init() {
60	var err error
61	srvConfig = &ftsConfig{
62		eventCh:     make(chan cbgt.CfgEvent),
63		subscribers: make(map[string]datastore.Indexer),
64		version:     1,
65	}
66	cbgt.CfgMetaKvPrefix = "/fts/cbgt/cfg/"
67	srvConfig.cfg, err = cbgt.NewCfgMetaKv("", make(map[string]string))
68	if err != nil {
69		logging.Infof("n1fty: ftsConfig err: %v", err)
70	}
71
72	go srvConfig.Listen()
73}
74
75func (c *ftsConfig) Listen() {
76	for {
77		select {
78		case <-c.eventCh:
79			// first bump the version so that the subscribers can
80			// verify the updated version with their cached one.
81			atomic.AddUint64(&c.version, 1)
82
83			c.m.Lock()
84			for _, i := range c.subscribers {
85				if indexer, ok := i.(*FTSIndexer); ok {
86					indexer.refresh(true /* config mutex acquired */)
87				}
88			}
89			c.m.Unlock()
90		}
91	}
92}
93
94func (c *ftsConfig) initConfig() {
95	once.Do(func() {
96		c.cfg.Subscribe(cbgt.INDEX_DEFS_KEY, c.eventCh)
97		c.cfg.Subscribe(cbgt.CfgNodeDefsKey(cbgt.NODE_DEFS_KNOWN), c.eventCh)
98	})
99}
100
101func (c *ftsConfig) subscribe(key string, i datastore.Indexer) {
102	c.m.Lock()
103	c.subscribeLOCKED(key, i)
104	c.m.Unlock()
105}
106
107func (c *ftsConfig) subscribeLOCKED(key string, i datastore.Indexer) {
108	c.subscribers[key] = i
109}
110
111func (c *ftsConfig) unSubscribe(key string) {
112	c.m.Lock()
113	delete(c.subscribers, key)
114	c.m.Unlock()
115}
116
117func (c *ftsConfig) getVersion() uint64 {
118	return atomic.LoadUint64(&c.version)
119}
120
121func (c *ftsConfig) bumpVersion() {
122	atomic.AddUint64(&c.version, 1)
123}
124
125type Cfg interface {
126	datastore.IndexConfig
127	GetConfig() map[string]interface{}
128}
129
130// clientConfig is used by the query to pass on the configs values
131// related to the backfill.
132var clientConfig n1ftyConfig
133
134// n1ftyConfig implementation of datastore.IndexConfig interface
135type n1ftyConfig struct {
136	config atomic.Value
137}
138
139func GetConfig() (datastore.IndexConfig, errors.Error) {
140	return &clientConfig, nil
141}
142
143func setConfig(nf *n1ftyConfig, conf map[string]interface{}) errors.Error {
144	err := nf.validateConfig(conf)
145	if err != nil {
146		return err
147	}
148
149	nf.processConfig(conf)
150	// make local copy so caller doesn't accidentally modify
151	newConf := nf.GetConfig()
152	if newConf == nil {
153		newConf = make(map[string]interface{})
154	}
155	for k, v := range conf {
156		newConf[k] = v
157	}
158
159	nf.config.Store(newConf)
160	return nil
161}
162
163func (c *n1ftyConfig) SetConfig(conf map[string]interface{}) errors.Error {
164	return setConfig(c, conf)
165}
166
167// SetParam should not be called concurrently with SetConfig
168func (c *n1ftyConfig) SetParam(name string, val interface{}) errors.Error {
169	conf := c.config.Load().(map[string]interface{})
170
171	if conf != nil {
172		tempconf := make(map[string]interface{})
173		tempconf[name] = val
174		err := c.validateConfig(tempconf)
175		if err != nil {
176			return err
177		}
178		c.processConfig(tempconf)
179		logging.Infof("n1ftyConfig - Setting param %v %v", name, val)
180		conf[name] = val
181	} else {
182		conf = make(map[string]interface{})
183		conf[name] = val
184		return c.SetConfig(conf)
185	}
186	return nil
187}
188
189func (c *n1ftyConfig) validateConfig(conf map[string]interface{}) errors.Error {
190	if conf == nil {
191		return nil
192	}
193
194	if v, ok := conf[backfillSpaceDir]; ok {
195		if _, ok1 := v.(string); !ok1 {
196			err := fmt.Errorf("n1fty Invalid Config.. key: %v, val: %v",
197				backfillSpaceDir, v)
198			return util.N1QLError(err, err.Error())
199		}
200	}
201
202	if v, ok := conf[backfillSpaceLimit]; ok {
203		if _, ok1 := v.(int64); !ok1 {
204			err := fmt.Errorf("n1fty Invalid Config.. key: %v, val: %v",
205				backfillSpaceLimit, v)
206			return util.N1QLError(err, err.Error())
207		}
208	}
209
210	if v, ok := conf[searchTimeoutMS]; ok {
211		if _, ok1 := v.(int64); !ok1 {
212			err := fmt.Errorf("n1fty Invalid Config.. key: %v, val: %v",
213				searchTimeoutMS, v)
214			return util.N1QLError(err, err.Error())
215		}
216	}
217
218	return nil
219}
220
221func (c *n1ftyConfig) processConfig(conf map[string]interface{}) {
222	var olddir interface{}
223	var newdir interface{}
224
225	if conf != nil {
226		newdir, _ = conf[backfillSpaceDir]
227	}
228
229	prevconf := clientConfig.GetConfig()
230
231	if prevconf != nil {
232		olddir, _ = prevconf[backfillSpaceDir]
233	}
234
235	if olddir == nil {
236		olddir = getDefaultTmpDir()
237	}
238
239	// cleanup any stale files
240	if olddir != newdir {
241		cleanupTmpFiles(olddir.(string))
242		if newdir != nil {
243			cleanupTmpFiles(newdir.(string))
244		}
245	}
246}
247
248// best effort cleanup as tmpdir may change during restart
249func cleanupTmpFiles(olddir string) {
250	files, err := ioutil.ReadDir(olddir)
251	if err != nil {
252		return
253	}
254
255	searchTimeout := defaultSearchTimeoutMS
256	conf := clientConfig.GetConfig()
257	if conf != nil {
258		if val, ok := conf[searchTimeoutMS]; ok {
259			searchTimeout = val.(int64)
260		}
261	}
262
263	for _, file := range files {
264		fname := path.Join(olddir, file.Name())
265		mtime := file.ModTime()
266		since := (time.Since(mtime).Seconds() * 1000) * 2 // twice the long search
267		if (strings.Contains(fname, backfillPrefix)) &&
268			int64(since) > searchTimeout {
269			logging.Infof("n1fty: removing old file %v, last modified @ %v",
270				fname, mtime)
271			os.Remove(fname)
272		}
273	}
274}
275
276func (c *n1ftyConfig) GetConfig() map[string]interface{} {
277	conf := c.config.Load()
278	if conf != nil {
279		return conf.(map[string]interface{})
280	}
281	return nil
282}
283
284func getDefaultTmpDir() string {
285	file, err := ioutil.TempFile("", backfillPrefix)
286	if err != nil {
287		return ""
288	}
289	defaultDir := path.Dir(file.Name())
290	os.Remove(file.Name())
291	return defaultDir
292}
293
294// GetIndexDefs gets the latest indexDefs from configs
295func GetIndexDefs(cfg cbgt.Cfg) (*cbgt.IndexDefs, error) {
296	indexDefs, _, err := cbgt.CfgGetIndexDefs(cfg)
297	if err != nil {
298		return nil, fmt.Errorf("indexDefs err: %v", err)
299	}
300	return indexDefs, nil
301}
302
303// GetNodeDefs gets the latest nodeDefs from configs
304func GetNodeDefs(cfg cbgt.Cfg) (*cbgt.NodeDefs, error) {
305	nodeDefs, _, err := cbgt.CfgGetNodeDefs(cfg, "known")
306	if err != nil {
307		return nil, fmt.Errorf("nodeDefs err: %v", err)
308	}
309	return nodeDefs, nil
310}
311