1// Copyright (c) 2014 Couchbase, Inc.
2// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
3// except in compliance with the License. You may obtain a copy of the License at
4//   http://www.apache.org/licenses/LICENSE-2.0
5// Unless required by applicable law or agreed to in writing, software distributed under the
6// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
7// either express or implied. See the License for the specific language governing permissions
8// and limitations under the License.
9package manager
10
11import (
12	"bytes"
13	"encoding/json"
14	"errors"
15	"fmt"
16	"io"
17	"io/ioutil"
18	"math"
19	"net/http"
20	"os"
21	"path"
22	"sort"
23	"strconv"
24	"strings"
25	"sync"
26	"time"
27
28	"github.com/couchbase/cbauth"
29	"github.com/couchbase/indexing/secondary/common"
30	"github.com/couchbase/indexing/secondary/logging"
31	"github.com/couchbase/indexing/secondary/manager/client"
32	mc "github.com/couchbase/indexing/secondary/manager/common"
33	"github.com/couchbase/indexing/secondary/planner"
34	"github.com/couchbase/indexing/secondary/security"
35)
36
37///////////////////////////////////////////////////////
38// Type Definition
39///////////////////////////////////////////////////////
40
41//
42// Index create / drop
43//
44
45type RequestType string
46
47const (
48	CREATE RequestType = "create"
49	DROP   RequestType = "drop"
50	BUILD  RequestType = "build"
51)
52
53type IndexRequest struct {
54	Version  uint64                 `json:"version,omitempty"`
55	Type     RequestType            `json:"type,omitempty"`
56	Index    common.IndexDefn       `json:"index,omitempty"`
57	IndexIds client.IndexIdList     `json:indexIds,omitempty"`
58	Plan     map[string]interface{} `json:plan,omitempty"`
59}
60
61type IndexResponse struct {
62	Version uint64 `json:"version,omitempty"`
63	Code    string `json:"code,omitempty"`
64	Error   string `json:"error,omitempty"`
65	Message string `json:"message,omitempty"`
66}
67
68//
69// Index Backup / Restore
70//
71
72type LocalIndexMetadata struct {
73	IndexerId        string             `json:"indexerId,omitempty"`
74	NodeUUID         string             `json:"nodeUUID,omitempty"`
75	StorageMode      string             `json:"storageMode,omitempty"`
76	Timestamp        int64              `json:"timestamp,omitempty"`
77	LocalSettings    map[string]string  `json:"localSettings,omitempty"`
78	IndexTopologies  []IndexTopology    `json:"topologies,omitempty"`
79	IndexDefinitions []common.IndexDefn `json:"definitions,omitempty"`
80}
81
82type ClusterIndexMetadata struct {
83	Metadata []LocalIndexMetadata `json:"metadata,omitempty"`
84}
85
86type BackupResponse struct {
87	Version uint64               `json:"version,omitempty"`
88	Code    string               `json:"code,omitempty"`
89	Error   string               `json:"error,omitempty"`
90	Result  ClusterIndexMetadata `json:"result,omitempty"`
91}
92
93type RestoreResponse struct {
94	Version uint64 `json:"version,omitempty"`
95	Code    string `json:"code,omitempty"`
96	Error   string `json:"error,omitempty"`
97}
98
99//
100// Index Status
101//
102
103type IndexStatusResponse struct {
104	Version     uint64        `json:"version,omitempty"`
105	Code        string        `json:"code,omitempty"`
106	Error       string        `json:"error,omitempty"`
107	FailedNodes []string      `json:"failedNodes,omitempty"`
108	Status      []IndexStatus `json:"status,omitempty"`
109}
110
111type IndexStatus struct {
112	DefnId       common.IndexDefnId `json:"defnId,omitempty"`
113	InstId       common.IndexInstId `json:"instId,omitempty"`
114	Name         string             `json:"name,omitempty"`
115	Bucket       string             `json:"bucket,omitempty"`
116	IsPrimary    bool               `json:"isPrimary,omitempty"`
117	SecExprs     []string           `json:"secExprs,omitempty"`
118	WhereExpr    string             `json:"where,omitempty"`
119	IndexType    string             `json:"indexType,omitempty"`
120	Status       string             `json:"status,omitempty"`
121	Definition   string             `json:"definition"`
122	Hosts        []string           `json:"hosts,omitempty"`
123	Error        string             `json:"error,omitempty"`
124	Completion   int                `json:"completion"`
125	Progress     float64            `json:"progress"`
126	Scheduled    bool               `json:"scheduled"`
127	Partitioned  bool               `json:"partitioned"`
128	NumPartition int                `json:"numPartition"`
129	PartitionMap map[string][]int   `json:"partitionMap"`
130	NodeUUID     string             `json:"nodeUUID,omitempty"`
131	NumReplica   int                `json:"numReplica"`
132	IndexName    string             `json:"indexName"`
133	ReplicaId    int                `json:"replicaId"`
134	Stale        bool               `json:"stale"`
135	LastScanTime string             `json:"lastScanTime,omitempty"`
136}
137
138type indexStatusSorter []IndexStatus
139
140//
141// Response
142//
143
144const (
145	RESP_SUCCESS string = "success"
146	RESP_ERROR   string = "error"
147)
148
149//
150// Internal data structure
151//
152
153type requestHandlerContext struct {
154	initializer sync.Once
155	finalizer   sync.Once
156	mgr         *IndexManager
157	clusterUrl  string
158
159	metaDir    string
160	statsDir   string
161	metaCh     chan map[string]*LocalIndexMetadata
162	statsCh    chan map[string]*common.Statistics
163	metaCache  map[string]*LocalIndexMetadata
164	statsCache map[string]*common.Statistics
165
166	mutex  sync.RWMutex
167	doneCh chan bool
168}
169
170var handlerContext requestHandlerContext
171
172///////////////////////////////////////////////////////
173// Registration
174///////////////////////////////////////////////////////
175
176func registerRequestHandler(mgr *IndexManager, clusterUrl string, mux *http.ServeMux, config common.Config) {
177
178	handlerContext.initializer.Do(func() {
179		defer func() {
180			if r := recover(); r != nil {
181				logging.Warnf("error encountered when registering http createIndex handler : %v.  Ignored.\n", r)
182			}
183		}()
184
185		mux.HandleFunc("/createIndex", handlerContext.createIndexRequest)
186		mux.HandleFunc("/createIndexRebalance", handlerContext.createIndexRequestRebalance)
187		mux.HandleFunc("/dropIndex", handlerContext.dropIndexRequest)
188		mux.HandleFunc("/buildIndex", handlerContext.buildIndexRequest)
189		mux.HandleFunc("/getLocalIndexMetadata", handlerContext.handleLocalIndexMetadataRequest)
190		mux.HandleFunc("/getIndexMetadata", handlerContext.handleIndexMetadataRequest)
191		mux.HandleFunc("/restoreIndexMetadata", handlerContext.handleRestoreIndexMetadataRequest)
192		mux.HandleFunc("/getIndexStatus", handlerContext.handleIndexStatusRequest)
193		mux.HandleFunc("/getIndexStatement", handlerContext.handleIndexStatementRequest)
194		mux.HandleFunc("/planIndex", handlerContext.handleIndexPlanRequest)
195		mux.HandleFunc("/settings/storageMode", handlerContext.handleIndexStorageModeRequest)
196		mux.HandleFunc("/settings/planner", handlerContext.handlePlannerRequest)
197		mux.HandleFunc("/listReplicaCount", handlerContext.handleListLocalReplicaCountRequest)
198		mux.HandleFunc("/getCachedLocalIndexMetadata", handlerContext.handleCachedLocalIndexMetadataRequest)
199		mux.HandleFunc("/getCachedStats", handlerContext.handleCachedStats)
200
201		cacheDir := path.Join(config["storage_dir"].String(), "cache")
202		handlerContext.metaDir = path.Join(cacheDir, "meta")
203		handlerContext.statsDir = path.Join(cacheDir, "stats")
204
205		os.MkdirAll(handlerContext.metaDir, 0755)
206		os.MkdirAll(handlerContext.statsDir, 0755)
207
208		handlerContext.metaCh = make(chan map[string]*LocalIndexMetadata, 100)
209		handlerContext.statsCh = make(chan map[string]*common.Statistics, 100)
210		handlerContext.doneCh = make(chan bool)
211
212		handlerContext.metaCache = make(map[string]*LocalIndexMetadata)
213		handlerContext.statsCache = make(map[string]*common.Statistics)
214
215		go handlerContext.runPersistor()
216	})
217
218	handlerContext.mgr = mgr
219	handlerContext.clusterUrl = clusterUrl
220}
221
222func (m *requestHandlerContext) Close() {
223	m.finalizer.Do(func() {
224		close(m.doneCh)
225	})
226}
227
228///////////////////////////////////////////////////////
229// Create / Drop Index
230///////////////////////////////////////////////////////
231
232func (m *requestHandlerContext) createIndexRequest(w http.ResponseWriter, r *http.Request) {
233
234	m.doCreateIndex(w, r, false)
235
236}
237
238func (m *requestHandlerContext) createIndexRequestRebalance(w http.ResponseWriter, r *http.Request) {
239
240	m.doCreateIndex(w, r, true)
241
242}
243
244func (m *requestHandlerContext) doCreateIndex(w http.ResponseWriter, r *http.Request, isRebalReq bool) {
245
246	creds, ok := doAuth(r, w)
247	if !ok {
248		return
249	}
250
251	// convert request
252	request := m.convertIndexRequest(r)
253	if request == nil {
254		sendIndexResponseWithError(http.StatusBadRequest, w, "Unable to convert request for create index")
255		return
256	}
257
258	permission := fmt.Sprintf("cluster.bucket[%s].n1ql.index!create", request.Index.Bucket)
259	if !isAllowed(creds, []string{permission}, w) {
260		return
261	}
262
263	indexDefn := request.Index
264
265	if indexDefn.DefnId == 0 {
266		defnId, err := common.NewIndexDefnId()
267		if err != nil {
268			sendIndexResponseWithError(http.StatusInternalServerError, w, fmt.Sprintf("Fail to generate index definition id %v", err))
269			return
270		}
271		indexDefn.DefnId = defnId
272	}
273
274	if len(indexDefn.Using) != 0 && strings.ToLower(string(indexDefn.Using)) != "gsi" {
275		if common.IndexTypeToStorageMode(indexDefn.Using) != common.GetStorageMode() {
276			sendIndexResponseWithError(http.StatusInternalServerError, w, fmt.Sprintf("Storage Mode Mismatch %v", indexDefn.Using))
277			return
278		}
279	}
280
281	// call the index manager to handle the DDL
282	logging.Debugf("RequestHandler::createIndexRequest: invoke IndexManager for create index bucket %s name %s",
283		indexDefn.Bucket, indexDefn.Name)
284
285	if err := m.mgr.HandleCreateIndexDDL(&indexDefn, isRebalReq); err == nil {
286		// No error, return success
287		sendIndexResponse(w)
288	} else {
289		// report failure
290		sendIndexResponseWithError(http.StatusInternalServerError, w, fmt.Sprintf("%v", err))
291	}
292
293}
294
295func (m *requestHandlerContext) dropIndexRequest(w http.ResponseWriter, r *http.Request) {
296
297	creds, ok := doAuth(r, w)
298	if !ok {
299		return
300	}
301
302	// convert request
303	request := m.convertIndexRequest(r)
304	if request == nil {
305		sendIndexResponseWithError(http.StatusBadRequest, w, "Unable to convert request for drop index")
306		return
307	}
308
309	permission := fmt.Sprintf("cluster.bucket[%s].n1ql.index!drop", request.Index.Bucket)
310	if !isAllowed(creds, []string{permission}, w) {
311		return
312	}
313
314	// call the index manager to handle the DDL
315	indexDefn := request.Index
316
317	if indexDefn.RealInstId == 0 {
318		if err := m.mgr.HandleDeleteIndexDDL(indexDefn.DefnId); err == nil {
319			// No error, return success
320			sendIndexResponse(w)
321		} else {
322			// report failure
323			sendIndexResponseWithError(http.StatusInternalServerError, w, fmt.Sprintf("%v", err))
324		}
325	} else if indexDefn.InstId != 0 {
326		if err := m.mgr.DropOrPruneInstance(indexDefn, true); err == nil {
327			// No error, return success
328			sendIndexResponse(w)
329		} else {
330			// report failure
331			sendIndexResponseWithError(http.StatusInternalServerError, w, fmt.Sprintf("%v", err))
332		}
333	} else {
334		// report failure
335		sendIndexResponseWithError(http.StatusInternalServerError, w, fmt.Sprintf("Missing index inst id for defn %v", indexDefn.DefnId))
336	}
337}
338
339func (m *requestHandlerContext) buildIndexRequest(w http.ResponseWriter, r *http.Request) {
340
341	creds, ok := doAuth(r, w)
342	if !ok {
343		return
344	}
345
346	// convert request
347	request := m.convertIndexRequest(r)
348	if request == nil {
349		sendIndexResponseWithError(http.StatusBadRequest, w, "Unable to convert request for build index")
350		return
351	}
352
353	permission := fmt.Sprintf("cluster.bucket[%s].n1ql.index!build", request.Index.Bucket)
354	if !isAllowed(creds, []string{permission}, w) {
355		return
356	}
357
358	// call the index manager to handle the DDL
359	indexIds := request.IndexIds
360	if err := m.mgr.HandleBuildIndexDDL(indexIds); err == nil {
361		// No error, return success
362		sendIndexResponse(w)
363	} else {
364		// report failure
365		sendIndexResponseWithError(http.StatusInternalServerError, w, fmt.Sprintf("%v", err))
366	}
367}
368
369func (m *requestHandlerContext) convertIndexRequest(r *http.Request) *IndexRequest {
370
371	req := &IndexRequest{}
372
373	buf := new(bytes.Buffer)
374	if _, err := buf.ReadFrom(r.Body); err != nil {
375		logging.Debugf("RequestHandler::convertIndexRequest: unable to read request body, err %v", err)
376		return nil
377	}
378
379	if err := json.Unmarshal(buf.Bytes(), req); err != nil {
380		logging.Debugf("RequestHandler::convertIndexRequest: unable to unmarshall request body. Buf = %s, err %v", logging.TagStrUD(buf), err)
381		return nil
382	}
383
384	return req
385}
386
387//////////////////////////////////////////////////////
388// Index Status
389///////////////////////////////////////////////////////
390
391func (m *requestHandlerContext) handleIndexStatusRequest(w http.ResponseWriter, r *http.Request) {
392
393	creds, ok := doAuth(r, w)
394	if !ok {
395		return
396	}
397
398	bucket := m.getBucket(r)
399
400	getAll := false
401	val := r.FormValue("getAll")
402	if len(val) != 0 && val == "true" {
403		getAll = true
404	}
405
406	list, failedNodes, err := m.getIndexStatus(creds, bucket, getAll)
407	if err == nil && len(failedNodes) == 0 {
408		sort.Sort(indexStatusSorter(list))
409		resp := &IndexStatusResponse{Code: RESP_SUCCESS, Status: list}
410		send(http.StatusOK, w, resp)
411	} else {
412		logging.Debugf("RequestHandler::handleIndexStatusRequest: failed nodes %v", failedNodes)
413		sort.Sort(indexStatusSorter(list))
414		resp := &IndexStatusResponse{Code: RESP_ERROR, Error: "Fail to retrieve cluster-wide metadata from index service",
415			Status: list, FailedNodes: failedNodes}
416		send(http.StatusInternalServerError, w, resp)
417	}
418}
419
420func (m *requestHandlerContext) getBucket(r *http.Request) string {
421
422	return r.FormValue("bucket")
423}
424
425func (m *requestHandlerContext) getIndexStatus(creds cbauth.Creds, bucket string, getAll bool) ([]IndexStatus, []string, error) {
426
427	var cinfo *common.ClusterInfoCache
428	cinfo = m.mgr.cinfoClient.GetClusterInfoCache()
429
430	if cinfo == nil {
431		return nil, nil, errors.New("ClusterInfoCache unavailable in IndexManager")
432	}
433
434	cinfo.RLock()
435	defer cinfo.RUnlock()
436
437	// find all nodes that has a index http service
438	nids := cinfo.GetNodesByServiceType(common.INDEX_HTTP_SERVICE)
439
440	numReplicas := make(map[common.IndexDefnId]common.Counter)
441	defns := make(map[common.IndexDefnId]common.IndexDefn)
442	list := make([]IndexStatus, 0)
443	failedNodes := make([]string, 0)
444	metaToCache := make(map[string]*LocalIndexMetadata)
445	statsToCache := make(map[string]*common.Statistics)
446
447	defnToHostMap := make(map[common.IndexDefnId][]string)
448	isInstanceDeferred := make(map[common.IndexInstId]bool)
449
450	mergeCounter := func(defnId common.IndexDefnId, counter common.Counter) {
451		if current, ok := numReplicas[defnId]; ok {
452			newValue, merged, err := current.MergeWith(counter)
453			if err != nil {
454				logging.Errorf("Fail to merge replica count. Error: %v", err)
455				return
456			}
457
458			if merged {
459				numReplicas[defnId] = newValue
460			}
461
462			return
463		}
464
465		if counter.IsValid() {
466			numReplicas[defnId] = counter
467		}
468	}
469
470	addHost := func(defnId common.IndexDefnId, hostAddr string) {
471		if hostList, ok := defnToHostMap[defnId]; ok {
472			for _, host := range hostList {
473				if strings.Compare(hostAddr, host) == 0 {
474					return
475				}
476			}
477		}
478		defnToHostMap[defnId] = append(defnToHostMap[defnId], hostAddr)
479	}
480
481	for _, nid := range nids {
482
483		mgmtAddr, err := cinfo.GetServiceAddress(nid, "mgmt")
484		if err != nil {
485			logging.Errorf("RequestHandler::getIndexStatus: Error from GetServiceAddress (mgmt) for node id %v. Error = %v", nid, err)
486			continue
487		}
488
489		addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE)
490		if err == nil {
491
492			u, err := security.GetURL(addr)
493			if err != nil {
494				logging.Debugf("RequestHandler::getIndexStatus: Fail to parse URL %v", addr)
495				failedNodes = append(failedNodes, mgmtAddr)
496				continue
497			}
498
499			stale := false
500			metaToCache[u.Host] = nil
501			localMeta, latest, err := m.getLocalMetadataForNode(addr, u.Host, cinfo)
502			if localMeta == nil || err != nil {
503				logging.Debugf("RequestHandler::getIndexStatus: Error while retrieving %v with auth %v", addr+"/getLocalIndexMetadata", err)
504				failedNodes = append(failedNodes, mgmtAddr)
505				continue
506			}
507
508			if !latest {
509				stale = true
510			} else {
511				metaToCache[u.Host] = localMeta
512			}
513
514			statsToCache[u.Host] = nil
515			stats, latest, err := m.getStatsForNode(addr, u.Host, cinfo)
516			if stats == nil || err != nil {
517				logging.Debugf("RequestHandler::getIndexStatus: Error while retrieving %v with auth %v", addr+"/stats?async=true", err)
518				failedNodes = append(failedNodes, mgmtAddr)
519				continue
520			}
521
522			if !latest {
523				stale = true
524			} else {
525				statsToCache[u.Host] = stats
526			}
527
528			for _, defn := range localMeta.IndexDefinitions {
529
530				if len(bucket) != 0 && bucket != defn.Bucket {
531					continue
532				}
533
534				permission := fmt.Sprintf("cluster.bucket[%s].n1ql.index!list", defn.Bucket)
535				if !isAllowed(creds, []string{permission}, nil) {
536					continue
537				}
538
539				mergeCounter(defn.DefnId, defn.NumReplica2)
540
541				if topology := findTopologyByBucket(localMeta.IndexTopologies, defn.Bucket); topology != nil {
542
543					instances := topology.GetIndexInstancesByDefn(defn.DefnId)
544					for _, instance := range instances {
545
546						state, errStr := topology.GetStatusByInst(defn.DefnId, common.IndexInstId(instance.InstId))
547
548						if state != common.INDEX_STATE_CREATED &&
549							state != common.INDEX_STATE_DELETED &&
550							state != common.INDEX_STATE_NIL {
551
552							stateStr := "Not Available"
553							switch state {
554							case common.INDEX_STATE_READY:
555								stateStr = "Created"
556							case common.INDEX_STATE_INITIAL:
557								stateStr = "Building"
558							case common.INDEX_STATE_CATCHUP:
559								stateStr = "Building"
560							case common.INDEX_STATE_ACTIVE:
561								stateStr = "Ready"
562							}
563
564							if instance.RState == uint32(common.REBAL_PENDING) && state != common.INDEX_STATE_READY {
565								stateStr = "Replicating"
566							}
567
568							if state == common.INDEX_STATE_INITIAL || state == common.INDEX_STATE_CATCHUP {
569								if len(instance.OldStorageMode) != 0 {
570
571									if instance.OldStorageMode == common.ForestDB && instance.StorageMode == common.PlasmaDB {
572										stateStr = "Building (Upgrading)"
573									}
574
575									if instance.StorageMode == common.ForestDB && instance.OldStorageMode == common.PlasmaDB {
576										stateStr = "Building (Downgrading)"
577									}
578								}
579							}
580
581							if state == common.INDEX_STATE_READY {
582								if len(instance.OldStorageMode) != 0 {
583
584									if instance.OldStorageMode == common.ForestDB && instance.StorageMode == common.PlasmaDB {
585										stateStr = "Created (Upgrading)"
586									}
587
588									if instance.StorageMode == common.ForestDB && instance.OldStorageMode == common.PlasmaDB {
589										stateStr = "Created (Downgrading)"
590									}
591								}
592							}
593
594							if indexerState, ok := stats.ToMap()["indexer_state"]; ok {
595								if indexerState == "Paused" {
596									stateStr = "Paused"
597								} else if indexerState == "Bootstrap" || indexerState == "Warmup" {
598									stateStr = "Warmup"
599								}
600							}
601
602							if len(errStr) != 0 {
603								stateStr = "Error"
604							}
605
606							name := common.FormatIndexInstDisplayName(defn.Name, int(instance.ReplicaId))
607
608							completion := int(0)
609							key := fmt.Sprintf("%v:%v:build_progress", defn.Bucket, name)
610							if progress, ok := stats.ToMap()[key]; ok {
611								completion = int(progress.(float64))
612							}
613
614							progress := float64(0)
615							key = fmt.Sprintf("%v:completion_progress", instance.InstId)
616							if stat, ok := stats.ToMap()[key]; ok {
617								progress = math.Float64frombits(uint64(stat.(float64)))
618							}
619
620							lastScanTime := "NA"
621							key = fmt.Sprintf("%v:%v:last_known_scan_time", defn.Bucket, name)
622							if scanTime, ok := stats.ToMap()[key]; ok {
623								nsecs := int64(scanTime.(float64))
624								if nsecs != 0 {
625									lastScanTime = time.Unix(0, nsecs).Format(time.UnixDate)
626								}
627							}
628
629							partitionMap := make(map[string][]int)
630							for _, partnDef := range instance.Partitions {
631								partitionMap[mgmtAddr] = append(partitionMap[mgmtAddr], int(partnDef.PartId))
632							}
633
634							addHost(defn.DefnId, mgmtAddr)
635							isInstanceDeferred[common.IndexInstId(instance.InstId)] = defn.Deferred
636							defn.NumPartitions = instance.NumPartitions
637
638							status := IndexStatus{
639								DefnId:       defn.DefnId,
640								InstId:       common.IndexInstId(instance.InstId),
641								Name:         name,
642								Bucket:       defn.Bucket,
643								IsPrimary:    defn.IsPrimary,
644								SecExprs:     defn.SecExprs,
645								WhereExpr:    defn.WhereExpr,
646								IndexType:    string(defn.Using),
647								Status:       stateStr,
648								Error:        errStr,
649								Hosts:        []string{mgmtAddr},
650								Definition:   common.IndexStatement(defn, int(instance.NumPartitions), -1, true),
651								Completion:   completion,
652								Progress:     progress,
653								Scheduled:    instance.Scheduled,
654								Partitioned:  common.IsPartitioned(defn.PartitionScheme),
655								NumPartition: len(instance.Partitions),
656								PartitionMap: partitionMap,
657								NodeUUID:     localMeta.NodeUUID,
658								NumReplica:   int(defn.GetNumReplica()),
659								IndexName:    defn.Name,
660								ReplicaId:    int(instance.ReplicaId),
661								Stale:        stale,
662								LastScanTime: lastScanTime,
663							}
664
665							list = append(list, status)
666						}
667					}
668				}
669				defns[defn.DefnId] = defn
670			}
671		} else {
672			logging.Debugf("RequestHandler::getIndexStatus: Error from GetServiceAddress (indexHttp) for node id %v. Error = %v", nid, err)
673			failedNodes = append(failedNodes, mgmtAddr)
674			continue
675		}
676	}
677
678	//Fix replica count
679	for i, index := range list {
680		if counter, ok := numReplicas[index.DefnId]; ok {
681			numReplica, exist := counter.Value()
682			if exist {
683				list[i].NumReplica = int(numReplica)
684			}
685		}
686	}
687
688	// Fix index definition so that the "nodes" field inside
689	// "with" clause show the current set of nodes on which
690	// the index resides.
691	//
692	// If the index resides on different nodes, the "nodes" clause
693	// is populated on UI irrespective of whether the index is
694	// explicitly defined with "nodes" clause or not
695	//
696	// If the index resides only on one node, the "nodes" clause is
697	// populated on UI only if the index definition is explicitly
698	// defined with "nodes" clause
699	for i, index := range list {
700		defnId := index.DefnId
701		defn := defns[defnId]
702		if len(defnToHostMap[defnId]) > 1 || defn.Nodes != nil {
703			defn.Nodes = defnToHostMap[defnId]
704			// The deferred field will be set to true by default for a rebalanced index
705			// For the non-rebalanced index, it can either be true or false depending on
706			// how it was created
707			defn.Deferred = isInstanceDeferred[index.InstId]
708			list[i].Definition = common.IndexStatement(defn, int(defn.NumPartitions), index.NumReplica, true)
709		}
710	}
711
712	if !getAll {
713		list = m.consolideIndexStatus(list)
714	}
715
716	// persist local meta and stats to disk cache
717	m.metaCh <- metaToCache
718	m.statsCh <- statsToCache
719
720	return list, failedNodes, nil
721}
722
723func (m *requestHandlerContext) consolideIndexStatus(statuses []IndexStatus) []IndexStatus {
724
725	statusMap := make(map[common.IndexInstId]IndexStatus)
726
727	for _, status := range statuses {
728		if s2, ok := statusMap[status.InstId]; !ok {
729			status.NodeUUID = ""
730			statusMap[status.InstId] = status
731		} else {
732			s2.Status = m.consolideStateStr(s2.Status, status.Status)
733			s2.Hosts = append(s2.Hosts, status.Hosts...)
734			s2.Completion = (s2.Completion + status.Completion) / 2
735			s2.Progress = (s2.Progress + status.Progress) / 2.0
736			s2.NumPartition += status.NumPartition
737			s2.NodeUUID = ""
738			if len(status.Error) != 0 {
739				s2.Error = fmt.Sprintf("%v %v", s2.Error, status.Error)
740			}
741
742			for host, partitions := range status.PartitionMap {
743				s2.PartitionMap[host] = partitions
744			}
745			s2.Stale = s2.Stale || status.Stale
746
747			statusMap[status.InstId] = s2
748		}
749	}
750
751	result := make([]IndexStatus, 0, len(statuses))
752	for _, status := range statusMap {
753		result = append(result, status)
754	}
755
756	return result
757}
758
759func (m *requestHandlerContext) consolideStateStr(str1 string, str2 string) string {
760
761	if str1 == "Paused" || str2 == "Paused" {
762		return "Paused"
763	}
764
765	if str1 == "Warmup" || str2 == "Warmup" {
766		return "Warmup"
767	}
768
769	if strings.HasPrefix(str1, "Created") || strings.HasPrefix(str2, "Created") {
770		if str1 == str2 {
771			return str1
772		}
773		return "Created"
774	}
775
776	if strings.HasPrefix(str1, "Building") || strings.HasPrefix(str2, "Building") {
777		if str1 == str2 {
778			return str1
779		}
780		return "Building"
781	}
782
783	if str1 == "Replicating" || str2 == "Replicating" {
784		return "Replicating"
785	}
786
787	// must be ready
788	return str1
789}
790
791//////////////////////////////////////////////////////
792// Index Statement
793///////////////////////////////////////////////////////
794
795func (m *requestHandlerContext) handleIndexStatementRequest(w http.ResponseWriter, r *http.Request) {
796
797	creds, ok := doAuth(r, w)
798	if !ok {
799		return
800	}
801
802	bucket := m.getBucket(r)
803
804	list, err := m.getIndexStatement(creds, bucket)
805	if err == nil {
806		sort.Strings(list)
807		send(http.StatusOK, w, list)
808	} else {
809		send(http.StatusInternalServerError, w, err.Error())
810	}
811}
812
813func (m *requestHandlerContext) getIndexStatement(creds cbauth.Creds, bucket string) ([]string, error) {
814
815	indexes, failedNodes, err := m.getIndexStatus(creds, bucket, false)
816	if err != nil {
817		return nil, err
818	}
819	if len(failedNodes) != 0 {
820		return nil, errors.New(fmt.Sprintf("Failed to connect to indexer nodes %v", failedNodes))
821	}
822
823	defnMap := make(map[common.IndexDefnId]bool)
824	statements := ([]string)(nil)
825	for _, index := range indexes {
826		if _, ok := defnMap[index.DefnId]; !ok {
827			defnMap[index.DefnId] = true
828			statements = append(statements, index.Definition)
829		}
830	}
831
832	return statements, nil
833}
834
835///////////////////////////////////////////////////////
836// ClusterIndexMetadata
837///////////////////////////////////////////////////////
838
839func (m *requestHandlerContext) handleIndexMetadataRequest(w http.ResponseWriter, r *http.Request) {
840
841	creds, ok := doAuth(r, w)
842	if !ok {
843		return
844	}
845
846	bucket := m.getBucket(r)
847
848	meta, err := m.getIndexMetadata(creds, bucket)
849	if err == nil {
850		resp := &BackupResponse{Code: RESP_SUCCESS, Result: *meta}
851		send(http.StatusOK, w, resp)
852	} else {
853		logging.Debugf("RequestHandler::handleIndexMetadataRequest: err %v", err)
854		resp := &BackupResponse{Code: RESP_ERROR, Error: err.Error()}
855		send(http.StatusInternalServerError, w, resp)
856	}
857}
858
859func (m *requestHandlerContext) getIndexMetadata(creds cbauth.Creds, bucket string) (*ClusterIndexMetadata, error) {
860
861	cinfo, err := m.mgr.FetchNewClusterInfoCache()
862	if err != nil {
863		return nil, err
864	}
865
866	// find all nodes that has a index http service
867	nids := cinfo.GetNodesByServiceType(common.INDEX_HTTP_SERVICE)
868
869	clusterMeta := &ClusterIndexMetadata{Metadata: make([]LocalIndexMetadata, len(nids))}
870
871	for i, nid := range nids {
872
873		addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE)
874		if err == nil {
875
876			url := "/getLocalIndexMetadata"
877			if len(bucket) != 0 {
878				url += "?bucket=" + bucket
879			}
880
881			resp, err := getWithAuth(addr + url)
882			if err != nil {
883				logging.Debugf("RequestHandler::getIndexMetadata: Error while retrieving %v with auth %v", addr+"/getLocalIndexMetadata", err)
884				return nil, errors.New(fmt.Sprintf("Fail to retrieve index definition from url %s", addr))
885			}
886			defer resp.Body.Close()
887
888			localMeta := new(LocalIndexMetadata)
889			status := convertResponse(resp, localMeta)
890			if status == RESP_ERROR {
891				return nil, errors.New(fmt.Sprintf("Fail to retrieve local metadata from url %s.", addr))
892			}
893
894			newLocalMeta := LocalIndexMetadata{
895				IndexerId:   localMeta.IndexerId,
896				NodeUUID:    localMeta.NodeUUID,
897				StorageMode: localMeta.StorageMode,
898			}
899
900			for _, topology := range localMeta.IndexTopologies {
901				permission := fmt.Sprintf("cluster.bucket[%s].n1ql.index!list", topology.Bucket)
902				if isAllowed(creds, []string{permission}, nil) {
903					newLocalMeta.IndexTopologies = append(newLocalMeta.IndexTopologies, topology)
904				}
905			}
906
907			for _, defn := range localMeta.IndexDefinitions {
908				permission := fmt.Sprintf("cluster.bucket[%s].n1ql.index!list", defn.Bucket)
909				if isAllowed(creds, []string{permission}, nil) {
910					newLocalMeta.IndexDefinitions = append(newLocalMeta.IndexDefinitions, defn)
911				}
912			}
913
914			clusterMeta.Metadata[i] = newLocalMeta
915
916		} else {
917			return nil, errors.New(fmt.Sprintf("Fail to retrieve http endpoint for index node"))
918		}
919	}
920
921	return clusterMeta, nil
922}
923
924func (m *requestHandlerContext) convertIndexMetadataRequest(r *http.Request) *ClusterIndexMetadata {
925	var check map[string]interface{}
926
927	meta := &ClusterIndexMetadata{}
928
929	buf := new(bytes.Buffer)
930	if _, err := buf.ReadFrom(r.Body); err != nil {
931		logging.Debugf("RequestHandler::convertIndexRequest: unable to read request body, err %v", err)
932		return nil
933	}
934
935	logging.Debugf("requestHandler.convertIndexMetadataRequest(): input %v", string(buf.Bytes()))
936
937	if err := json.Unmarshal(buf.Bytes(), &check); err != nil {
938		logging.Debugf("RequestHandler::convertIndexMetadataRequest: unable to unmarshall request body. Buf = %s, err %v", buf, err)
939		return nil
940	} else if _, ok := check["metadata"]; !ok {
941		logging.Debugf("RequestHandler::convertIndexMetadataRequest: invalid shape of request body. Buf = %s, err %v", buf, err)
942		return nil
943	}
944
945	if err := json.Unmarshal(buf.Bytes(), meta); err != nil {
946		logging.Debugf("RequestHandler::convertIndexMetadataRequest: unable to unmarshall request body. Buf = %s, err %v", buf, err)
947		return nil
948	}
949
950	return meta
951}
952
953///////////////////////////////////////////////////////
954// LocalIndexMetadata
955///////////////////////////////////////////////////////
956
957func (m *requestHandlerContext) handleLocalIndexMetadataRequest(w http.ResponseWriter, r *http.Request) {
958
959	creds, ok := doAuth(r, w)
960	if !ok {
961		return
962	}
963
964	bucket := m.getBucket(r)
965
966	meta, err := m.getLocalIndexMetadata(creds, bucket)
967	if err == nil {
968		send(http.StatusOK, w, meta)
969	} else {
970		logging.Debugf("RequestHandler::handleLocalIndexMetadataRequest: err %v", err)
971		sendHttpError(w, " Unable to retrieve index metadata", http.StatusInternalServerError)
972	}
973}
974
975func (m *requestHandlerContext) getLocalIndexMetadata(creds cbauth.Creds, bucket string) (meta *LocalIndexMetadata, err error) {
976
977	repo := m.mgr.getMetadataRepo()
978
979	meta = &LocalIndexMetadata{IndexTopologies: nil, IndexDefinitions: nil}
980	indexerId, err := repo.GetLocalIndexerId()
981	if err != nil {
982		return nil, err
983	}
984	meta.IndexerId = string(indexerId)
985
986	nodeUUID, err := repo.GetLocalNodeUUID()
987	if err != nil {
988		return nil, err
989	}
990	meta.NodeUUID = string(nodeUUID)
991
992	meta.StorageMode = string(common.StorageModeToIndexType(common.GetStorageMode()))
993	meta.LocalSettings = make(map[string]string)
994
995	meta.Timestamp = time.Now().UnixNano()
996
997	if exclude, err := m.mgr.GetLocalValue("excludeNode"); err == nil {
998		meta.LocalSettings["excludeNode"] = exclude
999	}
1000
1001	iter, err := repo.NewIterator()
1002	if err != nil {
1003		return nil, err
1004	}
1005	defer iter.Close()
1006
1007	var defn *common.IndexDefn
1008	_, defn, err = iter.Next()
1009	for err == nil {
1010		if len(bucket) == 0 || bucket == defn.Bucket {
1011			permission := fmt.Sprintf("cluster.bucket[%s].n1ql.index!list", defn.Bucket)
1012			if isAllowed(creds, []string{permission}, nil) {
1013				meta.IndexDefinitions = append(meta.IndexDefinitions, *defn)
1014			}
1015		}
1016		_, defn, err = iter.Next()
1017	}
1018
1019	iter1, err := repo.NewTopologyIterator()
1020	if err != nil {
1021		return nil, err
1022	}
1023	defer iter1.Close()
1024
1025	var topology *IndexTopology
1026	topology, err = iter1.Next()
1027	for err == nil {
1028		if len(bucket) == 0 || bucket == topology.Bucket {
1029			permission := fmt.Sprintf("cluster.bucket[%s].n1ql.index!list", topology.Bucket)
1030			if isAllowed(creds, []string{permission}, nil) {
1031				meta.IndexTopologies = append(meta.IndexTopologies, *topology)
1032			}
1033		}
1034		topology, err = iter1.Next()
1035	}
1036
1037	return meta, nil
1038}
1039
1040///////////////////////////////////////////////////////
1041// Cached LocalIndexMetadata and Stats
1042///////////////////////////////////////////////////////
1043
1044func (m *requestHandlerContext) handleCachedLocalIndexMetadataRequest(w http.ResponseWriter, r *http.Request) {
1045
1046	creds, ok := doAuth(r, w)
1047	if !ok {
1048		return
1049	}
1050
1051	host := r.FormValue("host")
1052	host = strings.Trim(host, "\"")
1053
1054	meta, err := m.getLocalMetadataFromDisk(host)
1055	if meta != nil && err == nil {
1056		newMeta := *meta
1057		newMeta.IndexDefinitions = make([]common.IndexDefn, 0, len(meta.IndexDefinitions))
1058		newMeta.IndexTopologies = make([]IndexTopology, 0, len(meta.IndexTopologies))
1059
1060		for _, defn := range meta.IndexDefinitions {
1061			permission := fmt.Sprintf("cluster.bucket[%s].n1ql.index!list", defn.Bucket)
1062			if isAllowed(creds, []string{permission}, nil) {
1063				newMeta.IndexDefinitions = append(newMeta.IndexDefinitions, defn)
1064			}
1065		}
1066
1067		for _, topology := range meta.IndexTopologies {
1068			permission := fmt.Sprintf("cluster.bucket[%s].n1ql.index!list", topology.Bucket)
1069			if isAllowed(creds, []string{permission}, nil) {
1070				newMeta.IndexTopologies = append(newMeta.IndexTopologies, topology)
1071			}
1072		}
1073
1074		send(http.StatusOK, w, newMeta)
1075
1076	} else {
1077		logging.Debugf("RequestHandler::handleCachedLocalIndexMetadataRequest: err %v", err)
1078		sendHttpError(w, " Unable to retrieve index metadata", http.StatusInternalServerError)
1079	}
1080}
1081
1082func (m *requestHandlerContext) handleCachedStats(w http.ResponseWriter, r *http.Request) {
1083
1084	_, ok := doAuth(r, w)
1085	if !ok {
1086		return
1087	}
1088
1089	host := r.FormValue("host")
1090	host = strings.Trim(host, "\"")
1091
1092	stats, err := m.getIndexStatsFromDisk(host)
1093	if stats != nil && err == nil {
1094		send(http.StatusOK, w, stats)
1095	} else {
1096		logging.Debugf("RequestHandler::handleCachedLocalIndexMetadataRequest: err %v", err)
1097		sendHttpError(w, " Unable to retrieve index metadata", http.StatusInternalServerError)
1098	}
1099}
1100
1101///////////////////////////////////////////////////////
1102// Restore
1103///////////////////////////////////////////////////////
1104
1105//
1106// Restore semantic:
1107// 1) Each index is associated with the <IndexDefnId, IndexerId>.  IndexDefnId is unique for each index defnition,
1108//    and IndexerId is unique among the index nodes.  Note that IndexDefnId cannot be reused.
1109// 2) Index defn exists for the given <IndexDefnId, IndexerId> in current repository.  No action will be applied during restore.
1110// 3) Index defn is deleted or missing in current repository.  Index Defn restored from backup if bucket exists.
1111//    - Index defn of the same <bucket, name> exists.   It will rename the index to <index name>_restore_<seqNo>
1112//    - Bucket does not exist.   It will restore an index defn with a non-existent bucket.
1113//
1114func (m *requestHandlerContext) handleRestoreIndexMetadataRequest(w http.ResponseWriter, r *http.Request) {
1115
1116	creds, ok := doAuth(r, w)
1117	if !ok {
1118		return
1119	}
1120
1121	// convert backup image into runtime data structure
1122	image := m.convertIndexMetadataRequest(r)
1123	if image == nil {
1124		send(http.StatusBadRequest, w, &RestoreResponse{Code: RESP_ERROR, Error: "Unable to process request input"})
1125		return
1126	}
1127
1128	for _, localMeta := range image.Metadata {
1129		for _, topology := range localMeta.IndexTopologies {
1130			permission := fmt.Sprintf("cluster.bucket[%s].n1ql.index!create", topology.Bucket)
1131			if !isAllowed(creds, []string{permission}, w) {
1132				return
1133			}
1134		}
1135
1136		for _, defn := range localMeta.IndexDefinitions {
1137			permission := fmt.Sprintf("cluster.bucket[%s].n1ql.index!create", defn.Bucket)
1138			if !isAllowed(creds, []string{permission}, w) {
1139				return
1140			}
1141		}
1142	}
1143
1144	// Restore
1145	bucket := m.getBucket(r)
1146	logging.Infof("restore to target bucket %v", bucket)
1147
1148	context := createRestoreContext(image, m.clusterUrl, bucket)
1149	hostIndexMap, err := context.computeIndexLayout()
1150	if err != nil {
1151		send(http.StatusInternalServerError, w, &RestoreResponse{Code: RESP_ERROR, Error: fmt.Sprintf("Unable to restore metadata.  Error=%v", err)})
1152	}
1153
1154	for host, indexes := range hostIndexMap {
1155		for _, index := range indexes {
1156			if !m.makeCreateIndexRequest(*index, host) {
1157				send(http.StatusInternalServerError, w, &RestoreResponse{Code: RESP_ERROR, Error: "Unable to restore metadata."})
1158			}
1159		}
1160	}
1161
1162	send(http.StatusOK, w, &RestoreResponse{Code: RESP_SUCCESS})
1163}
1164
1165func (m *requestHandlerContext) makeCreateIndexRequest(defn common.IndexDefn, host string) bool {
1166
1167	// deferred build for restore
1168	defn.Deferred = true
1169
1170	req := IndexRequest{Version: uint64(1), Type: CREATE, Index: defn}
1171	body, err := json.Marshal(&req)
1172	if err != nil {
1173		logging.Errorf("requestHandler.makeCreateIndexRequest(): cannot marshall create index request %v", err)
1174		return false
1175	}
1176
1177	bodybuf := bytes.NewBuffer(body)
1178
1179	resp, err := postWithAuth(host+"/createIndex", "application/json", bodybuf)
1180	if err != nil {
1181		logging.Errorf("requestHandler.makeCreateIndexRequest(): create index request fails for %v/createIndex. Error=%v", host, err)
1182		return false
1183	}
1184	defer resp.Body.Close()
1185
1186	response := new(IndexResponse)
1187	status := convertResponse(resp, response)
1188	if status == RESP_ERROR || response.Code == RESP_ERROR {
1189		logging.Errorf("requestHandler.makeCreateIndexRequest(): create index request fails. Error=%v", response.Error)
1190		return false
1191	}
1192
1193	return true
1194}
1195
1196//////////////////////////////////////////////////////
1197// Planner
1198///////////////////////////////////////////////////////
1199
1200func (m *requestHandlerContext) handleIndexPlanRequest(w http.ResponseWriter, r *http.Request) {
1201
1202	_, ok := doAuth(r, w)
1203	if !ok {
1204		return
1205	}
1206
1207	stmts, err := m.getIndexPlan(r)
1208
1209	if err == nil {
1210		send(http.StatusOK, w, stmts)
1211	} else {
1212		sendHttpError(w, err.Error(), http.StatusInternalServerError)
1213	}
1214}
1215
1216func (m *requestHandlerContext) getIndexPlan(r *http.Request) (string, error) {
1217
1218	plan, err := planner.RetrievePlanFromCluster(m.clusterUrl, nil)
1219	if err != nil {
1220		return "", errors.New(fmt.Sprintf("Fail to retreive index information from cluster.   Error=%v", err))
1221	}
1222
1223	specs, err := m.convertIndexPlanRequest(r)
1224	if err != nil {
1225		return "", errors.New(fmt.Sprintf("Fail to read index spec from request.   Error=%v", err))
1226	}
1227
1228	solution, err := planner.ExecutePlanWithOptions(plan, specs, true, "", "", 0, -1, -1, false, true)
1229	if err != nil {
1230		return "", errors.New(fmt.Sprintf("Fail to plan index.   Error=%v", err))
1231	}
1232
1233	return planner.CreateIndexDDL(solution), nil
1234}
1235
1236func (m *requestHandlerContext) convertIndexPlanRequest(r *http.Request) ([]*planner.IndexSpec, error) {
1237
1238	var specs []*planner.IndexSpec
1239
1240	buf := new(bytes.Buffer)
1241	if _, err := buf.ReadFrom(r.Body); err != nil {
1242		logging.Debugf("RequestHandler::convertIndexPlanRequest: unable to read request body, err %v", err)
1243		return nil, err
1244	}
1245
1246	logging.Debugf("requestHandler.convertIndexPlanRequest(): input %v", string(buf.Bytes()))
1247
1248	if err := json.Unmarshal(buf.Bytes(), &specs); err != nil {
1249		logging.Debugf("RequestHandler::convertIndexPlanRequest: unable to unmarshall request body. Buf = %s, err %v", buf, err)
1250		return nil, err
1251	}
1252
1253	return specs, nil
1254}
1255
1256//////////////////////////////////////////////////////
1257// Storage Mode
1258///////////////////////////////////////////////////////
1259
1260func (m *requestHandlerContext) handleIndexStorageModeRequest(w http.ResponseWriter, r *http.Request) {
1261
1262	creds, ok := doAuth(r, w)
1263	if !ok {
1264		return
1265	}
1266
1267	if !isAllowed(creds, []string{"cluster.settings!write"}, w) {
1268		return
1269	}
1270
1271	// Override the storage mode for the local indexer.  Override will not take into effect until
1272	// indexer has restarted manually by administrator.   During indexer bootstrap, it will upgrade/downgrade
1273	// individual index to the override storage mode.
1274	value := r.FormValue("downgrade")
1275	if len(value) != 0 {
1276		downgrade, err := strconv.ParseBool(value)
1277		if err == nil {
1278			if downgrade {
1279				if common.GetStorageMode() == common.StorageMode(common.PLASMA) {
1280
1281					nodeUUID, err := m.mgr.getMetadataRepo().GetLocalNodeUUID()
1282					if err != nil {
1283						logging.Infof("RequestHandler::handleIndexStorageModeRequest: unable to identify nodeUUID.  Cannot downgrade.")
1284						send(http.StatusOK, w, "Unable to identify nodeUUID.  Cannot downgrade.")
1285						return
1286					}
1287
1288					mc.PostIndexerStorageModeOverride(string(nodeUUID), common.ForestDB)
1289					logging.Infof("RequestHandler::handleIndexStorageModeRequest: set override storage mode to forestdb")
1290					send(http.StatusOK, w, "downgrade storage mode to forestdb after indexer restart.")
1291				} else {
1292					logging.Infof("RequestHandler::handleIndexStorageModeRequest: local storage mode is not plasma.  Cannot downgrade.")
1293					send(http.StatusOK, w, "Indexer storage mode is not plasma.  Cannot downgrade.")
1294				}
1295			} else {
1296				nodeUUID, err := m.mgr.getMetadataRepo().GetLocalNodeUUID()
1297				if err != nil {
1298					logging.Infof("RequestHandler::handleIndexStorageModeRequest: unable to identify nodeUUID. Cannot disable storage mode downgrade.")
1299					send(http.StatusOK, w, "Unable to identify nodeUUID.  Cannot disable storage mode downgrade.")
1300					return
1301				}
1302
1303				mc.PostIndexerStorageModeOverride(string(nodeUUID), "")
1304				logging.Infof("RequestHandler::handleIndexStorageModeRequst: unset storage mode override")
1305				send(http.StatusOK, w, "storage mode downgrade is disabled")
1306			}
1307		} else {
1308			sendHttpError(w, err.Error(), http.StatusBadRequest)
1309		}
1310	} else {
1311		sendHttpError(w, "missing argument `override`", http.StatusBadRequest)
1312	}
1313}
1314
1315//////////////////////////////////////////////////////
1316// Planner
1317///////////////////////////////////////////////////////
1318
1319func (m *requestHandlerContext) handlePlannerRequest(w http.ResponseWriter, r *http.Request) {
1320
1321	creds, ok := doAuth(r, w)
1322	if !ok {
1323		return
1324	}
1325
1326	if !isAllowed(creds, []string{"cluster.settings!write"}, w) {
1327		return
1328	}
1329
1330	// Override the storage mode for the local indexer.  Override will not take into effect until
1331	// indexer has restarted manually by administrator.   During indexer bootstrap, it will upgrade/downgrade
1332	// individual index to the override storage mode.
1333	value := r.FormValue("excludeNode")
1334	if value == "in" || value == "out" || value == "inout" || len(value) == 0 {
1335		m.mgr.SetLocalValue("excludeNode", value)
1336		send(http.StatusOK, w, "OK")
1337	} else {
1338		sendHttpError(w, "value must be in, out or inout", http.StatusBadRequest)
1339	}
1340}
1341
1342//////////////////////////////////////////////////////
1343// Alter Index
1344///////////////////////////////////////////////////////
1345
1346func (m *requestHandlerContext) handleListLocalReplicaCountRequest(w http.ResponseWriter, r *http.Request) {
1347
1348	creds, ok := doAuth(r, w)
1349	if !ok {
1350		return
1351	}
1352
1353	result, err := m.getLocalReplicaCount(creds)
1354	if err == nil {
1355		send(http.StatusOK, w, result)
1356	} else {
1357		logging.Debugf("RequestHandler::handleListReplicaCountRequest: err %v", err)
1358		sendHttpError(w, " Unable to retrieve index metadata", http.StatusInternalServerError)
1359	}
1360}
1361
1362func (m *requestHandlerContext) getLocalReplicaCount(creds cbauth.Creds) (map[common.IndexDefnId]common.Counter, error) {
1363
1364	result := make(map[common.IndexDefnId]common.Counter)
1365
1366	repo := m.mgr.getMetadataRepo()
1367	iter, err := repo.NewIterator()
1368	if err != nil {
1369		return nil, err
1370	}
1371	defer iter.Close()
1372
1373	var defn *common.IndexDefn
1374	permissions := make(map[string]bool)
1375
1376	_, defn, err = iter.Next()
1377	for err == nil {
1378		if _, ok := permissions[defn.Bucket]; !ok {
1379			permission := fmt.Sprintf("cluster.bucket[%s].n1ql.index!list", defn.Bucket)
1380			if !isAllowed(creds, []string{permission}, nil) {
1381				return nil, fmt.Errorf("Permission denied on reading metadata for bucket %v", defn.Bucket)
1382			}
1383			permissions[defn.Bucket] = true
1384		}
1385
1386		var numReplica *common.Counter
1387		numReplica, err = GetLatestReplicaCount(defn)
1388		if err != nil {
1389			return nil, fmt.Errorf("Fail to retreive replica count.  Error: %v", err)
1390		}
1391
1392		result[defn.DefnId] = *numReplica
1393		_, defn, err = iter.Next()
1394	}
1395
1396	return result, nil
1397}
1398
1399///////////////////////////////////////////////////////
1400// Utility
1401///////////////////////////////////////////////////////
1402
1403func sendIndexResponseWithError(status int, w http.ResponseWriter, msg string) {
1404	res := &IndexResponse{Code: RESP_ERROR, Error: msg}
1405	send(status, w, res)
1406}
1407
1408func sendIndexResponse(w http.ResponseWriter) {
1409	result := &IndexResponse{Code: RESP_SUCCESS}
1410	send(http.StatusOK, w, result)
1411}
1412
1413func send(status int, w http.ResponseWriter, res interface{}) {
1414
1415	header := w.Header()
1416	header["Content-Type"] = []string{"application/json"}
1417
1418	if buf, err := json.Marshal(res); err == nil {
1419		w.WriteHeader(status)
1420		logging.Tracef("RequestHandler::sendResponse: sending response back to caller. %v", logging.TagStrUD(buf))
1421		w.Write(buf)
1422	} else {
1423		// note : buf is nil if err != nil
1424		logging.Debugf("RequestHandler::sendResponse: fail to marshall response back to caller. %s", err)
1425		sendHttpError(w, "RequestHandler::sendResponse: Unable to marshall response", http.StatusInternalServerError)
1426	}
1427}
1428
1429func sendHttpError(w http.ResponseWriter, reason string, code int) {
1430	http.Error(w, reason, code)
1431}
1432
1433func convertResponse(r *http.Response, resp interface{}) string {
1434
1435	buf := new(bytes.Buffer)
1436	if _, err := buf.ReadFrom(r.Body); err != nil {
1437		logging.Debugf("RequestHandler::convertResponse: unable to read request body, err %v", err)
1438		return RESP_ERROR
1439	}
1440
1441	if err := json.Unmarshal(buf.Bytes(), resp); err != nil {
1442		logging.Debugf("convertResponse: unable to unmarshall response body. Buf = %s, err %v", buf, err)
1443		return RESP_ERROR
1444	}
1445
1446	return RESP_SUCCESS
1447}
1448
1449func doAuth(r *http.Request, w http.ResponseWriter) (cbauth.Creds, bool) {
1450
1451	creds, valid, err := common.IsAuthValid(r)
1452	if err != nil {
1453		sendIndexResponseWithError(http.StatusInternalServerError, w, err.Error())
1454		return nil, false
1455	} else if valid == false {
1456		w.WriteHeader(401)
1457		w.Write([]byte("401 Unauthorized\n"))
1458		return nil, false
1459	}
1460
1461	return creds, true
1462}
1463
1464func isAllowed(creds cbauth.Creds, permissions []string, w http.ResponseWriter) bool {
1465
1466	allow := false
1467	err := error(nil)
1468
1469	for _, permission := range permissions {
1470		allow, err = creds.IsAllowed(permission)
1471		if allow && err == nil {
1472			break
1473		}
1474	}
1475
1476	if err != nil {
1477		if w != nil {
1478			sendIndexResponseWithError(http.StatusInternalServerError, w, err.Error())
1479		}
1480		return false
1481	}
1482
1483	if !allow {
1484		if w != nil {
1485			w.WriteHeader(http.StatusUnauthorized)
1486			w.Write([]byte(http.StatusText(http.StatusUnauthorized)))
1487		}
1488		return false
1489	}
1490
1491	return true
1492}
1493
1494func getWithAuth(url string) (*http.Response, error) {
1495	params := &security.RequestParams{Timeout: time.Duration(10) * time.Second}
1496	return security.GetWithAuth(url, params)
1497}
1498
1499func postWithAuth(url string, bodyType string, body io.Reader) (*http.Response, error) {
1500	params := &security.RequestParams{Timeout: time.Duration(10) * time.Second}
1501	return security.PostWithAuth(url, bodyType, body, params)
1502}
1503
1504func findTopologyByBucket(topologies []IndexTopology, bucket string) *IndexTopology {
1505
1506	for _, topology := range topologies {
1507		if topology.Bucket == bucket {
1508			return &topology
1509		}
1510	}
1511
1512	return nil
1513}
1514
1515///////////////////////////////////////////////////////
1516// indexStatusSorter
1517///////////////////////////////////////////////////////
1518
1519func (s indexStatusSorter) Len() int {
1520	return len(s)
1521}
1522
1523func (s indexStatusSorter) Swap(i, j int) {
1524	s[i], s[j] = s[j], s[i]
1525}
1526
1527func (s indexStatusSorter) Less(i, j int) bool {
1528	if s[i].Name < s[j].Name {
1529		return true
1530	}
1531
1532	if s[i].Name > s[j].Name {
1533		return false
1534	}
1535
1536	return s[i].Bucket < s[j].Bucket
1537}
1538
1539///////////////////////////////////////////////////////
1540// retrieve / persist cached local index metadata
1541///////////////////////////////////////////////////////
1542
1543func (m *requestHandlerContext) getLocalMetadataForNode(addr string, host string, cinfo *common.ClusterInfoCache) (*LocalIndexMetadata, bool, error) {
1544
1545	meta, err := m.getLocalMetadataFromREST(addr, host)
1546	if err == nil {
1547		return meta, true, nil
1548	}
1549
1550	if cinfo.GetClusterVersion() >= common.INDEXER_65_VERSION {
1551		var latest *LocalIndexMetadata
1552		nids := cinfo.GetNodesByServiceType(common.INDEX_HTTP_SERVICE)
1553		for _, nid := range nids {
1554			addr, err1 := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE)
1555			if err1 == nil {
1556				cached, err1 := m.getCachedLocalMetadataFromREST(addr, host)
1557				if cached != nil && err1 == nil {
1558					if latest == nil || cached.Timestamp > latest.Timestamp {
1559						latest = cached
1560					}
1561				}
1562			}
1563		}
1564
1565		if latest != nil {
1566			return latest, false, nil
1567		}
1568	}
1569
1570	return nil, false, err
1571}
1572
1573func (m *requestHandlerContext) getLocalMetadataFromREST(addr string, hostname string) (*LocalIndexMetadata, error) {
1574
1575	resp, err := getWithAuth(addr + "/getLocalIndexMetadata")
1576	defer func() {
1577		if resp != nil && resp.Body != nil {
1578			resp.Body.Close()
1579		}
1580	}()
1581
1582	if err == nil {
1583		localMeta := new(LocalIndexMetadata)
1584		if status := convertResponse(resp, localMeta); status == RESP_SUCCESS {
1585
1586			m.mutex.Lock()
1587			filename := host2file(hostname)
1588			if _, ok := m.metaCache[filename]; ok {
1589				logging.Debugf("getLocalMetadataFromREST: remove metadata form in-memory cache %v", filename)
1590				delete(m.metaCache, filename)
1591			}
1592			m.mutex.Unlock()
1593
1594			return localMeta, nil
1595		}
1596
1597		err = fmt.Errorf("Fail to unmarshal response from %v", hostname)
1598	}
1599
1600	return nil, err
1601}
1602
1603func (m *requestHandlerContext) getCachedLocalMetadataFromREST(addr string, host string) (*LocalIndexMetadata, error) {
1604
1605	resp, err := getWithAuth(fmt.Sprintf("%v/getCachedLocalIndexMetadata?host=\"%v\"", addr, host))
1606	defer func() {
1607		if resp != nil && resp.Body != nil {
1608			resp.Body.Close()
1609		}
1610	}()
1611
1612	if err == nil {
1613		localMeta := new(LocalIndexMetadata)
1614		if status := convertResponse(resp, localMeta); status == RESP_SUCCESS {
1615			return localMeta, nil
1616		}
1617
1618		err = fmt.Errorf("Fail to unmarshal response from %v", host)
1619	}
1620
1621	return nil, err
1622}
1623
1624func (m *requestHandlerContext) getLocalMetadataFromDisk(hostname string) (*LocalIndexMetadata, error) {
1625
1626	filename := host2file(hostname)
1627
1628	m.mutex.RLock()
1629	if meta, ok := m.metaCache[filename]; ok && meta != nil {
1630		logging.Debugf("getLocalMetadataFromDisk(): found metadata from in-memory cache %v", filename)
1631		m.mutex.RUnlock()
1632		return meta, nil
1633	}
1634	m.mutex.RUnlock()
1635
1636	filepath := path.Join(m.metaDir, filename)
1637
1638	content, err := ioutil.ReadFile(filepath)
1639	if err != nil {
1640		logging.Errorf("getLocalMetadataFromDisk(): fail to read metadata from file %v.  Error %v", filepath, err)
1641		return nil, err
1642	}
1643
1644	localMeta := new(LocalIndexMetadata)
1645	if err := json.Unmarshal(content, localMeta); err != nil {
1646		logging.Errorf("getLocalMetadataFromDisk(): fail to unmarshal metadata from file %v.  Error %v", filepath, err)
1647		return nil, err
1648	}
1649
1650	m.mutex.Lock()
1651	logging.Debugf("getLocalMetadataFromDisk(): save metadata to in-memory cache %v", filename)
1652	m.metaCache[filename] = localMeta
1653	m.mutex.Unlock()
1654
1655	return localMeta, nil
1656}
1657
1658func (m *requestHandlerContext) saveLocalMetadataToDisk(hostname string, meta *LocalIndexMetadata) error {
1659
1660	filename := host2file(hostname)
1661	filepath := path.Join(m.metaDir, filename)
1662	temp := path.Join(m.metaDir, filename+".tmp")
1663
1664	content, err := json.Marshal(meta)
1665	if err != nil {
1666		logging.Errorf("saveLocalMetadatasToDisk(): fail to marshal metadata to file %v.  Error %v", filepath, err)
1667		return err
1668	}
1669
1670	err = ioutil.WriteFile(temp, content, 0755)
1671	if err != nil {
1672		logging.Errorf("saveLocalMetadataToDisk(): fail to save metadata to file %v.  Error %v", temp, err)
1673		return err
1674	}
1675
1676	err = os.Rename(temp, filepath)
1677	if err != nil {
1678		logging.Errorf("saveLocalMetadataToDisk(): fail to rename metadata to file %v.  Error %v", filepath, err)
1679		return err
1680	}
1681
1682	logging.Debugf("saveLocalMetadataToDisk(): successfully written metadata to disk for %v", filename)
1683
1684	return nil
1685}
1686
1687func (m *requestHandlerContext) cleanupLocalMetadataOnDisk(hostnames []string) {
1688
1689	filenames := make([]string, len(hostnames))
1690	for i, hostname := range hostnames {
1691		filenames[i] = host2file(hostname)
1692	}
1693
1694	files, err := ioutil.ReadDir(m.metaDir)
1695	if err != nil {
1696		logging.Errorf("cleanupLocalMetadataOnDisk(): fail to read directory %v.  Error %v", m.metaDir, err)
1697		return
1698	}
1699
1700	for _, file := range files {
1701		filename := file.Name()
1702
1703		found := false
1704		for _, filename2 := range filenames {
1705			if filename2 == filename {
1706				found = true
1707			}
1708		}
1709
1710		if !found {
1711			filepath := path.Join(m.metaDir, filename)
1712			if err := os.RemoveAll(filepath); err != nil {
1713				logging.Errorf("cleanupLocalMetadataOnDisk(): fail to remove file %v.  Error %v", filepath, err)
1714			}
1715
1716			logging.Debugf("cleanupLocalMetadataOnDisk(): succesfully removing file %v from cache.", filepath)
1717
1718			m.mutex.Lock()
1719			if _, ok := m.metaCache[filename]; ok {
1720				logging.Debugf("cleanupMetadataFromDisk: remove metadata form in-memory cache %v", filename)
1721				delete(m.metaCache, filename)
1722			}
1723			m.mutex.Unlock()
1724		}
1725	}
1726}
1727
1728///////////////////////////////////////////////////////
1729// retrieve / persist cached index stats
1730///////////////////////////////////////////////////////
1731
1732func (m *requestHandlerContext) getStatsForNode(addr string, host string, cinfo *common.ClusterInfoCache) (*common.Statistics, bool, error) {
1733
1734	stats, err := m.getStatsFromREST(addr, host)
1735	if err == nil {
1736		return stats, true, nil
1737	}
1738
1739	if cinfo.GetClusterVersion() >= common.INDEXER_65_VERSION {
1740		var latest *common.Statistics
1741		nids := cinfo.GetNodesByServiceType(common.INDEX_HTTP_SERVICE)
1742		for _, nid := range nids {
1743			addr, err1 := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE)
1744			if err1 == nil {
1745				cached, err1 := m.getCachedStatsFromREST(addr, host)
1746				if cached != nil && err1 == nil {
1747					if latest == nil {
1748						latest = cached
1749						continue
1750					}
1751
1752					ts1 := latest.Get("timestamp")
1753					if ts1 == nil {
1754						latest = cached
1755						continue
1756					}
1757
1758					ts2 := cached.Get("timestamp")
1759					if ts2 == nil {
1760						continue
1761					}
1762
1763					t1, ok1 := ts1.(float64)
1764					t2, ok2 := ts2.(float64)
1765
1766					if ok1 && ok2 {
1767						if t2 > t1 {
1768							latest = cached
1769						}
1770					}
1771				}
1772			}
1773		}
1774
1775		if latest != nil {
1776			return latest, false, nil
1777		}
1778	}
1779
1780	return nil, false, err
1781}
1782
1783func (m *requestHandlerContext) getStatsFromREST(addr string, hostname string) (*common.Statistics, error) {
1784
1785	resp, err := getWithAuth(addr + "/stats?async=true")
1786	defer func() {
1787		if resp != nil && resp.Body != nil {
1788			resp.Body.Close()
1789		}
1790	}()
1791
1792	if err == nil {
1793		stats := new(common.Statistics)
1794		if status := convertResponse(resp, stats); status == RESP_SUCCESS {
1795
1796			m.mutex.Lock()
1797			filename := host2file(hostname)
1798			if _, ok := m.statsCache[filename]; ok {
1799				logging.Debugf("getStatsFromREST: remove stats from in-memory cache %v", filename)
1800				delete(m.statsCache, filename)
1801			}
1802			m.mutex.Unlock()
1803
1804			return stats, nil
1805		}
1806
1807		err = fmt.Errorf("Fail to unmarshal response from %v", hostname)
1808	}
1809
1810	return nil, err
1811}
1812
1813func (m *requestHandlerContext) getCachedStatsFromREST(addr string, host string) (*common.Statistics, error) {
1814
1815	resp, err := getWithAuth(fmt.Sprintf("%v/getCachedStats?host=\"%v\"", addr, host))
1816	defer func() {
1817		if resp != nil && resp.Body != nil {
1818			resp.Body.Close()
1819		}
1820	}()
1821
1822	if err == nil {
1823		stats := new(common.Statistics)
1824		if status := convertResponse(resp, stats); status == RESP_SUCCESS {
1825			return stats, nil
1826		}
1827
1828		err = fmt.Errorf("Fail to unmarshal response from %v", host)
1829	}
1830
1831	return nil, err
1832}
1833
1834func (m *requestHandlerContext) getIndexStatsFromDisk(hostname string) (*common.Statistics, error) {
1835
1836	filename := host2file(hostname)
1837
1838	m.mutex.RLock()
1839	if stats, ok := m.statsCache[filename]; ok && stats != nil {
1840		logging.Debugf("getIndexStatsFromDisk(): found stats from in-memory cache %v", filename)
1841		m.mutex.RUnlock()
1842		return stats, nil
1843	}
1844	m.mutex.RUnlock()
1845
1846	filepath := path.Join(m.statsDir, filename)
1847
1848	content, err := ioutil.ReadFile(filepath)
1849	if err != nil {
1850		logging.Errorf("getIndexStatsFromDisk(): fail to read stats from file %v.  Error %v", filepath, err)
1851		return nil, err
1852	}
1853
1854	stats := new(common.Statistics)
1855	if err := json.Unmarshal(content, stats); err != nil {
1856		logging.Errorf("getIndexStatsFromDisk(): fail to unmarshal stats from file %v.  Error %v", filepath, err)
1857		return nil, err
1858	}
1859
1860	m.mutex.Lock()
1861	m.statsCache[filename] = stats
1862	logging.Debugf("getIndexStatsFromDisk(): save stats to in-memory cache %v", filename)
1863	m.mutex.Unlock()
1864
1865	return stats, nil
1866}
1867
1868func (m *requestHandlerContext) saveIndexStatsToDisk(hostname string, stats *common.Statistics) error {
1869
1870	filename := host2file(hostname)
1871	filepath := path.Join(m.statsDir, filename)
1872	temp := path.Join(m.statsDir, filename+".tmp")
1873
1874	content, err := json.Marshal(stats)
1875	if err != nil {
1876		logging.Errorf("saveIndexStatsToDisk(): fail to marshal stats to file %v.  Error %v", filepath, err)
1877		return err
1878	}
1879
1880	err = ioutil.WriteFile(temp, content, 0755)
1881	if err != nil {
1882		logging.Errorf("saveIndexStatsToDisk(): fail to save stats to file %v.  Error %v", temp, err)
1883		return err
1884	}
1885
1886	err = os.Rename(temp, filepath)
1887	if err != nil {
1888		logging.Errorf("saveIndexStatsToDisk(): fail to rename stats to file %v.  Error %v", filepath, err)
1889		return err
1890	}
1891
1892	logging.Debugf("saveIndexStatsToDisk(): successfully written stats to disk for %v", filename)
1893
1894	return nil
1895}
1896
1897func (m *requestHandlerContext) cleanupIndexStatsOnDisk(hostnames []string) {
1898
1899	filenames := make([]string, len(hostnames))
1900	for i, hostname := range hostnames {
1901		filenames[i] = host2file(hostname)
1902	}
1903
1904	files, err := ioutil.ReadDir(m.statsDir)
1905	if err != nil {
1906		logging.Errorf("cleanupStatsOnDisk(): fail to read directory %v.  Error %v", m.statsDir, err)
1907		return
1908	}
1909
1910	for _, file := range files {
1911		filename := file.Name()
1912
1913		found := false
1914		for _, filename2 := range filenames {
1915			if filename2 == filename {
1916				found = true
1917			}
1918		}
1919
1920		if !found {
1921			filepath := path.Join(m.statsDir, filename)
1922			if err := os.RemoveAll(filepath); err != nil {
1923				logging.Errorf("cleanupStatsOnDisk(): fail to remove file %v.  Error %v", filepath, err)
1924			}
1925
1926			logging.Debugf("cleanupIndexStatsOnDisk(): succesfully removing file %v from cache.", filepath)
1927
1928			m.mutex.Lock()
1929			if _, ok := m.statsCache[filename]; ok {
1930				logging.Debugf("cleanupStatsOnDisk: remove stats from in-memory cache %v", filename)
1931				delete(m.statsCache, filename)
1932			}
1933			m.mutex.Unlock()
1934		}
1935	}
1936}
1937
1938///////////////////////////////////////////////////////
1939// persistor
1940///////////////////////////////////////////////////////
1941
1942func (m *requestHandlerContext) runPersistor() {
1943
1944	updateMeta := func(v map[string]*LocalIndexMetadata) {
1945		hostnames := make([]string, 0, len(v))
1946
1947		for host, meta := range v {
1948			if meta != nil {
1949				m.saveLocalMetadataToDisk(host, meta)
1950			}
1951			hostnames = append(hostnames, host)
1952		}
1953
1954		m.cleanupLocalMetadataOnDisk(hostnames)
1955	}
1956
1957	updateStats := func(v map[string]*common.Statistics) {
1958		hostnames := make([]string, 0, len(v))
1959
1960		for host, stats := range v {
1961			if stats != nil {
1962				m.saveIndexStatsToDisk(host, stats)
1963			}
1964			hostnames = append(hostnames, host)
1965		}
1966
1967		m.cleanupIndexStatsOnDisk(hostnames)
1968	}
1969
1970	for {
1971		select {
1972		case v, ok := <-m.metaCh:
1973			if !ok {
1974				return
1975			}
1976
1977			for len(m.metaCh) > 0 {
1978				v = <-m.metaCh
1979			}
1980
1981			updateMeta(v)
1982
1983		case v, ok := <-m.statsCh:
1984			if !ok {
1985				return
1986			}
1987
1988			for len(m.statsCh) > 0 {
1989				v = <-m.statsCh
1990			}
1991
1992			updateStats(v)
1993
1994		case <-m.doneCh:
1995			logging.Infof("request_handler persistor exits")
1996			return
1997		}
1998	}
1999}
2000
2001func host2file(hostname string) string {
2002
2003	hostname = strings.Replace(hostname, ".", "_", -1)
2004	hostname = strings.Replace(hostname, ":", "_", -1)
2005
2006	return hostname
2007}
2008