1package utils 2 3import ( 4 "bytes" 5 "crypto/x509" 6 "encoding/binary" 7 "encoding/json" 8 "errors" 9 "expvar" 10 "fmt" 11 "github.com/couchbase/cbauth" 12 "github.com/couchbase/go-couchbase" 13 "github.com/couchbase/gojsonsm" 14 mc "github.com/couchbase/gomemcached" 15 mcc "github.com/couchbase/gomemcached/client" 16 "github.com/couchbase/goutils/scramsha" 17 base "github.com/couchbase/goxdcr/base" 18 "github.com/couchbase/goxdcr/log" 19 "github.com/couchbase/goxdcr/metadata" 20 "github.com/golang/snappy" 21 gocb "gopkg.in/couchbase/gocb.v1" 22 "io" 23 "io/ioutil" 24 "net" 25 "net/http" 26 "net/url" 27 "reflect" 28 "strconv" 29 "strings" 30 "syscall" 31 "time" 32 "unicode/utf8" 33) 34 35var NonExistentBucketError error = errors.New("Bucket doesn't exist") 36var BucketRecreatedError error = errors.New("Bucket has been deleted and recreated") 37var TargetMayNotSupportScramShaError error = errors.New("Target may not support ScramSha") 38 39func (f *HELOFeatures) NumberOfActivatedFeatures() int { 40 var result int 41 if f.Xattribute { 42 result++ 43 } 44 if f.CompressionType != base.CompressionTypeNone { 45 result++ 46 } 47 if f.Xerror { 48 result++ 49 } 50 return result 51} 52 53type Utilities struct { 54 logger_utils *log.CommonLogger 55} 56 57/** 58 * NOTE - ideally we want to be able to pass in utility interfaces so we can do much 59 * better unit testing with mocks. This constructor should be used in main() and then 60 * passed down level by levels. 61 * Currently, this method is being called in many places, and each place that is using 62 * this method should ideally be using a passed in interface from another parent level. 63 */ 64func NewUtilities() *Utilities { 65 retVar := &Utilities{ 66 logger_utils: log.NewLogger("Utils", log.DefaultLoggerContext), 67 } 68 return retVar 69} 70 71/** 72 * This utils file contains both regular non-REST related utilities as well as REST related. 73 * The first section is non-REST related utility functions 74 */ 75 76type BucketBasicStats struct { 77 ItemCount int `json:"itemCount"` 78} 79 80//Only used by unit test 81//TODO: replace with go-couchbase bucket stats API 82type CouchBucket struct { 83 Name string `json:"name"` 84 Stat BucketBasicStats `json:"basicStats"` 85} 86 87func (u *Utilities) GetNonExistentBucketError() error { 88 return NonExistentBucketError 89} 90 91func (u *Utilities) GetBucketRecreatedError() error { 92 return BucketRecreatedError 93} 94 95func (u *Utilities) loggerForFunc(logger *log.CommonLogger) *log.CommonLogger { 96 var l *log.CommonLogger 97 if logger != nil { 98 l = logger 99 } else { 100 l = u.logger_utils 101 } 102 return l 103} 104 105func (u *Utilities) ValidateSettings(defs base.SettingDefinitions, 106 settings metadata.ReplicationSettingsMap, 107 logger *log.CommonLogger) error { 108 var l *log.CommonLogger = u.loggerForFunc(logger) 109 110 if l.GetLogLevel() >= log.LogLevelDebug { 111 l.Debugf("Start validate setting=%v, defs=%v", settings.CloneAndRedact(), defs) 112 } 113 var err *base.SettingsError = nil 114 for key, def := range defs { 115 val, ok := settings[key] 116 if !ok && def.Required { 117 if err == nil { 118 err = base.NewSettingsError() 119 } 120 err.Add(key, errors.New("required, but not supplied")) 121 } else { 122 if val != nil && def.Data_type != reflect.PtrTo(reflect.TypeOf(val)) { 123 if err == nil { 124 err = base.NewSettingsError() 125 } 126 err.Add(key, errors.New(fmt.Sprintf("expected type is %v, supplied type is %v", 127 def.Data_type, reflect.TypeOf(val)))) 128 } 129 } 130 } 131 if err != nil { 132 l.Infof("setting validation result = %v", *err) 133 return *err 134 } 135 return nil 136} 137 138func (u *Utilities) RecoverPanic(err *error) { 139 if r := recover(); r != nil { 140 *err = errors.New(fmt.Sprint(r)) 141 } 142} 143 144func (u *Utilities) LocalPool(localConnectStr string) (couchbase.Pool, error) { 145 localURL := fmt.Sprintf("http://%s", localConnectStr) 146 client, err := couchbase.ConnectWithAuth(localURL, cbauth.NewAuthHandler(nil)) 147 if err != nil { 148 return couchbase.Pool{}, u.NewEnhancedError(fmt.Sprintf("Error connecting to couchbase. url=%v", u.UrlForLog(localURL)), err) 149 } 150 return client.GetPool("default") 151} 152 153// Get bucket in local cluster 154func (u *Utilities) LocalBucket(localConnectStr, bucketName string) (*couchbase.Bucket, error) { 155 u.logger_utils.Debugf("Getting local bucket name=%v\n", bucketName) 156 157 pool, err := u.LocalPool(localConnectStr) 158 if err != nil { 159 return nil, err 160 } 161 162 bucket, err := pool.GetBucket(bucketName) 163 if err != nil { 164 return nil, u.NewEnhancedError(fmt.Sprintf("Error getting bucket, %v, from pool.", bucketName), err) 165 } 166 167 u.logger_utils.Debugf("Got local bucket successfully name=%v\n", bucket.Name) 168 return bucket, err 169} 170 171func (u *Utilities) UnwrapError(infos map[string]interface{}) (err error) { 172 if infos != nil && len(infos) > 0 { 173 err = infos["error"].(error) 174 } 175 return err 176} 177 178// returns an enhanced error with erroe message being "msg + old error message" 179func (u *Utilities) NewEnhancedError(msg string, err error) error { 180 return errors.New(msg + "\n err = " + err.Error()) 181} 182 183func (u *Utilities) GetMapFromExpvarMap(expvarMap *expvar.Map) map[string]interface{} { 184 regMap := make(map[string]interface{}) 185 186 expvarMap.Do(func(keyValue expvar.KeyValue) { 187 valueStr := keyValue.Value.String() 188 // first check if valueStr is an integer 189 valueInt, err := strconv.Atoi(valueStr) 190 if err == nil { 191 regMap[keyValue.Key] = valueInt 192 } else { 193 // then check if valueStr is a float 194 valueFloat, err := strconv.ParseFloat(valueStr, 64) 195 if err == nil { 196 regMap[keyValue.Key] = valueFloat 197 } else { 198 // should never happen 199 u.logger_utils.Errorf("Invalid value in expvarMap. Only float and integer values are supported") 200 } 201 } 202 }) 203 return regMap 204} 205 206//convert the format returned by go-memcached StatMap - map[string]string to map[uint16]uint64 207func (u *Utilities) ParseHighSeqnoStat(vbnos []uint16, stats_map map[string]string, highseqno_map map[uint16]uint64) error { 208 var err error 209 for _, vbno := range vbnos { 210 stats_key := fmt.Sprintf(base.VBUCKET_HIGH_SEQNO_STAT_KEY_FORMAT, vbno) 211 highseqnostr, ok := stats_map[stats_key] 212 if !ok || highseqnostr == "" { 213 err = fmt.Errorf("Can't find high seqno for vbno=%v in stats map. Source topology may have changed.\n", vbno) 214 return err 215 } 216 highseqno, err := strconv.ParseUint(highseqnostr, 10, 64) 217 if err != nil { 218 u.logger_utils.Warnf("high seqno for vbno=%v in stats map is not a valid uint64. high seqno=%v\n", vbno, highseqnostr) 219 err = fmt.Errorf("high seqno for vbno=%v in stats map is not a valid uint64. high seqno=%v\n", vbno, highseqnostr) 220 return err 221 } 222 highseqno_map[vbno] = highseqno 223 } 224 return nil 225} 226 227//convert the format returned by go-memcached StatMap - map[string]string to map[uint16][]uint64 228func (u *Utilities) ParseHighSeqnoAndVBUuidFromStats(vbnos []uint16, stats_map map[string]string, high_seqno_and_vbuuid_map map[uint16][]uint64) { 229 for _, vbno := range vbnos { 230 high_seqno_stats_key := fmt.Sprintf(base.VBUCKET_HIGH_SEQNO_STAT_KEY_FORMAT, vbno) 231 highseqnostr, ok := stats_map[high_seqno_stats_key] 232 if !ok { 233 u.logger_utils.Warnf("Can't find high seqno for vbno=%v in stats map. Source topology may have changed.\n", vbno) 234 continue 235 } 236 high_seqno, err := strconv.ParseUint(highseqnostr, 10, 64) 237 if err != nil { 238 u.logger_utils.Warnf("high seqno for vbno=%v in stats map is not a valid uint64. high seqno=%v\n", vbno, highseqnostr) 239 continue 240 } 241 242 vbuuid_stats_key := fmt.Sprintf(base.VBUCKET_UUID_STAT_KEY_FORMAT, vbno) 243 vbuuidstr, ok := stats_map[vbuuid_stats_key] 244 if !ok { 245 u.logger_utils.Warnf("Can't find vbuuid for vbno=%v in stats map. Source topology may have changed.\n", vbno) 246 continue 247 } 248 vbuuid, err := strconv.ParseUint(vbuuidstr, 10, 64) 249 if err != nil { 250 u.logger_utils.Warnf("vbuuid for vbno=%v in stats map is not a valid uint64. vbuuid=%v\n", vbno, vbuuidstr) 251 continue 252 } 253 254 high_seqno_and_vbuuid_map[vbno] = []uint64{high_seqno, vbuuid} 255 } 256} 257 258// encode data in a map into a byte array, which can then be used as 259// the body part of a http request 260// so far only five types are supported: string, int, bool, LogLevel, []byte 261// which should be sufficient for all cases at hand 262func (u *Utilities) EncodeMapIntoByteArray(data map[string]interface{}) ([]byte, error) { 263 if len(data) == 0 { 264 return nil, nil 265 } 266 267 params := make(url.Values) 268 for key, val := range data { 269 var strVal string 270 switch val.(type) { 271 case string: 272 strVal = val.(string) 273 case int: 274 strVal = strconv.FormatInt(int64(val.(int)), base.ParseIntBase) 275 case bool: 276 strVal = strconv.FormatBool(val.(bool)) 277 case log.LogLevel: 278 strVal = val.(log.LogLevel).String() 279 case []byte: 280 strVal = string(val.([]byte)) 281 default: 282 return nil, base.IncorrectValueTypeInMapError(key, val, "string/int/bool/LogLevel/[]byte") 283 } 284 params.Add(key, strVal) 285 } 286 287 return []byte(params.Encode()), nil 288} 289 290func (u *Utilities) UrlForLog(urlStr string) string { 291 result, err := url.Parse(urlStr) 292 if err == nil { 293 if result.User != nil { 294 result.User = url.UserPassword(result.User.Username(), "xxxx") 295 } 296 return result.String() 297 } else { 298 return urlStr 299 } 300} 301 302func filterExpressionGetXattrHelper(bucket *gocb.Bucket, docId string, docCas gocb.Cas) ([]byte, error) { 303 var xattrMap map[string]interface{} 304 var xtoc interface{} 305 var xattrSlice []byte 306 307 xattrMap = make(map[string]interface{}) 308 frag, err := bucket.LookupIn(docId).GetEx(base.XattributeToc, gocb.SubdocFlagXattr).Execute() 309 if err != nil { 310 return nil, err 311 } 312 313 if frag.Cas() != docCas { 314 return nil, base.ErrorInvalidCAS 315 } 316 317 err = frag.Content(base.XattributeToc, &xtoc) 318 if err != nil { 319 return nil, err 320 } 321 322 tocList := xtoc.([]interface{}) 323 for _, tocEntry := range tocList { 324 if entry, ok := tocEntry.(string); ok { 325 frag, err := bucket.LookupIn(docId).GetEx(entry, gocb.SubdocFlagXattr).Execute() 326 if err != nil { 327 return nil, err 328 } 329 330 if frag.Cas() != docCas { 331 return nil, base.ErrorInvalidCAS 332 } 333 334 var value interface{} 335 frag.Content(entry, &value) 336 xattrMap[entry] = value 337 } 338 } 339 xattrSlice, err = json.Marshal(xattrMap) 340 if err != nil { 341 return nil, err 342 } 343 344 return xattrSlice, nil 345} 346 347func filterExpressionGetDocVal(bucket *gocb.Bucket, docId string) ([]byte, gocb.Cas, error) { 348 var retrievedDocVal interface{} 349 docCas, err := bucket.Get(docId, &retrievedDocVal) 350 if err != nil { 351 return nil, docCas, err 352 } 353 354 valMap, ok := retrievedDocVal.(map[string]interface{}) 355 if !ok { 356 err = fmt.Errorf("Retrieved document (%v) value is not a valid key-value map", docId) 357 return nil, docCas, err 358 } 359 360 bodySlice, err := json.Marshal(valMap) 361 if err != nil { 362 return nil, docCas, err 363 } 364 365 return bodySlice, docCas, err 366} 367 368// Called by UI to run a test on a specific document. This cannot be unit-tested as the authentication will fail 369// Queries ns_server REST endpoint for a document content 370// ns_server returns the document in a special json format, so then massage the data into gojsonsm compatible format 371// and pass it through gojsonsm for testing 372func (u *Utilities) FilterExpressionMatchesDoc(expression, docId, bucketName, addr string, port uint16) (result bool, err error) { 373 nsServerDocContent := make(map[string]interface{}) 374 hostAddr := base.GetHostAddr(addr, port) 375 var statusCode int 376 377 matcher, err := base.ValidateAndGetAdvFilter(expression) 378 if err != nil { 379 return 380 } 381 382 retryOp := func() error { 383 err, statusCode = u.QueryRestApi(hostAddr, base.DefaultPoolBucketsPath+bucketName+base.DocsPath+docId, false /*preservePathEncoding*/, base.MethodGet, "" /*contentType*/, nil, /*body*/ 384 0 /*timeout*/, &nsServerDocContent, u.logger_utils) 385 386 if err != nil { 387 return err 388 } else if statusCode != http.StatusOK { 389 return fmt.Errorf("Err returned: %v along with http status code: %v", err, statusCode) 390 } else { 391 return nil 392 } 393 } 394 395 err = u.ExponentialBackoffExecutor("filterTesterXattrRetriever", base.BucketInfoOpWaitTime, base.BucketInfoOpMaxRetry, base.BucketInfoOpRetryFactor, retryOp) 396 if err != nil { 397 if strings.Contains(err.Error(), base.ErrorResourceDoesNotExist.Error()) { 398 err = fmt.Errorf("Specified document not found") 399 } 400 return 401 } 402 403 return u.processNsServerDocForFiltering(matcher, nsServerDocContent, docId) 404} 405 406func (u *Utilities) processNsServerDocForFiltering(matcher gojsonsm.Matcher, nsServerDocContent map[string]interface{}, docId string) (result bool, err error) { 407 // Take care of the body - it shows up as a string of pre-formatted json 408 // i.e. the whole {"k":"v"} as the value of specified key below 409 processedJson := make(map[string]interface{}) 410 if body, ok := nsServerDocContent[base.BucketDocBodyKey].(string); ok { 411 marshaledBytes := []byte(body) 412 err = json.Unmarshal(marshaledBytes, &processedJson) 413 if err != nil { 414 err = fmt.Errorf("Unable to process document body: %v", err) 415 return 416 } 417 } 418 419 // Add document key in 420 processedJson[base.InternalKeyKey] = docId 421 422 // Add xattribute 423 if xattrs, ok := nsServerDocContent[base.BucketDocXattrKey].(map[string]interface{}); ok { 424 processedJson[base.InternalKeyXattr] = xattrs 425 } 426 427 byteSlice, err := json.Marshal(processedJson) 428 if err != nil { 429 err = fmt.Errorf("Unable to marshal postprocessed data for matching: %v", err) 430 return 431 } 432 433 matched, status, err := base.MatchWrapper(matcher, byteSlice) 434 if u.logger_utils.GetLogLevel() >= log.LogLevelDebug && status&gojsonsm.MatcherCollateUsed > 0 { 435 u.logger_utils.Debugf("Matcher used collate to determine outcome (%v) for document %v%v%v", matched, base.UdTagBegin, docId, base.UdTagEnd) 436 } 437 return matched, err 438} 439 440// given a matches map, convert the indices from byte index to rune index 441func (u *Utilities) convertByteIndexToRuneIndex(key string, matches [][]int) ([][]int, error) { 442 convertedMatches := make([][]int, 0) 443 if len(key) == 0 || len(matches) == 0 { 444 return matches, nil 445 } 446 447 // parse key and build a byte index to rune index map 448 indexMap := make(map[int]int) 449 byteIndex := 0 450 runeIndex := 0 451 keyBytes := []byte(key) 452 keyLen := len(key) 453 for { 454 indexMap[byteIndex] = runeIndex 455 if byteIndex < keyLen { 456 _, runeLen := utf8.DecodeRune(keyBytes[byteIndex:]) 457 byteIndex += runeLen 458 runeIndex++ 459 } else { 460 break 461 } 462 } 463 464 if u.logger_utils.GetLogLevel() >= log.LogLevelDebug { 465 u.logger_utils.Debugf("key=%v, indexMap=%v%v%v\n", base.UdTagBegin, key, base.UdTagEnd, indexMap) 466 } 467 468 var ok bool 469 for _, match := range matches { 470 convertedMatch := make([]int, 2) 471 convertedMatch[0], ok = indexMap[match[0]] 472 if !ok { 473 // should not happen 474 errMsg := u.InvalidRuneIndexErrorMessage(key, match[0]) 475 u.logger_utils.Errorf(errMsg) 476 return nil, errors.New(errMsg) 477 } 478 convertedMatch[1], ok = indexMap[match[1]] 479 if !ok { 480 // should not happen 481 errMsg := u.InvalidRuneIndexErrorMessage(key, match[1]) 482 u.logger_utils.Errorf(errMsg) 483 return nil, errors.New(errMsg) 484 } 485 convertedMatches = append(convertedMatches, convertedMatch) 486 } 487 488 return convertedMatches, nil 489} 490 491func (u *Utilities) InvalidRuneIndexErrorMessage(key string, index int) string { 492 return fmt.Sprintf("byte index, %v, in match for key, %v, is not a starting index for a rune", index, key) 493} 494 495func (u *Utilities) LocalBucketUUID(local_connStr string, bucketName string, logger *log.CommonLogger) (string, error) { 496 return u.BucketUUID(local_connStr, bucketName, "", "", base.HttpAuthMechPlain, nil, false, nil, nil, logger) 497} 498 499func (u *Utilities) LocalBucketPassword(local_connStr string, bucketName string, logger *log.CommonLogger) (string, error) { 500 return u.BucketPassword(local_connStr, bucketName, "", "", base.HttpAuthMechPlain, nil, false, nil, nil, logger) 501} 502 503func (u *Utilities) ReplicationStatusNotFoundError(topic string) error { 504 return fmt.Errorf("Cannot find replication status for topic %v", topic) 505} 506 507func (u *Utilities) BucketNotFoundError(bucketName string) error { 508 return fmt.Errorf("Bucket `%v` not found.", bucketName) 509} 510 511// creates a local memcached connection. 512// always use plain auth 513func (u *Utilities) GetMemcachedConnectionWFeatures(serverAddr, bucketName, userAgent string, 514 keepAlivePeriod time.Duration, features HELOFeatures, logger *log.CommonLogger) (mcc.ClientIface, HELOFeatures, error) { 515 var respondedFeatures HELOFeatures 516 517 logger.Infof("GetMemcachedConnection serverAddr=%v, bucketName=%v\n", serverAddr, bucketName) 518 if serverAddr == "" { 519 err := fmt.Errorf("Failed to get memcached connection because serverAddr is empty. bucketName=%v, userAgent=%v", bucketName, userAgent) 520 logger.Warnf(err.Error()) 521 return nil, respondedFeatures, err 522 } 523 username, password, err := cbauth.GetMemcachedServiceAuth(serverAddr) 524 if u.logger_utils.GetLogLevel() >= log.LogLevelDebug { 525 logger.Debugf("memcached auth: username=%v%v%v, password=%v%v%v, err=%v\n", base.UdTagBegin, username, base.UdTagEnd, base.UdTagBegin, password, base.UdTagEnd, err) 526 } 527 if err != nil { 528 return nil, respondedFeatures, err 529 } 530 531 return u.GetRemoteMemcachedConnectionWFeatures(serverAddr, username, password, bucketName, userAgent, true /*plainAuth*/, keepAlivePeriod, features, logger) 532} 533 534func (u *Utilities) GetMemcachedConnection(serverAddr, bucketName, userAgent string, 535 keepAlivePeriod time.Duration, logger *log.CommonLogger) (mcc.ClientIface, error) { 536 var noFeatures HELOFeatures 537 clientIface, _, err := u.GetMemcachedConnectionWFeatures(serverAddr, bucketName, userAgent, keepAlivePeriod, noFeatures, logger) 538 return clientIface, err 539} 540 541func (u *Utilities) GetMemcachedRawConn(serverAddr, username, password, bucketName string, plainAuth bool, 542 keepAlivePeriod time.Duration, logger *log.CommonLogger) (mcc.ClientIface, error) { 543 conn, err := base.NewConn(serverAddr, username, password, bucketName, plainAuth, keepAlivePeriod, logger) 544 if err != nil { 545 return nil, err 546 } 547 return conn, err 548} 549 550func (u *Utilities) GetRemoteMemcachedConnectionWFeatures(serverAddr, username, password, bucketName, userAgent string, 551 plainAuth bool, keepAlivePeriod time.Duration, features HELOFeatures, logger *log.CommonLogger) (mcc.ClientIface, HELOFeatures, error) { 552 var err error 553 var conn mcc.ClientIface 554 var respondedFeatures HELOFeatures 555 556 getRemoteMcConnOp := func() error { 557 conn, err = u.GetMemcachedRawConn(serverAddr, username, password, bucketName, plainAuth, keepAlivePeriod, logger) 558 if err != nil { 559 logger.Warnf("Failed to construct memcached client for %v, err=%v\n", serverAddr, err) 560 return err 561 } 562 563 respondedFeatures, err = u.SendHELOWithFeatures(conn, userAgent, base.HELOTimeout, base.HELOTimeout, features, logger) 564 565 if err != nil { 566 conn.Close() 567 logger.Warnf("Failed to send HELO for %v, err=%v\n", conn, err) 568 return err 569 } 570 return nil 571 } 572 573 opErr := u.ExponentialBackoffExecutor("GetRemoteMemcachedConnection", base.RemoteMcRetryWaitTime, base.MaxRemoteMcRetry, 574 base.RemoteMcRetryFactor, getRemoteMcConnOp) 575 576 if opErr != nil { 577 logger.Errorf(opErr.Error()) 578 return nil, respondedFeatures, err 579 } 580 581 return conn, respondedFeatures, err 582} 583 584func (u *Utilities) GetRemoteMemcachedConnection(serverAddr, username, password, bucketName, userAgent string, 585 plainAuth bool, keepAlivePeriod time.Duration, logger *log.CommonLogger) (mcc.ClientIface, error) { 586 var noFeatureEnabled HELOFeatures 587 conn, _, err := u.GetRemoteMemcachedConnectionWFeatures(serverAddr, username, password, bucketName, userAgent, plainAuth, keepAlivePeriod, 588 noFeatureEnabled, logger) 589 return conn, err 590} 591 592// send helo with specified user agent string to memcached 593// the helo is purely informational, for the identification of the client 594// unsuccessful response is not treated as errors 595func (u *Utilities) SendHELO(client mcc.ClientIface, userAgent string, readTimeout, writeTimeout time.Duration, 596 logger *log.CommonLogger) (err error) { 597 var allFeaturesDisabled HELOFeatures 598 heloReq := u.ComposeHELORequest(userAgent, allFeaturesDisabled) 599 600 var response *mc.MCResponse 601 response, err = u.sendHELORequest(client, heloReq, userAgent, readTimeout, writeTimeout, logger) 602 if err != nil { 603 logger.Errorf("Received error response from HELO command. userAgent=%v, err=%v.", userAgent, err) 604 } else if response.Status != mc.SUCCESS { 605 logger.Warnf("Received unexpected response from HELO command. userAgent=%v, response status=%v.", userAgent, response.Status) 606 } else { 607 logger.Infof("Successfully sent HELO command with userAgent=%v", userAgent) 608 } 609 return 610} 611 612// send helo to memcached with data type (including xattr) feature enabled 613// used exclusively by xmem nozzle 614// we need to know whether data type is indeed enabled from helo response 615// unsuccessful response is treated as errors 616func (u *Utilities) SendHELOWithFeatures(client mcc.ClientIface, userAgent string, readTimeout, writeTimeout time.Duration, requestedFeatures HELOFeatures, logger *log.CommonLogger) (respondedFeatures HELOFeatures, err error) { 617 // Initially set initial respondedFeatures to None since no compression negotiated should not be invalid 618 respondedFeatures.CompressionType = base.CompressionTypeNone 619 620 heloReq := u.ComposeHELORequest(userAgent, requestedFeatures) 621 622 var response *mc.MCResponse 623 response, err = u.sendHELORequest(client, heloReq, userAgent, readTimeout, writeTimeout, logger) 624 if err != nil { 625 logger.Errorf("Received error response from HELO command. userAgent=%v, err=%v.", userAgent, err) 626 } else if response.Status != mc.SUCCESS { 627 errMsg := fmt.Sprintf("Received unexpected response from HELO command. userAgent=%v, response status=%v.", userAgent, response.Status) 628 logger.Error(errMsg) 629 err = errors.New(errMsg) 630 } else { 631 // helo succeeded. parse response body for features enabled 632 bodyLen := len(response.Body) 633 if (bodyLen & 1) != 0 { 634 // body has to have even number of bytes 635 logger.Errorf("Received response body with odd number of bytes from HELO command. userAgent=%v, (redacted) response body=%v%v%v.", userAgent, base.UdTagBegin, response.Body, base.UdTagEnd) 636 err = errors.New(fmt.Sprintf("Received response body with odd number of bytes from HELO command. userAgent=%v.", userAgent)) 637 return 638 } 639 pos := 0 640 for { 641 if pos >= bodyLen { 642 break 643 } 644 feature := binary.BigEndian.Uint16(response.Body[pos : pos+2]) 645 if feature == base.HELO_FEATURE_XATTR { 646 respondedFeatures.Xattribute = true 647 } 648 if feature == base.HELO_FEATURE_SNAPPY { 649 respondedFeatures.CompressionType = base.CompressionTypeSnappy 650 } 651 if feature == base.HELO_FEATURE_XERROR { 652 respondedFeatures.Xerror = true 653 } 654 pos += 2 655 } 656 logger.Infof("Successfully sent HELO command with userAgent=%v. attributes=%v", userAgent, respondedFeatures) 657 } 658 return 659} 660 661func (u *Utilities) sendHELORequest(client mcc.ClientIface, heloReq *mc.MCRequest, userAgent string, readTimeout, writeTimeout time.Duration, 662 logger *log.CommonLogger) (response *mc.MCResponse, err error) { 663 664 conn := client.Hijack() 665 conn.(net.Conn).SetWriteDeadline(time.Now().Add(writeTimeout)) 666 _, err = conn.Write(heloReq.Bytes()) 667 conn.(net.Conn).SetWriteDeadline(time.Time{}) 668 if err != nil { 669 logger.Warnf("Error sending HELO command. userAgent=%v, err=%v.", userAgent, err) 670 return 671 } 672 673 conn.(net.Conn).SetReadDeadline(time.Now().Add(readTimeout)) 674 response, err = client.Receive() 675 conn.(net.Conn).SetReadDeadline(time.Time{}) 676 return 677} 678 679// compose a HELO command 680func (u *Utilities) ComposeHELORequest(userAgent string, features HELOFeatures) *mc.MCRequest { 681 var value []byte 682 var numOfFeatures = features.NumberOfActivatedFeatures() 683 var sliceIndex int 684 bytesToAllocate := base.HELO_BYTES_PER_FEATURE * (numOfFeatures + 1) // TCP_NO_DELAY is included by default 685 value = make([]byte, bytesToAllocate) 686 687 // tcp no delay - [0:2] 688 binary.BigEndian.PutUint16(value[sliceIndex:sliceIndex+base.HELO_BYTES_PER_FEATURE], base.HELO_FEATURE_TCP_NO_DELAY) 689 sliceIndex += base.HELO_BYTES_PER_FEATURE 690 691 // Xattribute 692 if features.Xattribute { 693 binary.BigEndian.PutUint16(value[sliceIndex:sliceIndex+base.HELO_BYTES_PER_FEATURE], base.HELO_FEATURE_XATTR) 694 sliceIndex += base.HELO_BYTES_PER_FEATURE 695 } 696 697 // Compression 698 if features.CompressionType == base.CompressionTypeSnappy { 699 binary.BigEndian.PutUint16(value[sliceIndex:sliceIndex+base.HELO_BYTES_PER_FEATURE], base.HELO_FEATURE_SNAPPY) 700 sliceIndex += base.HELO_BYTES_PER_FEATURE 701 } 702 703 // Xerror 704 if features.Xerror { 705 binary.BigEndian.PutUint16(value[sliceIndex:sliceIndex+base.HELO_BYTES_PER_FEATURE], base.HELO_FEATURE_XERROR) 706 sliceIndex += base.HELO_BYTES_PER_FEATURE 707 } 708 709 return &mc.MCRequest{ 710 Key: []byte(userAgent), 711 Opcode: mc.HELLO, 712 Body: value, 713 } 714} 715 716func (u *Utilities) GetIntSettingFromSettings(settings metadata.ReplicationSettingsMap, settingName string) (int, error) { 717 settingObj := u.GetSettingFromSettings(settings, settingName) 718 if settingObj == nil { 719 return -1, nil 720 } 721 722 setting, ok := settingObj.(int) 723 if !ok { 724 return -1, fmt.Errorf("Setting %v is of wrong type", settingName) 725 } 726 727 return setting, nil 728} 729 730func (u *Utilities) GetStringSettingFromSettings(settings metadata.ReplicationSettingsMap, settingName string) (string, error) { 731 settingObj := u.GetSettingFromSettings(settings, settingName) 732 if settingObj == nil { 733 return "", nil 734 } 735 736 setting, ok := settingObj.(string) 737 if !ok { 738 return "", fmt.Errorf("Setting %v is of wrong type", settingName) 739 } 740 741 return setting, nil 742} 743 744func (u *Utilities) GetSettingFromSettings(settings metadata.ReplicationSettingsMap, settingName string) interface{} { 745 if settings == nil { 746 return nil 747 } 748 749 setting, ok := settings[settingName] 750 if !ok { 751 return nil 752 } 753 754 return setting 755} 756 757func (u *Utilities) GetMemcachedClient(serverAddr, bucketName string, kv_mem_clients map[string]mcc.ClientIface, 758 userAgent string, keepAlivePeriod time.Duration, logger *log.CommonLogger) (mcc.ClientIface, error) { 759 client, ok := kv_mem_clients[serverAddr] 760 if ok { 761 return client, nil 762 } else { 763 if bucketName == "" { 764 err := fmt.Errorf("Failed to get memcached client because of unexpected empty bucketName. serverAddr=%v, userAgent=%v", serverAddr, userAgent) 765 logger.Warnf(err.Error()) 766 return nil, err 767 } 768 769 var client, err = u.GetMemcachedConnection(serverAddr, bucketName, userAgent, keepAlivePeriod, logger) 770 if err == nil { 771 kv_mem_clients[serverAddr] = client 772 return client, nil 773 } else { 774 return nil, err 775 } 776 } 777} 778 779func (u *Utilities) GetServerVBucketsMap(connStr, bucketName string, bucketInfo map[string]interface{}) (map[string][]uint16, error) { 780 vbucketServerMapObj, ok := bucketInfo[base.VBucketServerMapKey] 781 if !ok { 782 return nil, fmt.Errorf("Error getting vbucket server map from bucket info. connStr=%v, bucketName=%v, bucketInfo=%v\n", connStr, bucketName, bucketInfo) 783 } 784 vbucketServerMap, ok := vbucketServerMapObj.(map[string]interface{}) 785 if !ok { 786 return nil, fmt.Errorf("Vbucket server map is of wrong type. connStr=%v, bucketName=%v, vbucketServerMap=%v\n", connStr, bucketName, vbucketServerMapObj) 787 } 788 789 // get server list 790 serverListObj, ok := vbucketServerMap[base.ServerListKey] 791 if !ok { 792 return nil, fmt.Errorf("Error getting server list from vbucket server map. connStr=%v, bucketName=%v, vbucketServerMap=%v\n", connStr, bucketName, vbucketServerMap) 793 } 794 serverList, ok := serverListObj.([]interface{}) 795 if !ok { 796 return nil, fmt.Errorf("Server list is of wrong type. connStr=%v, bucketName=%v, serverList=%v\n", connStr, bucketName, serverListObj) 797 } 798 799 servers := make([]string, len(serverList)) 800 for index, serverName := range serverList { 801 serverNameStr, ok := serverName.(string) 802 if !ok { 803 return nil, fmt.Errorf("Server name is of wrong type. connStr=%v, bucketName=%v, serverName=%v\n", connStr, bucketName, serverName) 804 } 805 servers[index] = serverNameStr 806 } 807 808 // get vbucket "map" 809 vbucketMapObj, ok := vbucketServerMap[base.VBucketMapKey] 810 if !ok { 811 return nil, fmt.Errorf("Error getting vbucket map from vbucket server map. connStr=%v, bucketName=%v, vbucketServerMap=%v\n", connStr, bucketName, vbucketServerMap) 812 } 813 vbucketMap, ok := vbucketMapObj.([]interface{}) 814 if !ok { 815 return nil, fmt.Errorf("Vbucket map is of wrong type. connStr=%v, bucketName=%v, vbucketMap=%v\n", connStr, bucketName, vbucketMapObj) 816 } 817 818 serverVBMap := make(map[string][]uint16) 819 820 for vbno, indexListObj := range vbucketMap { 821 indexList, ok := indexListObj.([]interface{}) 822 if !ok { 823 return nil, fmt.Errorf("Index list is of wrong type. connStr=%v, bucketName=%v, indexList=%v\n", connStr, bucketName, indexListObj) 824 } 825 if len(indexList) == 0 { 826 return nil, fmt.Errorf("Index list is empty. connStr=%v, bucketName=%v, vbno=%v\n", connStr, bucketName, vbno) 827 } 828 indexFloat, ok := indexList[0].(float64) 829 if !ok { 830 return nil, fmt.Errorf("Master index is of wrong type. connStr=%v, bucketName=%v, index=%v\n", connStr, bucketName, indexList[0]) 831 } 832 indexInt := int(indexFloat) 833 if indexInt >= len(servers) { 834 return nil, fmt.Errorf("Master index is out of range. connStr=%v, bucketName=%v, index=%v\n", connStr, bucketName, indexInt) 835 } else if indexInt < 0 { 836 // During rebalancing or topology changes, it's possible ns_server may return a -1 for index. Callers should treat it as an transient error. 837 return nil, fmt.Errorf(fmt.Sprintf("%v connStr=%v, bucketName=%v, index=%v\n", base.ErrorMasterNegativeIndex, connStr, bucketName, indexInt)) 838 } 839 840 server := servers[indexInt] 841 var vbList []uint16 842 vbList, ok = serverVBMap[server] 843 if !ok { 844 vbList = make([]uint16, 0) 845 } 846 vbList = append(vbList, uint16(vbno)) 847 serverVBMap[server] = vbList 848 } 849 return serverVBMap, nil 850} 851 852func (u *Utilities) GetRemoteServerVBucketsMap(connStr, bucketName string, bucketInfo map[string]interface{}, useExternal bool) (kvVbMap map[string][]uint16, err error) { 853 kvVbMap, err = u.GetServerVBucketsMap(connStr, bucketName, bucketInfo) 854 if err != nil { 855 return 856 } 857 if useExternal { 858 u.TranslateKvVbMap(kvVbMap, bucketInfo) 859 } 860 return 861} 862 863// get bucket type setting from bucket info 864func (u *Utilities) GetBucketTypeFromBucketInfo(bucketName string, bucketInfo map[string]interface{}) (string, error) { 865 bucketType := "" 866 bucketTypeObj, ok := bucketInfo[base.BucketTypeKey] 867 if !ok { 868 return "", fmt.Errorf("Error looking up bucket type of bucket %v", bucketName) 869 } else { 870 bucketType, ok = bucketTypeObj.(string) 871 if !ok { 872 return "", fmt.Errorf("bucketType on bucket %v is of wrong type.", bucketName) 873 } 874 } 875 return bucketType, nil 876} 877 878// check if a bucket belongs to an elastic search (es) cluster by looking for "authType" field in bucket info. 879// if not found, cluster is es 880func (u *Utilities) CheckWhetherClusterIsESBasedOnBucketInfo(bucketInfo map[string]interface{}) bool { 881 _, ok := bucketInfo[base.AuthTypeKey] 882 return !ok 883} 884 885// get conflict resolution type setting from bucket info 886func (u *Utilities) GetConflictResolutionTypeFromBucketInfo(bucketName string, bucketInfo map[string]interface{}) (string, error) { 887 conflictResolutionType := base.ConflictResolutionType_Seqno 888 conflictResolutionTypeObj, ok := bucketInfo[base.ConflictResolutionTypeKey] 889 if ok { 890 conflictResolutionType, ok = conflictResolutionTypeObj.(string) 891 if !ok { 892 return "", fmt.Errorf("ConflictResolutionType on bucket %v is of wrong type.", bucketName) 893 } 894 } 895 return conflictResolutionType, nil 896} 897 898// get EvictionPolicy setting from bucket info 899func (u *Utilities) GetEvictionPolicyFromBucketInfo(bucketName string, bucketInfo map[string]interface{}) (string, error) { 900 evictionPolicy := "" 901 evictionPolicyObj, ok := bucketInfo[base.EvictionPolicyKey] 902 if ok { 903 evictionPolicy, ok = evictionPolicyObj.(string) 904 if !ok { 905 return "", fmt.Errorf("EvictionPolicy on bucket %v is of wrong type.", bucketName) 906 } 907 } 908 return evictionPolicy, nil 909} 910 911/** 912 * The second section is couchbase REST related utility functions 913 */ 914// This method is used to get the SSL port for target nodes - will use alternate fields if requested 915func (u *Utilities) GetMemcachedSSLPortMap(connStr, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, 916 clientCertificate []byte, clientKey []byte, bucket string, logger *log.CommonLogger, useExternal bool) (base.SSLPortMap, error) { 917 ret := make(base.SSLPortMap) 918 919 logger.Infof("GetMemcachedSSLPort, connStr=%v\n", connStr) 920 bucketInfo, err := u.GetClusterInfo(connStr, base.BPath+bucket, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, logger) 921 if err != nil { 922 return nil, err 923 } 924 925 nodesExt, ok := bucketInfo[base.NodeExtKey] 926 if !ok { 927 return nil, u.BucketInfoParseError(bucketInfo, logger) 928 } 929 930 nodesExtArray, ok := nodesExt.([]interface{}) 931 if !ok { 932 return nil, u.BucketInfoParseError(bucketInfo, logger) 933 } 934 935 var hostName string 936 for _, nodeExt := range nodesExtArray { 937 var portNumberToUse uint16 938 nodeExtMap, ok := nodeExt.(map[string]interface{}) 939 if !ok { 940 return nil, u.BucketInfoParseError(bucketInfo, logger) 941 } 942 943 // note that this is the only place where nodeExtMap contains a hostname without port 944 // instead of a host address with port 945 hostName, err = u.getHostNameWithoutPortFromNodeInfo(connStr, nodeExtMap, logger) 946 if err != nil { 947 return nil, u.BucketInfoParseError(bucketInfo, logger) 948 } 949 950 // Internal key 951 service, ok := nodeExtMap[base.ServicesKey] 952 if !ok { 953 return nil, u.BucketInfoParseError(bucketInfo, logger) 954 } 955 956 services_map, ok := service.(map[string]interface{}) 957 if !ok { 958 return nil, u.BucketInfoParseError(bucketInfo, logger) 959 } 960 961 kv_port, ok := services_map[base.KVPortKey] 962 if !ok { 963 // the node may not have kv services. skip the node 964 continue 965 } 966 kvPortFloat, ok := kv_port.(float64) 967 if !ok { 968 return nil, u.BucketInfoParseError(bucketInfo, logger) 969 } 970 971 hostAddr := base.GetHostAddr(hostName, uint16(kvPortFloat)) 972 973 kv_ssl_port, ok := services_map[base.KVSSLPortKey] 974 if !ok { 975 return nil, u.BucketInfoParseError(bucketInfo, logger) 976 } 977 978 kvSSLPortFloat, ok := kv_ssl_port.(float64) 979 if !ok { 980 return nil, u.BucketInfoParseError(bucketInfo, logger) 981 } 982 portNumberToUse = uint16(kvSSLPortFloat) 983 984 // Since this is a call intended for targets, get the external info if requested 985 if useExternal { 986 externalHostAddr, externalKVPort, externalKVPortErr, externalSSLPort, externalSSLPortErr := u.GetExternalAddressAndKvPortsFromNodeInfo(nodeExtMap) 987 if len(externalHostAddr) == 0 { 988 return nil, base.ErrorTargetNoAltHostName 989 } 990 if externalKVPortErr == nil { 991 // External address and port both exist 992 hostAddr = base.GetHostAddr(externalHostAddr, uint16(externalKVPort)) 993 } else if externalKVPortErr == base.ErrorNoPortNumber { 994 // External address exists, but port does not. Use internal host's port number 995 hostAddr = base.GetHostAddr(externalHostAddr, uint16(kvPortFloat)) 996 } 997 if externalSSLPortErr == nil { 998 portNumberToUse = uint16(externalSSLPort) 999 } 1000 } 1001 1002 ret[hostAddr] = portNumberToUse 1003 } 1004 logger.Infof("memcached ssl port map=%v\n", ret) 1005 1006 return ret, nil 1007} 1008 1009func (u *Utilities) BucketInfoParseError(bucketInfo map[string]interface{}, logger *log.CommonLogger) error { 1010 errMsg := "Error parsing memcached ssl port of remote cluster." 1011 detailedErrMsg := errMsg + fmt.Sprintf("bucketInfo=%v", bucketInfo) 1012 logger.Errorf(detailedErrMsg) 1013 return fmt.Errorf(errMsg) 1014} 1015 1016// The input is a non-https address, potentially with or without a port 1017// Returns 2 pairs of strings: 1018// 1. Hostname:internalSSLPort 1019// 2. Hostname:externalSSLPort (or "" if no external port) 1020func (u *Utilities) HttpsRemoteHostAddr(hostAddr string, logger *log.CommonLogger) (string, string, error) { 1021 // Extract hostname to be combined with SSL port 1022 hostName := base.GetHostName(hostAddr) 1023 1024 internalSSLPort, internalErr, externalSSLPort, externalErr := u.GetRemoteSSLPorts(hostAddr, logger) 1025 if internalErr != nil { 1026 return "", "", internalErr 1027 } 1028 1029 internalHostPort := base.GetHostAddr(hostName, internalSSLPort) 1030 var externalHostPort string 1031 if externalErr == nil { 1032 externalHostPort = base.GetHostAddr(hostName, externalSSLPort) 1033 } 1034 1035 return internalHostPort, externalHostPort, nil 1036} 1037 1038func (u *Utilities) GetRemoteSSLPorts(hostAddr string, logger *log.CommonLogger) (internalSSLPort uint16, internalSSLErr error, externalSSLPort uint16, externalSSLErr error) { 1039 externalSSLErr = base.ErrorNoPortNumber 1040 1041 portInfo := make(map[string]interface{}) 1042 err, statusCode := u.QueryRestApiWithAuth(hostAddr, base.SSLPortsPath, false, "", "", base.HttpAuthMechPlain, nil, false, nil, nil, base.MethodGet, "", nil, 0, &portInfo, nil, false, logger) 1043 if err == nil && statusCode == http.StatusUnauthorized { 1044 // SSLPorts request normally do not require any user credentials 1045 // the only place unauthorized error could be returned is when target is elasticsearch cluster 1046 // treat this case differently so that a more specific error message can be returned to user 1047 internalSSLErr = base.ErrorUnauthorized 1048 return 1049 } 1050 if err != nil || statusCode != http.StatusOK { 1051 internalSSLErr = fmt.Errorf("Failed on calling %v on host %v, err=%v, statusCode=%v", base.SSLPortsPath, hostAddr, err, statusCode) 1052 return 1053 } 1054 1055 sslPort, ok := portInfo[base.SSLPortKey] 1056 if !ok { 1057 errMsg := "Failed to parse port info. ssl port is missing." 1058 logger.Errorf("%v. portInfo=%v", errMsg, portInfo) 1059 internalSSLErr = fmt.Errorf(errMsg) 1060 return 1061 } 1062 1063 sslPortFloat, ok := sslPort.(float64) 1064 if !ok { 1065 internalSSLErr = fmt.Errorf("ssl port is of wrong type. Expected type: float64; Actual type: %s", reflect.TypeOf(sslPort)) 1066 return 1067 } 1068 internalSSLPort = uint16(sslPortFloat) 1069 1070 portNumber, externalSSLErr := u.getExternalSSLMgtPort(portInfo) 1071 if externalSSLErr == nil { 1072 externalSSLPort = (uint16)(portNumber) 1073 } 1074 return 1075} 1076 1077func (u *Utilities) GetClusterInfoWStatusCode(hostAddr, path, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte, logger *log.CommonLogger) (map[string]interface{}, error, int) { 1078 clusterInfo := make(map[string]interface{}) 1079 err, statusCode := u.QueryRestApiWithAuth(hostAddr, path, false, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, base.MethodGet, "", nil, 0, &clusterInfo, nil, false, logger) 1080 if err != nil || statusCode != http.StatusOK { 1081 return nil, fmt.Errorf("Failed on calling host=%v, path=%v, err=%v, statusCode=%v", hostAddr, path, err, statusCode), statusCode 1082 } 1083 return clusterInfo, nil, statusCode 1084} 1085 1086func (u *Utilities) GetClusterInfo(hostAddr, path, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte, logger *log.CommonLogger) (map[string]interface{}, error) { 1087 clusterInfo, err, _ := u.GetClusterInfoWStatusCode(hostAddr, path, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, logger) 1088 return clusterInfo, err 1089} 1090 1091func (u *Utilities) GetClusterUUID(hostAddr, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte, logger *log.CommonLogger) (string, error) { 1092 clusterInfo, err := u.GetClusterInfo(hostAddr, base.PoolsPath, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, logger) 1093 if err != nil { 1094 return "", err 1095 } 1096 clusterUUIDObj, ok := clusterInfo[base.UUIDKey] 1097 if !ok { 1098 return "", fmt.Errorf("Cannot find uuid key in cluster info. hostAddr=%v, clusterInfo=%v\n", hostAddr, clusterInfo) 1099 } 1100 clusterUUID, ok := clusterUUIDObj.(string) 1101 if !ok { 1102 // cluster uuid is "[]" for unintialized cluster 1103 _, ok = clusterUUIDObj.([]interface{}) 1104 if ok { 1105 return "", fmt.Errorf("cluster %v is not initialized. clusterUUIDObj=%v\n", hostAddr, clusterUUIDObj) 1106 } else { 1107 return "", fmt.Errorf("uuid key in cluster info is not of string type. hostAddr=%v, clusterUUIDObj=%v\n", hostAddr, clusterUUIDObj) 1108 } 1109 } 1110 return clusterUUID, nil 1111} 1112 1113// get a list of node infos with full info 1114// this api calls xxx/pools/nodes, which returns full node info including clustercompatibility, etc. 1115// the catch is that this xxx/pools/nodes is not supported by elastic search cluster 1116func (u *Utilities) GetNodeListWithFullInfo(hostAddr, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte, logger *log.CommonLogger) ([]interface{}, error) { 1117 clusterInfo, err := u.GetClusterInfo(hostAddr, base.NodesPath, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, logger) 1118 if err != nil { 1119 return nil, err 1120 } 1121 1122 return u.GetNodeListFromInfoMap(clusterInfo, logger) 1123 1124} 1125 1126// get a list of node infos with minimum info 1127// this api calls xxx/pools/default, which returns a subset of node info such as hostname 1128// this api can/needs to be used when connecting to elastic search cluster, which supports xxx/pools/default 1129func (u *Utilities) GetNodeListWithMinInfo(hostAddr, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte, logger *log.CommonLogger) ([]interface{}, error) { 1130 clusterInfo, err := u.GetClusterInfo(hostAddr, base.DefaultPoolPath, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, logger) 1131 if err != nil { 1132 return nil, err 1133 } 1134 1135 return u.GetNodeListFromInfoMap(clusterInfo, logger) 1136 1137} 1138 1139func (u *Utilities) GetClusterUUIDAndNodeListWithMinInfo(hostAddr, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte, logger *log.CommonLogger) (string, []interface{}, error) { 1140 defaultPoolInfo, err := u.GetClusterInfo(hostAddr, base.DefaultPoolPath, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, logger) 1141 if err != nil { 1142 return "", nil, err 1143 } 1144 1145 return u.GetClusterUUIDAndNodeListWithMinInfoFromDefaultPoolInfo(defaultPoolInfo, logger) 1146 1147} 1148 1149func (u *Utilities) GetClusterUUIDAndNodeListWithMinInfoFromDefaultPoolInfo(defaultPoolInfo map[string]interface{}, logger *log.CommonLogger) (string, []interface{}, error) { 1150 clusterUUID, err := u.GetClusterUUIDFromDefaultPoolInfo(defaultPoolInfo, logger) 1151 if err != nil { 1152 return "", nil, err 1153 } 1154 1155 nodeList, err := u.GetNodeListFromInfoMap(defaultPoolInfo, logger) 1156 1157 return clusterUUID, nodeList, err 1158 1159} 1160 1161// get bucket info 1162// a specialized case of GetClusterInfo 1163func (u *Utilities) GetBucketInfo(hostAddr, bucketName, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte, logger *log.CommonLogger) (map[string]interface{}, error) { 1164 bucketInfo := make(map[string]interface{}) 1165 err, statusCode := u.QueryRestApiWithAuth(hostAddr, base.DefaultPoolBucketsPath+bucketName, false, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, base.MethodGet, "", nil, 0, &bucketInfo, nil, false, logger) 1166 if err == nil && statusCode == http.StatusOK { 1167 return bucketInfo, nil 1168 } 1169 if statusCode == http.StatusNotFound { 1170 return nil, u.GetNonExistentBucketError() 1171 } else { 1172 logger.Errorf("Failed to get bucket info for bucket '%v'. host=%v, err=%v, statusCode=%v", bucketName, hostAddr, err, statusCode) 1173 return nil, fmt.Errorf("Failed to get bucket info.") 1174 } 1175} 1176 1177// get bucket uuid 1178func (u *Utilities) BucketUUID(hostAddr, bucketName, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte, logger *log.CommonLogger) (string, error) { 1179 bucketInfo, err := u.GetBucketInfo(hostAddr, bucketName, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, logger) 1180 if err != nil { 1181 return "", err 1182 } 1183 1184 return u.GetBucketUuidFromBucketInfo(bucketName, bucketInfo, logger) 1185} 1186 1187// get bucket password 1188func (u *Utilities) BucketPassword(hostAddr, bucketName, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte, logger *log.CommonLogger) (string, error) { 1189 bucketInfo, err := u.GetBucketInfo(hostAddr, bucketName, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, logger) 1190 if err != nil { 1191 return "", err 1192 } 1193 1194 return u.GetBucketPasswordFromBucketInfo(bucketName, bucketInfo, logger) 1195} 1196 1197func (u *Utilities) GetLocalBuckets(hostAddr string, logger *log.CommonLogger) (map[string]string, error) { 1198 return u.GetBuckets(hostAddr, "", "", base.HttpAuthMechPlain, nil, false, nil, nil, logger) 1199} 1200 1201// return a map of buckets 1202// key = bucketName, value = bucketUUID 1203func (u *Utilities) GetBuckets(hostAddr, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte, logger *log.CommonLogger) (map[string]string, error) { 1204 bucketListInfo := make([]interface{}, 0) 1205 err, statusCode := u.QueryRestApiWithAuth(hostAddr, base.DefaultPoolBucketsPath, false, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, base.MethodGet, "", nil, 0, &bucketListInfo, nil, false, logger) 1206 if err != nil || statusCode != http.StatusOK { 1207 return nil, fmt.Errorf("Failed on calling host=%v, path=%v, err=%v, statusCode=%v", hostAddr, base.DefaultPoolBucketsPath, err, statusCode) 1208 } 1209 1210 return u.GetBucketsFromInfoMap(bucketListInfo, logger) 1211} 1212 1213func (u *Utilities) GetBucketsFromInfoMap(bucketListInfo []interface{}, logger *log.CommonLogger) (map[string]string, error) { 1214 buckets := make(map[string]string) 1215 for _, bucketInfo := range bucketListInfo { 1216 bucketInfoMap, ok := bucketInfo.(map[string]interface{}) 1217 if !ok { 1218 errMsg := fmt.Sprintf("bucket info is not of map type. bucket info=%v", bucketInfo) 1219 logger.Error(errMsg) 1220 return nil, errors.New(errMsg) 1221 } 1222 bucketNameInfo, ok := bucketInfoMap[base.BucketNameKey] 1223 if !ok { 1224 errMsg := fmt.Sprintf("bucket info does not contain bucket name. bucket info=%v", bucketInfoMap) 1225 logger.Error(errMsg) 1226 return nil, errors.New(errMsg) 1227 } 1228 bucketName, ok := bucketNameInfo.(string) 1229 if !ok { 1230 errMsg := fmt.Sprintf("bucket name is not of string type. bucket name=%v", bucketNameInfo) 1231 logger.Error(errMsg) 1232 return nil, errors.New(errMsg) 1233 } 1234 bucketUUIDInfo, ok := bucketInfoMap[base.UUIDKey] 1235 if !ok { 1236 errMsg := fmt.Sprintf("bucket info does not contain bucket uuid. bucket info=%v", bucketInfoMap) 1237 logger.Error(errMsg) 1238 return nil, errors.New(errMsg) 1239 } 1240 bucketUUID, ok := bucketUUIDInfo.(string) 1241 if !ok { 1242 errMsg := fmt.Sprintf("bucket uuid is not of string type. bucket uuid=%v", bucketUUIDInfo) 1243 logger.Error(errMsg) 1244 return nil, errors.New(errMsg) 1245 } 1246 buckets[bucketName] = bucketUUID 1247 } 1248 1249 return buckets, nil 1250} 1251 1252func (u *Utilities) BucketValidationInfo(hostAddr, bucketName, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte, 1253 logger *log.CommonLogger) (bucketInfo map[string]interface{}, bucketType string, bucketUUID string, bucketConflictResolutionType string, 1254 bucketEvictionPolicy string, bucketKVVBMap map[string][]uint16, err error) { 1255 1256 return u.bucketValidationInfoInternal(hostAddr, bucketName, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, logger, false /*external*/) 1257} 1258 1259func (u *Utilities) RemoteBucketValidationInfo(hostAddr, bucketName, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte, 1260 logger *log.CommonLogger, useExternal bool) (bucketInfo map[string]interface{}, bucketType string, bucketUUID string, bucketConflictResolutionType string, 1261 bucketEvictionPolicy string, bucketKVVBMap map[string][]uint16, err error) { 1262 1263 return u.bucketValidationInfoInternal(hostAddr, bucketName, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, logger, useExternal) 1264} 1265 1266// get a number of fields in bucket for validation purpose 1267// 1. bucket type 1268// 2. bucket uuid 1269// 3. bucket conflict resolution type 1270// 4. bucket eviction policy 1271// 5. bucket server vb map 1272func (u *Utilities) bucketValidationInfoInternal(hostAddr, bucketName, username, password string, authMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte, 1273 logger *log.CommonLogger, remote bool) (bucketInfo map[string]interface{}, bucketType string, bucketUUID string, bucketConflictResolutionType string, 1274 bucketEvictionPolicy string, bucketKVVBMap map[string][]uint16, err error) { 1275 1276 bucketValidationInfoOp := func() error { 1277 bucketInfo, err = u.GetBucketInfo(hostAddr, bucketName, username, password, authMech, certificate, sanInCertificate, clientCertificate, clientKey, logger) 1278 if err != nil { 1279 return err 1280 } 1281 1282 bucketType, err = u.GetBucketTypeFromBucketInfo(bucketName, bucketInfo) 1283 if err != nil { 1284 err = fmt.Errorf("Error retrieving BucketType setting on bucket %v. err=%v", bucketName, err) 1285 return err 1286 } 1287 bucketUUID, err = u.GetBucketUuidFromBucketInfo(bucketName, bucketInfo, logger) 1288 if err != nil { 1289 err = fmt.Errorf("Error retrieving UUID setting on bucket %v. err=%v", bucketName, err) 1290 return err 1291 } 1292 bucketConflictResolutionType, err = u.GetConflictResolutionTypeFromBucketInfo(bucketName, bucketInfo) 1293 if err != nil { 1294 err = fmt.Errorf("Error retrieving ConflictResolutionType setting on bucket %v. err=%v", bucketName, err) 1295 return err 1296 } 1297 bucketEvictionPolicy, err = u.GetEvictionPolicyFromBucketInfo(bucketName, bucketInfo) 1298 if err != nil { 1299 err = fmt.Errorf("Error retrieving EvictionPolicy setting on bucket %v. err=%v", bucketName, err) 1300 return err 1301 } 1302 bucketKVVBMap, err = u.GetServerVBucketsMap(hostAddr, bucketName, bucketInfo) 1303 if err != nil { 1304 err = fmt.Errorf("Error retrieving server vb map on bucket %v. err=%v", bucketName, err) 1305 return err 1306 } 1307 1308 if remote { 1309 u.TranslateKvVbMap(bucketKVVBMap, bucketInfo) 1310 } 1311 return nil 1312 } 1313 1314 err = u.ExponentialBackoffExecutor("BucketValidationInfo", base.BucketInfoOpWaitTime, base.BucketInfoOpMaxRetry, base.BucketInfoOpRetryFactor, bucketValidationInfoOp) 1315 return 1316} 1317 1318func (u *Utilities) GetBucketUuidFromBucketInfo(bucketName string, bucketInfo map[string]interface{}, logger *log.CommonLogger) (string, error) { 1319 bucketUUID := "" 1320 bucketUUIDObj, ok := bucketInfo[base.UUIDKey] 1321 if !ok { 1322 return "", fmt.Errorf("Error looking up uuid of bucket %v", bucketName) 1323 } else { 1324 bucketUUID, ok = bucketUUIDObj.(string) 1325 if !ok { 1326 return "", fmt.Errorf("Uuid of bucket %v is of wrong type", bucketName) 1327 } 1328 } 1329 return bucketUUID, nil 1330} 1331 1332func (u *Utilities) GetClusterUUIDFromDefaultPoolInfo(defaultPoolInfo map[string]interface{}, logger *log.CommonLogger) (string, error) { 1333 bucketsObj, ok := defaultPoolInfo[base.BucketsKey] 1334 if !ok { 1335 errMsg := fmt.Sprintf("Cannot find buckets key in default pool info. defaultPoolInfo=%v\n", defaultPoolInfo) 1336 logger.Error(errMsg) 1337 return "", errors.New(errMsg) 1338 } 1339 bucketsInfo, ok := bucketsObj.(map[string]interface{}) 1340 if !ok { 1341 errMsg := fmt.Sprintf("buckets in default pool info is not of map type. buckets=%v\n", bucketsObj) 1342 logger.Error(errMsg) 1343 return "", errors.New(errMsg) 1344 } 1345 uriObj, ok := bucketsInfo[base.URIKey] 1346 if !ok { 1347 errMsg := fmt.Sprintf("Cannot find uri key in buckets info. bucketsInfo=%v\n", bucketsInfo) 1348 logger.Error(errMsg) 1349 return "", errors.New(errMsg) 1350 } 1351 uri, ok := uriObj.(string) 1352 if !ok { 1353 errMsg := fmt.Sprintf("uri in buckets info is not of string type. uri=%v\n", uriObj) 1354 logger.Error(errMsg) 1355 return "", errors.New(errMsg) 1356 } 1357 1358 return u.GetClusterUUIDFromURI(uri) 1359} 1360 1361func (u *Utilities) GetClusterUUIDFromURI(uri string) (string, error) { 1362 // uri is in the form of /pools/default/buckets?uuid=d5dea23aa7ee3771becb3fcdb46ff956 1363 searchKey := base.UUIDKey + "=" 1364 index := strings.LastIndex(uri, searchKey) 1365 if index < 0 { 1366 return "", fmt.Errorf("uri does not contain uuid. uri=%v", uri) 1367 } 1368 return uri[index+len(searchKey):], nil 1369} 1370 1371func (u *Utilities) GetClusterCompatibilityFromBucketInfo(bucketInfo map[string]interface{}, logger *log.CommonLogger) (int, error) { 1372 nodeList, err := u.GetNodeListFromInfoMap(bucketInfo, logger) 1373 if err != nil { 1374 return 0, err 1375 } 1376 1377 clusterCompatibility, err := u.GetClusterCompatibilityFromNodeList(nodeList) 1378 if err != nil { 1379 logger.Error(err.Error()) 1380 return 0, err 1381 } 1382 1383 return clusterCompatibility, nil 1384} 1385 1386func (u *Utilities) GetBucketPasswordFromBucketInfo(bucketName string, bucketInfo map[string]interface{}, logger *log.CommonLogger) (string, error) { 1387 bucketPassword := "" 1388 bucketPasswordObj, ok := bucketInfo[base.SASLPasswordKey] 1389 if !ok { 1390 return "", fmt.Errorf("Error looking up password of bucket %v", bucketName) 1391 } else { 1392 bucketPassword, ok = bucketPasswordObj.(string) 1393 if !ok { 1394 return "", fmt.Errorf("Password of bucket %v is of wrong type", bucketName) 1395 } 1396 } 1397 return bucketPassword, nil 1398} 1399 1400func (u *Utilities) GetNodeListFromInfoMap(infoMap map[string]interface{}, logger *log.CommonLogger) ([]interface{}, error) { 1401 // get node list from the map 1402 nodes, ok := infoMap[base.NodesKey] 1403 if !ok { 1404 errMsg := fmt.Sprintf("info map contains no nodes. info map=%v", infoMap) 1405 logger.Error(errMsg) 1406 return nil, errors.New(errMsg) 1407 } 1408 1409 nodeList, ok := nodes.([]interface{}) 1410 if !ok { 1411 errMsg := fmt.Sprintf("nodes is not of list type. type of nodes=%v", reflect.TypeOf(nodes)) 1412 logger.Error(errMsg) 1413 return nil, errors.New(errMsg) 1414 } 1415 1416 // only return the nodes that are active 1417 activeNodeList := make([]interface{}, 0) 1418 for _, node := range nodeList { 1419 nodeInfoMap, ok := node.(map[string]interface{}) 1420 if !ok { 1421 errMsg := fmt.Sprintf("node info is not of map type. type=%v", reflect.TypeOf(node)) 1422 logger.Error(errMsg) 1423 return nil, errors.New(errMsg) 1424 } 1425 clusterMembershipObj, ok := nodeInfoMap[base.ClusterMembershipKey] 1426 if !ok { 1427 // this could happen when target is elastic search cluster (or maybe very old couchbase cluster?) 1428 // consider the node to be "active" to be safe 1429 errMsg := fmt.Sprintf("node info map does not contain cluster membership. node info map=%v ", nodeInfoMap) 1430 logger.Debug(errMsg) 1431 activeNodeList = append(activeNodeList, node) 1432 continue 1433 } 1434 clusterMembership, ok := clusterMembershipObj.(string) 1435 if !ok { 1436 // play safe and return the node as active 1437 errMsg := fmt.Sprintf("cluster membership is not string type. type=%v ", reflect.TypeOf(clusterMembershipObj)) 1438 logger.Warn(errMsg) 1439 activeNodeList = append(activeNodeList, node) 1440 continue 1441 } 1442 if clusterMembership == "" || clusterMembership == base.ClusterMembership_Active { 1443 activeNodeList = append(activeNodeList, node) 1444 } 1445 } 1446 1447 return activeNodeList, nil 1448} 1449 1450func (u *Utilities) GetClusterCompatibilityFromNodeList(nodeList []interface{}) (int, error) { 1451 if len(nodeList) > 0 { 1452 firstNode, ok := nodeList[0].(map[string]interface{}) 1453 if !ok { 1454 return 0, fmt.Errorf("node info is of wrong type. node info=%v", nodeList[0]) 1455 } 1456 clusterCompatibility, ok := firstNode[base.ClusterCompatibilityKey] 1457 if !ok { 1458 return 0, fmt.Errorf("Can't get cluster compatibility info. node info=%v\n If replicating to ElasticSearch node, use XDCR v1.", nodeList[0]) 1459 } 1460 clusterCompatibilityFloat, ok := clusterCompatibility.(float64) 1461 if !ok { 1462 return 0, fmt.Errorf("cluster compatibility is not of int type. type=%v", reflect.TypeOf(clusterCompatibility)) 1463 } 1464 return int(clusterCompatibilityFloat), nil 1465 } 1466 1467 return 0, fmt.Errorf("node list is empty") 1468} 1469 1470// Used externally only - returns a list of nodes for management access 1471// if needHttps is true, returns both http addresses and https addresses 1472// if needHttps is false, returns http addresses and empty https addresses 1473func (u *Utilities) GetRemoteNodeAddressesListFromNodeList(nodeList []interface{}, connStr string, needHttps bool, logger *log.CommonLogger, useExternal bool) (base.StringPairList, error) { 1474 nodeAddressesList := make(base.StringPairList, len(nodeList)) 1475 var hostAddr string 1476 var hostHttpsAddr string 1477 var err error 1478 index := 0 1479 1480 for _, node := range nodeList { 1481 nodeInfoMap, ok := node.(map[string]interface{}) 1482 if !ok { 1483 errMsg := fmt.Sprintf("node info is not of map type. type of node info=%v", reflect.TypeOf(node)) 1484 logger.Error(errMsg) 1485 return nil, errors.New(errMsg) 1486 } 1487 1488 hostAddr, err = u.GetHostAddrFromNodeInfo(connStr, nodeInfoMap, false /*isHttps*/, logger, useExternal) 1489 if err != nil { 1490 errMsg := fmt.Sprintf("cannot get hostname from node info %v", nodeInfoMap) 1491 logger.Error(errMsg) 1492 return nil, errors.New(errMsg) 1493 } 1494 1495 if needHttps { 1496 hostHttpsAddr, err = u.GetHostAddrFromNodeInfo(connStr, nodeInfoMap, true /*isHttps*/, logger, useExternal) 1497 if err != nil { 1498 errMsg := fmt.Sprintf("cannot get https hostname from node info %v", nodeInfoMap) 1499 logger.Error(errMsg) 1500 return nil, errors.New(errMsg) 1501 } 1502 } else { 1503 hostHttpsAddr = "" 1504 } 1505 1506 nodeAddressesList[index] = base.StringPair{hostAddr, hostHttpsAddr} 1507 index++ 1508 } 1509 return nodeAddressesList, nil 1510} 1511 1512func (u *Utilities) GetHttpsMgtPortFromNodeInfo(nodeInfo map[string]interface{}) (int, error) { 1513 portsObjRaw, portsObjExists := nodeInfo[base.PortsKey] 1514 if !portsObjExists { 1515 return -1, base.ErrorNoPortNumber 1516 } 1517 1518 portsObj, ok := portsObjRaw.(map[string]interface{}) 1519 if !ok { 1520 return -1, base.ErrorNoPortNumber 1521 } 1522 1523 sslPort, ok := portsObj[base.SSLPortKey] 1524 if !ok { 1525 return -1, base.ErrorNoPortNumber 1526 } 1527 1528 sslPortFloat, ok := sslPort.(float64) 1529 if !ok { 1530 return -1, base.ErrorNoPortNumber 1531 } 1532 return int(sslPortFloat), nil 1533} 1534 1535func (u *Utilities) GetHostAddrFromNodeInfo(connStr string, nodeInfo map[string]interface{}, isHttps bool, logger *log.CommonLogger, useExternal bool) (string, error) { 1536 // Internal node information 1537 hostAddr, err := u.getAdminHostAddrFromNodeInfo(connStr, nodeInfo, logger) 1538 if err != nil { 1539 errMsg := fmt.Sprintf("cannot get hostname from node info %v", nodeInfo) 1540 logger.Error(errMsg) 1541 return "", errors.New(errMsg) 1542 } 1543 1544 if isHttps { 1545 sslPort, err := u.GetHttpsMgtPortFromNodeInfo(nodeInfo) 1546 if err != nil { 1547 return "", err 1548 } 1549 hostName := base.GetHostName(hostAddr) 1550 hostAddr = base.GetHostAddr(hostName, uint16(sslPort)) 1551 } 1552 1553 if useExternal { 1554 // If external info exists, replace accordingly - hostAddr is currently pointing to internalNode's info 1555 if externalAddr, externalMgtPort, externalErr := u.GetExternalMgtHostAndPort(nodeInfo, isHttps); externalErr == nil { 1556 hostAddr = base.GetHostAddr(externalAddr, (uint16)(externalMgtPort)) 1557 } else if externalErr == base.ErrorNoPortNumber { 1558 // Extract original internal node management port from above 1559 hostPort, portErr := base.GetPortNumber(hostAddr) 1560 if portErr == nil { 1561 // Combine externalHost:internalPort 1562 hostAddr = base.GetHostAddr(externalAddr, (uint16)(hostPort)) 1563 } else { 1564 // Original internal address did not have port number, so continue to just have externalAddr[:noPort] 1565 hostAddr = externalAddr 1566 } 1567 } 1568 } 1569 1570 return hostAddr, nil 1571} 1572 1573// Returns: 1574// 1. External IP 1575// 2. External kv port (if applicable, -1 if not found) 1576// 3. Returns nil if port exists - ErrorNoPortNumber if kv (direct) port doesn't exist 1577// 4. External KvSSL port (if applicable, -1 if not found) 1578// 5. Returns nil if SSL port exists - ErrorNoPortNumber if SSL port doesn't exist 1579// Any other errors are considered bad op 1580func (u *Utilities) GetExternalAddressAndKvPortsFromNodeInfo(nodeInfo map[string]interface{}) (string, int, error, int, error) { 1581 var hostAddr string 1582 var portNumber int 1583 var sslPortNumber int 1584 var portErr error 1585 var sslPortErr error 1586 1587 alternateObjRaw, alternateExists := nodeInfo[base.AlternateKey] 1588 if !alternateExists { 1589 return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist 1590 } 1591 1592 alternateObj, ok := (alternateObjRaw).(map[string]interface{}) 1593 if !ok { 1594 u.logger_utils.Errorf("GetExternalAddressAndKvPortsFromNodeInfo: Unable to convert alternateObj to map[string]interface{}") 1595 return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist 1596 } 1597 1598 externalObjRaw, externalExists := alternateObj[base.ExternalKey] 1599 if !externalExists { 1600 return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist 1601 } 1602 1603 externalObj, ok := (externalObjRaw).(map[string]interface{}) 1604 if !ok { 1605 u.logger_utils.Errorf("GetExternalAddressAndKvPortsFromNodeInfo: Unable to convert externalObj to map[string]interface{}") 1606 return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist 1607 } 1608 1609 hostAddrObjRaw, hostAddrObjExists := externalObj[base.HostNameKey] 1610 if !hostAddrObjExists { 1611 return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist 1612 } 1613 1614 hostAddr, ok = (hostAddrObjRaw).(string) 1615 if !ok { 1616 u.logger_utils.Errorf("GetExternalAddressAndKvPortsFromNodeInfo: Unable to convert hostAddrObj to string") 1617 return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist 1618 } else if len(hostAddr) == 0 { 1619 u.logger_utils.Errorf("GetExternalAddressAndKvPortsFromNodeInfo: Empty Hostname") 1620 return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist 1621 } 1622 1623 portsObjRaw, portsObjExists := externalObj[base.PortsKey] 1624 if !portsObjExists { 1625 return hostAddr, -1, base.ErrorNoPortNumber, -1, base.ErrorNoPortNumber 1626 } 1627 1628 portErr = base.ErrorNoPortNumber 1629 sslPortErr = base.ErrorNoPortNumber 1630 portNumber = -1 1631 sslPortNumber = -1 1632 portsObj, ok := portsObjRaw.(map[string]interface{}) 1633 if !ok { 1634 u.logger_utils.Warnf("Unable to convert portsObj to map[string]interface{}") 1635 } else { 1636 // Get the External kv port (internally "direct") port if it's there 1637 // KV team wants clients to use nodeServices, which means that "direct" is used as an internal naming convention 1638 // The alternate address fields use "kv" as what "direct" means to traditional XDCR 1639 kvPortFloat, kvPortExists := portsObj[base.KVPortKey] 1640 if kvPortExists { 1641 kvPortIntCheck, ok := kvPortFloat.(float64) 1642 if ok { 1643 portNumber = (int)(kvPortIntCheck) 1644 portErr = nil 1645 } 1646 } 1647 // Get the SSL port if it is there 1648 sslPort, sslPortExists := portsObj[base.KVSSLPortKey] 1649 if sslPortExists { 1650 sslPortIntCheck, ok := sslPort.(float64) 1651 if ok { 1652 sslPortNumber = (int)(sslPortIntCheck) 1653 sslPortErr = nil 1654 } 1655 } 1656 } 1657 return hostAddr, portNumber, portErr, sslPortNumber, sslPortErr 1658} 1659 1660func (u *Utilities) GetExternalMgtHostAndPort(nodeInfo map[string]interface{}, isHttps bool) (string, int, error) { 1661 var hostAddr string 1662 var portErr error = base.ErrorNoPortNumber 1663 var portNumber int = -1 1664 1665 alternateObjRaw, alternateExists := nodeInfo[base.AlternateKey] 1666 if !alternateExists { 1667 return "", -1, base.ErrorResourceDoesNotExist 1668 } 1669 1670 alternateObj, ok := alternateObjRaw.(map[string]interface{}) 1671 if !ok { 1672 u.logger_utils.Errorf("GetExternalMgtHostAndPort: unable to cast alternateObj to map[string]interface{}") 1673 fmt.Printf("GetExternalMgtHostAndPort: unable to cast alternateObj to map[string]interface{}\n") 1674 return "", -1, base.ErrorResourceDoesNotExist 1675 } 1676 1677 externalObjRaw, externalExists := alternateObj[base.ExternalKey] 1678 if !externalExists { 1679 fmt.Printf("externalObjRaw does not exist\n") 1680 return "", -1, base.ErrorResourceDoesNotExist 1681 } 1682 1683 externalObj, ok := externalObjRaw.(map[string]interface{}) 1684 if !ok { 1685 u.logger_utils.Errorf("GetExternalMgtHostAndPort: unable to cast externalObj to map[string]interface{}") 1686 fmt.Printf("GetExternalMgtHostAndPort: unable to cast externalObj to map[string]interface{}\n") 1687 return "", -1, base.ErrorResourceDoesNotExist 1688 } 1689 1690 hostAddrObj, hostAddrObjExists := externalObj[base.HostNameKey] 1691 if !hostAddrObjExists { 1692 return "", -1, base.ErrorResourceDoesNotExist 1693 } 1694 1695 hostAddr, ok = hostAddrObj.(string) 1696 if !ok { 1697 u.logger_utils.Errorf("GetExternalMgtHostAndPort: unable to cast hostAddr to string") 1698 return "", -1, base.ErrorResourceDoesNotExist 1699 } else if len(hostAddr) == 0 { 1700 u.logger_utils.Errorf("GetExternalMgtHostAndPort: empty hostAddr") 1701 return "", -1, base.ErrorResourceDoesNotExist 1702 } 1703 1704 portsObjRaw, portsObjExists := externalObj[base.PortsKey] 1705 if !portsObjExists { 1706 return hostAddr, portNumber, portErr 1707 } 1708 1709 portsObj, ok := portsObjRaw.(map[string]interface{}) 1710 if !ok { 1711 u.logger_utils.Errorf("GetExternalMgtHostAndPort: unable to cast portsObj to map[string]interface{}") 1712 return hostAddr, portNumber, portErr 1713 } 1714 1715 var portKey string 1716 if isHttps { 1717 portKey = base.SSLMgtPortKey 1718 } else { 1719 portKey = base.MgtPortKey 1720 } 1721 1722 mgmtObjRaw, mgmtObjExists := portsObj[portKey] 1723 if !mgmtObjExists { 1724 return hostAddr, portNumber, portErr 1725 } 1726 1727 mgmtObj, ok := mgmtObjRaw.(float64) 1728 if !ok { 1729 u.logger_utils.Errorf("GetExternalMgtHostAndPort: unable to cast mgmtObj to float64") 1730 return hostAddr, portNumber, portErr 1731 } 1732 1733 portNumber = (int)(mgmtObj) 1734 portErr = nil 1735 return hostAddr, portNumber, portErr 1736} 1737 1738// Returns remote node's SSL management port if it exists 1739func (u *Utilities) getExternalSSLMgtPort(nodeInfo map[string]interface{}) (int, error) { 1740 alternateObjRaw, alternateExists := nodeInfo[base.AlternateKey] 1741 if !alternateExists { 1742 return -1, base.ErrorResourceDoesNotExist 1743 } 1744 1745 alternateObj, ok := alternateObjRaw.(map[string]interface{}) 1746 if !ok { 1747 u.logger_utils.Errorf("getExternalSSLMgtPort: unable to cast alternateObj to map[string]interface{}") 1748 return -1, base.ErrorResourceDoesNotExist 1749 } 1750 1751 externalObjRaw, externalExists := alternateObj[base.ExternalKey] 1752 if !externalExists { 1753 return -1, base.ErrorResourceDoesNotExist 1754 } 1755 1756 externalObj, ok := externalObjRaw.(map[string]interface{}) 1757 if !ok { 1758 u.logger_utils.Errorf("getExternalSSLMgtPort: unable to cast externalObj to map[string]interface{}") 1759 return -1, base.ErrorResourceDoesNotExist 1760 } 1761 1762 portsObjRaw, portsObjExists := externalObj[base.PortsKey] 1763 if !portsObjExists { 1764 u.logger_utils.Warnf("Unable to convert portsObj to map[string]interface{}") 1765 return -1, base.ErrorNoPortNumber 1766 } 1767 1768 portsObj, ok := portsObjRaw.(map[string]interface{}) 1769 if !ok { 1770 return -1, base.ErrorNoPortNumber 1771 } 1772 1773 mgmtSSLObjRaw, mgmtSSLExists := portsObj[base.SSLMgtPortKey] 1774 if !mgmtSSLExists { 1775 return -1, base.ErrorNoPortNumber 1776 } 1777 1778 mgmtSSLObj, ok := mgmtSSLObjRaw.(float64) 1779 if !ok { 1780 u.logger_utils.Warnf("Unable to convert portsObj to float64") 1781 return -1, base.ErrorNoPortNumber 1782 } 1783 1784 return (int)(mgmtSSLObj), nil 1785} 1786 1787// Returns: 1788// 1. External Hostname 1789// 2. capi port 1790// 3. capi port error 1791// 4. capi SSL port 1792// 5. capi SSL port error 1793func (u *Utilities) getExternalHostAndCapiPorts(nodeInfo map[string]interface{}) (string, int, error, int, error) { 1794 var hostAddr string 1795 var capiPort int = -1 1796 var capiSSLPort int = -1 1797 var capiPortErr error = base.ErrorNoPortNumber 1798 var capiSSLPortErr error = base.ErrorNoPortNumber 1799 1800 alternateObjRaw, alternateExists := nodeInfo[base.AlternateKey] 1801 if !alternateExists { 1802 return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist 1803 } 1804 1805 alternateObj, ok := (alternateObjRaw).(map[string]interface{}) 1806 if !ok { 1807 u.logger_utils.Errorf("getExternalHostAndCapiPorts: Unable to convert alternateObj to map[string]interface{}") 1808 return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist 1809 } 1810 1811 externalObjRaw, externalExists := alternateObj[base.ExternalKey] 1812 if !externalExists { 1813 return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist 1814 } 1815 1816 externalObj, ok := (externalObjRaw).(map[string]interface{}) 1817 if !ok { 1818 u.logger_utils.Errorf("getExternalHostAndCapiPorts: Unable to convert externalObj to map[string]interface{}") 1819 return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist 1820 } 1821 1822 hostAddrObjRaw, hostAddrObjExists := externalObj[base.HostNameKey] 1823 if !hostAddrObjExists { 1824 return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist 1825 } 1826 1827 hostAddr, ok = (hostAddrObjRaw).(string) 1828 if !ok { 1829 u.logger_utils.Errorf("getExternalHostAndCapiPorts: Unable to convert hostAddrObj to string") 1830 return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist 1831 } else if len(hostAddr) == 0 { 1832 u.logger_utils.Errorf("getExternalHostAndCapiPorts: Empty Hostname") 1833 return "", -1, base.ErrorResourceDoesNotExist, -1, base.ErrorResourceDoesNotExist 1834 } 1835 1836 portsObjRaw, portsObjExists := externalObj[base.PortsKey] 1837 if !portsObjExists { 1838 return "", -1, base.ErrorNoPortNumber, -1, base.ErrorNoPortNumber 1839 } 1840 1841 portsObj, ok := portsObjRaw.(map[string]interface{}) 1842 if !ok { 1843 u.logger_utils.Warnf("Unable to convert portsObj to map[string]interface{}") 1844 } else { 1845 capiPortRaw, capiPortExists := portsObj[base.CapiPortKey] 1846 if capiPortExists { 1847 portNumberFloat, ok := (capiPortRaw).(float64) 1848 if !ok { 1849 u.logger_utils.Warnf("Unable to convert capiPort to float64") 1850 } else { 1851 capiPort = (int)(portNumberFloat) 1852 capiPortErr = nil 1853 } 1854 } 1855 // Get the SSL port if it is there 1856 if sslPort, sslPortExists := portsObj[base.CapiSSLPortKey]; sslPortExists { 1857 sslPortNumberFloat, ok := sslPort.(float64) 1858 if !ok { 1859 u.logger_utils.Warnf("Unable to convert capiSSLPort to float64") 1860 } else { 1861 capiSSLPort = (int)(sslPortNumberFloat) 1862 capiSSLPortErr = nil 1863 } 1864 } 1865 } 1866 return hostAddr, capiPort, capiPortErr, capiSSLPort, capiSSLPortErr 1867} 1868 1869func (u *Utilities) getAdminHostAddrFromNodeInfo(adminHostAddr string, nodeInfo map[string]interface{}, logger *log.CommonLogger) (string, error) { 1870 hostAddr, err := u.getHostAddrFromNodeInfoInternal(adminHostAddr, nodeInfo, logger) 1871 if err == base.ErrorNoHostName { 1872 hostAddr = adminHostAddr 1873 err = nil 1874 } 1875 return hostAddr, err 1876} 1877 1878func (u *Utilities) getHostAddrFromNodeInfoInternal(adminHostAddr string, nodeInfo map[string]interface{}, logger *log.CommonLogger) (string, error) { 1879 var hostAddr string 1880 var ok bool 1881 1882 hostAddrObj, ok := nodeInfo[base.HostNameKey] 1883 if !ok { 1884 logger.Infof("hostname is missing from node info %v. Host name in remote cluster reference, %v, will be used.\n", nodeInfo, adminHostAddr) 1885 return "", base.ErrorNoHostName 1886 } else { 1887 hostAddr, ok = hostAddrObj.(string) 1888 if !ok { 1889 return "", fmt.Errorf("Error getting host address from target cluster %v. host name, %v, is of wrong type\n", adminHostAddr, hostAddrObj) 1890 } 1891 } 1892 1893 return hostAddr, nil 1894} 1895 1896// Note - the translated map should be in the k->v form of: 1897// internalNodeAddress:directPort -> externalNodeAddress:kvPort 1898func (u *Utilities) GetIntExtHostNameKVPortTranslationMap(mapContainingNodesKey map[string]interface{}) (map[string]string, error) { 1899 internalExternalNodesMap := make(map[string]string) 1900 var err error 1901 var directPort int 1902 var nodesList []interface{} 1903 1904 nodesList, err = u.GetNodeListFromInfoMap(mapContainingNodesKey, u.logger_utils) 1905 if err != nil { 1906 return internalExternalNodesMap, err 1907 } 1908 1909 for _, nodeInfoRaw := range nodesList { 1910 nodeInfo, ok := nodeInfoRaw.(map[string]interface{}) 1911 if !ok { 1912 u.logger_utils.Warnf("GetIntExtHostNameKVPortTranslationMap unable to cast nodeInfo as map[string]interface{} from: %v", nodeInfoRaw) 1913 // skip this node 1914 continue 1915 } 1916 1917 internalAddressAndPortRaw, internalAddressOk := nodeInfo[base.HostNameKey] 1918 if !internalAddressOk { 1919 u.logger_utils.Warnf("GetIntExtHostNameKVPortTranslationMap unable to retrieve internal host name from %v", nodeInfo) 1920 // skip this node 1921 continue 1922 } 1923 1924 internalAddressAndPort, ok := (internalAddressAndPortRaw).(string) 1925 if !ok { 1926 u.logger_utils.Warnf("GetIntExtHostNameKVPortTranslationMap unable to cast internalAddressAndPort as string: %v", internalAddressAndPortRaw) 1927 // skip this node 1928 continue 1929 } 1930 1931 internalAddress := base.GetHostName(internalAddressAndPort) 1932 // Internally, we care about "direct" field 1933 portsObjRaw, portsExists := nodeInfo[base.PortsKey] 1934 if !portsExists { 1935 u.logger_utils.Warnf("Unable to get port for %v", internalAddress) 1936 // skip this node 1937 continue 1938 } 1939 portsObj, ok := portsObjRaw.(map[string]interface{}) 1940 if !ok { 1941 u.logger_utils.Warnf("GetIntExtHostNameKVPortTranslationMap unable to cast portsObj as map[string]interface{} from: %v", portsObjRaw) 1942 // skip this node 1943 continue 1944 } 1945 1946 directPortIface, directPortExists := portsObj[base.DirectPortKey] 1947 if !directPortExists { 1948 u.logger_utils.Warnf("Unable to get direct port for %v", internalAddress) 1949 // skip this node 1950 continue 1951 } 1952 directPortFloat, ok := directPortIface.(float64) 1953 if !ok { 1954 u.logger_utils.Warnf("GetIntExtHostNameKVPortTranslationMap unable to cast directPort as float", directPortIface) 1955 // skip this node 1956 continue 1957 } 1958 1959 directPort = (int)(directPortFloat) 1960 internalAddressAndDirectPort := base.GetHostAddr(internalAddress, (uint16)(directPort)) 1961 1962 externalAddress, externalDirectPort, externalErr, _, _ := u.GetExternalAddressAndKvPortsFromNodeInfo(nodeInfo) 1963 if len(externalAddress) > 0 { 1964 if externalErr == nil { 1965 // External address and port both exist 1966 internalExternalNodesMap[internalAddressAndDirectPort] = base.GetHostAddr(externalAddress, (uint16)(externalDirectPort)) 1967 } else if externalErr == base.ErrorNoPortNumber { 1968 // External address exists, but port does not. Use internal host's port number 1969 internalExternalNodesMap[internalAddressAndDirectPort] = base.GetHostAddr(externalAddress, (uint16)(directPort)) 1970 } 1971 } 1972 } 1973 1974 if len(internalExternalNodesMap) == 0 { 1975 err = base.ErrorResourceDoesNotExist 1976 } 1977 return internalExternalNodesMap, err 1978} 1979 1980func (u *Utilities) GetHostNameFromNodeInfo(adminHostAddr string, nodeInfo map[string]interface{}, logger *log.CommonLogger) (string, error) { 1981 hostAddr, err := u.getAdminHostAddrFromNodeInfo(adminHostAddr, nodeInfo, logger) 1982 if err != nil { 1983 return "", err 1984 } 1985 return base.GetHostName(hostAddr), nil 1986} 1987 1988// this method is called when nodeInfo came from the terse bucket call, pools/default/b/[bucketName] 1989// where hostname in nodeInfo is a host name without port rather than a host address with port 1990func (u *Utilities) getHostNameWithoutPortFromNodeInfo(adminHostAddr string, nodeInfo map[string]interface{}, logger *log.CommonLogger) (string, error) { 1991 hostName, err := u.getHostAddrFromNodeInfoInternal(adminHostAddr, nodeInfo, logger) 1992 if err == base.ErrorNoHostName { 1993 hostName = base.GetHostName(adminHostAddr) 1994 err = nil 1995 } 1996 1997 return hostName, err 1998} 1999 2000//convenient api for rest calls to local cluster 2001func (u *Utilities) QueryRestApi(baseURL string, 2002 path string, 2003 preservePathEncoding bool, 2004 httpCommand string, 2005 contentType string, 2006 body []byte, 2007 timeout time.Duration, 2008 out interface{}, 2009 logger *log.CommonLogger) (error, int) { 2010 return u.QueryRestApiWithAuth(baseURL, path, preservePathEncoding, "", "", base.HttpAuthMechPlain, nil, false, nil, nil, httpCommand, contentType, body, timeout, out, nil, false, logger) 2011} 2012 2013func (u *Utilities) EnforcePrefix(prefix string, str string) string { 2014 var ret_str string = str 2015 if !strings.HasPrefix(str, prefix) { 2016 ret_str = prefix + str 2017 } 2018 return ret_str 2019} 2020 2021func (u *Utilities) RemovePrefix(prefix string, str string) string { 2022 ret_str := strings.Replace(str, prefix, "", 1) 2023 return ret_str 2024} 2025 2026//this expect the baseURL doesn't contain username and password 2027func (u *Utilities) QueryRestApiWithAuth( 2028 baseURL string, 2029 path string, 2030 preservePathEncoding bool, 2031 username string, 2032 password string, 2033 authMech base.HttpAuthMech, 2034 certificate []byte, 2035 san_in_certificate bool, 2036 clientCertificate []byte, 2037 clientKey []byte, 2038 httpCommand string, 2039 contentType string, 2040 body []byte, 2041 timeout time.Duration, 2042 out interface{}, 2043 client *http.Client, 2044 keep_client_alive bool, 2045 logger *log.CommonLogger) (err error, statusCode int) { 2046 var http_client *http.Client 2047 if authMech != base.HttpAuthMechScramSha { 2048 var req *http.Request 2049 http_client, req, err = u.prepareForRestCall(baseURL, path, preservePathEncoding, username, password, authMech, certificate, san_in_certificate, clientCertificate, clientKey, httpCommand, contentType, body, client, logger) 2050 if err != nil { 2051 return 2052 } 2053 err, statusCode = u.doRestCall(req, timeout, out, http_client, logger) 2054 } else { 2055 err, statusCode, http_client = u.queryRestApiWithScramShaAuth(baseURL, path, preservePathEncoding, username, password, httpCommand, contentType, body, timeout, out, client, logger) 2056 2057 } 2058 u.cleanupAfterRestCall(keep_client_alive, err, statusCode, http_client, logger) 2059 return 2060} 2061 2062func (u *Utilities) queryRestApiWithScramShaAuth( 2063 baseURL string, 2064 path string, 2065 preservePathEncoding bool, 2066 username string, 2067 password string, 2068 httpCommand string, 2069 contentType string, 2070 body []byte, 2071 timeout time.Duration, 2072 out interface{}, 2073 client *http.Client, 2074 logger *log.CommonLogger) (error, int, *http.Client) { 2075 2076 logger.Debugf("SCRAM-SHA authentication for user %v%v%v, baseURL=%v, path=%v\n", base.UdTagBegin, username, base.UdTagEnd, baseURL, path) 2077 2078 URL, err := u.constructURL(baseURL, path, preservePathEncoding, base.HttpAuthMechScramSha) 2079 if err != nil { 2080 return err, 0, nil 2081 } 2082 2083 req, err := scramsha.NewRequest(httpCommand, 2084 // URL.String() is adequate since scramSha is always called with preservePathEncoding set to false 2085 URL.String(), 2086 strings.NewReader(string(body))) 2087 if err != nil { 2088 return err, 0, nil 2089 } 2090 2091 if timeout == 0 { 2092 timeout = base.DefaultHttpTimeout 2093 } 2094 if client == nil { 2095 client = &http.Client{Timeout: timeout} 2096 } else { 2097 client.Timeout = timeout 2098 } 2099 2100 res, err := scramsha.DoScramSha(req, username, password, client) 2101 statusCode := 0 2102 if res != nil { 2103 statusCode = res.StatusCode 2104 } 2105 if err != nil { 2106 return fmt.Errorf("Received error when making SCRAM-SHA connection. baseURL=%v, path=%v, err=%v", baseURL, path, err), statusCode, client 2107 } 2108 2109 err = u.parseResponseBody(res, out, logger) 2110 return err, statusCode, client 2111 2112} 2113 2114func (u *Utilities) prepareForRestCall(baseURL string, 2115 path string, 2116 preservePathEncoding bool, 2117 username string, 2118 password string, 2119 authMech base.HttpAuthMech, 2120 certificate []byte, 2121 san_in_certificate bool, 2122 clientCertificate []byte, 2123 clientKey []byte, 2124 httpCommand string, 2125 contentType string, 2126 body []byte, 2127 client *http.Client, 2128 logger *log.CommonLogger) (*http.Client, *http.Request, error) { 2129 var l *log.CommonLogger = u.loggerForFunc(logger) 2130 var ret_client *http.Client = client 2131 2132 userAuthMode := base.UserAuthModeNone 2133 2134 if len(username) == 0 && len(clientCertificate) == 0 && path != base.SSLPortsPath { 2135 // username and clientCertificate can be both empty only when 2136 // 1. this is a local http call to the same node 2137 // or 2. this is a call to /nodes/self/xdcrSSLPorts on target to retrieve ssl port for subsequent https calls 2138 // treat case 1 separately, since we will need to set local user auth in http request 2139 userAuthMode = base.UserAuthModeLocal 2140 } else { 2141 // for http calls to remote target, set username and password in http request header if 2142 // 1. username has been provided 2143 // and 2. scram sha authentication is not used 2144 if len(username) != 0 && authMech != base.HttpAuthMechScramSha { 2145 userAuthMode = base.UserAuthModeBasic 2146 } 2147 } 2148 2149 req, host, err := u.ConstructHttpRequest(baseURL, path, preservePathEncoding, username, password, authMech, userAuthMode, httpCommand, contentType, body, l) 2150 if err != nil { 2151 return nil, nil, err 2152 } 2153 2154 if ret_client == nil { 2155 ret_client, err = u.GetHttpClient(username, authMech, certificate, san_in_certificate, clientCertificate, clientKey, host, l) 2156 if err != nil { 2157 l.Errorf("Failed to get client for request, err=%v, req=%v\n", err, req) 2158 return nil, nil, err 2159 } 2160 } 2161 return ret_client, req, nil 2162} 2163 2164func (u *Utilities) cleanupAfterRestCall(keep_client_alive bool, err error, statusCode int, client *http.Client, logger *log.CommonLogger) { 2165 if !keep_client_alive || u.IsSeriousNetError(err) || u.isFatalStatusCode(statusCode) { 2166 if client != nil && client.Transport != nil { 2167 transport, ok := client.Transport.(*http.Transport) 2168 if ok { 2169 if u.IsSeriousNetError(err) { 2170 logger.Debugf("Encountered %v, close all idle connections for this http client.\n", err) 2171 } 2172 transport.CloseIdleConnections() 2173 } 2174 } 2175 } 2176} 2177 2178func (u *Utilities) doRestCall(req *http.Request, 2179 timeout time.Duration, 2180 out interface{}, 2181 client *http.Client, 2182 logger *log.CommonLogger) (error, int) { 2183 if timeout > 0 { 2184 client.Timeout = timeout 2185 } else if client.Timeout != base.DefaultHttpTimeout { 2186 client.Timeout = base.DefaultHttpTimeout 2187 } 2188 2189 res, err := client.Do(req) 2190 if err == nil && res != nil { 2191 err = u.parseResponseBody(res, out, logger) 2192 return err, res.StatusCode 2193 } 2194 2195 return err, 0 2196 2197} 2198 2199func (u *Utilities) parseResponseBody(res *http.Response, out interface{}, logger *log.CommonLogger) (err error) { 2200 var l *log.CommonLogger = u.loggerForFunc(logger) 2201 if res != nil && res.Body != nil { 2202 defer res.Body.Close() 2203 var bod []byte 2204 if res.ContentLength == 0 { 2205 // If res.Body is empty, json.Unmarshal on an empty Body will return the error "unexpected end of JSON input" 2206 // Return a more specific error here so upstream callers can handle it 2207 err = base.ErrorResourceDoesNotExist 2208 return 2209 } 2210 bod, err = ioutil.ReadAll(io.LimitReader(res.Body, res.ContentLength)) 2211 if err != nil { 2212 l.Errorf("Failed to read response body, err=%v\n res=%v\n", err, res) 2213 return 2214 } 2215 if out != nil { 2216 err = json.Unmarshal(bod, out) 2217 if err != nil { 2218 l.Errorf("Failed to unmarshal the response as json, err=%v, bod=%v\n res=%v\n", err, string(bod), res) 2219 out = bod 2220 return 2221 } 2222 } 2223 } 2224 return 2225} 2226 2227//convenient api for rest calls to local cluster 2228func (u *Utilities) InvokeRestWithRetry(baseURL string, 2229 path string, 2230 preservePathEncoding bool, 2231 httpCommand string, 2232 contentType string, 2233 body []byte, 2234 timeout time.Duration, 2235 out interface{}, 2236 client *http.Client, 2237 keep_client_alive bool, 2238 logger *log.CommonLogger, num_retry int) (error, int) { 2239 return u.InvokeRestWithRetryWithAuth(baseURL, path, preservePathEncoding, "", "", base.HttpAuthMechPlain, nil, false, nil, nil, true, httpCommand, contentType, body, timeout, out, client, keep_client_alive, logger, num_retry) 2240} 2241 2242func (u *Utilities) InvokeRestWithRetryWithAuth(baseURL string, 2243 path string, 2244 preservePathEncoding bool, 2245 username string, 2246 password string, 2247 authMech base.HttpAuthMech, 2248 certificate []byte, 2249 san_in_certificate bool, 2250 clientCertificate []byte, 2251 clientKey []byte, 2252 insecureSkipVerify bool, 2253 httpCommand string, 2254 contentType string, 2255 body []byte, 2256 timeout time.Duration, 2257 out interface{}, 2258 client *http.Client, 2259 keep_client_alive bool, 2260 logger *log.CommonLogger, num_retry int) (err error, statusCode int) { 2261 2262 var http_client *http.Client = nil 2263 var req *http.Request = nil 2264 backoff_time := 500 * time.Millisecond 2265 2266 for i := 0; i < num_retry; i++ { 2267 if authMech != base.HttpAuthMechScramSha { 2268 http_client, req, err = u.prepareForRestCall(baseURL, path, preservePathEncoding, username, password, authMech, certificate, san_in_certificate, clientCertificate, clientKey, httpCommand, contentType, body, client, logger) 2269 if err == nil { 2270 err, statusCode = u.doRestCall(req, timeout, out, http_client, logger) 2271 } 2272 2273 if err == nil { 2274 break 2275 } 2276 } else { 2277 err, statusCode, http_client = u.queryRestApiWithScramShaAuth(baseURL, path, preservePathEncoding, username, password, httpCommand, contentType, body, timeout, out, client, logger) 2278 if err == nil { 2279 break 2280 } 2281 } 2282 2283 logger.Errorf("Received error when making rest call or unmarshalling data. baseURL=%v, path=%v, err=%v, statusCode=%v, num_retry=%v\n", baseURL, path, err, statusCode, i) 2284 2285 //cleanup the idle connection if the error is serious network error 2286 u.cleanupAfterRestCall(true /*keep_client_alive*/, err, statusCode, http_client, logger) 2287 2288 //backoff 2289 backoff_time = backoff_time + backoff_time 2290 time.Sleep(backoff_time) 2291 } 2292 2293 return 2294 2295} 2296 2297func (u *Utilities) GetHttpClient(username string, authMech base.HttpAuthMech, certificate []byte, san_in_certificate bool, clientCertificate, clientKey []byte, ssl_con_str string, logger *log.CommonLogger) (*http.Client, error) { 2298 var client *http.Client 2299 if authMech == base.HttpAuthMechHttps { 2300 caPool := x509.NewCertPool() 2301 ok := caPool.AppendCertsFromPEM(certificate) 2302 if !ok { 2303 return nil, base.InvalidCerfiticateError 2304 } 2305 2306 //using a separate tls connection to verify certificate 2307 //it can be changed in 1.4 when DialTLS is avaialbe in http.Transport 2308 conn, tlsConfig, err := base.MakeTLSConn(ssl_con_str, username, certificate, san_in_certificate, clientCertificate, clientKey, logger) 2309 if err != nil { 2310 return nil, err 2311 } 2312 conn.Close() 2313 2314 tr := &http.Transport{TLSClientConfig: tlsConfig, Dial: base.DialTCPWithTimeout} 2315 client = &http.Client{Transport: tr, 2316 Timeout: base.DefaultHttpTimeout} 2317 2318 } else { 2319 client = &http.Client{Timeout: base.DefaultHttpTimeout} 2320 } 2321 return client, nil 2322} 2323 2324//this expect the baseURL doesn't contain username and password 2325func (u *Utilities) ConstructHttpRequest( 2326 baseURL string, 2327 path string, 2328 preservePathEncoding bool, 2329 username string, 2330 password string, 2331 authMech base.HttpAuthMech, 2332 userAuthMode base.UserAuthMode, 2333 httpCommand string, 2334 contentType string, 2335 body []byte, 2336 logger *log.CommonLogger) (*http.Request, string, error) { 2337 url, err := u.constructURL(baseURL, path, preservePathEncoding, authMech) 2338 if err != nil { 2339 return nil, "", err 2340 } 2341 2342 var l *log.CommonLogger = u.loggerForFunc(logger) 2343 2344 req, err := http.NewRequest(httpCommand, url.String(), bytes.NewBuffer(body)) 2345 if err != nil { 2346 return nil, "", err 2347 } 2348 2349 if preservePathEncoding { 2350 // get the original Opaque back 2351 req.URL.Opaque = url.Opaque 2352 } 2353 2354 if contentType == "" { 2355 contentType = base.DefaultContentType 2356 } 2357 req.Header.Set(base.ContentType, contentType) 2358 2359 req.Header.Set(base.UserAgent, base.GoxdcrUserAgent) 2360 2361 switch userAuthMode { 2362 case base.UserAuthModeLocal: 2363 err := cbauth.SetRequestAuth(req) 2364 if err != nil { 2365 l.Errorf("Failed to set authentication to request. err=%v\n req=%v\n", err, req) 2366 return nil, "", err 2367 } 2368 case base.UserAuthModeBasic: 2369 req.SetBasicAuth(username, password) 2370 case base.UserAuthModeNone: 2371 // no op 2372 default: 2373 return nil, "", fmt.Errorf("Invalid userAuthMode %v", userAuthMode) 2374 } 2375 2376 //TODO: log request would log password barely 2377 l.Debugf("http request=%v\n", req) 2378 2379 return req, url.Host, nil 2380} 2381 2382func (u *Utilities) constructURL(baseURL string, 2383 path string, 2384 preservePathEncoding bool, 2385 authMech base.HttpAuthMech) (*url.URL, error) { 2386 2387 var baseURL_new string 2388 if authMech == base.HttpAuthMechHttps { 2389 baseURL_new = u.EnforcePrefix("https://", baseURL) 2390 } else { 2391 baseURL_new = u.EnforcePrefix("http://", baseURL) 2392 } 2393 url, err := couchbase.ParseURL(baseURL_new) 2394 if err != nil { 2395 return nil, err 2396 } 2397 2398 if !preservePathEncoding { 2399 if q := strings.Index(path, "?"); q > 0 { 2400 url.Path = path[:q] 2401 url.RawQuery = path[q+1:] 2402 } else { 2403 url.Path = path 2404 } 2405 } else { 2406 // use url.Opaque to preserve encoding 2407 url.Opaque = "//" 2408 2409 index := strings.Index(baseURL_new, "//") 2410 if index < len(baseURL_new)-2 { 2411 url.Opaque += baseURL_new[index+2:] 2412 } 2413 url.Opaque += path 2414 } 2415 return url, nil 2416} 2417 2418// encode http request into wire format 2419// it differs from HttpRequest.Write() in that it preserves the Content-Length in the header, 2420// and ignores Body in request 2421func (u *Utilities) EncodeHttpRequest(req *http.Request) ([]byte, error) { 2422 reqBytes := make([]byte, 0) 2423 reqBytes = append(reqBytes, []byte(req.Method)...) 2424 reqBytes = append(reqBytes, []byte(" ")...) 2425 reqBytes = append(reqBytes, []byte(req.URL.String())...) 2426 reqBytes = append(reqBytes, []byte(" HTTP/1.1\r\n")...) 2427 2428 hasHost := false 2429 for key, value := range req.Header { 2430 if key == "Host" { 2431 hasHost = true 2432 } 2433 if value != nil && len(value) > 0 { 2434 reqBytes = u.EncodeHttpRequestHeader(reqBytes, key, value[0]) 2435 } else { 2436 reqBytes = u.EncodeHttpRequestHeader(reqBytes, key, "") 2437 } 2438 } 2439 if !hasHost { 2440 // ensure that host name is in header 2441 reqBytes = u.EncodeHttpRequestHeader(reqBytes, "Host", req.Host) 2442 } 2443 2444 // add extra "\r\n" as separator for Body 2445 reqBytes = append(reqBytes, []byte("\r\n")...) 2446 2447 if req.Body != nil { 2448 defer req.Body.Close() 2449 2450 bodyBytes, err := ioutil.ReadAll(req.Body) 2451 if err != nil { 2452 return nil, err 2453 } 2454 reqBytes = append(reqBytes, bodyBytes...) 2455 } 2456 return reqBytes, nil 2457} 2458 2459func (u *Utilities) EncodeHttpRequestHeader(reqBytes []byte, key, value string) []byte { 2460 reqBytes = append(reqBytes, []byte(key)...) 2461 reqBytes = append(reqBytes, []byte(": ")...) 2462 reqBytes = append(reqBytes, []byte(value)...) 2463 reqBytes = append(reqBytes, []byte("\r\n")...) 2464 return reqBytes 2465} 2466 2467func (u *Utilities) IsSeriousNetError(err error) bool { 2468 if err == nil { 2469 return false 2470 } 2471 2472 errStr := err.Error() 2473 netError, ok := err.(*net.OpError) 2474 return err == syscall.EPIPE || 2475 err == io.EOF || 2476 strings.Contains(errStr, "EOF") || 2477 strings.Contains(errStr, "use of closed network connection") || 2478 strings.Contains(errStr, "connection reset by peer") || 2479 strings.Contains(errStr, "http: can't write HTTP request on broken connection") || 2480 (ok && (!netError.Temporary() && !netError.Timeout())) 2481} 2482 2483// statusCode that requires connections to be dropped and recreated 2484// basically all statusCodes that are larger than 400 are fatal 2485func (u *Utilities) isFatalStatusCode(statusCode int) bool { 2486 return statusCode > http.StatusBadRequest 2487} 2488 2489func (u *Utilities) NewTCPConn(hostName string) (*net.TCPConn, error) { 2490 conn, err := base.DialTCPWithTimeout(base.NetTCP, hostName) 2491 if err != nil { 2492 return nil, err 2493 } 2494 if conn == nil { 2495 return nil, fmt.Errorf("Failed to set up connection to %v", hostName) 2496 } 2497 tcpConn, ok := conn.(*net.TCPConn) 2498 if !ok { 2499 // should never get here 2500 conn.Close() 2501 return nil, fmt.Errorf("The connection to %v returned is not TCP type", hostName) 2502 } 2503 2504 // same settings as erlang xdcr 2505 err = tcpConn.SetKeepAlive(true) 2506 if err == nil { 2507 err = tcpConn.SetKeepAlivePeriod(base.KeepAlivePeriod) 2508 } 2509 if err == nil { 2510 err = tcpConn.SetNoDelay(false) 2511 } 2512 2513 if err != nil { 2514 tcpConn.Close() 2515 return nil, fmt.Errorf("Error setting options on the connection to %v. err=%v", hostName, err) 2516 } 2517 2518 return tcpConn, nil 2519} 2520 2521/** 2522 * Executes a anonymous function that returns an error. If the error is non nil, retry with exponential backoff. 2523 * Returns base.ErrorFailedAfterRetry + the last recorded error if operation times out, nil otherwise. 2524 * Max retries == the times to retry in additional to the initial try, should the initial try fail 2525 * initialWait == Initial time with which to start 2526 * Factor == exponential backoff factor based off of initialWait 2527 */ 2528func (u *Utilities) ExponentialBackoffExecutor(name string, initialWait time.Duration, maxRetries int, factor int, op ExponentialOpFunc) error { 2529 waitTime := initialWait 2530 var opErr error 2531 for i := 0; i <= maxRetries; i++ { 2532 opErr = op() 2533 if opErr == nil { 2534 return nil 2535 } else if i != maxRetries { 2536 u.logger_utils.Warnf("ExponentialBackoffExecutor for %v encountered error (%v). Sleeping %v\n", 2537 name, opErr.Error(), waitTime) 2538 time.Sleep(waitTime) 2539 waitTime *= time.Duration(factor) 2540 } 2541 } 2542 opErr = fmt.Errorf("%v %v Last error: %v", name, base.ErrorFailedAfterRetry.Error(), opErr.Error()) 2543 return opErr 2544} 2545 2546/* 2547 * This method has an additional parameter, finCh, than ExponentialBackoffExecutor. When finCh is closed, 2548 * this method can abort earlier. 2549 */ 2550func (u *Utilities) ExponentialBackoffExecutorWithFinishSignal(name string, initialWait time.Duration, maxRetries int, factor int, op ExponentialOpFunc2, param interface{}, finCh chan bool) (interface{}, error) { 2551 waitTime := initialWait 2552 var result interface{} 2553 var err error 2554 for i := 0; i <= maxRetries; i++ { 2555 select { 2556 case <-finCh: 2557 err = fmt.Errorf("ExponentialBackoffExecutorWithFinishSignal for %v aborting because of finch closure\n", name) 2558 u.logger_utils.Warnf(err.Error()) 2559 return nil, err 2560 default: 2561 result, err = op(param) 2562 if err == nil { 2563 return result, nil 2564 } else if i != maxRetries { 2565 u.logger_utils.Warnf("ExponentialBackoffExecutorWithFinishSignal for %v encountered error (%v). Sleeping %v\n", 2566 name, err.Error(), waitTime) 2567 base.WaitForTimeoutOrFinishSignal(waitTime, finCh) 2568 waitTime *= time.Duration(factor) 2569 } 2570 } 2571 } 2572 err = fmt.Errorf("%v %v Last error: %v", name, base.ErrorFailedAfterRetry.Error(), err.Error()) 2573 return nil, err 2574} 2575 2576// Get security related settings from target 2577// 1. whether target supports SAN in certificate 2578// 2. authentication mechanism to use when making http[s] connections to target ns_server 2579// This method used ShortHttpTimeout because it is either called from remote cluster rest API, 2580// where a prompt response is required to keep the rest request from timing out, 2581// or called from remote cluster reference refresh code, where a pre-mature timeout can be tolerated 2582// This method also returns defaultPoolInfo of target for more flexibility 2583// CALLER BEWARE: defaultPoolInfo is returned ONLY when either scram sha or ssl is enabled, so as to avoid unnecessary work 2584func (u *Utilities) GetSecuritySettingsAndDefaultPoolInfo(hostAddr, hostHttpsAddr, username, password string, 2585 certificate []byte, clientCertificate, clientKey []byte, scramShaEnabled bool, logger *log.CommonLogger) (sanInCertificate bool, 2586 httpAuthMech base.HttpAuthMech, defaultPoolInfo map[string]interface{}, err error) { 2587 if !scramShaEnabled && len(certificate) == 0 { 2588 // security settings are irrelevant if we are not using scram sha or ssl 2589 // note that a nil defaultPoolInfo is returned in this case 2590 return false, base.HttpAuthMechPlain, nil, nil 2591 } 2592 2593 if scramShaEnabled { 2594 // if scram sha is enabled, we will first try to connect to target ns_server using scram sha authentication 2595 // even if certificate/clientCert have been provided, we will not use them here because they are not needed by scram sha authentication 2596 defaultPoolInfo, err = u.GetDefaultPoolInfoUsingScramSha(hostAddr, username, password, logger) 2597 if err == nil { 2598 httpAuthMech = base.HttpAuthMechScramSha 2599 } else if err != TargetMayNotSupportScramShaError { 2600 return false, base.HttpAuthMechPlain, nil, err 2601 } else { 2602 if len(certificate) == 0 { 2603 // certificate not provided, cannot fall back to https. return error right away 2604 return false, base.HttpAuthMechPlain, nil, fmt.Errorf("Cannot connect to target %v using \"half\" secure mode. Received unauthorized error when using Scram-Sha authentication. Cannot use https because server certificate has not been provided.", hostAddr) 2605 } else { 2606 // proceed to fall back to https 2607 } 2608 } 2609 } 2610 2611 if defaultPoolInfo == nil { 2612 // if we get here, either scram sha is not enabled, or scram sha is enabled and target ns_server returned 401 error on our scram sha attempt 2613 // either way, it is implied that certificate has been provided. use https to connect to target 2614 defaultPoolInfo, err = u.GetDefaultPoolInfoUsingHttps(hostHttpsAddr, username, password, 2615 certificate, clientCertificate, clientKey, logger) 2616 if err == nil { 2617 httpAuthMech = base.HttpAuthMechHttps 2618 } else { 2619 return false, base.HttpAuthMechPlain, nil, err 2620 } 2621 } 2622 2623 // at this point, we have a valid defaultPoolInfo, an httpAuthMech that worked for the host 2624 // derive httpAuthMech and certificate related settings from defaultPoolInfo 2625 2626 nodeList, err := u.GetNodeListFromInfoMap(defaultPoolInfo, logger) 2627 if err != nil || len(nodeList) == 0 { 2628 err = fmt.Errorf("Can't get nodes information for cluster %v, err=%v", hostAddr, err) 2629 return false, base.HttpAuthMechPlain, nil, err 2630 } 2631 2632 clusterCompatibility, err := u.GetClusterCompatibilityFromNodeList(nodeList) 2633 if err != nil { 2634 return false, base.HttpAuthMechPlain, nil, err 2635 } 2636 2637 targetHasScramShaSupport := base.IsClusterCompatible(clusterCompatibility, base.VersionForHttpScramShaSupport) 2638 if scramShaEnabled && targetHasScramShaSupport && httpAuthMech != base.HttpAuthMechScramSha { 2639 // do not fall back to https if target is vulcan and up 2640 return false, base.HttpAuthMechPlain, nil, fmt.Errorf("Failed to retrieve secruity settings from host=%v using SCRAM-SHA authentication. Please check whether SCRAM-SHA is enabled on target.", hostAddr) 2641 } 2642 2643 if scramShaEnabled && !targetHasScramShaSupport && httpAuthMech == base.HttpAuthMechScramSha { 2644 // Cluster is not ScramSha compatible. We need to fallback to https. 2645 // Before doing so, we will get default pool using https again to make sure it works 2646 defaultPoolInfo, err = u.GetDefaultPoolInfoUsingHttps(hostHttpsAddr, username, password, 2647 certificate, clientCertificate, clientKey, logger) 2648 if err == nil { 2649 httpAuthMech = base.HttpAuthMechHttps 2650 } else { 2651 return false, base.HttpAuthMechPlain, nil, err 2652 } 2653 } 2654 sanInCertificate = base.IsClusterCompatible(clusterCompatibility, base.VersionForSANInCertificateSupport) 2655 return sanInCertificate, httpAuthMech, defaultPoolInfo, nil 2656} 2657 2658func (u *Utilities) GetDefaultPoolInfoUsingScramSha(hostAddr, username, password string, logger *log.CommonLogger) (map[string]interface{}, error) { 2659 defaultPoolInfo := make(map[string]interface{}) 2660 err, statusCode := u.QueryRestApiWithAuth(hostAddr, base.DefaultPoolPath, false, username, password, base.HttpAuthMechScramSha, nil /*certificate*/, false /*sanInCertificate*/, nil /*clientCertificate*/, nil /*clientKey*/, base.MethodGet, "", nil, base.ShortHttpTimeout, &defaultPoolInfo, nil, false, logger) 2661 if err == nil && statusCode == http.StatusOK { 2662 // target supports scram sha 2663 return defaultPoolInfo, nil 2664 } else if statusCode == http.StatusUnauthorized { 2665 // unauthorized error could be returned when target ns_server is pre-vulcan and does not support scram sha. 2666 // return a specific error to allow caller to fall back to https 2667 return nil, TargetMayNotSupportScramShaError 2668 } else { 2669 return nil, fmt.Errorf("Failed to retrieve secruity settings from host=%v using scram sha, err=%v, statusCode=%v", hostAddr, err, statusCode) 2670 } 2671} 2672 2673func (u *Utilities) GetDefaultPoolInfoUsingHttps(hostHttpsAddr, username, password string, 2674 certificate []byte, clientCertificate, clientKey []byte, logger *log.CommonLogger) (map[string]interface{}, error) { 2675 defaultPoolInfo := make(map[string]interface{}) 2676 2677 // we do not know the correct values of sanInCertificate. set sanInCertificate set to true for better security 2678 err, statusCode := u.QueryRestApiWithAuth(hostHttpsAddr, base.DefaultPoolPath, false, username, password, base.HttpAuthMechHttps, certificate, true /*sanInCertificate*/, clientCertificate, clientKey, base.MethodGet, "", nil, base.ShortHttpTimeout, &defaultPoolInfo, nil, false, logger) 2679 if err == nil && statusCode == http.StatusOK { 2680 return defaultPoolInfo, nil 2681 } else { 2682 if err != nil && strings.Contains(err.Error(), base.NoIpSANErrMsg) { 2683 // if the error is about certificate not containing IP SANs, it could be that the target cluster is of an old version 2684 // make a second try with sanInCertificate set to false 2685 // after we retrieve target cluster version, we will then re-set sanInCertificate to the appropriate value 2686 logger.Debugf("Received certificate validation error from %v. Target may be an old version that does not support SAN in certificates. Retrying connection to target using sanInCertificate = false.", hostHttpsAddr) 2687 err, statusCode = u.QueryRestApiWithAuth(hostHttpsAddr, base.DefaultPoolPath, false, username, password, base.HttpAuthMechHttps, certificate, false /*sanInCertificate*/, clientCertificate, clientKey, base.MethodGet, "", nil, base.ShortHttpTimeout, &defaultPoolInfo, nil, false, logger) 2688 if err == nil && statusCode == http.StatusOK { 2689 return defaultPoolInfo, nil 2690 } else if statusCode == http.StatusUnauthorized { 2691 return nil, u.getUnauthorizedError(username) 2692 } else { 2693 // if the second try still fails, return error 2694 return nil, fmt.Errorf("Failed to retrieve secruity settings from host=%v, err=%v, statusCode=%v", hostHttpsAddr, err, statusCode) 2695 } 2696 } else if statusCode == http.StatusUnauthorized { 2697 return nil, u.getUnauthorizedError(username) 2698 } else { 2699 return nil, fmt.Errorf("Failed to retrieve secruity settings from host=%v, err=%v, statusCode=%v", hostHttpsAddr, err, statusCode) 2700 } 2701 } 2702} 2703 2704// Given the KVVBMap, translate the map so that the server keys are replaced with external server keys, if applicable 2705func (u *Utilities) TranslateKvVbMap(kvVBMap base.BucketKVVbMap, targetBucketInfo map[string]interface{}) { 2706 translationMap, translationErr := u.GetIntExtHostNameKVPortTranslationMap(targetBucketInfo) 2707 if translationErr != nil && translationErr != base.ErrorResourceDoesNotExist { 2708 u.logger_utils.Warnf("Error constructing internal -> external address translation table. err=%v", translationErr) 2709 } else if translationErr == nil { 2710 (base.BucketKVVbMap)(kvVBMap).ReplaceInternalWithExternalHosts(translationMap) 2711 } 2712} 2713 2714func (u *Utilities) ReplaceCouchApiBaseObjWithExternals(couchApiBase string, nodeInfo map[string]interface{}) string { 2715 if len(couchApiBase) == 0 { 2716 return couchApiBase 2717 } 2718 2719 extHost, extCapi, extCapiErr, extCapiSSL, extCapiSSLErr := u.getExternalHostAndCapiPorts(nodeInfo) 2720 if len(extHost) > 0 { 2721 // "couchApiBaseHTTPS": "https://127.0.0.1:19502/b2%2B746a570d364cf609ac11572f8c8c2608", 2722 url, err := url.Parse(couchApiBase) 2723 if err != nil || !url.IsAbs() { 2724 u.logger_utils.Errorf("Unable to parse URL string for CouchApiBase: %v", err) 2725 return couchApiBase 2726 } 2727 var isHttps bool = strings.HasPrefix(couchApiBase, "https") 2728 var leadingHttpString string 2729 if isHttps { 2730 leadingHttpString = "https://" 2731 } else { 2732 leadingHttpString = "http://" 2733 } 2734 2735 // Now strip out the http(s)://host:port/ 2736 var leadingPrefix string 2737 leadingHostName := url.Hostname() 2738 leadingPort := url.Port() 2739 if len(leadingPort) > 0 { 2740 leadingPrefix = fmt.Sprintf("%s%s:%s/", leadingHttpString, leadingHostName, leadingPort) 2741 } else { 2742 leadingPrefix = fmt.Sprintf("%s%s/", leadingHttpString, leadingHostName) 2743 } 2744 strippedCouchApiBase := strings.TrimPrefix(couchApiBase, leadingPrefix) 2745 2746 // Now recompile 2747 var recompiledUrl string 2748 var recompiledHostToUse string = extHost 2749 var recompiledPortToUse string 2750 if isHttps && extCapiSSLErr == nil { 2751 recompiledPortToUse = fmt.Sprintf("%v", extCapiSSL) 2752 } else if !isHttps && extCapiErr == nil { 2753 recompiledPortToUse = fmt.Sprintf("%v", extCapi) 2754 } else { 2755 recompiledPortToUse = leadingPort 2756 } 2757 if len(recompiledPortToUse) == 0 { 2758 recompiledUrl = fmt.Sprintf("%s%s/%s", leadingHttpString, recompiledHostToUse, strippedCouchApiBase) 2759 } else { 2760 recompiledUrl = fmt.Sprintf("%s%s:%s/%s", leadingHttpString, recompiledHostToUse, recompiledPortToUse, strippedCouchApiBase) 2761 } 2762 return recompiledUrl 2763 } 2764 return couchApiBase 2765} 2766 2767func (u *Utilities) getUnauthorizedError(username string) error { 2768 errMsg := "Received unauthorized error from target. Please double check user credentials." 2769 // if username has not been specified [implying that client certificate has been provided and is being used] 2770 // unauthorized error could also be returned if target has client cert auth setting set to disable 2771 if len(username) == 0 { 2772 errMsg += " Since client certificate is being used, please ensure that target is version 5.5 and up and has client certificate authentication setting set to \"enable\" or \"mandatory\"." 2773 } 2774 2775 return errors.New(errMsg) 2776} 2777 2778func decompressSnappyBody(incomingBody, key []byte, dp DataPoolIface, slicesToBeReleased *[][]byte, needExtraBytesInBody bool) ([]byte, error, string, int64, int) { 2779 var dpFailedCnt int64 2780 lenOfDecodedData, err := snappy.DecodedLen(incomingBody) 2781 lastBodyPos := lenOfDecodedData - 1 2782 if err != nil { 2783 return nil, base.ErrorCompressionUnableToInflate, fmt.Sprintf("XDCR for key %v%v%v is unable to decode snappy uncompressed size: %v", base.UdTagBegin, string(key), base.UdTagEnd, err), dpFailedCnt, lastBodyPos 2784 } 2785 2786 uncompressedBodySize := uint64(lenOfDecodedData) 2787 if needExtraBytesInBody { 2788 uncompressedBodySize += uint64(len(key) + base.AddFilterKeyExtraBytes) 2789 } 2790 body, err := dp.GetByteSlice(uncompressedBodySize) 2791 if err != nil { 2792 body = make([]byte, 0, uncompressedBodySize) 2793 dpFailedCnt = int64(uncompressedBodySize) 2794 } else { 2795 *slicesToBeReleased = append(*slicesToBeReleased, body) 2796 } 2797 2798 body, err = snappy.Decode(body, incomingBody) 2799 if err != nil { 2800 return nil, base.ErrorCompressionUnableToInflate, fmt.Sprintf("XDCR for key %v%v%v is unable to snappy decompress body value: %v", base.UdTagBegin, string(key), base.UdTagEnd, err), dpFailedCnt, lastBodyPos 2801 } 2802 2803 // Check to make sure the last bracket position is correct 2804 if body[lastBodyPos] != '}' { 2805 return nil, base.ErrorInvalidInput, fmt.Sprintf("XDCR for key %v%v%v after decompression seems to be an invalid JSON", base.UdTagBegin, string(key), base.UdTagEnd), dpFailedCnt, lastBodyPos 2806 } 2807 2808 return body, nil, "", dpFailedCnt, lastBodyPos 2809} 2810 2811func getBodySlice(incomingBody, key []byte, dp DataPoolIface, slicesToBeReleased *[][]byte) ([]byte, error, string, int64, int) { 2812 var dpFailedCnt int64 2813 var incomingBodyLen int = len(incomingBody) 2814 lastBodyPos := incomingBodyLen - 1 2815 bodySize := uint64(incomingBodyLen + len(key) + base.AddFilterKeyExtraBytes) 2816 2817 if incomingBody[lastBodyPos] != '}' { 2818 return nil, base.ErrorInvalidInput, fmt.Sprintf("Document %v%v%v body is not a valid JSON", base.UdTagBegin, string(key), base.UdTagEnd), dpFailedCnt, lastBodyPos 2819 } 2820 2821 body, err := dp.GetByteSlice(bodySize) 2822 if err != nil { 2823 body = make([]byte, 0, bodySize) 2824 dpFailedCnt = int64(bodySize) 2825 } else { 2826 *slicesToBeReleased = append(*slicesToBeReleased, body) 2827 } 2828 copy(body, incomingBody) 2829 return body, nil, "", dpFailedCnt, lastBodyPos 2830} 2831 2832func stripAndPrependXattribute(body []byte, xattrSize uint32, dp DataPoolIface, slicesToBeReleased *[][]byte, endBodyPos int, xattrOnly bool) ([]byte, error, int64, int) { 2833 var dpFailedCnt int64 2834 var actualBodySize int 2835 actualBody := body[xattrSize:] 2836 endBodyPos = endBodyPos - int(xattrSize) 2837 if !xattrOnly { 2838 actualBodySize = len(actualBody) 2839 // Prereq check 2840 if actualBody[0] != '{' { 2841 return nil, base.ErrorInvalidInput, dpFailedCnt, endBodyPos 2842 } 2843 } 2844 2845 // xattrSize is the size of the xAttribute section 2846 // The xattribute section is consisted of uint32 + key + NUL + value + NUL (repeat) 2847 // Functions calling this is converting DCP xattribute pairs encoding to the following: 2848 // { <- could be absorbed 2849 // " key " : value , 2850 // " key2 " : value2 } 2851 // Original DCP stream has 6 extra bytes per KV pair 2852 // Converted has 4 extra bytes per KV pair, so using xattrSize is sufficient 2853 2854 // Get a size of body size + xattr value size + internal Xattr KEY and JSON symbol sizes 2855 bodySize := uint64(int(xattrSize) + actualBodySize + base.AddFilterXattrExtraBytes) 2856 combinedBody, err := dp.GetByteSlice(bodySize) 2857 if err != nil { 2858 combinedBody = make([]byte, 0, bodySize) 2859 dpFailedCnt += int64(bodySize) 2860 } else { 2861 *slicesToBeReleased = append(*slicesToBeReleased, combinedBody) 2862 } 2863 2864 // Non-xattrOnly case: 2865 // ------------------ 2866 // Current body looks like (spaces added for readability): 2867 // { key : val } 2868 // Want to insert xattr at the beginning so "combinedBody" looks like: 2869 // { "XdcrInternalXattrKey" : { xattrKey : xattrVal } , key : val } 2870 2871 // xattrOnly case: (essentially just returning) 2872 // -------------------------------------------- 2873 // { "XDCRInternalXattrKey" : <ConvertedXattrSection> } 2874 2875 // { XdcrInternalXattrKey : 2876 combinedBodyPos := 0 2877 combinedBody, combinedBodyPos = base.WriteJsonRawMsg(combinedBody, base.CachedInternalKeyXattrByteSlice, combinedBodyPos, base.WriteJsonKey, base.CachedInternalKeyXattrByteSize, combinedBodyPos == 0 /*firstKey*/) 2878 2879 // { XdcrInternalXattrKey : { xattrKey : xattrVal } 2880 // Followed by a uint32, then -> key -> NUL -> value -> NUL (repeat) 2881 xattrIter, err := base.NewXattrIterator(body) 2882 if err != nil { 2883 return nil, base.ErrorInvalidInput, dpFailedCnt, endBodyPos 2884 } 2885 firstKey := true 2886 for xattrIter.HasNext() == true { 2887 key, value, err := xattrIter.Next() 2888 if err != nil { 2889 return nil, err, dpFailedCnt, endBodyPos 2890 } 2891 combinedBody, combinedBodyPos = base.WriteJsonRawMsg(combinedBody, key, combinedBodyPos, base.WriteJsonKey, len(key), firstKey) 2892 combinedBody, combinedBodyPos = base.WriteJsonRawMsg(combinedBody, value, combinedBodyPos, base.WriteJsonValueNoQuotes, len(value), false) 2893 if firstKey { 2894 firstKey = false 2895 } 2896 } 2897 2898 // Currently: v - combinedBodyPos 2899 // { XdcrInternalXattrKey : { xattrKey : xattrVal } 2900 2901 if xattrOnly { 2902 // Targeted: v - combinedBodyPos 2903 // { XdcrInternalXattrKey : { xattrKey : xattrVal }} 2904 combinedBodyPos++ 2905 combinedBody[combinedBodyPos] = '}' 2906 2907 endBodyPos = combinedBodyPos 2908 } else { 2909 // Targeted: v - combinedBodyPos 2910 // { XdcrInternalXattrKey : { xattrKey : xattrVal }, 2911 combinedBodyPos++ 2912 combinedBody[combinedBodyPos] = ',' 2913 // endBodyPos is added instead of combinedBodyPos+1 is because below we are copying actualBody[1:] to skip the first { 2914 endBodyPos += combinedBodyPos 2915 combinedBodyPos++ 2916 2917 // ActualBody: 2918 // { key : val } 2919 // targeted combinedBody: 2920 // { XdcrInternalXattrKey : { xattrKey : xattrVal }, key : val } 2921 copy(combinedBody[combinedBodyPos:], actualBody[1:]) 2922 } 2923 2924 return combinedBody, nil, dpFailedCnt, endBodyPos 2925} 2926 2927type processXattributeType int 2928 2929const ( 2930 // Should Process the body but not xattribute 2931 processSkipXattr processXattributeType = iota 2932 // Should Process both body and xattribute 2933 processXattrAndBody processXattributeType = iota 2934 // Should Process just the xattribute, no body 2935 processXattrOnly processXattributeType = iota 2936) 2937 2938func processXattribute(body, key []byte, processType processXattributeType, dp DataPoolIface, slicesToBeReleased *[][]byte, endBodyPos int) ([]byte, error, int64, int) { 2939 var pos uint32 2940 // var separator uint32 2941 var dpFailedCnt int64 2942 var err error 2943 2944 // first uint32 in the body contains the size of the entire XATTR section 2945 totalXattrSize := binary.BigEndian.Uint32(body[pos : pos+4]) 2946 // Couchbase doc size is max of 20MB. Xattribute count against this limit. 2947 // So if total xattr size is greater than this limit, then something is wrong 2948 if totalXattrSize > base.MaxDocSizeByte { 2949 return nil, fmt.Errorf("For document %v%v%v, unable to correctly parse xattribute from DCP packet. Xattr size determined to be %v bytes, which is invalid", base.UdTagBegin, string(key), base.UdTagEnd, totalXattrSize), dpFailedCnt, endBodyPos 2950 } 2951 // Add 4 bytes here to skip the uint32 that was just parsed 2952 totalXattrSize += 4 2953 2954 switch processType { 2955 case processSkipXattr: 2956 newBody := body[totalXattrSize:] 2957 body = newBody 2958 endBodyPos = endBodyPos - int(totalXattrSize) 2959 case processXattrAndBody: 2960 body, err, dpFailedCnt, endBodyPos = stripAndPrependXattribute(body, totalXattrSize, dp, slicesToBeReleased, endBodyPos, false /*xattrOnly*/) 2961 case processXattrOnly: 2962 body, err, dpFailedCnt, endBodyPos = stripAndPrependXattribute(body, totalXattrSize, dp, slicesToBeReleased, endBodyPos, true /*xattrOnly*/) 2963 default: 2964 return nil, base.ErrorInvalidInput, dpFailedCnt, -1 2965 } 2966 return body, err, dpFailedCnt, endBodyPos 2967} 2968 2969func processKeyOnlyForFiltering(key []byte, dp DataPoolIface, slicesToBeReleased *[][]byte) ([]byte, int64) { 2970 var body []byte 2971 var err error 2972 keyLen := len(key) 2973 bodySize := +uint64(keyLen + 2 /*quote surrounding key*/ + 5 /*cruft around the special key*/ + len(base.ReservedWordsMap[base.ExternalKeyKey])) 2974 body, err = dp.GetByteSlice(bodySize) 2975 if err != nil { 2976 // If there is any problem using datapool, just use json.RawMessage directly to allocate new byte slice 2977 body = json.RawMessage(fmt.Sprintf("{\"%v\":\"%v\"}", base.ReservedWordsMap[base.ExternalKeyKey], string(key))) 2978 return body, int64(len(body)) 2979 } else { 2980 *slicesToBeReleased = append(*slicesToBeReleased, body) 2981 } 2982 var bodyPos int 2983 body, bodyPos = base.WriteJsonRawMsg(body, base.CachedInternalKeyKeyByteSlice, bodyPos, base.WriteJsonKey, base.CachedInternalKeyKeyByteSize, bodyPos == 0) 2984 body, bodyPos = base.WriteJsonRawMsg(body, key, bodyPos, base.WriteJsonValue /*uprEvent key as value*/,