1package utils
2
3import (
4	"bytes"
5	"crypto/x509"
6	"encoding/binary"
7	"encoding/json"
8	"errors"
9	"expvar"
10	"fmt"
11	"github.com/couchbase/cbauth"
12	"github.com/couchbase/go-couchbase"
13	"github.com/couchbase/gojsonsm"
14	mc "github.com/couchbase/gomemcached"
15	mcc "github.com/couchbase/gomemcached/client"
16	"github.com/couchbase/goutils/scramsha"
17	base "github.com/couchbase/goxdcr/base"
18	"github.com/couchbase/goxdcr/log"
19	"github.com/couchbase/goxdcr/metadata"
20	"github.com/golang/snappy"
21	gocb "gopkg.in/couchbase/gocb.v1"
22	"io"
23	"io/ioutil"
24	"net"
25	"net/http"
26	"net/url"
27	"reflect"
28	"strconv"
29	"strings"
30	"syscall"
31	"time"
32	"unicode/utf8"
33)
34
35var NonExistentBucketError error = errors.New("Bucket doesn't exist")
36var BucketRecreatedError error = errors.New("Bucket has been deleted and recreated")
37var TargetMayNotSupportScramShaError error = errors.New("Target may not support ScramSha")
38
39func (f *HELOFeatures) NumberOfActivatedFeatures() int {
40	var result int
41	if f.Xattribute {
42		result++
43	}
44	if f.CompressionType != base.CompressionTypeNone {
45		result++
46	}
47	if f.Xerror {
48		result++
49	}
50	return result
51}
52
53type Utilities struct {
54	logger_utils *log.CommonLogger
55}
56
57/**
58 * NOTE - ideally we want to be able to pass in utility interfaces so we can do much
59 * better unit testing with mocks. This constructor should be used in main() and then
60 * passed down level by levels.
61 * Currently, this method is being called in many places, and each place that is using
62 * this method should ideally be using a passed in interface from another parent level.
63 */
64func NewUtilities() *Utilities {
65	retVar := &Utilities{
66		logger_utils: log.NewLogger("Utils", log.DefaultLoggerContext),
67	}
68	return retVar
69}
70
71/**
72 * This utils file contains both regular non-REST related utilities as well as REST related.
73 * The first section is non-REST related utility functions
74 */
75
76type BucketBasicStats struct {
77	ItemCount int `json:"itemCount"`
78}
79
80//Only used by unit test
81//TODO: replace with go-couchbase bucket stats API
82type CouchBucket struct {
83	Name string           `json:"name"`
84	Stat BucketBasicStats `json:"basicStats"`
85}
86
87func (u *Utilities) GetNonExistentBucketError() error {
88	return NonExistentBucketError
89}
90
91func (u *Utilities) GetBucketRecreatedError() error {
92	return BucketRecreatedError
93}
94
95func (u *Utilities) loggerForFunc(logger *log.CommonLogger) *log.CommonLogger {
96	var l *log.CommonLogger
97	if logger != nil {
98		l = logger
99	} else {
100		l = u.logger_utils
101	}
102	return l
103}
104
105func (u *Utilities) ValidateSettings(defs base.SettingDefinitions,
106	settings metadata.ReplicationSettingsMap,
107	logger *log.CommonLogger) error {
108	var l *log.CommonLogger = u.loggerForFunc(logger)
109
110	if l.GetLogLevel() >= log.LogLevelDebug {
111		l.Debugf("Start validate setting=%v, defs=%v", settings.CloneAndRedact(), defs)
112	}
113	var err *base.SettingsError = nil
114	for key, def := range defs {
115		val, ok := settings[key]
116		if !ok && def.Required {
117			if err == nil {
118				err = base.NewSettingsError()
119			}
120			err.Add(key, errors.New("required, but not supplied"))
121		} else {
122			if val != nil && def.Data_type != reflect.PtrTo(reflect.TypeOf(val)) {
123				if err == nil {
124					err = base.NewSettingsError()
125				}
126				err.Add(key, errors.New(fmt.Sprintf("expected type is %v, supplied type is %v",
127					def.Data_type, reflect.TypeOf(val))))
128			}
129		}
130	}
131	if err != nil {
132		l.Infof("setting validation result = %v", *err)
133		return *err
134	}
135	return nil
136}
137
138func (u *Utilities) RecoverPanic(err *error) {
139	if r := recover(); r != nil {
140		*err = errors.New(fmt.Sprint(r))
141	}
142}
143
144func (u *Utilities) LocalPool(localConnectStr string) (couchbase.Pool, error) {
145	localURL := fmt.Sprintf("http://%s", localConnectStr)
146	client, err := couchbase.ConnectWithAuth(localURL, cbauth.NewAuthHandler(nil))
147	if err != nil {
148		return couchbase.Pool{}, u.NewEnhancedError(fmt.Sprintf("Error connecting to couchbase. url=%v", u.UrlForLog(localURL)), err)
149	}
150	return client.GetPool("default")
151}
152
153// Get bucket in local cluster
154func (u *Utilities) LocalBucket(localConnectStr, bucketName string) (*couchbase.Bucket, error) {
155	u.logger_utils.Debugf("Getting local bucket name=%v\n", bucketName)
156
157	pool, err := u.LocalPool(localConnectStr)
158	if err != nil {
159		return nil, err
160	}
161
162	bucket, err := pool.GetBucket(bucketName)
163	if err != nil {
164		return nil, u.NewEnhancedError(fmt.Sprintf("Error getting bucket, %v, from pool.", bucketName), err)
165	}
166
167	u.logger_utils.Debugf("Got local bucket successfully name=%v\n", bucket.Name)
168	return bucket, err
169}
170
171func (u *Utilities) UnwrapError(infos map[string]interface{}) (err error) {
172	if infos != nil && len(infos) > 0 {
173		err = infos["error"].(error)
174	}
175	return err
176}
177
178// returns an enhanced error with erroe message being "msg + old error message"
179func (u *Utilities) NewEnhancedError(msg string, err error) error {
180	return errors.New(msg + "\n err = " + err.Error())
181}
182
183func (u *Utilities) GetMapFromExpvarMap(expvarMap *expvar.Map) map[string]interface{} {
184	regMap := make(map[string]interface{})
185
186	expvarMap.Do(func(keyValue expvar.KeyValue) {
187		valueStr := keyValue.Value.String()
188		// first check if valueStr is an integer
189		valueInt, err := strconv.Atoi(valueStr)
190		if err == nil {
191			regMap[keyValue.Key] = valueInt
192		} else {
193			// then check if valueStr is a float
194			valueFloat, err := strconv.ParseFloat(valueStr, 64)
195			if err == nil {
196				regMap[keyValue.Key] = valueFloat
197			} else {
198				// should never happen
199				u.logger_utils.Errorf("Invalid value in expvarMap. Only float and integer values are supported")
200			}
201		}
202	})
203	return regMap
204}
205
206//convert the format returned by go-memcached StatMap - map[string]string to map[uint16]uint64
207func (u *Utilities) ParseHighSeqnoStat(vbnos []uint16, stats_map map[string]string, highseqno_map map[uint16]uint64) error {
208	var err error
209	for _, vbno := range vbnos {
210		stats_key := fmt.Sprintf(base.VBUCKET_HIGH_SEQNO_STAT_KEY_FORMAT, vbno)
211		highseqnostr, ok := stats_map[stats_key]
212		if !ok || highseqnostr == "" {
213			err = fmt.Errorf("Can't find high seqno for vbno=%v in stats map. Source topology may have changed.\n", vbno)
214			return err
215		}
216		highseqno, err := strconv.ParseUint(highseqnostr, 10, 64)
217		if err != nil {
218			u.logger_utils.Warnf("high seqno for vbno=%v in stats map is not a valid uint64. high seqno=%v\n", vbno, highseqnostr)
219			err = fmt.Errorf("high seqno for vbno=%v in stats map is not a valid uint64. high seqno=%v\n", vbno, highseqnostr)
220			return err
221		}
222		highseqno_map[vbno] = highseqno
223	}
224	return nil
225}
226
227//convert the format returned by go-memcached StatMap - map[string]string to map[uint16][]uint64
228func (u *Utilities) ParseHighSeqnoAndVBUuidFromStats(vbnos []uint16, stats_map map[string]string, high_seqno_and_vbuuid_map map[uint16][]uint64) {
229	for _, vbno := range vbnos {
230		high_seqno_stats_key := fmt.Sprintf(base.VBUCKET_HIGH_SEQNO_STAT_KEY_FORMAT, vbno)
231		highseqnostr, ok := stats_map[high_seqno_stats_key]
232		if !ok {
233			u.logger_utils.Warnf("Can't find high seqno for vbno=%v in stats map. Source topology may have changed.\n", vbno)
234			continue
235		}
236		high_seqno, err := strconv.ParseUint(highseqnostr, 10, 64)
237		if err != nil {
238			u.logger_utils.Warnf("high seqno for vbno=%v in stats map is not a valid uint64. high seqno=%v\n", vbno, highseqnostr)
239			continue
240		}
241
242		vbuuid_stats_key := fmt.Sprintf(base.VBUCKET_UUID_STAT_KEY_FORMAT, vbno)
243		vbuuidstr, ok := stats_map[vbuuid_stats_key]
244		if !ok {
245			u.logger_utils.Warnf("Can't find vbuuid for vbno=%v in stats map. Source topology may have changed.\n", vbno)
246			continue
247		}
248		vbuuid, err := strconv.ParseUint(vbuuidstr, 10, 64)
249		if err != nil {
250			u.logger_utils.Warnf("vbuuid for vbno=%v in stats map is not a valid uint64. vbuuid=%v\n", vbno, vbuuidstr)
251			continue
252		}
253
254		high_seqno_and_vbuuid_map[vbno] = []uint64{high_seqno, vbuuid}
255	}
256}
257
258// encode data in a map into a byte array, which can then be used as
259// the body part of a http request
260// so far only five types are supported: string, int, bool, LogLevel, []byte
261// which should be sufficient for all cases at hand
262func (u *Utilities) EncodeMapIntoByteArray(data map[string]interface{}) ([]byte, error) {
263	if len(data) == 0 {
264		return nil, nil
265	}
266
267	params := make(url.Values)
268	for key, val := range data {
269		var strVal string
270		switch val.(type) {
271		case string:
272			strVal = val.(string)
273		case int:
274			strVal = strconv.FormatInt(int64(val.(int)), base.ParseIntBase)
275		case bool:
276			strVal = strconv.FormatBool(val.(bool))
277		case log.LogLevel:
278			strVal = val.(log.LogLevel).String()
279		case []byte:
280			strVal = string(val.([]byte))
281		default:
282			return nil, base.IncorrectValueTypeInMapError(key, val, "string/int/bool/LogLevel/[]byte")
283		}
284		params.Add(key, strVal)
285	}
286
287	return []byte(params.Encode()), nil
288}
289
290func (u *Utilities) UrlForLog(urlStr string) string {
291	result, err := url.Parse(urlStr)
292	if err == nil {
293		if result.User != nil {
294			result.User = url.UserPassword(result.User.Username(), "xxxx")
295		}
296		return result.String()
297	} else {
298		return urlStr
299	}
300}
301
302func filterExpressionGetXattrHelper(bucket *gocb.Bucket, docId string, docCas gocb.Cas) ([]byte, error) {
303	var xattrMap map[string]interface{}
304	var xtoc interface{}
305	var xattrSlice []byte
306
307	xattrMap = make(map[string]interface{})
308	frag, err := bucket.LookupIn(docId).GetEx(base.XattributeToc, gocb.SubdocFlagXattr).Execute()
309	if err != nil {
310		return nil, err
311	}
312
313	if frag.Cas() != docCas {
314		return nil, base.ErrorInvalidCAS
315	}
316
317	err = frag.Content(base.XattributeToc, &xtoc)
318	if err != nil {
319		return nil, err
320	}
321
322	tocList := xtoc.([]interface{})
323	for _, tocEntry := range tocList {
324		if entry, ok := tocEntry.(string); ok {
325			frag, err := bucket.LookupIn(docId).GetEx(entry, gocb.SubdocFlagXattr).Execute()
326			if err != nil {
327				return nil, err
328			}
329
330			if frag.Cas() != docCas {
331				return nil, base.ErrorInvalidCAS
332			}
333
334			var value interface{}
335			frag.Content(entry, &value)
336			xattrMap[entry] = value
337		}
338	}
339	xattrSlice, err = json.Marshal(xattrMap)
340	if err != nil {
341		return nil, err
342	}
343
344	return xattrSlice, nil
345}
346
347func filterExpressionGetDocVal(bucket *gocb.Bucket, docId string) ([]byte, gocb.Cas, error) {
348	var retrievedDocVal interface{}
349	docCas, err := bucket.Get(docId, &retrievedDocVal)
350	if err != nil {
351		return nil, docCas, err
352	}
353
354	valMap, ok := retrievedDocVal.(map[string]interface{})
355	if !ok {
356		err = fmt.Errorf("Retrieved document (%v) value is not a valid key-value map", docId)
357		return nil, docCas, err
358	}
359
360	bodySlice, err := json.Marshal(valMap)
361	if err != nil {
362		return nil, docCas, err
363	}
364
365	return bodySlice, docCas, err
366}
367
368// Called by UI to run a test on a specific document. This cannot be unit-tested as the authentication will fail
369// Queries ns_server REST endpoint for a document content
370// ns_server returns the document in a special json format, so then massage the data into gojsonsm compatible format
371// and pass it through gojsonsm for testing
372func (u *Utilities) FilterExpressionMatchesDoc(expression, docId, bucketName, addr string, port uint16) (result bool, err error) {
373	nsServerDocContent := make(map[string]interface{})
374	hostAddr := base.GetHostAddr(addr, port)
375	var statusCode int
376
377	matcher, err := base.ValidateAndGetAdvFilter(expression)
378	if err != nil {
379		return
380	}
381
382	retryOp := func() error {
383		err, statusCode = u.QueryRestApi(hostAddr, base.DefaultPoolBucketsPath+bucketName+base.DocsPath+docId, false /*preservePathEncoding*/, base.MethodGet, "" /*contentType*/, nil, /*body*/
384			0 /*timeout*/, &nsServerDocContent, u.logger_utils)
385
386		if err != nil {
387			return err
388		} else if statusCode != http.StatusOK {
389			return fmt.Errorf("Err returned: %v along with http status code: %v", err, statusCode)
390		} else {
391			return nil
392		}
393	}
394
395	err = u.ExponentialBackoffExecutor("filterTesterXattrRetriever", base.BucketInfoOpWaitTime, base.BucketInfoOpMaxRetry, base.BucketInfoOpRetryFactor, retryOp)
396	if err != nil {
397		if strings.Contains(err.Error(), base.ErrorResourceDoesNotExist.Error()) {
398			err = fmt.Errorf("Specified document not found")
399		}
400		return
401	}
402
403	return u.processNsServerDocForFiltering(matcher, nsServerDocContent, docId)
404}
405
406func (u *Utilities) processNsServerDocForFiltering(matcher gojsonsm.Matcher, nsServerDocContent map[string]interface{}, docId string) (result bool, err error) {
407	// Take care of the body - it shows up as a string of pre-formatted json
408	// i.e. the whole {"k":"v"} as the value of specified key below
409	processedJson := make(map[string]interface{})
410	if body, ok := nsServerDocContent[base.BucketDocBodyKey].(string); ok {
411		marshaledBytes := []byte(body)
412		err = json.Unmarshal(marshaledBytes, &processedJson)
413		if err != nil {
414			err = fmt.Errorf("Unable to process document body: %v", err)
415			return
416		}
417	}
418
419	// Add document key in
420	processedJson[base.InternalKeyKey] = docId
421
422	// Add xattribute
423	if xattrs, ok := nsServerDocContent[base.BucketDocXattrKey].(map[string]interface{}); ok {
424		processedJson[base.InternalKeyXattr] = xattrs
425	}
426
427	byteSlice, err := json.Marshal(processedJson)
428	if err != nil {
429		err = fmt.Errorf("Unable to marshal postprocessed data for matching: %v", err)
430		return
431	}
432
433	matched, status, err := base.MatchWrapper(matcher, byteSlice)
434	if u.logger_utils.GetLogLevel() >= log.LogLevelDebug && status&gojsonsm.MatcherCollateUsed > 0 {
435		u.logger_utils.Debugf("Matcher used collate to determine outcome (%v) for document %v%v%v", matched, base.UdTagBegin, docId, base.UdTagEnd)
436	}
437	return matched, err
438}
439
440// given a matches map, convert the indices from byte index to rune index
441func (u *Utilities) convertByteIndexToRuneIndex(key string, matches [][]int) ([][]int, error) {
442	convertedMatches := make([][]int, 0)
443	if len(key) == 0 || len(matches) == 0 {
444		return matches, nil
445	}
446
447	// parse key and build a byte index to rune index map
448	indexMap := make(map[int]int)
449	byteIndex := 0
450	runeIndex := 0
451	keyBytes := []byte(key)
452	keyLen := len(key)
453	for {
454		indexMap[byteIndex] = runeIndex
455		if byteIndex < keyLen {
456			_, runeLen := utf8.DecodeRune(keyBytes[byteIndex:])
457			byteIndex += runeLen
458			runeIndex++
459		} else {
460			break
461		}
462	}
463
464	if u.logger_utils.GetLogLevel() >= log.LogLevelDebug {
465		u.logger_utils.Debugf("key=%v, indexMap=%v%v%v\n", base.UdTagBegin, key, base.UdTagEnd, indexMap)
466	}
467
468	var ok bool
469	for _, match := range matches {
470		convertedMatch := make([]int, 2)
471		convertedMatch[0], ok = indexMap[match[0]]
472		if !ok {
473			// should not happen
474			errMsg := u.InvalidRuneIndexErrorMessage(key, match[0])
475			u.logger_utils.Errorf(errMsg)
476			return nil, errors.New(errMsg)
477		}
478		convertedMatch[1], ok = indexMap[match[1]]
479		if !ok {
480			// should not happen
481			errMsg := u.InvalidRuneIndexErrorMessage(key, match[1])
482			u.logger_utils.Errorf(errMsg)
483			return nil, errors.New(errMsg)
484		}
485		convertedMatches = append(convertedMatches, convertedMatch)
486	}
487
488	return convertedMatches, nil
489}
490
491func (u *Utilities) InvalidRuneIndexErrorMessage(key string, index int) string {
492	return fmt.Sprintf("byte index, %v, in match for key, %v, is not a starting index for a rune", index, key)
493}
494
495func (u *Utilities) LocalBucketUUID(local_connStr string, bucketName string, logger *log.CommonLogger) (string, error) {
496	return u.BucketUUID(local_connStr, bucketName, "", "", base.HttpAuthMechPlain, nil, false, nil, nil, logger)
497}
498
499func (u *Utilities) LocalBucketPassword(local_connStr string, bucketName string, logger *log.CommonLogger) (string, error) {
500	return u.BucketPassword(local_connStr, bucketName, "", "", base.HttpAuthMechPlain, nil, false, nil, nil, logger)
501}
502
503func (u *Utilities) ReplicationStatusNotFoundError(topic string) error {
504	return fmt.Errorf("Cannot find replication status for topic %v", topic)
505}
506
507func (u *Utilities) BucketNotFoundError(bucketName string) error {
508	return fmt.Errorf("Bucket `%v` not found.", bucketName)
509}
510
511// creates a local memcached connection.
512// always use plain auth
513func (u *Utilities) GetMemcachedConnectionWFeatures(serverAddr, bucketName, userAgent string,
514	keepAlivePeriod time.Duration, features HELOFeatures, logger *log.CommonLogger) (mcc.ClientIface, HELOFeatures, error) {
515	var respondedFeatures HELOFeatures
516
517	logger.Infof("GetMemcachedConnection serverAddr=%v, bucketName=%v\n", serverAddr, bucketName)
518	if serverAddr == "" {
519		err := fmt.Errorf("Failed to get memcached connection because serverAddr is empty. bucketName=%v, userAgent=%v", bucketName, userAgent)
520		logger.Warnf(err.Error())
521		return nil, respondedFeatures, err
522	}
523	username, password, err := cbauth.GetMemcachedServiceAuth(serverAddr)
524	if u.logger_utils.GetLogLevel() >= log.LogLevelDebug {
525		logger.Debugf("memcached auth: username=%v%v%v, password=%v%v%v, err=%v\n", base.UdTagBegin, username, base.UdTagEnd, base.UdTagBegin, password, base.UdTagEnd, err)
526	}
527	if err != nil {
528		return nil, respondedFeatures, err
529	}
530
531	return u.GetRemoteMemcachedConnectionWFeatures(serverAddr, username, password, bucketName, userAgent, true /*plainAuth*/, keepAlivePeriod, features, logger)
532}
533
534func (u *Utilities) GetMemcachedConnection(serverAddr, bucketName, userAgent string,
535	keepAlivePeriod time.Duration, logger *log.CommonLogger) (mcc.ClientIface, error) {
536	var noFeatures HELOFeatures
537	clientIface, _, err := u.GetMemcachedConnectionWFeatures(serverAddr, bucketName, userAgent, keepAlivePeriod, noFeatures, logger)
538	return clientIface, err
539}
540
541func (u *Utilities) GetMemcachedRawConn(serverAddr, username, password, bucketName string, plainAuth bool,
542	keepAlivePeriod time.Duration, logger *log.CommonLogger) (mcc.ClientIface, error) {
543	conn, err := base.NewConn(serverAddr, username, password, bucketName, plainAuth, keepAlivePeriod, logger)
544	if err != nil {
545		return nil, err
546	}
547	return conn, err
548}
549
550func (u *Utilities) GetRemoteMemcachedConnectionWFeatures(serverAddr, username, password, bucketName, userAgent string,
551	plainAuth bool, keepAlivePeriod time.Duration, features HELOFeatures, logger *log.CommonLogger) (mcc.ClientIface, HELOFeatures, error) {
552	var err error
553	var conn mcc.ClientIface
554	var respondedFeatures HELOFeatures
555
556	getRemoteMcConnOp := func() error {
557		conn, err = u.GetMemcachedRawConn(serverAddr, username, password, bucketName, plainAuth, keepAlivePeriod, logger)
558		if err != nil {
559			logger.Warnf("Failed to construct memcached client for %v, err=%v\n", serverAddr, err)
560			return err
561		}
562
563		respondedFeatures, err = u.SendHELOWithFeatures(conn, userAgent, base.HELOTimeout, base.HELOTimeout, features, logger)
564
565		if err != nil {
566			conn.Close()
567			logger.Warnf("Failed to send HELO for %v, err=%v\n", conn, err)
568			return err
569		}
570		return nil
571	}
572
573	opErr := u.ExponentialBackoffExecutor("GetRemoteMemcachedConnection", base.RemoteMcRetryWaitTime, base.MaxRemoteMcRetry,
574		base.RemoteMcRetryFactor, getRemoteMcConnOp)
575
576	if opErr != nil {
577		logger.Errorf(opErr.Error())
578		return nil, respondedFeatures, err
579	}
580
581	return conn, respondedFeatures, err
582}
583
584func (u *Utilities) GetRemoteMemcachedConnection(serverAddr, username, password, bucketName, userAgent string,
585	plainAuth bool, keepAlivePeriod time.Duration, logger *log.CommonLogger) (mcc.ClientIface, error) {
586	var noFeatureEnabled HELOFeatures
587	conn, _, err := u.GetRemoteMemcachedConnectionWFeatures(serverAddr, username, password, bucketName, userAgent, plainAuth, keepAlivePeriod,
588		noFeatureEnabled, logger)
589	return conn, err
590}
591
592// send helo with specified user agent string to memcached
593// the helo is purely informational, for the identification of the client
594// unsuccessful response is not treated as errors
595func (u *Utilities) SendHELO(client mcc.ClientIface, userAgent string, readTimeout, writeTimeout time.Duration,
596	logger *log.CommonLogger) (err error) {
597	var allFeaturesDisabled HELOFeatures
598	heloReq := u.ComposeHELORequest(userAgent, allFeaturesDisabled)
599
600	var response *mc.MCResponse
601	response, err = u.sendHELORequest(client, heloReq, userAgent, readTimeout, writeTimeout, logger)
602	if err != nil {
603		logger.Errorf("Received error response from HELO command. userAgent=%v, err=%v.", userAgent, err)
604	} else if response.Status != mc.SUCCESS {
605		logger.Warnf("Received unexpected response from HELO command. userAgent=%v, response status=%v.", userAgent, response.Status)
606	} else {
607		logger.Infof("Successfully sent HELO command with userAgent=%v", userAgent)
608	}
609	return
610}
611
612// send helo to memcached with data type (including xattr) feature enabled
613// used exclusively by xmem nozzle
614// we need to know whether data type is indeed enabled from helo response
615// unsuccessful response is treated as errors
616func (u *Utilities) SendHELOWithFeatures(client mcc.ClientIface, userAgent string, readTimeout, writeTimeout time.Duration, requestedFeatures HELOFeatures, logger *log.CommonLogger) (respondedFeatures HELOFeatures, err error) {
617	// Initially set initial respondedFeatures to None since no compression negotiated should not be invalid
618	respondedFeatures.CompressionType = base.CompressionTypeNone
619
620	heloReq := u.ComposeHELORequest(userAgent, requestedFeatures)
621
622	var response *mc.MCResponse
623	response, err = u.sendHELORequest(client, heloReq, userAgent, readTimeout, writeTimeout, logger)
624	if err != nil {
625		logger.Errorf("Received error response from HELO command. userAgent=%v, err=%v.", userAgent, err)
626	} else if response.Status != mc.SUCCESS {
627		errMsg := fmt.Sprintf("Received unexpected response from HELO command. userAgent=%v, response status=%v.", userAgent, response.Status)
628		logger.Error(errMsg)
629		err = errors.New(errMsg)
630	} else {
631		// helo succeeded. parse response body for features enabled
632		bodyLen := len(response.Body)
633		if (bodyLen & 1) != 0 {
634			// body has to have even number of bytes
635			logger.Errorf("Received response body with odd number of bytes from HELO command. userAgent=%v, (redacted) response body=%v%v%v.", userAgent, base.UdTagBegin, response.Body, base.UdTagEnd)
636			err = errors.New(fmt.Sprintf("Received response body with odd number of bytes from HELO command. userAgent=%v.", userAgent))
637			return
638		}
639		pos := 0
640		for {
641			if pos >= bodyLen {
642				break
643			}
644			feature := binary.BigEndian.Uint16(response.Body[pos : pos+2])
645			if feature == base.HELO_FEATURE_XATTR {
646				respondedFeatures.Xattribute = true
647			}
648			if feature == base.HELO_FEATURE_SNAPPY {
649				respondedFeatures.CompressionType = base.CompressionTypeSnappy
650			}
651			if feature == base.HELO_FEATURE_XERROR {
652				respondedFeatures.Xerror = true
653			}
654			pos += 2
655		}
656		logger.Infof("Successfully sent HELO command with userAgent=%v. attributes=%v", userAgent, respondedFeatures)
657	}
658	return
659}
660
661func (u *Utilities) sendHELORequest(client mcc.ClientIface, heloReq *mc.MCRequest, userAgent string, readTimeout, writeTimeout time.Duration,
662	logger *log.CommonLogger) (response *mc.MCResponse, err error) {
663
664	conn := client.Hijack()
665	conn.(net.Conn).SetWriteDeadline(time.Now().Add(writeTimeout))
666	_, err = conn.Write(heloReq.Bytes())
667	conn.(net.Conn).SetWriteDeadline(time.Time{})
668	if err != nil {
669		logger.Warnf("Error sending HELO command. userAgent=%v, err=%v.", userAgent, err)
670		return
671	}
672
673	conn.(net.Conn).SetReadDeadline(time.Now().Add(readTimeout))
674	response, err = client.Receive()
675	conn.(net.Conn).SetReadDeadline(time.Time{})
676	return
677}
678
679// compose a HELO command
680func (u *Utilities) ComposeHELORequest(userAgent string, features HELOFeatures) *mc.MCRequest {
681	var value []byte
682	var numOfFeatures = features.NumberOfActivatedFeatures()
683	var sliceIndex int
684	bytesToAllocate := base.HELO_BYTES_PER_FEATURE * (numOfFeatures + 1) // TCP_NO_DELAY is included by default
685	value = make([]byte, bytesToAllocate)
686
687	// tcp no delay - [0:2]
688	binary.BigEndian.PutUint16(value[sliceIndex:sliceIndex+base.HELO_BYTES_PER_FEATURE], base.HELO_FEATURE_TCP_NO_DELAY)
689	sliceIndex += base.HELO_BYTES_PER_FEATURE
690
691	// Xattribute
692	if features.Xattribute {
693		binary.BigEndian.PutUint16(value[sliceIndex:sliceIndex+base.HELO_BYTES_PER_FEATURE], base.HELO_FEATURE_XATTR)
694		sliceIndex += base.HELO_BYTES_PER_FEATURE
695	}
696
697	// Compression
698	if features.CompressionType == base.CompressionTypeSnappy {
699		binary.BigEndian.PutUint16(value[sliceIndex:sliceIndex+base.HELO_BYTES_PER_FEATURE], base.HELO_FEATURE_SNAPPY)
700		sliceIndex += base.HELO_BYTES_PER_FEATURE
701	}
702
703	// Xerror
704	if features.Xerror {
705		binary.BigEndian.PutUint16(value[sliceIndex:sliceIndex+base.HELO_BYTES_PER_FEATURE], base.HELO_FEATURE_XERROR)
706		sliceIndex += base.HELO_BYTES_PER_FEATURE
707	}
708
709	return &mc.MCRequest{
710		Key:    []byte(userAgent),
711		Opcode: mc.HELLO,
712		Body:   value,
713	}
714}
715
716func (u *Utilities) GetIntSettingFromSettings(settings metadata.ReplicationSettingsMap, settingName string) (int, error) {
717	settingObj := u.GetSettingFromSettings(settings, settingName)
718	if settingObj == nil {
719		return -1, nil
720	}
721
722	setting, ok := settingObj.(int)
723	if !ok {
724		return -1, fmt.Errorf("Setting %v is of wrong type", settingName)
725	}
726
727	return setting, nil
728}
729
730func (u *Utilities) GetStringSettingFromSettings(settings metadata.ReplicationSettingsMap, settingName string) (string, error) {
731	settingObj := u.GetSettingFromSettings(settings, settingName)
732	if settingObj == nil {
733		return "", nil
734	}
735
736	setting, ok := settingObj.(string)
737	if !ok {
738		return "", fmt.Errorf("Setting %v is of wrong type", settingName)
739	}
740
741	return setting, nil
742}
743
744func (u *Utilities) GetSettingFromSettings(settings metadata.ReplicationSettingsMap, settingName string) interface{} {
745	if settings == nil {
746		return nil
747	}
748
749	setting, ok := settings[settingName]
750	if !ok {
751		return nil
752	}
753
754	return setting
755}
756
757func (u *Utilities) GetMemcachedClient(serverAddr, bucketName string, kv_mem_clients map[string]mcc.ClientIface,
758	userAgent string, keepAlivePeriod time.Duration, logger *log.CommonLogger) (mcc.ClientIface, error) {
759	client, ok := kv_mem_clients[serverAddr]
760	if ok {
761		return client, nil
762	} else {
763		if bucketName == "" {
764			err := fmt.Errorf("Failed to get memcached client because of unexpected empty bucketName. serverAddr=%v, userAgent=%v", serverAddr, userAgent)
765			logger.Warnf(err.Error())
766			return nil, err
767		}
768
769		var client, err = u.GetMemcachedConnection(serverAddr, bucketName, userAgent, keepAlivePeriod, logger)
770		if err == nil {
771			kv_mem_clients[serverAddr] = client
772			return client, nil
773		} else {
774			return nil, err
775		}
776	}
777}
778
779func (u *Utilities) GetServerVBucketsMap(connStr, bucketName string, bucketInfo map[string]interface{}) (map[string][]uint16, error) {
780	vbucketServerMapObj, ok := bucketInfo[base.VBucketServerMapKey]
781	if !ok {
782		return nil, fmt.Errorf("Error getting vbucket server map from bucket info. connStr=%v, bucketName=%v, bucketInfo=%v\n", connStr, bucketName, bucketInfo)
783	}
784	vbucketServerMap, ok := vbucketServerMapObj.(map[string]interface{})
785	if !ok {
786		return nil, fmt.Errorf("Vbucket server map is of wrong type. connStr=%v, bucketName=%v, vbucketServerMap=%v\n", connStr, bucketName, vbucketServerMapObj)
787	}
788
789	// get server list
790	serverListObj, ok := vbucketServerMap[base.ServerListKey]
791	if !ok {
792		return nil, fmt.Errorf("Error getting server list from vbucket server map. connStr=%v, bucketName=%v, vbucketServerMap=%v\n", connStr, bucketName, vbucketServerMap)
793	}
794	serverList, ok := serverListObj.([]interface{})
795	if !ok {
796		return nil, fmt.Errorf("Server list is of wrong type. connStr=%v, bucketName=%v, serverList=%v\n", connStr, bucketName, serverListObj)
797	}
798
799	servers := make([]string, len(serverList))
800	for index, serverName := range serverList {
801		serverNameStr, ok := serverName.(string)
802		if !ok {
803			return nil, fmt.Errorf("Server name is of wrong type. connStr=%v, bucketName=%v, serverName=%v\n", connStr, bucketName, serverName)
804		}
805		servers[index] = serverNameStr
806	}
807
808	// get vbucket "map"
809	vbucketMapObj, ok := vbucketServerMap[base.VBucketMapKey]
810	if !ok {
811		return nil, fmt.Errorf("Error getting vbucket map from vbucket server map. connStr=%v, bucketName=%v, vbucketServerMap=%v\n", connStr, bucketName, vbucketServerMap)
812	}
813	vbucketMap, ok := vbucketMapObj.([]interface{})
814	if !ok {
815		return nil, fmt.Errorf("Vbucket map is of wrong type. connStr=%v, bucketName=%v, vbucketMap=%v\n", connStr, bucketName, vbucketMapObj)
816	}
817
818	serverVBMap := make(map[string][]uint16)
819
820	for vbno, indexListObj := range vbucketMap {
821		indexList, ok := indexListObj.([]interface{})
822		if !ok {
823			return nil, fmt.Errorf("Index list is of wrong type. connStr=%v, bucketName=%v, indexList=%v\n", connStr, bucketName, indexListObj)
824		}
825		if len(indexList) == 0 {
826			return nil, fmt.Errorf("Index list is empty. connStr=%v, bucketName=%v, vbno=%v\n", connStr, bucketName, vbno)
827		}
828		indexFloat, ok := indexList[0].(float64)
829		if !ok {
830			return nil, fmt.Errorf("Master index is of wrong type. connStr=%v, bucketName=%v, index=%v\n", connStr, bucketName, indexList[0])
831		}
832		indexInt := int(indexFloat)
833		if indexInt >= len(servers) {
834			return nil, fmt.Errorf("Master index is out of range. connStr=%v, bucketName=%v, index=%v\n", connStr, bucketName, indexInt)
835		} else if indexInt < 0 {
836			// During rebalancing or topology changes, it's possible ns_server may return a -1 for index. Callers should treat it as an transient error.
837			return nil, fmt.Errorf(fmt.Sprintf("%v connStr=%v, bucketName=%v, index=%v\n", base.ErrorMasterNegativeIndex, connStr, bucketName, indexInt))
838		}
839
840		server := servers[indexInt]
841		var vbList []uint16
842		vbList, ok = serverVBMap[server]
843		if !ok {
844			vbList = make([]uint16, 0)
845		}
846		vbList = append(vbList, uint16(vbno))
847		serverVBMap[server] = vbList
848	}
849	return serverVBMap, nil
850}
851
852func (u *Utilities) GetRemoteServerVBucketsMap(connStr, bucketName string, bucketInfo map[string]interface{}, useExternal bool) (kvVbMap map[string][]uint16, err error) {
853	kvVbMap, err = u.GetServerVBucketsMap(connStr, bucketName, bucketInfo)
854	if err != nil {
855		return
856	}
857	if useExternal {
858		u.TranslateKvVbMap(kvVbMap, bucketInfo)
859	}
860	return
861}
862
863// get bucket type setting from bucket info
864func (u *Utilities) GetBucketTypeFromBucketInfo(bucketName string, bucketInfo map[string]interface{}) (string, error) {
865	bucketType := ""
866	bucketTypeObj, ok := bucketInfo[base.BucketTypeKey]
867	if !ok {
868		return "", fmt.Errorf("Error looking up bucket type of bucket %v", bucketName)
869	} else {
870		bucketType, ok = bucketTypeObj.(string)
871		if !ok {
872			return "", fmt.Errorf("bucketType on bucket %v is of wrong type.", bucketName)
873		}
874	}
875	return bucketType, nil
876}
877
878// check if a bucket belongs to an elastic search (es) cluster by looking for "authType" field in bucket info.
879// if not found, cluster is es
880func (u *Utilities) CheckWhetherClusterIsESBasedOnBucketInfo(bucketInfo map[string]interface{}) bool {
881	_, ok := bucketInfo[base.AuthTypeKey]
882	return !ok
883}
884
885// get conflict resolution type setting from bucket info
886func (u *Utilities) GetConflictResolutionTypeFromBucketInfo(bucketName string, bucketInfo map[string]interface{}) (string, error) {
887	conflictResolutionType := base.ConflictResolutionType_Seqno
888	conflictResolutionTypeObj, ok := bucketInfo[base.ConflictResolutionTypeKey]
889	if ok {
890		conflictResolutionType, ok = conflictResolutionTypeObj.(string)
891		if !ok {
892			return "", fmt.Errorf("ConflictResolutionType on bucket %v is of wrong type.", bucketName)
893		}
894	}
895	return conflictResolutionType, nil
896}
897
898// get EvictionPolicy setting from bucket info
899func (u *Utilities) GetEvictionPolicyFromBucketInfo(bucketName string, bucketInfo map[string]interface{}) (string, error) {
900	evictionPolicy := ""
901	evictionPolicyObj, ok := bucketInfo[base.EvictionPolicyKey]
902	if ok {
903		evictionPolicy, ok = evictionPolicyObj.(string)
904		if !ok {
905			return "", fmt.Errorf("EvictionPolicy on bucket %v is of wrong type.", bucketName)
906		}
907	}
908	return evictionPolicy, nil
909}
910
911/**
912 * The second section is couchbase REST related utility functions
913 */
914// This method is used to get the SSL port for target nodes - will use alternate fields if requested
915func (u *Utilities) GetMemcachedSSLPortMap(connStr, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool,
916	clientCertificate []byte, clientKey []byte, bucket string, logger *log.CommonLogger, useExternal bool) (base.SSLPortMap, error) {
917	ret := make(base.SSLPortMap)
918
919	logger.Infof("GetMemcachedSSLPort, connStr=%v\n", connStr)
920	bucketInfo, err := u.GetClusterInfo(connStr, base.BPath+bucket, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, logger)
921	if err != nil {
922		return nil, err
923	}
924
925	nodesExt, ok := bucketInfo[base.NodeExtKey]
926	if !ok {
927		return nil, u.BucketInfoParseError(bucketInfo, logger)
928	}
929
930	nodesExtArray, ok := nodesExt.([]interface{})
931	if !ok {
932		return nil, u.BucketInfoParseError(bucketInfo, logger)
933	}
934
935	var hostName string
936	for _, nodeExt := range nodesExtArray {
937		var portNumberToUse uint16
938		nodeExtMap, ok := nodeExt.(map[string]interface{})
939		if !ok {
940			return nil, u.BucketInfoParseError(bucketInfo, logger)
941		}
942
943		// note that this is the only place where nodeExtMap contains a hostname without port
944		// instead of a host address with port
945		hostName, err = u.getHostNameWithoutPortFromNodeInfo(connStr, nodeExtMap, logger)
946		if err != nil {
947			return nil, u.BucketInfoParseError(bucketInfo, logger)
948		}
949
950		// Internal key
951		service, ok := nodeExtMap[base.ServicesKey]
952		if !ok {
953			return nil, u.BucketInfoParseError(bucketInfo, logger)
954		}
955
956		services_map, ok := service.(map[string]interface{})
957		if !ok {
958			return nil, u.BucketInfoParseError(bucketInfo, logger)
959		}
960
961		kv_port, ok := services_map[base.KVPortKey]
962		if !ok {
963			// the node may not have kv services. skip the node
964			continue
965		}
966		kvPortFloat, ok := kv_port.(float64)
967		if !ok {
968			return nil, u.BucketInfoParseError(bucketInfo, logger)
969		}
970
971		hostAddr := base.GetHostAddr(hostName, uint16(kvPortFloat))
972
973		kv_ssl_port, ok := services_map[base.KVSSLPortKey]
974		if !ok {
975			return nil, u.BucketInfoParseError(bucketInfo, logger)
976		}
977
978		kvSSLPortFloat, ok := kv_ssl_port.(float64)
979		if !ok {
980			return nil, u.BucketInfoParseError(bucketInfo, logger)
981		}
982		portNumberToUse = uint16(kvSSLPortFloat)
983
984		// Since this is a call intended for targets, get the external info if requested
985		if useExternal {
986			externalHostAddr, externalKVPort, externalKVPortErr, externalSSLPort, externalSSLPortErr := u.GetExternalAddressAndKvPortsFromNodeInfo(nodeExtMap)
987			if len(externalHostAddr) == 0 {
988				return nil, base.ErrorTargetNoAltHostName
989			}
990			if externalKVPortErr == nil {
991				// External address and port both exist
992				hostAddr = base.GetHostAddr(externalHostAddr, uint16(externalKVPort))
993			} else if externalKVPortErr == base.ErrorNoPortNumber {
994				// External address exists, but port does not. Use internal host's port number
995				hostAddr = base.GetHostAddr(externalHostAddr, uint16(kvPortFloat))
996			}
997			if externalSSLPortErr == nil {
998				portNumberToUse = uint16(externalSSLPort)
999			}
1000		}
1001
1002		ret[hostAddr] = portNumberToUse
1003	}
1004	logger.Infof("memcached ssl port map=%v\n", ret)
1005
1006	return ret, nil
1007}
1008
1009func (u *Utilities) BucketInfoParseError(bucketInfo map[string]interface{}, logger *log.CommonLogger) error {
1010	errMsg := "Error parsing memcached ssl port of remote cluster."
1011	detailedErrMsg := errMsg + fmt.Sprintf("bucketInfo=%v", bucketInfo)
1012	logger.Errorf(detailedErrMsg)
1013	return fmt.Errorf(errMsg)
1014}
1015
1016// The input is a non-https address, potentially with or without a port
1017// Returns 2 pairs of strings:
1018// 1. Hostname:internalSSLPort
1019// 2. Hostname:externalSSLPort (or "" if no external port)
1020func (u *Utilities) HttpsRemoteHostAddr(hostAddr string, logger *log.CommonLogger) (string, string, error) {
1021	// Extract hostname to be combined with SSL port
1022	hostName := base.GetHostName(hostAddr)
1023
1024	internalSSLPort, internalErr, externalSSLPort, externalErr := u.GetRemoteSSLPorts(hostAddr, logger)
1025	if internalErr != nil {
1026		return "", "", internalErr
1027	}
1028
1029	internalHostPort := base.GetHostAddr(hostName, internalSSLPort)
1030	var externalHostPort string
1031	if externalErr == nil {
1032		externalHostPort = base.GetHostAddr(hostName, externalSSLPort)
1033	}
1034
1035	return internalHostPort, externalHostPort, nil
1036}
1037
1038func (u *Utilities) GetRemoteSSLPorts(hostAddr string, logger *log.CommonLogger) (internalSSLPort uint16, internalSSLErr error, externalSSLPort uint16, externalSSLErr error) {
1039	externalSSLErr = base.ErrorNoPortNumber
1040
1041	portInfo := make(map[string]interface{})
1042	err, statusCode := u.QueryRestApiWithAuth(hostAddr, base.SSLPortsPath, false, "", "", base.HttpAuthMechPlain, nil, false, nil, nil, base.MethodGet, "", nil, 0, &portInfo, nil, false, logger)
1043	if err == nil && statusCode == http.StatusUnauthorized {
1044		// SSLPorts request normally do not require any user credentials
1045		// the only place unauthorized error could be returned is when target is elasticsearch cluster
1046		// treat this case differently so that a more specific error message can be returned to user
1047		internalSSLErr = base.ErrorUnauthorized
1048		return
1049	}
1050	if err != nil || statusCode != http.StatusOK {
1051		internalSSLErr = fmt.Errorf("Failed on calling %v on host %v, err=%v, statusCode=%v", base.SSLPortsPath, hostAddr, err, statusCode)
1052		return
1053	}
1054
1055	sslPort, ok := portInfo[base.SSLPortKey]
1056	if !ok {
1057		errMsg := "Failed to parse port info. ssl port is missing."
1058		logger.Errorf("%v. portInfo=%v", errMsg, portInfo)
1059		internalSSLErr = fmt.Errorf(errMsg)
1060		return
1061	}
1062
1063	sslPortFloat, ok := sslPort.(float64)
1064	if !ok {
1065		internalSSLErr = fmt.Errorf("ssl port is of wrong type. Expected type: float64; Actual type: %s", reflect.TypeOf(sslPort))
1066		return
1067	}
1068	internalSSLPort = uint16(sslPortFloat)
1069
1070	portNumber, externalSSLErr := u.getExternalSSLMgtPort(portInfo)
1071	if externalSSLErr == nil {
1072		externalSSLPort = (uint16)(portNumber)
1073	}
1074	return
1075}
1076
1077func (u *Utilities) GetClusterInfoWStatusCode(hostAddr, path, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte, logger *log.CommonLogger) (map[string]interface{}, error, int) {
1078	clusterInfo := make(map[string]interface{})
1079	err, statusCode := u.QueryRestApiWithAuth(hostAddr, path, false, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, base.MethodGet, "", nil, 0, &clusterInfo, nil, false, logger)
1080	if err != nil || statusCode != http.StatusOK {
1081		return nil, fmt.Errorf("Failed on calling host=%v, path=%v, err=%v, statusCode=%v", hostAddr, path, err, statusCode), statusCode
1082	}
1083	return clusterInfo, nil, statusCode
1084}
1085
1086func (u *Utilities) GetClusterInfo(hostAddr, path, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte, logger *log.CommonLogger) (map[string]interface{}, error) {
1087	clusterInfo, err, _ := u.GetClusterInfoWStatusCode(hostAddr, path, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, logger)
1088	return clusterInfo, err
1089}
1090
1091func (u *Utilities) GetClusterUUID(hostAddr, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte, logger *log.CommonLogger) (string, error) {
1092	clusterInfo, err := u.GetClusterInfo(hostAddr, base.PoolsPath, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, logger)
1093	if err != nil {
1094		return "", err
1095	}
1096	clusterUUIDObj, ok := clusterInfo[base.UUIDKey]
1097	if !ok {
1098		return "", fmt.Errorf("Cannot find uuid key in cluster info. hostAddr=%v, clusterInfo=%v\n", hostAddr, clusterInfo)
1099	}
1100	clusterUUID, ok := clusterUUIDObj.(string)
1101	if !ok {
1102		// cluster uuid is "[]" for unintialized cluster
1103		_, ok = clusterUUIDObj.([]interface{})
1104		if ok {
1105			return "", fmt.Errorf("cluster %v is not initialized. clusterUUIDObj=%v\n", hostAddr, clusterUUIDObj)
1106		} else {
1107			return "", fmt.Errorf("uuid key in cluster info is not of string type. hostAddr=%v, clusterUUIDObj=%v\n", hostAddr, clusterUUIDObj)
1108		}
1109	}
1110	return clusterUUID, nil
1111}
1112
1113// get a list of node infos with full info
1114// this api calls xxx/pools/nodes, which returns full node info including clustercompatibility, etc.
1115// the catch is that this xxx/pools/nodes is not supported by elastic search cluster
1116func (u *Utilities) GetNodeListWithFullInfo(hostAddr, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte, logger *log.CommonLogger) ([]interface{}, error) {
1117	clusterInfo, err := u.GetClusterInfo(hostAddr, base.NodesPath, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, logger)
1118	if err != nil {
1119		return nil, err
1120	}
1121
1122	return u.GetNodeListFromInfoMap(clusterInfo, logger)
1123
1124}
1125
1126// get a list of node infos with minimum info
1127// this api calls xxx/pools/default, which returns a subset of node info such as hostname
1128// this api can/needs to be used when connecting to elastic search cluster, which supports xxx/pools/default
1129func (u *Utilities) GetNodeListWithMinInfo(hostAddr, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte, logger *log.CommonLogger) ([]interface{}, error) {
1130	clusterInfo, err := u.GetClusterInfo(hostAddr, base.DefaultPoolPath, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, logger)
1131	if err != nil {
1132		return nil, err
1133	}
1134
1135	return u.GetNodeListFromInfoMap(clusterInfo, logger)
1136
1137}
1138
1139func (u *Utilities) GetClusterUUIDAndNodeListWithMinInfo(hostAddr, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte, logger *log.CommonLogger) (string, []interface{}, error) {
1140	defaultPoolInfo, err := u.GetClusterInfo(hostAddr, base.DefaultPoolPath, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, logger)
1141	if err != nil {
1142		return "", nil, err
1143	}
1144
1145	return u.GetClusterUUIDAndNodeListWithMinInfoFromDefaultPoolInfo(defaultPoolInfo, logger)
1146
1147}
1148
1149func (u *Utilities) GetClusterUUIDAndNodeListWithMinInfoFromDefaultPoolInfo(defaultPoolInfo map[string]interface{}, logger *log.CommonLogger) (string, []interface{}, error) {
1150	clusterUUID, err := u.GetClusterUUIDFromDefaultPoolInfo(defaultPoolInfo, logger)
1151	if err != nil {
1152		return "", nil, err
1153	}
1154
1155	nodeList, err := u.GetNodeListFromInfoMap(defaultPoolInfo, logger)
1156
1157	return clusterUUID, nodeList, err
1158
1159}
1160
1161// get bucket info
1162// a specialized case of GetClusterInfo
1163func (u *Utilities) GetBucketInfo(hostAddr, bucketName, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte, logger *log.CommonLogger) (map[string]interface{}, error) {
1164	bucketInfo := make(map[string]interface{})
1165	err, statusCode := u.QueryRestApiWithAuth(hostAddr, base.DefaultPoolBucketsPath+bucketName, false, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, base.MethodGet, "", nil, 0, &bucketInfo, nil, false, logger)
1166	if err == nil && statusCode == http.StatusOK {
1167		return bucketInfo, nil
1168	}
1169	if statusCode == http.StatusNotFound {
1170		return nil, u.GetNonExistentBucketError()
1171	} else {
1172		logger.Errorf("Failed to get bucket info for bucket '%v'. host=%v, err=%v, statusCode=%v", bucketName, hostAddr, err, statusCode)
1173		return nil, fmt.Errorf("Failed to get bucket info.")
1174	}
1175}
1176
1177// get bucket uuid
1178func (u *Utilities) BucketUUID(hostAddr, bucketName, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte, logger *log.CommonLogger) (string, error) {
1179	bucketInfo, err := u.GetBucketInfo(hostAddr, bucketName, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, logger)
1180	if err != nil {
1181		return "", err
1182	}
1183
1184	return u.GetBucketUuidFromBucketInfo(bucketName, bucketInfo, logger)
1185}
1186
1187// get bucket password
1188func (u *Utilities) BucketPassword(hostAddr, bucketName, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte, logger *log.CommonLogger) (string, error) {
1189	bucketInfo, err := u.GetBucketInfo(hostAddr, bucketName, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, logger)
1190	if err != nil {
1191		return "", err
1192	}
1193
1194	return u.GetBucketPasswordFromBucketInfo(bucketName, bucketInfo, logger)
1195}
1196
1197func (u *Utilities) GetLocalBuckets(hostAddr string, logger *log.CommonLogger) (map[string]string, error) {
1198	return u.GetBuckets(hostAddr, "", "", base.HttpAuthMechPlain, nil, false, nil, nil, logger)
1199}
1200
1201// return a map of buckets
1202// key = bucketName, value = bucketUUID
1203func (u *Utilities) GetBuckets(hostAddr, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte, logger *log.CommonLogger) (map[string]string, error) {
1204	bucketListInfo := make([]interface{}, 0)
1205	err, statusCode := u.QueryRestApiWithAuth(hostAddr, base.DefaultPoolBucketsPath, false, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, base.MethodGet, "", nil, 0, &bucketListInfo, nil, false, logger)
1206	if err != nil || statusCode != http.StatusOK {
1207		return nil, fmt.Errorf("Failed on calling host=%v, path=%v, err=%v, statusCode=%v", hostAddr, base.DefaultPoolBucketsPath, err, statusCode)
1208	}
1209
1210	return u.GetBucketsFromInfoMap(bucketListInfo, logger)
1211}
1212
1213func (u *Utilities) GetBucketsFromInfoMap(bucketListInfo []interface{}, logger *log.CommonLogger) (map[string]string, error) {
1214	buckets := make(map[string]string)
1215	for _, bucketInfo := range bucketListInfo {
1216		bucketInfoMap, ok := bucketInfo.(map[string]interface{})
1217		if !ok {
1218			errMsg := fmt.Sprintf("bucket info is not of map type.  bucket info=%v", bucketInfo)
1219			logger.Error(errMsg)
1220			return nil, errors.New(errMsg)
1221		}
1222		bucketNameInfo, ok := bucketInfoMap[base.BucketNameKey]
1223		if !ok {
1224			errMsg := fmt.Sprintf("bucket info does not contain bucket name.  bucket info=%v", bucketInfoMap)
1225			logger.Error(errMsg)
1226			return nil, errors.New(errMsg)
1227		}
1228		bucketName, ok := bucketNameInfo.(string)
1229		if !ok {
1230			errMsg := fmt.Sprintf("bucket name is not of string type.  bucket name=%v", bucketNameInfo)
1231			logger.Error(errMsg)
1232			return nil, errors.New(errMsg)
1233		}
1234		bucketUUIDInfo, ok := bucketInfoMap[base.UUIDKey]
1235		if !ok {
1236			errMsg := fmt.Sprintf("bucket info does not contain bucket uuid.  bucket info=%v", bucketInfoMap)
1237			logger.Error(errMsg)
1238			return nil, errors.New(errMsg)
1239		}
1240		bucketUUID, ok := bucketUUIDInfo.(string)
1241		if !ok {
1242			errMsg := fmt.Sprintf("bucket uuid is not of string type.  bucket uuid=%v", bucketUUIDInfo)
1243			logger.Error(errMsg)
1244			return nil, errors.New(errMsg)
1245		}
1246		buckets[bucketName] = bucketUUID
1247	}
1248
1249	return buckets, nil
1250}
1251
1252func (u *Utilities) BucketValidationInfo(hostAddr, bucketName, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte,
1253	logger *log.CommonLogger) (bucketInfo map[string]interface{}, bucketType string, bucketUUID string, bucketConflictResolutionType string,
1254	bucketEvictionPolicy string, bucketKVVBMap map[string][]uint16, err error) {
1255
1256	return u.bucketValidationInfoInternal(hostAddr, bucketName, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, logger, false /*external*/)
1257}
1258
1259func (u *Utilities) RemoteBucketValidationInfo(hostAddr, bucketName, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte,
1260	logger *log.CommonLogger, useExternal bool) (bucketInfo map[string]interface{}, bucketType string, bucketUUID string, bucketConflictResolutionType string,
1261	bucketEvictionPolicy string, bucketKVVBMap map[string][]uint16, err error) {
1262
1263	return u.bucketValidationInfoInternal(hostAddr, bucketName, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, logger, useExternal)
1264}
1265
1266// get a number of fields in bucket for validation purpose
1267// 1. bucket type
1268// 2. bucket uuid
1269// 3. bucket conflict resolution type
1270// 4. bucket eviction policy
1271// 5. bucket server vb map
1272func (u *Utilities) bucketValidationInfoInternal(hostAddr, bucketName, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte,
1273	logger *log.CommonLogger, remote bool) (bucketInfo map[string]interface{}, bucketType string, bucketUUID string, bucketConflictResolutionType string,
1274	bucketEvictionPolicy string, bucketKVVBMap map[string][]uint16, err error) {
1275
1276	bucketValidationInfoOp := func() error {
1277		bucketInfo, err = u.GetBucketInfo(hostAddr, bucketName, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, logger)
1278		if err != nil {
1279			return err
1280		}
1281
1282		bucketType, err = u.GetBucketTypeFromBucketInfo(bucketName, bucketInfo)
1283		if err != nil {
1284			err = fmt.Errorf("Error retrieving BucketType setting on bucket %v. err=%v", bucketName, err)
1285			return err
1286		}
1287		bucketUUID, err = u.GetBucketUuidFromBucketInfo(bucketName, bucketInfo, logger)
1288		if err != nil {
1289			err = fmt.Errorf("Error retrieving UUID setting on bucket %v. err=%v", bucketName, err)
1290			return err
1291		}
1292		bucketConflictResolutionType, err = u.GetConflictResolutionTypeFromBucketInfo(bucketName, bucketInfo)
1293		if err != nil {
1294			err = fmt.Errorf("Error retrieving ConflictResolutionType setting on bucket %v. err=%v", bucketName, err)
1295			return err
1296		}
1297		bucketEvictionPolicy, err = u.GetEvictionPolicyFromBucketInfo(bucketName, bucketInfo)
1298		if err != nil {
1299			err = fmt.Errorf("Error retrieving EvictionPolicy setting on bucket %v. err=%v", bucketName, err)
1300			return err
1301		}
1302		bucketKVVBMap, err = u.GetServerVBucketsMap(hostAddr, bucketName, bucketInfo)
1303		if err != nil {
1304			err = fmt.Errorf("Error retrieving server vb map on bucket %v. err=%v", bucketName, err)
1305			return err
1306		}
1307
1308		if remote {
1309			u.TranslateKvVbMap(bucketKVVBMap, bucketInfo)
1310		}
1311		return nil
1312	}
1313
1314	err = u.ExponentialBackoffExecutor("BucketValidationInfo", base.BucketInfoOpWaitTime, base.BucketInfoOpMaxRetry, base.BucketInfoOpRetryFactor, bucketValidationInfoOp)
1315	return
1316}
1317
1318func (u *Utilities) GetBucketUuidFromBucketInfo(bucketName string, bucketInfo map[string]interface{}, logger *log.CommonLogger) (string, error) {
1319	bucketUUID := ""
1320	bucketUUIDObj, ok := bucketInfo[base.UUIDKey]
1321	if !ok {
1322		return "", fmt.Errorf("Error looking up uuid of bucket %v", bucketName)
1323	} else {
1324		bucketUUID, ok = bucketUUIDObj.(string)
1325		if !ok {
1326			return "", fmt.Errorf("Uuid of bucket %v is of wrong type", bucketName)
1327		}
1328	}
1329	return bucketUUID, nil
1330}
1331
1332func (u *Utilities) GetClusterUUIDFromDefaultPoolInfo(defaultPoolInfo map[string]interface{}, logger *log.CommonLogger) (string, error) {
1333	bucketsObj, ok := defaultPoolInfo[base.BucketsKey]
1334	if !ok {
1335		errMsg := fmt.Sprintf("Cannot find buckets key in default pool info. defaultPoolInfo=%v\n", defaultPoolInfo)
1336		logger.Error(errMsg)
1337		return "", errors.New(errMsg)
1338	}
1339	bucketsInfo, ok := bucketsObj.(map[string]interface{})
1340	if !ok {
1341		errMsg := fmt.Sprintf("buckets in default pool info is not of map type. buckets=%v\n", bucketsObj)
1342		logger.Error(errMsg)
1343		return "", errors.New(errMsg)
1344	}
1345	uriObj, ok := bucketsInfo[base.URIKey]
1346	if !ok {
1347		errMsg := fmt.Sprintf("Cannot find uri key in buckets info. bucketsInfo=%v\n", bucketsInfo)
1348		logger.Error(errMsg)
1349		return "", errors.New(errMsg)
1350	}
1351	uri, ok := uriObj.(string)
1352	if !ok {
1353		errMsg := fmt.Sprintf("uri in buckets info is not of string type. uri=%v\n", uriObj)
1354		logger.Error(errMsg)
1355		return "", errors.New(errMsg)
1356	}
1357
1358	return u.GetClusterUUIDFromURI(uri)
1359}
1360
1361func (u *Utilities) GetClusterUUIDFromURI(uri string) (string, error) {
1362	// uri is in the form of /pools/default/buckets?uuid=d5dea23aa7ee3771becb3fcdb46ff956
1363	searchKey := base.UUIDKey + "="
1364	index := strings.LastIndex(uri, searchKey)
1365	if index < 0 {
1366		return "", fmt.Errorf("uri does not contain uuid. uri=%v", uri)
1367	}
1368	return uri[index+len(searchKey):], nil
1369}
1370
1371func (u *Utilities) GetClusterCompatibilityFromBucketInfo(bucketInfo map[string]interface{}, logger *log.CommonLogger) (int, error) {
1372	nodeList, err := u.GetNodeListFromInfoMap(bucketInfo, logger)
1373	if err != nil {
1374		return 0, err
1375	}
1376
1377	clusterCompatibility, err := u.GetClusterCompatibilityFromNodeList(nodeList)
1378	if err != nil {
1379		logger.Error(err.Error())
1380		return 0, err
1381	}
1382
1383	return clusterCompatibility, nil
1384}
1385
1386func (u *Utilities) GetBucketPasswordFromBucketInfo(bucketName string, bucketInfo map[string]interface{}, logger *log.CommonLogger) (string, error) {
1387	bucketPassword := ""
1388	bucketPasswordObj, ok := bucketInfo[base.SASLPasswordKey]
1389	if !ok {
1390		return "", fmt.Errorf("Error looking up password of bucket %v", bucketName)
1391	} else {
1392		bucketPassword, ok = bucketPasswordObj.(string)
1393		if !ok {
1394			return "", fmt.Errorf("Password of bucket %v is of wrong type", bucketName)
1395		}
1396	}
1397	return bucketPassword, nil
1398}
1399
1400func (u *Utilities) GetNodeListFromInfoMap(infoMap map[string]interface{}, logger *log.CommonLogger) ([]interface{}, error) {
1401	// get node list from the map
1402	nodes, ok := infoMap[base.NodesKey]
1403	if !ok {
1404		errMsg := fmt.Sprintf("info map contains no nodes. info map=%v", infoMap)
1405		logger.Error(errMsg)
1406		return nil, errors.New(errMsg)
1407	}
1408
1409	nodeList, ok := nodes.([]interface{})
1410	if !ok {
1411		errMsg := fmt.Sprintf("nodes is not of list type. type of nodes=%v", reflect.TypeOf(nodes))
1412		logger.Error(errMsg)
1413		return nil, errors.New(errMsg)
1414	}
1415
1416	// only return the nodes that are active
1417	activeNodeList := make([]interface{}, 0)
1418	for _, node := range nodeList {
1419		nodeInfoMap, ok := node.(map[string]interface{})
1420		if !ok {
1421			errMsg := fmt.Sprintf("node info is not of map type. type=%v", reflect.TypeOf(node))
1422			logger.Error(errMsg)
1423			return nil, errors.New(errMsg)
1424		}
1425		clusterMembershipObj, ok := nodeInfoMap[base.ClusterMembershipKey]
1426		if !ok {
1427			// this could happen when target is elastic search cluster (or maybe very old couchbase cluster?)
1428			// consider the node to be "active" to be safe
1429			errMsg := fmt.Sprintf("node info map does not contain cluster membership. node info map=%v ", nodeInfoMap)
1430			logger.Debug(errMsg)
1431			activeNodeList = append(activeNodeList, node)
1432			continue
1433		}
1434		clusterMembership, ok := clusterMembershipObj.(string)
1435		if !ok {
1436			// play safe and return the node as active
1437			errMsg := fmt.Sprintf("cluster membership is not string type. type=%v ", reflect.TypeOf(clusterMembershipObj))
1438			logger.Warn(errMsg)
1439			activeNodeList = append(activeNodeList, node)
1440			continue
1441		}
1442		if clusterMembership == "" || clusterMembership == base.ClusterMembership_Active {
1443			activeNodeList = append(activeNodeList, node)
1444		}
1445	}
1446
1447	return activeNodeList, nil
1448}
1449
1450func (u *Utilities) GetClusterCompatibilityFromNodeList(nodeList []interface{}) (int, error) {
1451	if len(nodeList) > 0 {
1452		firstNode, ok := nodeList[0].(map[string]interface{})
1453		if !ok {
1454			return 0, fmt.Errorf("node info is of wrong type. node info=%v", nodeList[0])
1455		}
1456		clusterCompatibility, ok := firstNode[base.ClusterCompatibilityKey]
1457		if !ok {
1458			return 0, fmt.Errorf("Can't get cluster compatibility info. node info=%v\n If replicating to ElasticSearch node, use XDCR v1.", nodeList[0])
1459		}
1460		clusterCompatibilityFloat, ok := clusterCompatibility.(float64)
1461		if !ok {
1462			return 0, fmt.Errorf("cluster compatibility is not of int type. type=%v", reflect.TypeOf(clusterCompatibility))
1463		}
1464		return int(clusterCompatibilityFloat), nil
1465	}
1466
1467	return 0, fmt.Errorf("node list is empty")
1468}
1469
1470// Used externally only - returns a list of nodes for management access
1471// if needHttps is true, returns both http addresses and https addresses
1472// if needHttps is false, returns http addresses and empty https addresses
1473func (u *Utilities) GetRemoteNodeAddressesListFromNodeList(nodeList []interface{}, connStr string, needHttps bool, logger *log.CommonLogger, useExternal bool) (base.StringPairList, error) {
1474	nodeAddressesList := make(base.StringPairList, len(nodeList))
1475	var hostAddr string
1476	var hostHttpsAddr string
1477	var err error
1478	index := 0
1479
1480	for _, node := range nodeList {
1481		nodeInfoMap, ok := node.(map[string]interface{})
1482		if !ok {
1483			errMsg := fmt.Sprintf("node info is not of map type. type of node info=%v", reflect.TypeOf(node))
1484			logger.Error(errMsg)
1485			return nil, errors.New(errMsg)
1486		}
1487
1488		hostAddr, err = u.GetHostAddrFromNodeInfo(connStr, nodeInfoMap, false /*isHttps*/, logger, useExternal)
1489		if err != nil {
1490			errMsg := fmt.Sprintf("cannot get hostname from node info %v", nodeInfoMap)
1491			logger.Error(errMsg)
1492			return nil, errors.New(errMsg)
1493		}
1494
1495		if needHttps {
1496			hostHttpsAddr, err = u.GetHostAddrFromNodeInfo(connStr, nodeInfoMap, true /*isHttps*/, logger, useExternal)
1497			if err != nil {
1498				errMsg := fmt.Sprintf("cannot get https hostname from node info %v", nodeInfoMap)
1499				logger.Error(errMsg)
1500				return nil, errors.New(errMsg)
1501			}
1502		} else {
1503			hostHttpsAddr = ""
1504		}
1505
1506		nodeAddressesList[index] = base.StringPair{hostAddr, hostHttpsAddr}
1507		index++
1508	}
1509	return nodeAddressesList, nil
1510}
1511
1512func (u *Utilities) GetHttpsMgtPortFromNodeInfo(nodeInfo map[string]interface{}) (int, error) {
1513	portsObjRaw, portsObjExists := nodeInfo[base.PortsKey]
1514	if !portsObjExists {
1515		return -1, base.ErrorNoPortNumber
1516	}
1517
1518	portsObj, ok := portsObjRaw.(map[string]interface{})
1519	if !ok {
1520		return -1, base.ErrorNoPortNumber
1521	}
1522
1523	sslPort, ok := portsObj[base.SSLPortKey]
1524	if !ok {
1525		return -1, base.ErrorNoPortNumber
1526	}
1527
1528	sslPortFloat, ok := sslPort.(float64)
1529	if !ok {
1530		return -1, base.ErrorNoPortNumber
1531	}
1532	return int(sslPortFloat), nil
1533}
1534
1535func (u *Utilities) GetHostAddrFromNodeInfo(connStr string, nodeInfo map[string]interface{}, isHttps bool, logger *log.CommonLogger, useExternal bool) (string, error) {
1536	// Internal node information
1537	hostAddr, err := u.getAdminHostAddrFromNodeInfo(connStr, nodeInfo, logger)
1538	if err != nil {
1539		errMsg := fmt.Sprintf("cannot get hostname from node info %v", nodeInfo)
1540		logger.Error(errMsg)
1541		return "", errors.New(errMsg)
1542	}
1543
1544	if isHttps {
1545		sslPort, err := u.GetHttpsMgtPortFromNodeInfo(nodeInfo)
1546		if err != nil {
1547			return "", err
1548		}
1549		hostName := base.GetHostName(hostAddr)
1550		hostAddr = base.GetHostAddr(hostName, uint16(sslPort))
1551	}
1552
1553	if useExternal {
1554		// If external info exists, replace accordingly - hostAddr is currently pointing to internalNode's info
1555		if externalAddr, externalMgtPort, externalErr := u.GetExternalMgtHostAndPort(nodeInfo, isHttps); externalErr == nil {
1556			hostAddr = base.GetHostAddr(externalAddr, (uint16)(externalMgtPort))
1557		} else if externalErr == base.ErrorNoPortNumber {
1558			// Extract original internal node management port from above
1559			hostPort, portErr := base.GetPortNumber(hostAddr)
1560			if portErr == nil {
1561				// Combine externalHost:internalPort
1562				hostAddr = base.GetHostAddr(externalAddr, (uint16)(hostPort))
1563			} else {
1564				// Original internal address did not have port number, so continue to just have externalAddr[:noPort]
1565				hostAddr = externalAddr
1566			}
1567		}
1568	}
1569
1570	return hostAddr, nil
1571}
1572
1573// Returns:
1574// 1. External IP
1575// 2. External kv port (if applicable, -1 if not found)
1576// 3. Returns nil if port exists - ErrorNoPortNumber if kv (direct) port doesn't exist
1577// 4. External KvSSL port (if applicable, -1 if not found)
1578// 5. Returns nil if SSL port exists - ErrorNoPortNumber if SSL port doesn't exist
1579// Any other errors are considered bad op
1580func (u *Utilities) GetExternalAddressAndKvPortsFromNodeInfo(nodeInfo map[string]interface{}) (string, int, error, int, error) {
1581	var hostAddr string
1582	var portNumber int
1583	var sslPortNumber int
1584	var portErr error
1585	var sslPortErr error
1586
1587	alternateObjRaw, alternateExists := nodeInfo[base.AlternateKey]
1588	if !alternateExists {
1589		return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist
1590	}
1591
1592	alternateObj, ok := (alternateObjRaw).(map[string]interface{})
1593	if !ok {
1594		u.logger_utils.Errorf("GetExternalAddressAndKvPortsFromNodeInfo: Unable to convert alternateObj to map[string]interface{}")
1595		return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist
1596	}
1597
1598	externalObjRaw, externalExists := alternateObj[base.ExternalKey]
1599	if !externalExists {
1600		return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist
1601	}
1602
1603	externalObj, ok := (externalObjRaw).(map[string]interface{})
1604	if !ok {
1605		u.logger_utils.Errorf("GetExternalAddressAndKvPortsFromNodeInfo: Unable to convert externalObj to map[string]interface{}")
1606		return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist
1607	}
1608
1609	hostAddrObjRaw, hostAddrObjExists := externalObj[base.HostNameKey]
1610	if !hostAddrObjExists {
1611		return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist
1612	}
1613
1614	hostAddr, ok = (hostAddrObjRaw).(string)
1615	if !ok {
1616		u.logger_utils.Errorf("GetExternalAddressAndKvPortsFromNodeInfo: Unable to convert hostAddrObj to string")
1617		return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist
1618	} else if len(hostAddr) == 0 {
1619		u.logger_utils.Errorf("GetExternalAddressAndKvPortsFromNodeInfo: Empty Hostname")
1620		return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist
1621	}
1622
1623	portsObjRaw, portsObjExists := externalObj[base.PortsKey]
1624	if !portsObjExists {
1625		return hostAddr, -1, base.ErrorNoPortNumber, -1, base.ErrorNoPortNumber
1626	}
1627
1628	portErr = base.ErrorNoPortNumber
1629	sslPortErr = base.ErrorNoPortNumber
1630	portNumber = -1
1631	sslPortNumber = -1
1632	portsObj, ok := portsObjRaw.(map[string]interface{})
1633	if !ok {
1634		u.logger_utils.Warnf("Unable to convert portsObj to map[string]interface{}")
1635	} else {
1636		// Get the External kv port (internally "direct") port if it's there
1637		// KV team wants clients to use nodeServices, which means that "direct" is used as an internal naming convention
1638		// The alternate address fields use "kv" as what "direct" means to traditional XDCR
1639		kvPortFloat, kvPortExists := portsObj[base.KVPortKey]
1640		if kvPortExists {
1641			kvPortIntCheck, ok := kvPortFloat.(float64)
1642			if ok {
1643				portNumber = (int)(kvPortIntCheck)
1644				portErr = nil
1645			}
1646		}
1647		// Get the SSL port if it is there
1648		sslPort, sslPortExists := portsObj[base.KVSSLPortKey]
1649		if sslPortExists {
1650			sslPortIntCheck, ok := sslPort.(float64)
1651			if ok {
1652				sslPortNumber = (int)(sslPortIntCheck)
1653				sslPortErr = nil
1654			}
1655		}
1656	}
1657	return hostAddr, portNumber, portErr, sslPortNumber, sslPortErr
1658}
1659
1660func (u *Utilities) GetExternalMgtHostAndPort(nodeInfo map[string]interface{}, isHttps bool) (string, int, error) {
1661	var hostAddr string
1662	var portErr error = base.ErrorNoPortNumber
1663	var portNumber int = -1
1664
1665	alternateObjRaw, alternateExists := nodeInfo[base.AlternateKey]
1666	if !alternateExists {
1667		return "", -1, base.ErrorResourceDoesNotExist
1668	}
1669
1670	alternateObj, ok := alternateObjRaw.(map[string]interface{})
1671	if !ok {
1672		u.logger_utils.Errorf("GetExternalMgtHostAndPort: unable to cast alternateObj to map[string]interface{}")
1673		fmt.Printf("GetExternalMgtHostAndPort: unable to cast alternateObj to map[string]interface{}\n")
1674		return "", -1, base.ErrorResourceDoesNotExist
1675	}
1676
1677	externalObjRaw, externalExists := alternateObj[base.ExternalKey]
1678	if !externalExists {
1679		fmt.Printf("externalObjRaw does not exist\n")
1680		return "", -1, base.ErrorResourceDoesNotExist
1681	}
1682
1683	externalObj, ok := externalObjRaw.(map[string]interface{})
1684	if !ok {
1685		u.logger_utils.Errorf("GetExternalMgtHostAndPort: unable to cast externalObj to map[string]interface{}")
1686		fmt.Printf("GetExternalMgtHostAndPort: unable to cast externalObj to map[string]interface{}\n")
1687		return "", -1, base.ErrorResourceDoesNotExist
1688	}
1689
1690	hostAddrObj, hostAddrObjExists := externalObj[base.HostNameKey]
1691	if !hostAddrObjExists {
1692		return "", -1, base.ErrorResourceDoesNotExist
1693	}
1694
1695	hostAddr, ok = hostAddrObj.(string)
1696	if !ok {
1697		u.logger_utils.Errorf("GetExternalMgtHostAndPort: unable to cast hostAddr to string")
1698		return "", -1, base.ErrorResourceDoesNotExist
1699	} else if len(hostAddr) == 0 {
1700		u.logger_utils.Errorf("GetExternalMgtHostAndPort: empty hostAddr")
1701		return "", -1, base.ErrorResourceDoesNotExist
1702	}
1703
1704	portsObjRaw, portsObjExists := externalObj[base.PortsKey]
1705	if !portsObjExists {
1706		return hostAddr, portNumber, portErr
1707	}
1708
1709	portsObj, ok := portsObjRaw.(map[string]interface{})
1710	if !ok {
1711		u.logger_utils.Errorf("GetExternalMgtHostAndPort: unable to cast portsObj to map[string]interface{}")
1712		return hostAddr, portNumber, portErr
1713	}
1714
1715	var portKey string
1716	if isHttps {
1717		portKey = base.SSLMgtPortKey
1718	} else {
1719		portKey = base.MgtPortKey
1720	}
1721
1722	mgmtObjRaw, mgmtObjExists := portsObj[portKey]
1723	if !mgmtObjExists {
1724		return hostAddr, portNumber, portErr
1725	}
1726
1727	mgmtObj, ok := mgmtObjRaw.(float64)
1728	if !ok {
1729		u.logger_utils.Errorf("GetExternalMgtHostAndPort: unable to cast mgmtObj to float64")
1730		return hostAddr, portNumber, portErr
1731	}
1732
1733	portNumber = (int)(mgmtObj)
1734	portErr = nil
1735	return hostAddr, portNumber, portErr
1736}
1737
1738// Returns remote node's SSL management port if it exists
1739func (u *Utilities) getExternalSSLMgtPort(nodeInfo map[string]interface{}) (int, error) {
1740	alternateObjRaw, alternateExists := nodeInfo[base.AlternateKey]
1741	if !alternateExists {
1742		return -1, base.ErrorResourceDoesNotExist
1743	}
1744
1745	alternateObj, ok := alternateObjRaw.(map[string]interface{})
1746	if !ok {
1747		u.logger_utils.Errorf("getExternalSSLMgtPort: unable to cast alternateObj to map[string]interface{}")
1748		return -1, base.ErrorResourceDoesNotExist
1749	}
1750
1751	externalObjRaw, externalExists := alternateObj[base.ExternalKey]
1752	if !externalExists {
1753		return -1, base.ErrorResourceDoesNotExist
1754	}
1755
1756	externalObj, ok := externalObjRaw.(map[string]interface{})
1757	if !ok {
1758		u.logger_utils.Errorf("getExternalSSLMgtPort: unable to cast externalObj to map[string]interface{}")
1759		return -1, base.ErrorResourceDoesNotExist
1760	}
1761
1762	portsObjRaw, portsObjExists := externalObj[base.PortsKey]
1763	if !portsObjExists {
1764		u.logger_utils.Warnf("Unable to convert portsObj to map[string]interface{}")
1765		return -1, base.ErrorNoPortNumber
1766	}
1767
1768	portsObj, ok := portsObjRaw.(map[string]interface{})
1769	if !ok {
1770		return -1, base.ErrorNoPortNumber
1771	}
1772
1773	mgmtSSLObjRaw, mgmtSSLExists := portsObj[base.SSLMgtPortKey]
1774	if !mgmtSSLExists {
1775		return -1, base.ErrorNoPortNumber
1776	}
1777
1778	mgmtSSLObj, ok := mgmtSSLObjRaw.(float64)
1779	if !ok {
1780		u.logger_utils.Warnf("Unable to convert portsObj to float64")
1781		return -1, base.ErrorNoPortNumber
1782	}
1783
1784	return (int)(mgmtSSLObj), nil
1785}
1786
1787// Returns:
1788// 1. External Hostname
1789// 2. capi port
1790// 3. capi port error
1791// 4. capi SSL port
1792// 5. capi SSL port error
1793func (u *Utilities) getExternalHostAndCapiPorts(nodeInfo map[string]interface{}) (string, int, error, int, error) {
1794	var hostAddr string
1795	var capiPort int = -1
1796	var capiSSLPort int = -1
1797	var capiPortErr error = base.ErrorNoPortNumber
1798	var capiSSLPortErr error = base.ErrorNoPortNumber
1799
1800	alternateObjRaw, alternateExists := nodeInfo[base.AlternateKey]
1801	if !alternateExists {
1802		return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist
1803	}
1804
1805	alternateObj, ok := (alternateObjRaw).(map[string]interface{})
1806	if !ok {
1807		u.logger_utils.Errorf("getExternalHostAndCapiPorts: Unable to convert alternateObj to map[string]interface{}")
1808		return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist
1809	}
1810
1811	externalObjRaw, externalExists := alternateObj[base.ExternalKey]
1812	if !externalExists {
1813		return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist
1814	}
1815
1816	externalObj, ok := (externalObjRaw).(map[string]interface{})
1817	if !ok {
1818		u.logger_utils.Errorf("getExternalHostAndCapiPorts: Unable to convert externalObj to map[string]interface{}")
1819		return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist
1820	}
1821
1822	hostAddrObjRaw, hostAddrObjExists := externalObj[base.HostNameKey]
1823	if !hostAddrObjExists {
1824		return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist
1825	}
1826
1827	hostAddr, ok = (hostAddrObjRaw).(string)
1828	if !ok {
1829		u.logger_utils.Errorf("getExternalHostAndCapiPorts: Unable to convert hostAddrObj to string")
1830		return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist
1831	} else if len(hostAddr) == 0 {
1832		u.logger_utils.Errorf("getExternalHostAndCapiPorts: Empty Hostname")
1833		return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist
1834	}
1835
1836	portsObjRaw, portsObjExists := externalObj[base.PortsKey]
1837	if !portsObjExists {
1838		return "", -1, base.ErrorNoPortNumber, -1, base.ErrorNoPortNumber
1839	}
1840
1841	portsObj, ok := portsObjRaw.(map[string]interface{})
1842	if !ok {
1843		u.logger_utils.Warnf("Unable to convert portsObj to map[string]interface{}")
1844	} else {
1845		capiPortRaw, capiPortExists := portsObj[base.CapiPortKey]
1846		if capiPortExists {
1847			portNumberFloat, ok := (capiPortRaw).(float64)
1848			if !ok {
1849				u.logger_utils.Warnf("Unable to convert capiPort to float64")
1850			} else {
1851				capiPort = (int)(portNumberFloat)
1852				capiPortErr = nil
1853			}
1854		}
1855		// Get the SSL port if it is there
1856		if sslPort, sslPortExists := portsObj[base.CapiSSLPortKey]; sslPortExists {
1857			sslPortNumberFloat, ok := sslPort.(float64)
1858			if !ok {
1859				u.logger_utils.Warnf("Unable to convert capiSSLPort to float64")
1860			} else {
1861				capiSSLPort = (int)(sslPortNumberFloat)
1862				capiSSLPortErr = nil
1863			}
1864		}
1865	}
1866	return hostAddr, capiPort, capiPortErr, capiSSLPort, capiSSLPortErr
1867}
1868
1869func (u *Utilities) getAdminHostAddrFromNodeInfo(adminHostAddr string, nodeInfo map[string]interface{}, logger *log.CommonLogger) (string, error) {
1870	hostAddr, err := u.getHostAddrFromNodeInfoInternal(adminHostAddr, nodeInfo, logger)
1871	if err == base.ErrorNoHostName {
1872		hostAddr = adminHostAddr
1873		err = nil
1874	}
1875	return hostAddr, err
1876}
1877
1878func (u *Utilities) getHostAddrFromNodeInfoInternal(adminHostAddr string, nodeInfo map[string]interface{}, logger *log.CommonLogger) (string, error) {
1879	var hostAddr string
1880	var ok bool
1881
1882	hostAddrObj, ok := nodeInfo[base.HostNameKey]
1883	if !ok {
1884		logger.Infof("hostname is missing from node info %v. Host name in remote cluster reference, %v, will be used.\n", nodeInfo, adminHostAddr)
1885		return "", base.ErrorNoHostName
1886	} else {
1887		hostAddr, ok = hostAddrObj.(string)
1888		if !ok {
1889			return "", fmt.Errorf("Error getting host address from target cluster %v. host name, %v, is of wrong type\n", adminHostAddr, hostAddrObj)
1890		}
1891	}
1892
1893	return hostAddr, nil
1894}
1895
1896// Note - the translated map should be in the k->v form of:
1897// internalNodeAddress:directPort -> externalNodeAddress:kvPort
1898func (u *Utilities) GetIntExtHostNameKVPortTranslationMap(mapContainingNodesKey map[string]interface{}) (map[string]string, error) {
1899	internalExternalNodesMap := make(map[string]string)
1900	var err error
1901	var directPort int
1902	var nodesList []interface{}
1903
1904	nodesList, err = u.GetNodeListFromInfoMap(mapContainingNodesKey, u.logger_utils)
1905	if err != nil {
1906		return internalExternalNodesMap, err
1907	}
1908
1909	for _, nodeInfoRaw := range nodesList {
1910		nodeInfo, ok := nodeInfoRaw.(map[string]interface{})
1911		if !ok {
1912			u.logger_utils.Warnf("GetIntExtHostNameKVPortTranslationMap unable to cast nodeInfo as map[string]interface{} from: %v", nodeInfoRaw)
1913			// skip this node
1914			continue
1915		}
1916
1917		internalAddressAndPortRaw, internalAddressOk := nodeInfo[base.HostNameKey]
1918		if !internalAddressOk {
1919			u.logger_utils.Warnf("GetIntExtHostNameKVPortTranslationMap unable to retrieve internal host name from %v", nodeInfo)
1920			// skip this node
1921			continue
1922		}
1923
1924		internalAddressAndPort, ok := (internalAddressAndPortRaw).(string)
1925		if !ok {
1926			u.logger_utils.Warnf("GetIntExtHostNameKVPortTranslationMap unable to cast internalAddressAndPort as string: %v", internalAddressAndPortRaw)
1927			// skip this node
1928			continue
1929		}
1930
1931		internalAddress := base.GetHostName(internalAddressAndPort)
1932		// Internally, we care about "direct" field
1933		portsObjRaw, portsExists := nodeInfo[base.PortsKey]
1934		if !portsExists {
1935			u.logger_utils.Warnf("Unable to get port for %v", internalAddress)
1936			// skip this node
1937			continue
1938		}
1939		portsObj, ok := portsObjRaw.(map[string]interface{})
1940		if !ok {
1941			u.logger_utils.Warnf("GetIntExtHostNameKVPortTranslationMap unable to cast portsObj as map[string]interface{} from: %v", portsObjRaw)
1942			// skip this node
1943			continue
1944		}
1945
1946		directPortIface, directPortExists := portsObj[base.DirectPortKey]
1947		if !directPortExists {
1948			u.logger_utils.Warnf("Unable to get direct port for %v", internalAddress)
1949			// skip this node
1950			continue
1951		}
1952		directPortFloat, ok := directPortIface.(float64)
1953		if !ok {
1954			u.logger_utils.Warnf("GetIntExtHostNameKVPortTranslationMap unable to cast directPort as float", directPortIface)
1955			// skip this node
1956			continue
1957		}
1958
1959		directPort = (int)(directPortFloat)
1960		internalAddressAndDirectPort := base.GetHostAddr(internalAddress, (uint16)(directPort))
1961
1962		externalAddress, externalDirectPort, externalErr, _, _ := u.GetExternalAddressAndKvPortsFromNodeInfo(nodeInfo)
1963		if len(externalAddress) > 0 {
1964			if externalErr == nil {
1965				// External address and port both exist
1966				internalExternalNodesMap[internalAddressAndDirectPort] = base.GetHostAddr(externalAddress, (uint16)(externalDirectPort))
1967			} else if externalErr == base.ErrorNoPortNumber {
1968				// External address exists, but port does not. Use internal host's port number
1969				internalExternalNodesMap[internalAddressAndDirectPort] = base.GetHostAddr(externalAddress, (uint16)(directPort))
1970			}
1971		}
1972	}
1973
1974	if len(internalExternalNodesMap) == 0 {
1975		err = base.ErrorResourceDoesNotExist
1976	}
1977	return internalExternalNodesMap, err
1978}
1979
1980func (u *Utilities) GetHostNameFromNodeInfo(adminHostAddr string, nodeInfo map[string]interface{}, logger *log.CommonLogger) (string, error) {
1981	hostAddr, err := u.getAdminHostAddrFromNodeInfo(adminHostAddr, nodeInfo, logger)
1982	if err != nil {
1983		return "", err
1984	}
1985	return base.GetHostName(hostAddr), nil
1986}
1987
1988// this method is called when nodeInfo came from the terse bucket call, pools/default/b/[bucketName]
1989// where hostname in nodeInfo is a host name without port rather than a host address with port
1990func (u *Utilities) getHostNameWithoutPortFromNodeInfo(adminHostAddr string, nodeInfo map[string]interface{}, logger *log.CommonLogger) (string, error) {
1991	hostName, err := u.getHostAddrFromNodeInfoInternal(adminHostAddr, nodeInfo, logger)
1992	if err == base.ErrorNoHostName {
1993		hostName = base.GetHostName(adminHostAddr)
1994		err = nil
1995	}
1996
1997	return hostName, err
1998}
1999
2000//convenient api for rest calls to local cluster
2001func (u *Utilities) QueryRestApi(baseURL string,
2002	path string,
2003	preservePathEncoding bool,
2004	httpCommand string,
2005	contentType string,
2006	body []byte,
2007	timeout time.Duration,
2008	out interface{},
2009	logger *log.CommonLogger) (error, int) {
2010	return u.QueryRestApiWithAuth(baseURL, path, preservePathEncoding, "", "", base.HttpAuthMechPlain, nil, false, nil, nil, httpCommand, contentType, body, timeout, out, nil, false, logger)
2011}
2012
2013func (u *Utilities) EnforcePrefix(prefix string, str string) string {
2014	var ret_str string = str
2015	if !strings.HasPrefix(str, prefix) {
2016		ret_str = prefix + str
2017	}
2018	return ret_str
2019}
2020
2021func (u *Utilities) RemovePrefix(prefix string, str string) string {
2022	ret_str := strings.Replace(str, prefix, "", 1)
2023	return ret_str
2024}
2025
2026//this expect the baseURL doesn't contain username and password
2027func (u *Utilities) QueryRestApiWithAuth(
2028	baseURL string,
2029	path string,
2030	preservePathEncoding bool,
2031	username string,
2032	password string,
2033	authMech base.HttpAuthMech,
2034	certificate []byte,
2035	san_in_certificate bool,
2036	clientCertificate []byte,
2037	clientKey []byte,
2038	httpCommand string,
2039	contentType string,
2040	body []byte,
2041	timeout time.Duration,
2042	out interface{},
2043	client *http.Client,
2044	keep_client_alive bool,
2045	logger *log.CommonLogger) (err error, statusCode int) {
2046	var http_client *http.Client
2047	if authMech != base.HttpAuthMechScramSha {
2048		var req *http.Request
2049		http_client, req, err = u.prepareForRestCall(baseURL, path, preservePathEncoding, username, password, authMech, certificate, san_in_certificate, clientCertificate, clientKey, httpCommand, contentType, body, client, logger)
2050		if err != nil {
2051			return
2052		}
2053		err, statusCode = u.doRestCall(req, timeout, out, http_client, logger)
2054	} else {
2055		err, statusCode, http_client = u.queryRestApiWithScramShaAuth(baseURL, path, preservePathEncoding, username, password, httpCommand, contentType, body, timeout, out, client, logger)
2056
2057	}
2058	u.cleanupAfterRestCall(keep_client_alive, err, statusCode, http_client, logger)
2059	return
2060}
2061
2062func (u *Utilities) queryRestApiWithScramShaAuth(
2063	baseURL string,
2064	path string,
2065	preservePathEncoding bool,
2066	username string,
2067	password string,
2068	httpCommand string,
2069	contentType string,
2070	body []byte,
2071	timeout time.Duration,
2072	out interface{},
2073	client *http.Client,
2074	logger *log.CommonLogger) (error, int, *http.Client) {
2075
2076	logger.Debugf("SCRAM-SHA authentication for user %v%v%v, baseURL=%v, path=%v\n", base.UdTagBegin, username, base.UdTagEnd, baseURL, path)
2077
2078	URL, err := u.constructURL(baseURL, path, preservePathEncoding, base.HttpAuthMechScramSha)
2079	if err != nil {
2080		return err, 0, nil
2081	}
2082
2083	req, err := scramsha.NewRequest(httpCommand,
2084		// URL.String() is adequate since scramSha is always called with preservePathEncoding set to false
2085		URL.String(),
2086		strings.NewReader(string(body)))
2087	if err != nil {
2088		return err, 0, nil
2089	}
2090
2091	if timeout == 0 {
2092		timeout = base.DefaultHttpTimeout
2093	}
2094	if client == nil {
2095		client = &http.Client{Timeout: timeout}
2096	} else {
2097		client.Timeout = timeout
2098	}
2099
2100	res, err := scramsha.DoScramSha(req, username, password, client)
2101	statusCode := 0
2102	if res != nil {
2103		statusCode = res.StatusCode
2104	}
2105	if err != nil {
2106		return fmt.Errorf("Received error when making SCRAM-SHA connection. baseURL=%v, path=%v, err=%v", baseURL, path, err), statusCode, client
2107	}
2108
2109	err = u.parseResponseBody(res, out, logger)
2110	return err, statusCode, client
2111
2112}
2113
2114func (u *Utilities) prepareForRestCall(baseURL string,
2115	path string,
2116	preservePathEncoding bool,
2117	username string,
2118	password string,
2119	authMech base.HttpAuthMech,
2120	certificate []byte,
2121	san_in_certificate bool,
2122	clientCertificate []byte,
2123	clientKey []byte,
2124	httpCommand string,
2125	contentType string,
2126	body []byte,
2127	client *http.Client,
2128	logger *log.CommonLogger) (*http.Client, *http.Request, error) {
2129	var l *log.CommonLogger = u.loggerForFunc(logger)
2130	var ret_client *http.Client = client
2131
2132	userAuthMode := base.UserAuthModeNone
2133
2134	if len(username) == 0 && len(clientCertificate) == 0 && path != base.SSLPortsPath {
2135		// username and clientCertificate can be both empty only when
2136		// 1. this is a local http call to the same node
2137		// or 2. this is a call to /nodes/self/xdcrSSLPorts on target to retrieve ssl port for subsequent https calls
2138		// treat case 1 separately, since we will need to set local user auth in http request
2139		userAuthMode = base.UserAuthModeLocal
2140	} else {
2141		// for http calls to remote target, set username and password in http request header if
2142		// 1. username has been provided
2143		// and 2. scram sha authentication is not used
2144		if len(username) != 0 && authMech != base.HttpAuthMechScramSha {
2145			userAuthMode = base.UserAuthModeBasic
2146		}
2147	}
2148
2149	req, host, err := u.ConstructHttpRequest(baseURL, path, preservePathEncoding, username, password, authMech, userAuthMode, httpCommand, contentType, body, l)
2150	if err != nil {
2151		return nil, nil, err
2152	}
2153
2154	if ret_client == nil {
2155		ret_client, err = u.GetHttpClient(username, authMech, certificate, san_in_certificate, clientCertificate, clientKey, host, l)
2156		if err != nil {
2157			l.Errorf("Failed to get client for request, err=%v, req=%v\n", err, req)
2158			return nil, nil, err
2159		}
2160	}
2161	return ret_client, req, nil
2162}
2163
2164func (u *Utilities) cleanupAfterRestCall(keep_client_alive bool, err error, statusCode int, client *http.Client, logger *log.CommonLogger) {
2165	if !keep_client_alive || u.IsSeriousNetError(err) || u.isFatalStatusCode(statusCode) {
2166		if client != nil && client.Transport != nil {
2167			transport, ok := client.Transport.(*http.Transport)
2168			if ok {
2169				if u.IsSeriousNetError(err) {
2170					logger.Debugf("Encountered %v, close all idle connections for this http client.\n", err)
2171				}
2172				transport.CloseIdleConnections()
2173			}
2174		}
2175	}
2176}
2177
2178func (u *Utilities) doRestCall(req *http.Request,
2179	timeout time.Duration,
2180	out interface{},
2181	client *http.Client,
2182	logger *log.CommonLogger) (error, int) {
2183	if timeout > 0 {
2184		client.Timeout = timeout
2185	} else if client.Timeout != base.DefaultHttpTimeout {
2186		client.Timeout = base.DefaultHttpTimeout
2187	}
2188
2189	res, err := client.Do(req)
2190	if err == nil && res != nil {
2191		err = u.parseResponseBody(res, out, logger)
2192		return err, res.StatusCode
2193	}
2194
2195	return err, 0
2196
2197}
2198
2199func (u *Utilities) parseResponseBody(res *http.Response, out interface{}, logger *log.CommonLogger) (err error) {
2200	var l *log.CommonLogger = u.loggerForFunc(logger)
2201	if res != nil && res.Body != nil {
2202		defer res.Body.Close()
2203		var bod []byte
2204		if res.ContentLength == 0 {
2205			// If res.Body is empty, json.Unmarshal on an empty Body will return the error "unexpected end of JSON input"
2206			// Return a more specific error here so upstream callers can handle it
2207			err = base.ErrorResourceDoesNotExist
2208			return
2209		}
2210		bod, err = ioutil.ReadAll(io.LimitReader(res.Body, res.ContentLength))
2211		if err != nil {
2212			l.Errorf("Failed to read response body, err=%v\n res=%v\n", err, res)
2213			return
2214		}
2215		if out != nil {
2216			err = json.Unmarshal(bod, out)
2217			if err != nil {
2218				l.Errorf("Failed to unmarshal the response as json, err=%v, bod=%v\n res=%v\n", err, string(bod), res)
2219				out = bod
2220				return
2221			}
2222		}
2223	}
2224	return
2225}
2226
2227//convenient api for rest calls to local cluster
2228func (u *Utilities) InvokeRestWithRetry(baseURL string,
2229	path string,
2230	preservePathEncoding bool,
2231	httpCommand string,
2232	contentType string,
2233	body []byte,
2234	timeout time.Duration,
2235	out interface{},
2236	client *http.Client,
2237	keep_client_alive bool,
2238	logger *log.CommonLogger, num_retry int) (error, int) {
2239	return u.InvokeRestWithRetryWithAuth(baseURL, path, preservePathEncoding, "", "", base.HttpAuthMechPlain, nil, false, nil, nil, true, httpCommand, contentType, body, timeout, out, client, keep_client_alive, logger, num_retry)
2240}
2241
2242func (u *Utilities) InvokeRestWithRetryWithAuth(baseURL string,
2243	path string,
2244	preservePathEncoding bool,
2245	username string,
2246	password string,
2247	authMech base.HttpAuthMech,
2248	certificate []byte,
2249	san_in_certificate bool,
2250	clientCertificate []byte,
2251	clientKey []byte,
2252	insecureSkipVerify bool,
2253	httpCommand string,
2254	contentType string,
2255	body []byte,
2256	timeout time.Duration,
2257	out interface{},
2258	client *http.Client,
2259	keep_client_alive bool,
2260	logger *log.CommonLogger, num_retry int) (err error, statusCode int) {
2261
2262	var http_client *http.Client = nil
2263	var req *http.Request = nil
2264	backoff_time := 500 * time.Millisecond
2265
2266	for i := 0; i < num_retry; i++ {
2267		if authMech != base.HttpAuthMechScramSha {
2268			http_client, req, err = u.prepareForRestCall(baseURL, path, preservePathEncoding, username, password, authMech, certificate, san_in_certificate, clientCertificate, clientKey, httpCommand, contentType, body, client, logger)
2269			if err == nil {
2270				err, statusCode = u.doRestCall(req, timeout, out, http_client, logger)
2271			}
2272
2273			if err == nil {
2274				break
2275			}
2276		} else {
2277			err, statusCode, http_client = u.queryRestApiWithScramShaAuth(baseURL, path, preservePathEncoding, username, password, httpCommand, contentType, body, timeout, out, client, logger)
2278			if err == nil {
2279				break
2280			}
2281		}
2282
2283		logger.Errorf("Received error when making rest call or unmarshalling data. baseURL=%v, path=%v, err=%v, statusCode=%v, num_retry=%v\n", baseURL, path, err, statusCode, i)
2284
2285		//cleanup the idle connection if the error is serious network error
2286		u.cleanupAfterRestCall(true /*keep_client_alive*/, err, statusCode, http_client, logger)
2287
2288		//backoff
2289		backoff_time = backoff_time + backoff_time
2290		time.Sleep(backoff_time)
2291	}
2292
2293	return
2294
2295}
2296
2297func (u *Utilities) GetHttpClient(username string, authMech base.HttpAuthMech, certificate []byte, san_in_certificate bool, clientCertificate, clientKey []byte, ssl_con_str string, logger *log.CommonLogger) (*http.Client, error) {
2298	var client *http.Client
2299	if authMech == base.HttpAuthMechHttps {
2300		caPool := x509.NewCertPool()
2301		ok := caPool.AppendCertsFromPEM(certificate)
2302		if !ok {
2303			return nil, base.InvalidCerfiticateError
2304		}
2305
2306		//using a separate tls connection to verify certificate
2307		//it can be changed in 1.4 when DialTLS is avaialbe in http.Transport
2308		conn, tlsConfig, err := base.MakeTLSConn(ssl_con_str, username, certificate, san_in_certificate, clientCertificate, clientKey, logger)
2309		if err != nil {
2310			return nil, err
2311		}
2312		conn.Close()
2313
2314		tr := &http.Transport{TLSClientConfig: tlsConfig, Dial: base.DialTCPWithTimeout}
2315		client = &http.Client{Transport: tr,
2316			Timeout: base.DefaultHttpTimeout}
2317
2318	} else {
2319		client = &http.Client{Timeout: base.DefaultHttpTimeout}
2320	}
2321	return client, nil
2322}
2323
2324//this expect the baseURL doesn't contain username and password
2325func (u *Utilities) ConstructHttpRequest(
2326	baseURL string,
2327	path string,
2328	preservePathEncoding bool,
2329	username string,
2330	password string,
2331	authMech base.HttpAuthMech,
2332	userAuthMode base.UserAuthMode,
2333	httpCommand string,
2334	contentType string,
2335	body []byte,
2336	logger *log.CommonLogger) (*http.Request, string, error) {
2337	url, err := u.constructURL(baseURL, path, preservePathEncoding, authMech)
2338	if err != nil {
2339		return nil, "", err
2340	}
2341
2342	var l *log.CommonLogger = u.loggerForFunc(logger)
2343
2344	req, err := http.NewRequest(httpCommand, url.String(), bytes.NewBuffer(body))
2345	if err != nil {
2346		return nil, "", err
2347	}
2348
2349	if preservePathEncoding {
2350		// get the original Opaque back
2351		req.URL.Opaque = url.Opaque
2352	}
2353
2354	if contentType == "" {
2355		contentType = base.DefaultContentType
2356	}
2357	req.Header.Set(base.ContentType, contentType)
2358
2359	req.Header.Set(base.UserAgent, base.GoxdcrUserAgent)
2360
2361	switch userAuthMode {
2362	case base.UserAuthModeLocal:
2363		err := cbauth.SetRequestAuth(req)
2364		if err != nil {
2365			l.Errorf("Failed to set authentication to request. err=%v\n req=%v\n", err, req)
2366			return nil, "", err
2367		}
2368	case base.UserAuthModeBasic:
2369		req.SetBasicAuth(username, password)
2370	case base.UserAuthModeNone:
2371		// no op
2372	default:
2373		return nil, "", fmt.Errorf("Invalid userAuthMode %v", userAuthMode)
2374	}
2375
2376	//TODO: log request would log password barely
2377	l.Debugf("http request=%v\n", req)
2378
2379	return req, url.Host, nil
2380}
2381
2382func (u *Utilities) constructURL(baseURL string,
2383	path string,
2384	preservePathEncoding bool,
2385	authMech base.HttpAuthMech) (*url.URL, error) {
2386
2387	var baseURL_new string
2388	if authMech == base.HttpAuthMechHttps {
2389		baseURL_new = u.EnforcePrefix("https://", baseURL)
2390	} else {
2391		baseURL_new = u.EnforcePrefix("http://", baseURL)
2392	}
2393	url, err := couchbase.ParseURL(baseURL_new)
2394	if err != nil {
2395		return nil, err
2396	}
2397
2398	if !preservePathEncoding {
2399		if q := strings.Index(path, "?"); q > 0 {
2400			url.Path = path[:q]
2401			url.RawQuery = path[q+1:]
2402		} else {
2403			url.Path = path
2404		}
2405	} else {
2406		// use url.Opaque to preserve encoding
2407		url.Opaque = "//"
2408
2409		index := strings.Index(baseURL_new, "//")
2410		if index < len(baseURL_new)-2 {
2411			url.Opaque += baseURL_new[index+2:]
2412		}
2413		url.Opaque += path
2414	}
2415	return url, nil
2416}
2417
2418// encode http request into wire format
2419// it differs from HttpRequest.Write() in that it preserves the Content-Length in the header,
2420// and ignores Body in request
2421func (u *Utilities) EncodeHttpRequest(req *http.Request) ([]byte, error) {
2422	reqBytes := make([]byte, 0)
2423	reqBytes = append(reqBytes, []byte(req.Method)...)
2424	reqBytes = append(reqBytes, []byte(" ")...)
2425	reqBytes = append(reqBytes, []byte(req.URL.String())...)
2426	reqBytes = append(reqBytes, []byte(" HTTP/1.1\r\n")...)
2427
2428	hasHost := false
2429	for key, value := range req.Header {
2430		if key == "Host" {
2431			hasHost = true
2432		}
2433		if value != nil && len(value) > 0 {
2434			reqBytes = u.EncodeHttpRequestHeader(reqBytes, key, value[0])
2435		} else {
2436			reqBytes = u.EncodeHttpRequestHeader(reqBytes, key, "")
2437		}
2438	}
2439	if !hasHost {
2440		// ensure that host name is in header
2441		reqBytes = u.EncodeHttpRequestHeader(reqBytes, "Host", req.Host)
2442	}
2443
2444	// add extra "\r\n" as separator for Body
2445	reqBytes = append(reqBytes, []byte("\r\n")...)
2446
2447	if req.Body != nil {
2448		defer req.Body.Close()
2449
2450		bodyBytes, err := ioutil.ReadAll(req.Body)
2451		if err != nil {
2452			return nil, err
2453		}
2454		reqBytes = append(reqBytes, bodyBytes...)
2455	}
2456	return reqBytes, nil
2457}
2458
2459func (u *Utilities) EncodeHttpRequestHeader(reqBytes []byte, key, value string) []byte {
2460	reqBytes = append(reqBytes, []byte(key)...)
2461	reqBytes = append(reqBytes, []byte(": ")...)
2462	reqBytes = append(reqBytes, []byte(value)...)
2463	reqBytes = append(reqBytes, []byte("\r\n")...)
2464	return reqBytes
2465}
2466
2467func (u *Utilities) IsSeriousNetError(err error) bool {
2468	if err == nil {
2469		return false
2470	}
2471
2472	errStr := err.Error()
2473	netError, ok := err.(*net.OpError)
2474	return err == syscall.EPIPE ||
2475		err == io.EOF ||
2476		strings.Contains(errStr, "EOF") ||
2477		strings.Contains(errStr, "use of closed network connection") ||
2478		strings.Contains(errStr, "connection reset by peer") ||
2479		strings.Contains(errStr, "http: can't write HTTP request on broken connection") ||
2480		(ok && (!netError.Temporary() && !netError.Timeout()))
2481}
2482
2483// statusCode that requires connections to be dropped and recreated
2484// basically all statusCodes that are larger than 400 are fatal
2485func (u *Utilities) isFatalStatusCode(statusCode int) bool {
2486	return statusCode > http.StatusBadRequest
2487}
2488
2489func (u *Utilities) NewTCPConn(hostName string) (*net.TCPConn, error) {
2490	conn, err := base.DialTCPWithTimeout(base.NetTCP, hostName)
2491	if err != nil {
2492		return nil, err
2493	}
2494	if conn == nil {
2495		return nil, fmt.Errorf("Failed to set up connection to %v", hostName)
2496	}
2497	tcpConn, ok := conn.(*net.TCPConn)
2498	if !ok {
2499		// should never get here
2500		conn.Close()
2501		return nil, fmt.Errorf("The connection to %v returned is not TCP type", hostName)
2502	}
2503
2504	// same settings as erlang xdcr
2505	err = tcpConn.SetKeepAlive(true)
2506	if err == nil {
2507		err = tcpConn.SetKeepAlivePeriod(base.KeepAlivePeriod)
2508	}
2509	if err == nil {
2510		err = tcpConn.SetNoDelay(false)
2511	}
2512
2513	if err != nil {
2514		tcpConn.Close()
2515		return nil, fmt.Errorf("Error setting options on the connection to %v. err=%v", hostName, err)
2516	}
2517
2518	return tcpConn, nil
2519}
2520
2521/**
2522 * Executes a anonymous function that returns an error. If the error is non nil, retry with exponential backoff.
2523 * Returns base.ErrorFailedAfterRetry + the last recorded error if operation times out, nil otherwise.
2524 * Max retries == the times to retry in additional to the initial try, should the initial try fail
2525 * initialWait == Initial time with which to start
2526 * Factor == exponential backoff factor based off of initialWait
2527 */
2528func (u *Utilities) ExponentialBackoffExecutor(name string, initialWait time.Duration, maxRetries int, factor int, op ExponentialOpFunc) error {
2529	waitTime := initialWait
2530	var opErr error
2531	for i := 0; i <= maxRetries; i++ {
2532		opErr = op()
2533		if opErr == nil {
2534			return nil
2535		} else if i != maxRetries {
2536			u.logger_utils.Warnf("ExponentialBackoffExecutor for %v encountered error (%v). Sleeping %v\n",
2537				name, opErr.Error(), waitTime)
2538			time.Sleep(waitTime)
2539			waitTime *= time.Duration(factor)
2540		}
2541	}
2542	opErr = fmt.Errorf("%v %v Last error: %v", name, base.ErrorFailedAfterRetry.Error(), opErr.Error())
2543	return opErr
2544}
2545
2546/*
2547 * This method has an additional parameter, finCh, than ExponentialBackoffExecutor. When finCh is closed,
2548 * this method can abort earlier.
2549 */
2550func (u *Utilities) ExponentialBackoffExecutorWithFinishSignal(name string, initialWait time.Duration, maxRetries int, factor int, op ExponentialOpFunc2, param interface{}, finCh chan bool) (interface{}, error) {
2551	waitTime := initialWait
2552	var result interface{}
2553	var err error
2554	for i := 0; i <= maxRetries; i++ {
2555		select {
2556		case <-finCh:
2557			err = fmt.Errorf("ExponentialBackoffExecutorWithFinishSignal for %v aborting because of finch closure\n", name)
2558			u.logger_utils.Warnf(err.Error())
2559			return nil, err
2560		default:
2561			result, err = op(param)
2562			if err == nil {
2563				return result, nil
2564			} else if i != maxRetries {
2565				u.logger_utils.Warnf("ExponentialBackoffExecutorWithFinishSignal for %v encountered error (%v). Sleeping %v\n",
2566					name, err.Error(), waitTime)
2567				base.WaitForTimeoutOrFinishSignal(waitTime, finCh)
2568				waitTime *= time.Duration(factor)
2569			}
2570		}
2571	}
2572	err = fmt.Errorf("%v %v Last error: %v", name, base.ErrorFailedAfterRetry.Error(), err.Error())
2573	return nil, err
2574}
2575
2576// Get security related settings from target
2577// 1. whether target supports SAN in certificate
2578// 2. authentication mechanism to use when making http[s] connections to target ns_server
2579// This method used ShortHttpTimeout because it is either called from remote cluster rest API,
2580// where a prompt response is required to keep the rest request from timing out,
2581// or called from remote cluster reference refresh code, where a pre-mature timeout can be tolerated
2582// This method also returns defaultPoolInfo of target for more flexibility
2583// CALLER BEWARE: defaultPoolInfo is returned ONLY when either scram sha or ssl is enabled, so as to avoid unnecessary work
2584func (u *Utilities) GetSecuritySettingsAndDefaultPoolInfo(hostAddr, hostHttpsAddr, username, password string,
2585	certificate []byte, clientCertificate, clientKey []byte, scramShaEnabled bool, logger *log.CommonLogger) (sanInCertificate bool,
2586	httpAuthMech base.HttpAuthMech, defaultPoolInfo map[string]interface{}, err error) {
2587	if !scramShaEnabled && len(certificate) == 0 {
2588		// security settings are irrelevant if we are not using scram sha or ssl
2589		// note that a nil defaultPoolInfo is returned in this case
2590		return false, base.HttpAuthMechPlain, nil, nil
2591	}
2592
2593	if scramShaEnabled {
2594		// if scram sha is enabled, we will first try to connect to target ns_server using scram sha authentication
2595		// even if certificate/clientCert have been provided, we will not use them here because they are not needed by scram sha authentication
2596		defaultPoolInfo, err = u.GetDefaultPoolInfoUsingScramSha(hostAddr, username, password, logger)
2597		if err == nil {
2598			httpAuthMech = base.HttpAuthMechScramSha
2599		} else if err != TargetMayNotSupportScramShaError {
2600			return false, base.HttpAuthMechPlain, nil, err
2601		} else {
2602			if len(certificate) == 0 {
2603				// certificate not provided, cannot fall back to https. return error right away
2604				return false, base.HttpAuthMechPlain, nil, fmt.Errorf("Cannot connect to target %v using \"half\" secure mode. Received unauthorized error when using Scram-Sha authentication. Cannot use https because server certificate has not been provided.", hostAddr)
2605			} else {
2606				// proceed to fall back to https
2607			}
2608		}
2609	}
2610
2611	if defaultPoolInfo == nil {
2612		// if we get here, either scram sha is not enabled, or scram sha is enabled and target ns_server returned 401 error on our scram sha attempt
2613		// either way, it is implied that certificate has been provided. use https to connect to target
2614		defaultPoolInfo, err = u.GetDefaultPoolInfoUsingHttps(hostHttpsAddr, username, password,
2615			certificate, clientCertificate, clientKey, logger)
2616		if err == nil {
2617			httpAuthMech = base.HttpAuthMechHttps
2618		} else {
2619			return false, base.HttpAuthMechPlain, nil, err
2620		}
2621	}
2622
2623	// at this point, we have a valid defaultPoolInfo, an httpAuthMech that worked for the host
2624	// derive httpAuthMech and certificate related settings from defaultPoolInfo
2625
2626	nodeList, err := u.GetNodeListFromInfoMap(defaultPoolInfo, logger)
2627	if err != nil || len(nodeList) == 0 {
2628		err = fmt.Errorf("Can't get nodes information for cluster %v, err=%v", hostAddr, err)
2629		return false, base.HttpAuthMechPlain, nil, err
2630	}
2631
2632	clusterCompatibility, err := u.GetClusterCompatibilityFromNodeList(nodeList)
2633	if err != nil {
2634		return false, base.HttpAuthMechPlain, nil, err
2635	}
2636
2637	targetHasScramShaSupport := base.IsClusterCompatible(clusterCompatibility, base.VersionForHttpScramShaSupport)
2638	if scramShaEnabled && targetHasScramShaSupport && httpAuthMech != base.HttpAuthMechScramSha {
2639		// do not fall back to https if target is vulcan and up
2640		return false, base.HttpAuthMechPlain, nil, fmt.Errorf("Failed to retrieve secruity settings from host=%v using SCRAM-SHA authentication. Please check whether SCRAM-SHA is enabled on target.", hostAddr)
2641	}
2642
2643	if scramShaEnabled && !targetHasScramShaSupport && httpAuthMech == base.HttpAuthMechScramSha {
2644		// Cluster is not ScramSha compatible. We need to fallback to https.
2645		// Before doing so, we will get default pool using https again to make sure it works
2646		defaultPoolInfo, err = u.GetDefaultPoolInfoUsingHttps(hostHttpsAddr, username, password,
2647			certificate, clientCertificate, clientKey, logger)
2648		if err == nil {
2649			httpAuthMech = base.HttpAuthMechHttps
2650		} else {
2651			return false, base.HttpAuthMechPlain, nil, err
2652		}
2653	}
2654	sanInCertificate = base.IsClusterCompatible(clusterCompatibility, base.VersionForSANInCertificateSupport)
2655	return sanInCertificate, httpAuthMech, defaultPoolInfo, nil
2656}
2657
2658func (u *Utilities) GetDefaultPoolInfoUsingScramSha(hostAddr, username, password string, logger *log.CommonLogger) (map[string]interface{}, error) {
2659	defaultPoolInfo := make(map[string]interface{})
2660	err, statusCode := u.QueryRestApiWithAuth(hostAddr, base.DefaultPoolPath, false, username, password, base.HttpAuthMechScramSha, nil /*certificate*/, false /*sanInCertificate*/, nil /*clientCertificate*/, nil /*clientKey*/, base.MethodGet, "", nil, base.ShortHttpTimeout, &defaultPoolInfo, nil, false, logger)
2661	if err == nil && statusCode == http.StatusOK {
2662		// target supports scram sha
2663		return defaultPoolInfo, nil
2664	} else if statusCode == http.StatusUnauthorized {
2665		// unauthorized error could be returned when target ns_server is pre-vulcan and does not support scram sha.
2666		// return a specific error to allow caller to fall back to https
2667		return nil, TargetMayNotSupportScramShaError
2668	} else {
2669		return nil, fmt.Errorf("Failed to retrieve secruity settings from host=%v using scram sha, err=%v, statusCode=%v", hostAddr, err, statusCode)
2670	}
2671}
2672
2673func (u *Utilities) GetDefaultPoolInfoUsingHttps(hostHttpsAddr, username, password string,
2674	certificate []byte, clientCertificate, clientKey []byte, logger *log.CommonLogger) (map[string]interface{}, error) {
2675	defaultPoolInfo := make(map[string]interface{})
2676
2677	// we do not know the correct values of sanInCertificate. set sanInCertificate set to true for better security
2678	err, statusCode := u.QueryRestApiWithAuth(hostHttpsAddr, base.DefaultPoolPath, false, username, password, base.HttpAuthMechHttps, certificate, true /*sanInCertificate*/, clientCertificate, clientKey, base.MethodGet, "", nil, base.ShortHttpTimeout, &defaultPoolInfo, nil, false, logger)
2679	if err == nil && statusCode == http.StatusOK {
2680		return defaultPoolInfo, nil
2681	} else {
2682		if err != nil && strings.Contains(err.Error(), base.NoIpSANErrMsg) {
2683			// if the error is about certificate not containing IP SANs, it could be that the target cluster is of an old version
2684			// make a second try with sanInCertificate set to false
2685			// after we retrieve target cluster version, we will then re-set sanInCertificate to the appropriate value
2686			logger.Debugf("Received certificate validation error from %v. Target may be an old version that does not support SAN in certificates. Retrying connection to target using sanInCertificate = false.", hostHttpsAddr)
2687			err, statusCode = u.QueryRestApiWithAuth(hostHttpsAddr, base.DefaultPoolPath, false, username, password, base.HttpAuthMechHttps, certificate, false /*sanInCertificate*/, clientCertificate, clientKey, base.MethodGet, "", nil, base.ShortHttpTimeout, &defaultPoolInfo, nil, false, logger)
2688			if err == nil && statusCode == http.StatusOK {
2689				return defaultPoolInfo, nil
2690			} else if statusCode == http.StatusUnauthorized {
2691				return nil, u.getUnauthorizedError(username)
2692			} else {
2693				// if the second try still fails, return error
2694				return nil, fmt.Errorf("Failed to retrieve secruity settings from host=%v, err=%v, statusCode=%v", hostHttpsAddr, err, statusCode)
2695			}
2696		} else if statusCode == http.StatusUnauthorized {
2697			return nil, u.getUnauthorizedError(username)
2698		} else {
2699			return nil, fmt.Errorf("Failed to retrieve secruity settings from host=%v, err=%v, statusCode=%v", hostHttpsAddr, err, statusCode)
2700		}
2701	}
2702}
2703
2704// Given the KVVBMap, translate the map so that the server keys are replaced with external server keys, if applicable
2705func (u *Utilities) TranslateKvVbMap(kvVBMap base.BucketKVVbMap, targetBucketInfo map[string]interface{}) {
2706	translationMap, translationErr := u.GetIntExtHostNameKVPortTranslationMap(targetBucketInfo)
2707	if translationErr != nil && translationErr != base.ErrorResourceDoesNotExist {
2708		u.logger_utils.Warnf("Error constructing internal -> external address translation table. err=%v", translationErr)
2709	} else if translationErr == nil {
2710		(base.BucketKVVbMap)(kvVBMap).ReplaceInternalWithExternalHosts(translationMap)
2711	}
2712}
2713
2714func (u *Utilities) ReplaceCouchApiBaseObjWithExternals(couchApiBase string, nodeInfo map[string]interface{}) string {
2715	if len(couchApiBase) == 0 {
2716		return couchApiBase
2717	}
2718
2719	extHost, extCapi, extCapiErr, extCapiSSL, extCapiSSLErr := u.getExternalHostAndCapiPorts(nodeInfo)
2720	if len(extHost) > 0 {
2721		// "couchApiBaseHTTPS": "https://127.0.0.1:19502/b2%2B746a570d364cf609ac11572f8c8c2608",
2722		url, err := url.Parse(couchApiBase)
2723		if err != nil || !url.IsAbs() {
2724			u.logger_utils.Errorf("Unable to parse URL string for CouchApiBase: %v", err)
2725			return couchApiBase
2726		}
2727		var isHttps bool = strings.HasPrefix(couchApiBase, "https")
2728		var leadingHttpString string
2729		if isHttps {
2730			leadingHttpString = "https://"
2731		} else {
2732			leadingHttpString = "http://"
2733		}
2734
2735		// Now strip out the http(s)://host:port/
2736		var leadingPrefix string
2737		leadingHostName := url.Hostname()
2738		leadingPort := url.Port()
2739		if len(leadingPort) > 0 {
2740			leadingPrefix = fmt.Sprintf("%s%s:%s/", leadingHttpString, leadingHostName, leadingPort)
2741		} else {
2742			leadingPrefix = fmt.Sprintf("%s%s/", leadingHttpString, leadingHostName)
2743		}
2744		strippedCouchApiBase := strings.TrimPrefix(couchApiBase, leadingPrefix)
2745
2746		// Now recompile
2747		var recompiledUrl string
2748		var recompiledHostToUse string = extHost
2749		var recompiledPortToUse string
2750		if isHttps && extCapiSSLErr == nil {
2751			recompiledPortToUse = fmt.Sprintf("%v", extCapiSSL)
2752		} else if !isHttps && extCapiErr == nil {
2753			recompiledPortToUse = fmt.Sprintf("%v", extCapi)
2754		} else {
2755			recompiledPortToUse = leadingPort
2756		}
2757		if len(recompiledPortToUse) == 0 {
2758			recompiledUrl = fmt.Sprintf("%s%s/%s", leadingHttpString, recompiledHostToUse, strippedCouchApiBase)
2759		} else {
2760			recompiledUrl = fmt.Sprintf("%s%s:%s/%s", leadingHttpString, recompiledHostToUse, recompiledPortToUse, strippedCouchApiBase)
2761		}
2762		return recompiledUrl
2763	}
2764	return couchApiBase
2765}
2766
2767func (u *Utilities) getUnauthorizedError(username string) error {
2768	errMsg := "Received unauthorized error from target. Please double check user credentials."
2769	// if username has not been specified [implying that client certificate has been provided and is being used]
2770	// unauthorized error could also be returned if target has client cert auth setting set to disable
2771	if len(username) == 0 {
2772		errMsg += " Since client certificate is being used, please ensure that target is version 5.5 and up and has client certificate authentication setting set to \"enable\" or \"mandatory\"."
2773	}
2774
2775	return errors.New(errMsg)
2776}
2777
2778func decompressSnappyBody(incomingBody, key []byte, dp DataPoolIface, slicesToBeReleased *[][]byte, needExtraBytesInBody bool) ([]byte, error, string, int64, int) {
2779	var dpFailedCnt int64
2780	lenOfDecodedData, err := snappy.DecodedLen(incomingBody)
2781	lastBodyPos := lenOfDecodedData - 1
2782	if err != nil {
2783		return nil, base.ErrorCompressionUnableToInflate, fmt.Sprintf("XDCR for key %v%v%v is unable to decode snappy uncompressed size: %v", base.UdTagBegin, string(key), base.UdTagEnd, err), dpFailedCnt, lastBodyPos
2784	}
2785
2786	uncompressedBodySize := uint64(lenOfDecodedData)
2787	if needExtraBytesInBody {
2788		uncompressedBodySize += uint64(len(key) + base.AddFilterKeyExtraBytes)
2789	}
2790	body, err := dp.GetByteSlice(uncompressedBodySize)
2791	if err != nil {
2792		body = make([]byte, 0, uncompressedBodySize)
2793		dpFailedCnt = int64(uncompressedBodySize)
2794	} else {
2795		*slicesToBeReleased = append(*slicesToBeReleased, body)
2796	}
2797
2798	body, err = snappy.Decode(body, incomingBody)
2799	if err != nil {
2800		return nil, base.ErrorCompressionUnableToInflate, fmt.Sprintf("XDCR for key %v%v%v is unable to snappy decompress body value: %v", base.UdTagBegin, string(key), base.UdTagEnd, err), dpFailedCnt, lastBodyPos
2801	}
2802
2803	// Check to make sure the last bracket position is correct
2804	if body[lastBodyPos] != '}' {
2805		return nil, base.ErrorInvalidInput, fmt.Sprintf("XDCR for key %v%v%v after decompression seems to be an invalid JSON", base.UdTagBegin, string(key), base.UdTagEnd), dpFailedCnt, lastBodyPos
2806	}
2807
2808	return body, nil, "", dpFailedCnt, lastBodyPos
2809}
2810
2811func getBodySlice(incomingBody, key []byte, dp DataPoolIface, slicesToBeReleased *[][]byte) ([]byte, error, string, int64, int) {
2812	var dpFailedCnt int64
2813	var incomingBodyLen int = len(incomingBody)
2814	lastBodyPos := incomingBodyLen - 1
2815	bodySize := uint64(incomingBodyLen + len(key) + base.AddFilterKeyExtraBytes)
2816
2817	if incomingBody[lastBodyPos] != '}' {
2818		return nil, base.ErrorInvalidInput, fmt.Sprintf("Document %v%v%v body is not a valid JSON", base.UdTagBegin, string(key), base.UdTagEnd), dpFailedCnt, lastBodyPos
2819	}
2820
2821	body, err := dp.GetByteSlice(bodySize)
2822	if err != nil {
2823		body = make([]byte, 0, bodySize)
2824		dpFailedCnt = int64(bodySize)
2825	} else {
2826		*slicesToBeReleased = append(*slicesToBeReleased, body)
2827	}
2828	copy(body, incomingBody)
2829	return body, nil, "", dpFailedCnt, lastBodyPos
2830}
2831
2832func stripAndPrependXattribute(body []byte, xattrSize uint32, dp DataPoolIface, slicesToBeReleased *[][]byte, endBodyPos int, xattrOnly bool) ([]byte, error, int64, int) {
2833	var dpFailedCnt int64
2834	var actualBodySize int
2835	actualBody := body[xattrSize:]
2836	endBodyPos = endBodyPos - int(xattrSize)
2837	if !xattrOnly {
2838		actualBodySize = len(actualBody)
2839		// Prereq check
2840		if actualBody[0] != '{' {
2841			return nil, base.ErrorInvalidInput, dpFailedCnt, endBodyPos
2842		}
2843	}
2844
2845	// xattrSize is the size of the xAttribute section
2846	// The xattribute section is consisted of uint32 + key + NUL + value + NUL (repeat)
2847	// Functions calling this is converting DCP xattribute pairs encoding to the following:
2848	// {					 <- could be absorbed
2849	// " key " : value ,
2850	// " key2 " : value2 }
2851	// Original DCP stream has 6 extra bytes per KV pair
2852	// Converted has 4 extra bytes per KV pair, so using xattrSize is sufficient
2853
2854	// Get a size of body size + xattr value size + internal Xattr KEY and JSON symbol sizes
2855	bodySize := uint64(int(xattrSize) + actualBodySize + base.AddFilterXattrExtraBytes)
2856	combinedBody, err := dp.GetByteSlice(bodySize)
2857	if err != nil {
2858		combinedBody = make([]byte, 0, bodySize)
2859		dpFailedCnt += int64(bodySize)
2860	} else {
2861		*slicesToBeReleased = append(*slicesToBeReleased, combinedBody)
2862	}
2863
2864	// Non-xattrOnly case:
2865	// ------------------
2866	// Current body looks like (spaces added for readability):
2867	// { key : val }
2868	// Want to insert xattr at the beginning so "combinedBody" looks like:
2869	// { "XdcrInternalXattrKey" : { xattrKey : xattrVal } , key : val }
2870
2871	// xattrOnly case: (essentially just returning)
2872	// --------------------------------------------
2873	// { "XDCRInternalXattrKey" : <ConvertedXattrSection> }
2874
2875	// { XdcrInternalXattrKey :
2876	combinedBodyPos := 0
2877	combinedBody, combinedBodyPos = base.WriteJsonRawMsg(combinedBody, base.CachedInternalKeyXattrByteSlice, combinedBodyPos, base.WriteJsonKey, base.CachedInternalKeyXattrByteSize, combinedBodyPos == 0 /*firstKey*/)
2878
2879	// { XdcrInternalXattrKey : { xattrKey : xattrVal }
2880	// Followed by a uint32, then  -> key -> NUL -> value -> NUL (repeat)
2881	xattrIter, err := base.NewXattrIterator(body)
2882	if err != nil {
2883		return nil, base.ErrorInvalidInput, dpFailedCnt, endBodyPos
2884	}
2885	firstKey := true
2886	for xattrIter.HasNext() == true {
2887		key, value, err := xattrIter.Next()
2888		if err != nil {
2889			return nil, err, dpFailedCnt, endBodyPos
2890		}
2891		combinedBody, combinedBodyPos = base.WriteJsonRawMsg(combinedBody, key, combinedBodyPos, base.WriteJsonKey, len(key), firstKey)
2892		combinedBody, combinedBodyPos = base.WriteJsonRawMsg(combinedBody, value, combinedBodyPos, base.WriteJsonValueNoQuotes, len(value), false)
2893		if firstKey {
2894			firstKey = false
2895		}
2896	}
2897
2898	// Currently:                                     v - combinedBodyPos
2899	// { XdcrInternalXattrKey : { xattrKey : xattrVal }
2900
2901	if xattrOnly {
2902		// Targeted:                                       v - combinedBodyPos
2903		// { XdcrInternalXattrKey : { xattrKey : xattrVal }}
2904		combinedBodyPos++
2905		combinedBody[combinedBodyPos] = '}'
2906
2907		endBodyPos = combinedBodyPos
2908	} else {
2909		// Targeted:                                        v - combinedBodyPos
2910		// { XdcrInternalXattrKey : { xattrKey : xattrVal },
2911		combinedBodyPos++
2912		combinedBody[combinedBodyPos] = ','
2913		// endBodyPos is added instead of combinedBodyPos+1 is because below we are copying actualBody[1:] to skip the first {
2914		endBodyPos += combinedBodyPos
2915		combinedBodyPos++
2916
2917		// ActualBody:
2918		// { key : val }
2919		// targeted combinedBody:
2920		// { XdcrInternalXattrKey : { xattrKey : xattrVal }, key : val }
2921		copy(combinedBody[combinedBodyPos:], actualBody[1:])
2922	}
2923
2924	return combinedBody, nil, dpFailedCnt, endBodyPos
2925}
2926
2927type processXattributeType int
2928
2929const (
2930	// Should Process the body but not xattribute
2931	processSkipXattr processXattributeType = iota
2932	// Should Process both body and xattribute
2933	processXattrAndBody processXattributeType = iota
2934	// Should Process just the xattribute, no body
2935	processXattrOnly processXattributeType = iota
2936)
2937
2938func processXattribute(body, key []byte, processType processXattributeType, dp DataPoolIface, slicesToBeReleased *[][]byte, endBodyPos int) ([]byte, error, int64, int) {
2939	var pos uint32
2940	//	var separator uint32
2941	var dpFailedCnt int64
2942	var err error
2943
2944	//	first uint32 in the body contains the size of the entire XATTR section
2945	totalXattrSize := binary.BigEndian.Uint32(body[pos : pos+4])
2946	// Couchbase doc size is max of 20MB. Xattribute count against this limit.
2947	// So if total xattr size is greater than this limit, then something is wrong
2948	if totalXattrSize > base.MaxDocSizeByte {
2949		return nil, fmt.Errorf("For document %v%v%v, unable to correctly parse xattribute from DCP packet. Xattr size determined to be %v bytes, which is invalid", base.UdTagBegin, string(key), base.UdTagEnd, totalXattrSize), dpFailedCnt, endBodyPos
2950	}
2951	// Add 4 bytes here to skip the uint32 that was just parsed
2952	totalXattrSize += 4
2953
2954	switch processType {
2955	case processSkipXattr:
2956		newBody := body[totalXattrSize:]
2957		body = newBody
2958		endBodyPos = endBodyPos - int(totalXattrSize)
2959	case processXattrAndBody:
2960		body, err, dpFailedCnt, endBodyPos = stripAndPrependXattribute(body, totalXattrSize, dp, slicesToBeReleased, endBodyPos, false /*xattrOnly*/)
2961	case processXattrOnly:
2962		body, err, dpFailedCnt, endBodyPos = stripAndPrependXattribute(body, totalXattrSize, dp, slicesToBeReleased, endBodyPos, true /*xattrOnly*/)
2963	default:
2964		return nil, base.ErrorInvalidInput, dpFailedCnt, -1
2965	}
2966	return body, err, dpFailedCnt, endBodyPos
2967}
2968
2969func processKeyOnlyForFiltering(key []byte, dp DataPoolIface, slicesToBeReleased *[][]byte) ([]byte, int64) {
2970	var body []byte
2971	var err error
2972	keyLen := len(key)
2973	bodySize := +uint64(keyLen + 2 /*quote surrounding key*/ + 5 /*cruft around the special key*/ + len(base.ReservedWordsMap[base.ExternalKeyKey]))
2974	body, err = dp.GetByteSlice(bodySize)
2975	if err != nil {
2976		// If there is any problem using datapool, just use json.RawMessage directly to allocate new byte slice
2977		body = json.RawMessage(fmt.Sprintf("{\"%v\":\"%v\"}", base.ReservedWordsMap[base.ExternalKeyKey], string(key)))
2978		return body, int64(len(body))
2979	} else {
2980		*slicesToBeReleased = append(*slicesToBeReleased, body)
2981	}
2982	var bodyPos int
2983	body, bodyPos = base.WriteJsonRawMsg(body, base.CachedInternalKeyKeyByteSlice, bodyPos, base.WriteJsonKey, base.CachedInternalKeyKeyByteSize, bodyPos == 0)
2984	body, bodyPos = base.WriteJsonRawMsg(body, key, bodyPos, base.WriteJsonValue /*uprEvent key as value*/,