1//  Copyright (c) 2014 Couchbase, Inc.
2//  Licensed under the Apache License, Version 2.0 (the "License");
3//  you may not use this file except in compliance with the
4//  License. You may obtain a copy of the License at
5//    http://www.apache.org/licenses/LICENSE-2.0
6//  Unless required by applicable law or agreed to in writing,
7//  software distributed under the License is distributed on an "AS
8//  IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
9//  express or implied. See the License for the specific language
10//  governing permissions and limitations under the License.
11
12package cbgt
13
14import (
15	"encoding/json"
16	"fmt"
17	"io/ioutil"
18	"math"
19	"os"
20	"strings"
21	"sync"
22)
23
24const PINDEX_META_FILENAME string = "PINDEX_META"
25const pindexPathSuffix string = ".pindex"
26
27// A PIndex represents a partition of an index, or an "index
28// partition".  A logical index definition will be split into one or
29// more pindexes.
30type PIndex struct {
31	Name             string     `json:"name"`
32	UUID             string     `json:"uuid"`
33	IndexType        string     `json:"indexType"`
34	IndexName        string     `json:"indexName"`
35	IndexUUID        string     `json:"indexUUID"`
36	IndexParams      string     `json:"indexParams"`
37	SourceType       string     `json:"sourceType"`
38	SourceName       string     `json:"sourceName"`
39	SourceUUID       string     `json:"sourceUUID"`
40	SourceParams     string     `json:"sourceParams"`
41	SourcePartitions string     `json:"sourcePartitions"`
42	Path             string     `json:"-"` // Transient, not persisted.
43	Impl             PIndexImpl `json:"-"` // Transient, not persisted.
44	Dest             Dest       `json:"-"` // Transient, not persisted.
45
46	sourcePartitionsMap map[string]bool // Non-persisted memoization.
47
48	m      sync.Mutex
49	closed bool
50}
51
52// Close down a pindex, optionally removing its stored files.
53func (p *PIndex) Close(remove bool) error {
54	p.m.Lock()
55	if p.closed {
56		p.m.Unlock()
57		return nil
58	}
59
60	p.closed = true
61	p.m.Unlock()
62
63	if p.Dest != nil {
64		err := p.Dest.Close()
65		if err != nil {
66			return err
67		}
68	}
69
70	if remove {
71		os.RemoveAll(p.Path)
72	}
73
74	return nil
75}
76
77// Clone clones the current PIndex
78func (p *PIndex) Clone() *PIndex {
79	if p != nil {
80		p.m.Lock()
81		pi := &PIndex{
82			Name:                p.Name,
83			UUID:                p.UUID,
84			IndexName:           p.IndexName,
85			IndexParams:         p.IndexParams,
86			IndexType:           p.IndexType,
87			IndexUUID:           p.IndexUUID,
88			SourceType:          p.SourceType,
89			SourceName:          p.SourceName,
90			SourceUUID:          p.SourceUUID,
91			SourceParams:        p.SourceParams,
92			SourcePartitions:    p.SourcePartitions,
93			sourcePartitionsMap: p.sourcePartitionsMap,
94			Path:                p.Path,
95			Impl:                p.Impl,
96			Dest:                p.Dest,
97			closed:              p.closed,
98		}
99		p.m.Unlock()
100		return pi
101	}
102	return nil
103}
104
105func restartPIndex(mgr *Manager, pindex *PIndex) {
106	pindex.m.Lock()
107	closed := pindex.closed
108	pindex.m.Unlock()
109
110	if !closed {
111		mgr.ClosePIndex(pindex)
112	}
113
114	mgr.Kick("restart-pindex")
115}
116
117// Creates a pindex, including its backend implementation structures,
118// and its files.
119func NewPIndex(mgr *Manager, name, uuid,
120	indexType, indexName, indexUUID, indexParams,
121	sourceType, sourceName, sourceUUID, sourceParams, sourcePartitions string,
122	path string) (*PIndex, error) {
123	var pindex *PIndex
124
125	restart := func() {
126		go restartPIndex(mgr, pindex)
127	}
128
129	impl, dest, err := NewPIndexImpl(indexType, indexParams, path, restart)
130	if err != nil {
131		os.RemoveAll(path)
132		return nil, fmt.Errorf("pindex: new indexType: %s, indexParams: %s,"+
133			" path: %s, err: %s", indexType, indexParams, path, err)
134	}
135
136	pindex = &PIndex{
137		Name:             name,
138		UUID:             uuid,
139		IndexType:        indexType,
140		IndexName:        indexName,
141		IndexUUID:        indexUUID,
142		IndexParams:      indexParams,
143		SourceType:       sourceType,
144		SourceName:       sourceName,
145		SourceUUID:       sourceUUID,
146		SourceParams:     sourceParams,
147		SourcePartitions: sourcePartitions,
148		Path:             path,
149		Impl:             impl,
150		Dest:             dest,
151	}
152	pindex.sourcePartitionsMap = map[string]bool{}
153	for _, partition := range strings.Split(sourcePartitions, ",") {
154		pindex.sourcePartitionsMap[partition] = true
155	}
156
157	buf, err := json.Marshal(pindex)
158	if err != nil {
159		dest.Close()
160		os.RemoveAll(path)
161		return nil, err
162	}
163
164	err = ioutil.WriteFile(path+string(os.PathSeparator)+PINDEX_META_FILENAME,
165		buf, 0600)
166	if err != nil {
167		dest.Close()
168		os.RemoveAll(path)
169		return nil, fmt.Errorf("pindex: could not save PINDEX_META_FILENAME,"+
170			" path: %s, err: %v", path, err)
171	}
172
173	return pindex, nil
174}
175
176// OpenPIndex reopens a previously created pindex.  The path argument
177// must be a directory for the pindex.
178func OpenPIndex(mgr *Manager, path string) (*PIndex, error) {
179	buf, err := ioutil.ReadFile(path +
180		string(os.PathSeparator) + PINDEX_META_FILENAME)
181	if err != nil {
182		return nil, fmt.Errorf("pindex: could not load PINDEX_META_FILENAME,"+
183			" path: %s, err: %v", path, err)
184	}
185
186	pindex := &PIndex{}
187	err = json.Unmarshal(buf, pindex)
188	if err != nil {
189		return nil, fmt.Errorf("pindex: could not parse pindex json,"+
190			" path: %s, err: %v", path, err)
191	}
192
193	restart := func() {
194		go restartPIndex(mgr, pindex)
195	}
196
197	impl, dest, err := OpenPIndexImplUsing(pindex.IndexType, path,
198		pindex.IndexParams, restart)
199	if err != nil {
200		return nil, fmt.Errorf("pindex: could not open indexType: %s,"+
201			" path: %s, err: %v", pindex.IndexType, path, err)
202	}
203
204	pindex.Path = path
205	pindex.Impl = impl
206	pindex.Dest = dest
207
208	pindex.sourcePartitionsMap = map[string]bool{}
209	for _, partition := range strings.Split(pindex.SourcePartitions, ",") {
210		pindex.sourcePartitionsMap[partition] = true
211	}
212
213	return pindex, nil
214}
215
216// Computes the storage path for a pindex.
217func PIndexPath(dataDir, pindexName string) string {
218	// TODO: Need path security checks / mapping here; ex: "../etc/pswd"
219	return dataDir + string(os.PathSeparator) + pindexName + pindexPathSuffix
220}
221
222// Retrieves a pindex name from a pindex path.
223func ParsePIndexPath(dataDir, pindexPath string) (string, bool) {
224	if !strings.HasSuffix(pindexPath, pindexPathSuffix) {
225		return "", false
226	}
227	prefix := dataDir + string(os.PathSeparator)
228	if !strings.HasPrefix(pindexPath, prefix) {
229		return "", false
230	}
231	pindexName := pindexPath[len(prefix):]
232	pindexName = pindexName[0 : len(pindexName)-len(pindexPathSuffix)]
233	return pindexName, true
234}
235
236// ---------------------------------------------------------
237
238// RemotePlanPIndex associations are returned by CoveringPIndexes().
239type RemotePlanPIndex struct {
240	PlanPIndex *PlanPIndex
241	NodeDef    *NodeDef
242}
243
244// PlanPIndexFilter is used to filter out nodes being considered by
245// CoveringPIndexes().
246type PlanPIndexFilter func(*PlanPIndexNode) bool
247
248// CoveringPIndexesSpec represent the arguments for computing the
249// covering pindexes for an index.  See also CoveringPIndexesEx().
250type CoveringPIndexesSpec struct {
251	IndexName            string
252	IndexUUID            string
253	PlanPIndexFilterName string // See PlanPIndexesFilters.
254}
255
256// CoveringPIndexes represents a non-overlapping, disjoint set of
257// PIndexes that cover all the partitions of an index.
258type CoveringPIndexes struct {
259	LocalPIndexes      []*PIndex
260	RemotePlanPIndexes []*RemotePlanPIndex
261	MissingPIndexNames []string
262}
263
264// PlanPIndexFilters represent registered PlanPIndexFilter func's, and
265// should only be modified during process init()'ialization.
266var PlanPIndexFilters = map[string]PlanPIndexFilter{
267	"ok":      PlanPIndexNodeOk,
268	"canRead": PlanPIndexNodeCanRead,
269}
270
271// ---------------------------------------------------------
272
273// CoveringPIndexes returns a non-overlapping, disjoint set (or cut)
274// of PIndexes (either local or remote) that cover all the partitons
275// of an index so that the caller can perform scatter/gather queries,
276// etc.  Only PlanPIndexes on wanted nodes that pass the
277// planPIndexFilter filter will be returned.
278//
279// TODO: Perhaps need a tighter check around indexUUID, as the current
280// implementation might have a race where old pindexes with a matching
281// (but outdated) indexUUID might be chosen.
282//
283// TODO: This implementation currently always favors the local node's
284// pindex, but should it?  Perhaps a remote node is more up-to-date
285// than the local pindex?
286//
287// TODO: We should favor the most up-to-date node rather than
288// the first one that we run into here?  But, perhaps the most
289// up-to-date node is also the most overloaded?  Or, perhaps
290// the planner may be trying to rebalance away the most
291// up-to-date node and hitting it with load just makes the
292// rebalance take longer?
293func (mgr *Manager) CoveringPIndexes(indexName, indexUUID string,
294	planPIndexFilter PlanPIndexFilter, wantKind string) (
295	localPIndexes []*PIndex,
296	remotePlanPIndexes []*RemotePlanPIndex,
297	err error) {
298	var missingPIndexNames []string
299
300	localPIndexes, remotePlanPIndexes, missingPIndexNames, err =
301		mgr.CoveringPIndexesEx(CoveringPIndexesSpec{
302			IndexName: indexName,
303			IndexUUID: indexUUID,
304		}, planPIndexFilter, false)
305	if err == nil && len(missingPIndexNames) > 0 {
306		return nil, nil, fmt.Errorf("pindex:"+
307			" %s may have been disabled; no nodes are enabled/allocated"+
308			" to serve %s for the index partition(s)",
309			wantKind, wantKind)
310	}
311
312	return localPIndexes, remotePlanPIndexes, err
313}
314
315// CoveringPIndexesBestEffort is similar to CoveringPIndexes, but does
316// not error if there are missing/disabled nodes for some of the
317// pindexes.
318func (mgr *Manager) CoveringPIndexesBestEffort(indexName, indexUUID string,
319	planPIndexFilter PlanPIndexFilter, wantKind string) (
320	localPIndexes []*PIndex,
321	remotePlanPIndexes []*RemotePlanPIndex,
322	missingPIndexNames []string,
323	err error) {
324	return mgr.CoveringPIndexesEx(CoveringPIndexesSpec{
325		IndexName: indexName,
326		IndexUUID: indexUUID,
327	}, planPIndexFilter, false)
328}
329
330// CoveringPIndexesEx returns a non-overlapping, disjoint set (or cut)
331// of PIndexes (either local or remote) that cover all the partitons
332// of an index so that the caller can perform scatter/gather queries.
333//
334// If the planPIndexFilter param is nil, then the
335// spec.PlanPIndexFilterName is used.
336func (mgr *Manager) CoveringPIndexesEx(spec CoveringPIndexesSpec,
337	planPIndexFilter PlanPIndexFilter, noCache bool) (
338	[]*PIndex, []*RemotePlanPIndex, []string, error) {
339	var ver uint64
340
341	ppf := planPIndexFilter
342	if ppf == nil {
343		if !noCache {
344			var cp *CoveringPIndexes
345
346			mgr.m.Lock()
347			if mgr.coveringCache != nil {
348				cp = mgr.coveringCache[spec]
349				ver = mgr.coveringCacheVerLOCKED()
350			}
351			mgr.m.Unlock()
352
353			if cp != nil {
354				return cp.LocalPIndexes, cp.RemotePlanPIndexes, cp.MissingPIndexNames, nil
355			}
356		}
357
358		ppf = PlanPIndexFilters[spec.PlanPIndexFilterName]
359	}
360
361	localPIndexes, remotePlanPIndexes, missingPIndexNames, err :=
362		mgr.coveringPIndexesEx(spec.IndexName, spec.IndexUUID, ppf)
363	if err != nil {
364		return nil, nil, nil, err
365	}
366
367	if planPIndexFilter == nil && !noCache {
368		cp := &CoveringPIndexes{
369			LocalPIndexes:      localPIndexes,
370			RemotePlanPIndexes: remotePlanPIndexes,
371			MissingPIndexNames: missingPIndexNames,
372		}
373
374		mgr.m.Lock()
375		if ver == mgr.coveringCacheVerLOCKED() {
376			if mgr.coveringCache == nil {
377				mgr.coveringCache = map[CoveringPIndexesSpec]*CoveringPIndexes{}
378			}
379			mgr.coveringCache[spec] = cp
380		}
381		mgr.m.Unlock()
382	}
383
384	return localPIndexes, remotePlanPIndexes, missingPIndexNames, err
385}
386
387func (mgr *Manager) coveringPIndexesEx(indexName, indexUUID string,
388	planPIndexFilter PlanPIndexFilter) (
389	localPIndexes []*PIndex,
390	remotePlanPIndexes []*RemotePlanPIndex,
391	missingPIndexNames []string,
392	err error) {
393	nodeDefs, err := mgr.GetNodeDefs(NODE_DEFS_WANTED, false)
394	if err != nil {
395		return nil, nil, nil,
396			fmt.Errorf("pindex: could not get wanted nodeDefs,"+
397				" err: %v", err)
398	}
399
400	// Returns true if the node has the "pindex" tag.
401	nodeDoesPIndexes := func(nodeUUID string) (*NodeDef, bool) {
402		nodeDef, ok := nodeDefs.NodeDefs[nodeUUID]
403		if ok && nodeDef.UUID == nodeUUID {
404			if len(nodeDef.Tags) <= 0 {
405				return nodeDef, true
406			}
407			for _, tag := range nodeDef.Tags {
408				if tag == "pindex" {
409					return nodeDef, true
410				}
411			}
412		}
413		return nil, false
414	}
415
416	_, allPlanPIndexes, err := mgr.GetPlanPIndexes(false)
417	if err != nil {
418		return nil, nil, nil,
419			fmt.Errorf("pindex: could not retrieve allPlanPIndexes,"+
420				" err: %v", err)
421	}
422
423	planPIndexes, exists := allPlanPIndexes[indexName]
424	if !exists || len(planPIndexes) <= 0 {
425		return nil, nil, nil,
426			fmt.Errorf("pindex: no planPIndexes for indexName: %s",
427				indexName)
428	}
429
430	localPIndexes = make([]*PIndex, 0, len(planPIndexes))
431	remotePlanPIndexes = make([]*RemotePlanPIndex, 0, len(planPIndexes))
432	missingPIndexNames = make([]string, 0)
433
434	_, pindexes := mgr.CurrentMaps()
435
436	selfUUID := mgr.UUID()
437
438	for _, planPIndex := range planPIndexes {
439		lowestNodePriority := math.MaxInt64
440		var lowestNode *NodeDef
441
442		// look through each of the nodes
443		for nodeUUID, planPIndexNode := range planPIndex.Nodes {
444			// if node is local, do additional checks
445			nodeLocal := nodeUUID == selfUUID
446			nodeLocalOK := false
447			if nodeLocal {
448				localPIndex, exists := pindexes[planPIndex.Name]
449				if exists &&
450					localPIndex != nil &&
451					localPIndex.Name == planPIndex.Name &&
452					localPIndex.IndexName == indexName &&
453					(indexUUID == "" || localPIndex.IndexUUID == indexUUID) {
454					nodeLocalOK = true
455				}
456			}
457
458			// node does pindexes and it is wanted
459			if nodeDef, ok := nodeDoesPIndexes(nodeUUID); ok &&
460				planPIndexFilter(planPIndexNode) {
461				if planPIndexNode.Priority < lowestNodePriority {
462					// candidate node has lower priority
463					if !nodeLocal || (nodeLocal && nodeLocalOK) {
464						lowestNode = nodeDef
465						lowestNodePriority = planPIndexNode.Priority
466					}
467				} else if planPIndexNode.Priority == lowestNodePriority {
468					if nodeLocal && nodeLocalOK {
469						// same priority, but prefer local nodes
470						lowestNode = nodeDef
471						lowestNodePriority = planPIndexNode.Priority
472					}
473				}
474			}
475		}
476
477		// now add the node we found to the correct list
478		if lowestNode == nil {
479			// couldn't find anyone with this pindex
480			missingPIndexNames = append(missingPIndexNames, planPIndex.Name)
481		} else if lowestNode.UUID == selfUUID {
482			// lowest priority is local
483			localPIndex := pindexes[planPIndex.Name]
484			localPIndexes = append(localPIndexes, localPIndex)
485		} else {
486			// lowest priority is remote
487			remotePlanPIndexes =
488				append(remotePlanPIndexes, &RemotePlanPIndex{
489					PlanPIndex: planPIndex,
490					NodeDef:    lowestNode,
491				})
492		}
493	}
494
495	return localPIndexes, remotePlanPIndexes, missingPIndexNames, nil
496}
497
498// coveringCacheVerLOCKED computes a CAS-like number that can be
499// quickly compared to see if any inputs to the covering pindexes
500// computation have changed.
501func (mgr *Manager) coveringCacheVerLOCKED() uint64 {
502	return mgr.stats.TotRefreshLastNodeDefs +
503		mgr.stats.TotRefreshLastPlanPIndexes +
504		mgr.stats.TotRegisterPIndex +
505		mgr.stats.TotUnregisterPIndex
506}
507