1// Copyright (c) 2014 Couchbase, Inc. 2// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file 3// except in compliance with the License. You may obtain a copy of the License at 4// http://www.apache.org/licenses/LICENSE-2.0 5// Unless required by applicable law or agreed to in writing, software distributed under the 6// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 7// either express or implied. See the License for the specific language governing permissions 8// and limitations under the License. 9 10/* 11 12Package couchbase provides a couchbase-server implementation of the datastore 13package. 14 15*/ 16 17package couchbase 18 19import ( 20 "encoding/binary" 21 "fmt" 22 "net/http" 23 "net/url" 24 "os" 25 "strconv" 26 "strings" 27 "sync" 28 "time" 29 30 "github.com/couchbase/cbauth" 31 cb "github.com/couchbase/go-couchbase" 32 "github.com/couchbase/gomemcached" 33 gsi "github.com/couchbase/indexing/secondary/queryport/n1ql" 34 "github.com/couchbase/query/auth" 35 "github.com/couchbase/query/datastore" 36 "github.com/couchbase/query/errors" 37 "github.com/couchbase/query/expression" 38 "github.com/couchbase/query/logging" 39 "github.com/couchbase/query/timestamp" 40 "github.com/couchbase/query/value" 41 42 "github.com/couchbase/query/server" 43) 44 45var REQUIRE_CBAUTH bool // Connection to authorization system must succeed. 46 47// cbPoolMap and cbPoolServices implement a local cache of the datastore's topology 48type cbPoolMap struct { 49 sync.RWMutex 50 poolServices map[string]cbPoolServices 51} 52 53type cbPoolServices struct { 54 name string 55 rev int 56 nodeServices map[string]interface{} 57} 58 59var _POOLMAP cbPoolMap 60 61func init() { 62 63 // MB-27415 have a larger overflow pool and close overflow connections asynchronously 64 cb.SetConnectionPoolParams(64, 64) 65 cb.EnableAsynchronousCloser(true) 66 67 val, err := strconv.ParseBool(os.Getenv("REQUIRE_CBAUTH")) 68 if err != nil { 69 REQUIRE_CBAUTH = val 70 } else { 71 REQUIRE_CBAUTH = true // default 72 } 73 74 // enable data type response 75 cb.EnableDataType = true 76 77 // enable xattrs 78 cb.EnableXattr = true 79 80 // start the fetch workers for servicing the BulkGet operations 81 cb.InitBulkGet() 82 _POOLMAP.poolServices = make(map[string]cbPoolServices, 1) 83} 84 85const ( 86 PRIMARY_INDEX = "#primary" 87) 88 89// store is the root for the couchbase datastore 90type store struct { 91 client cb.Client // instance of go-couchbase client 92 namespaceCache map[string]*namespace // map of pool-names and IDs 93 CbAuthInit bool // whether cbAuth is initialized 94 inferencer datastore.Inferencer // what we use to infer schemas 95 connectionUrl string // where to contact ns_server 96} 97 98func (s *store) Id() string { 99 return s.URL() 100} 101 102func (s *store) URL() string { 103 return s.client.BaseURL.String() 104} 105 106func (s *store) Info() datastore.Info { 107 info := &infoImpl{client: &s.client} 108 return info 109} 110 111type infoImpl struct { 112 client *cb.Client 113} 114 115func (info *infoImpl) Version() string { 116 return info.client.Info.ImplementationVersion 117} 118 119func fullhostName(n string) string { 120 hostName, portVal := server.HostNameandPort(n) 121 if hostName != "" { 122 return n 123 } 124 return server.GetIP(true) + ":" + portVal 125} 126 127func (info *infoImpl) Topology() ([]string, []errors.Error) { 128 var nodes []string 129 var errs []errors.Error 130 131 for _, p := range info.client.Info.Pools { 132 pool, err := info.client.GetPool(p.Name) 133 134 if err == nil { 135 for _, node := range pool.Nodes { 136 nodes = append(nodes, fullhostName(node.Hostname)) 137 } 138 } else { 139 errs = append(errs, errors.NewDatastoreClusterError(err, p.Name)) 140 } 141 } 142 return nodes, errs 143} 144 145func (info *infoImpl) Services(node string) (map[string]interface{}, []errors.Error) { 146 var errs []errors.Error 147 148 isReadLock := true 149 _POOLMAP.RLock() 150 defer func() { 151 if isReadLock { 152 _POOLMAP.RUnlock() 153 } else { 154 _POOLMAP.Unlock() 155 } 156 }() 157 158 // scan the pools 159 for _, p := range info.client.Info.Pools { 160 pool, err := info.client.GetPool(p.Name) 161 poolServices, pErr := info.client.GetPoolServices(p.Name) 162 163 if err == nil && pErr == nil { 164 var found bool = false 165 var services cbPoolServices 166 167 services, ok := _POOLMAP.poolServices[p.Name] 168 found = ok && (services.rev == poolServices.Rev) 169 170 // missing the information, rebuild 171 if !found { 172 173 // promote the lock 174 if isReadLock { 175 _POOLMAP.RUnlock() 176 _POOLMAP.Lock() 177 isReadLock = false 178 179 // now that we have promoted the lock, did we get beaten by somebody else to it? 180 services, ok = _POOLMAP.poolServices[p.Name] 181 found = ok && (services.rev == poolServices.Rev) 182 if found { 183 continue 184 } 185 } 186 187 newPoolServices := cbPoolServices{name: p.Name, rev: poolServices.Rev} 188 nodeServices := make(map[string]interface{}, len(pool.Nodes)) 189 190 // go through all the nodes in the pool 191 for _, n := range pool.Nodes { 192 var servicesCopy []interface{} 193 194 newServices := make(map[string]interface{}, 3) 195 newServices["name"] = fullhostName(n.Hostname) 196 for _, s := range n.Services { 197 servicesCopy = append(servicesCopy, s) 198 } 199 newServices["services"] = servicesCopy 200 201 // go through all bucket independet services in the pool 202 for _, ns := range poolServices.NodesExt { 203 204 mgmtPort := ns.Services["mgmt"] 205 if mgmtPort == 0 { 206 207 // shouldn't happen, there should always be a mgmt port on each node 208 // we should return an error 209 msg := fmt.Sprintf("NodeServices does not report mgmt endpoint for "+ 210 "this node: %v", node) 211 errs = append(errs, errors.NewAdminGetNodeError(nil, msg)) 212 continue 213 } 214 215 nsHostname := "" 216 if ns.Hostname != "" { 217 nsHostname = ns.Hostname + ":" + strconv.Itoa(mgmtPort) 218 } 219 // if we can positively match nodeServices and node, add ports 220 if n.Hostname == nsHostname || 221 (nsHostname == "" && ns.ThisNode && n.ThisNode) { 222 ports := make(map[string]interface{}, len(ns.Services)) 223 224 // only add the ports for those services that are advertised 225 for _, s := range n.Services { 226 for pn, p := range ns.Services { 227 if strings.Index(pn, s) == 0 { 228 ports[pn] = p 229 } 230 } 231 } 232 newServices["ports"] = ports 233 break 234 } 235 } 236 nodeServices[fullhostName(n.Hostname)] = newServices 237 } 238 newPoolServices.nodeServices = nodeServices 239 _POOLMAP.poolServices[p.Name] = newPoolServices 240 services = newPoolServices 241 } 242 nodeServices, ok := services.nodeServices[node] 243 if ok { 244 return nodeServices.(map[string]interface{}), errs 245 } 246 } else { 247 if err != nil { 248 errs = append(errs, errors.NewDatastoreClusterError(err, p.Name)) 249 } 250 if pErr != nil { 251 errs = append(errs, errors.NewDatastoreClusterError(pErr, p.Name)) 252 } 253 } 254 } 255 return map[string]interface{}{}, errs 256} 257 258func (s *store) NamespaceIds() ([]string, errors.Error) { 259 return s.NamespaceNames() 260} 261 262func (s *store) NamespaceNames() ([]string, errors.Error) { 263 return []string{"default"}, nil 264} 265 266func (s *store) NamespaceById(id string) (p datastore.Namespace, e errors.Error) { 267 return s.NamespaceByName(id) 268} 269 270func (s *store) NamespaceByName(name string) (p datastore.Namespace, e errors.Error) { 271 p, ok := s.namespaceCache[name] 272 if !ok { 273 var err errors.Error 274 p, err = loadNamespace(s, name) 275 if err != nil { 276 return nil, err 277 } 278 s.namespaceCache[name] = p.(*namespace) 279 } 280 return p, nil 281} 282 283// The ns_server admin API is open iff we can access the /pools API without a password. 284func (s *store) adminIsOpen() bool { 285 url := s.connectionUrl + "/pools" 286 resp, err := http.Get(url) 287 if err != nil { 288 return false 289 } 290 defer resp.Body.Close() 291 if resp.StatusCode != 200 { 292 return false 293 } 294 return true 295} 296 297func (s *store) auth(user, pwd string) (cbauth.Creds, error) { 298 return cbauth.Auth(user, pwd) 299} 300 301func (s *store) authWebCreds(req *http.Request) (cbauth.Creds, error) { 302 return cbauth.AuthWebCreds(req) 303} 304 305func (s *store) Authorize(privileges *auth.Privileges, credentials auth.Credentials, req *http.Request) (auth.AuthenticatedUsers, errors.Error) { 306 if s.CbAuthInit == false { 307 // cbauth is not initialized. Access to SASL protected buckets will be 308 // denied by the couchbase server 309 logging.Warnf("CbAuth not intialized") 310 return nil, nil 311 } 312 313 return cbAuthorize(s, privileges, credentials, req) 314} 315 316func (s *store) CredsString(req *http.Request) string { 317 if req != nil { 318 creds, err := cbauth.AuthWebCreds(req) 319 if err == nil { 320 return creds.Name() 321 } 322 } 323 return "" 324} 325 326func (s *store) SetLogLevel(level logging.Level) { 327 for _, n := range s.namespaceCache { 328 defer n.lock.Unlock() 329 n.lock.Lock() 330 for _, k := range n.keyspaceCache { 331 if k.cbKeyspace == nil { 332 continue 333 } 334 indexers, _ := k.cbKeyspace.Indexers() 335 if len(indexers) > 0 { 336 for _, idxr := range indexers { 337 idxr.SetLogLevel(level) 338 } 339 340 return 341 } 342 } 343 } 344} 345 346// Ignore the name parameter for now 347func (s *store) Inferencer(name datastore.InferenceType) (datastore.Inferencer, errors.Error) { 348 return s.inferencer, nil 349} 350 351func (s *store) Inferencers() ([]datastore.Inferencer, errors.Error) { 352 return []datastore.Inferencer{s.inferencer}, nil 353} 354 355func (s *store) AuditInfo() (*datastore.AuditInfo, errors.Error) { 356 auditSpec, err := s.client.GetAuditSpec() 357 if err != nil { 358 return nil, errors.NewSystemUnableToRetrieveError(err) 359 } 360 361 users := make(map[datastore.UserInfo]bool, len(auditSpec.DisabledUsers)) 362 for _, u := range auditSpec.DisabledUsers { 363 ui := datastore.UserInfo{Name: u.Name, Domain: u.Domain} 364 users[ui] = true 365 } 366 367 events := make(map[uint32]bool, len(auditSpec.Disabled)) 368 for _, eid := range auditSpec.Disabled { 369 events[eid] = true 370 } 371 372 ret := &datastore.AuditInfo{ 373 EventDisabled: events, 374 UserWhitelisted: users, 375 AuditEnabled: auditSpec.AuditdEnabled, 376 Uid: auditSpec.Uid, 377 } 378 return ret, nil 379} 380 381func (s *store) ProcessAuditUpdateStream(callb func(uid string) error) errors.Error { 382 f := func(data interface{}) error { 383 d, ok := data.(*DefaultObject) 384 if !ok { 385 return fmt.Errorf("Unable to convert received object to proper type: %T", data) 386 } 387 return callb(d.Uid) 388 } 389 do := DefaultObject{} 390 err := s.client.ProcessStream("/poolsStreaming/default", f, &do) 391 if err != nil { 392 return errors.NewAuditStreamHandlerFailed(err) 393 } 394 return nil 395} 396 397type DefaultObject struct { 398 Uid string `json:"auditUid"` 399} 400 401func (s *store) UserInfo() (value.Value, errors.Error) { 402 data, err := s.client.GetUserRoles() 403 if err != nil { 404 return nil, errors.NewSystemUnableToRetrieveError(err) 405 } 406 return value.NewValue(data), nil 407} 408 409func (s *store) GetUserInfoAll() ([]datastore.User, errors.Error) { 410 sourceUsers, err := s.client.GetUserInfoAll() 411 if err != nil { 412 return nil, errors.NewSystemUnableToRetrieveError(err) 413 } 414 resultUsers := make([]datastore.User, len(sourceUsers)) 415 for i, u := range sourceUsers { 416 resultUsers[i].Name = u.Name 417 resultUsers[i].Id = u.Id 418 resultUsers[i].Domain = u.Domain 419 roles := make([]datastore.Role, len(u.Roles)) 420 for j, r := range u.Roles { 421 roles[j].Name = r.Role 422 roles[j].Bucket = r.BucketName 423 } 424 resultUsers[i].Roles = roles 425 } 426 return resultUsers, nil 427} 428 429func (s *store) PutUserInfo(u *datastore.User) errors.Error { 430 var outputUser cb.User 431 outputUser.Name = u.Name 432 outputUser.Id = u.Id 433 outputUser.Roles = make([]cb.Role, len(u.Roles)) 434 outputUser.Domain = u.Domain 435 for i, r := range u.Roles { 436 outputUser.Roles[i].Role = r.Name 437 outputUser.Roles[i].BucketName = r.Bucket 438 } 439 err := s.client.PutUserInfo(&outputUser) 440 if err != nil { 441 return errors.NewSystemUnableToUpdateError(err) 442 } 443 return nil 444} 445 446func (s *store) GetRolesAll() ([]datastore.Role, errors.Error) { 447 roleDescList, err := s.client.GetRolesAll() 448 if err != nil { 449 return nil, errors.NewDatastoreUnableToRetrieveRoles(err) 450 } 451 roles := make([]datastore.Role, len(roleDescList)) 452 for i, rd := range roleDescList { 453 roles[i].Name = rd.Role 454 roles[i].Bucket = rd.BucketName 455 } 456 return roles, nil 457} 458 459func initCbAuth(url string) (*cb.Client, error) { 460 461 transport := cbauth.WrapHTTPTransport(cb.HTTPTransport, nil) 462 cb.HTTPClient.Transport = transport 463 464 client, err := cb.ConnectWithAuth(url, cbauth.NewAuthHandler(nil)) 465 if err != nil { 466 return nil, err 467 } 468 469 logging.Infof(" Initialization of cbauth succeeded ") 470 471 return &client, nil 472} 473 474func parseUrl(u string) (host string, username string, password string, err error) { 475 url, err := url.Parse(u) 476 if err != nil { 477 return "", "", "", err 478 } 479 if url.User == nil { 480 return "", "", "", fmt.Errorf("Unusable url %s. No user information.", u) 481 } 482 password, _ = url.User.Password() 483 if password == "" { 484 logging.Warnf("No password found in url <ud>%s</ud>.", u) 485 } 486 if url.User.Username() == "" { 487 logging.Warnf("No username found in url <ud>%s</ud>.", u) 488 } 489 return url.Host, url.User.Username(), password, nil 490} 491 492// NewStore creates a new Couchbase store for the given url. 493// In the main server, and error return here will cause the server to shut down. 494func NewDatastore(u string) (s datastore.Datastore, e errors.Error) { 495 var client cb.Client 496 var cbAuthInit bool 497 var err error 498 var connectionUrl string 499 500 // initialize cbauth 501 c, err := initCbAuth(u) 502 if err != nil { 503 logging.Errorf("Unable to initialize cbauth. Error %v", err) 504 505 // intialize cb_auth variables manually 506 host, username, password, err := parseUrl(u) 507 if err != nil { 508 logging.Warnf("Unable to parse url <ud>%s</ud>: %v", u, err) 509 } else { 510 set, err := cbauth.InternalRetryDefaultInit(host, username, password) 511 if set == false || err != nil { 512 logging.Errorf("Unable to initialize cbauth variables. Error %v", err) 513 } else { 514 c, err = initCbAuth("http://" + host) 515 if err != nil { 516 logging.Errorf("Unable to initialize cbauth. Error %v", err) 517 } else { 518 client = *c 519 cbAuthInit = true 520 connectionUrl = "http://" + host 521 } 522 } 523 } 524 } else { 525 client = *c 526 cbAuthInit = true 527 connectionUrl = u 528 } 529 530 if !cbAuthInit { 531 if REQUIRE_CBAUTH { 532 return nil, errors.NewUnableToInitCbAuthError(err) 533 } 534 // connect without auth 535 logging.Warnf("Unable to initialize cbAuth, access to couchbase buckets may be restricted") 536 cb.HTTPClient = &http.Client{} 537 client, err = cb.Connect(u) 538 if err != nil { 539 return nil, errors.NewCbConnectionError(err, "url "+u) 540 } 541 } 542 543 store := &store{ 544 client: client, 545 namespaceCache: make(map[string]*namespace), 546 CbAuthInit: cbAuthInit, 547 connectionUrl: connectionUrl, 548 } 549 550 // get the schema inferencer 551 var er errors.Error 552 store.inferencer, er = GetDefaultInferencer(store) 553 if er != nil { 554 return nil, er 555 } 556 557 // initialize the default pool. 558 // TODO can couchbase server contain more than one pool ? 559 560 defaultPool, er := loadNamespace(store, "default") 561 if er != nil { 562 logging.Errorf("Cannot connect to default pool") 563 return nil, er 564 } 565 566 store.namespaceCache["default"] = defaultPool 567 logging.Infof("New store created with url %s", u) 568 569 return store, nil 570} 571 572func loadNamespace(s *store, name string) (*namespace, errors.Error) { 573 574 cbpool, err := s.client.GetPool(name) 575 if err != nil { 576 if name == "default" { 577 // if default pool is not available, try reconnecting to the server 578 url := s.URL() 579 580 var client cb.Client 581 582 if s.CbAuthInit == true { 583 client, err = cb.ConnectWithAuth(url, cbauth.NewAuthHandler(nil)) 584 } else { 585 client, err = cb.Connect(url) 586 } 587 if err != nil { 588 return nil, errors.NewCbNamespaceNotFoundError(err, "Namespace "+name) 589 } 590 // check if the default pool exists 591 cbpool, err = client.GetPool(name) 592 if err != nil { 593 return nil, errors.NewCbNamespaceNotFoundError(err, "Namespace "+name) 594 } 595 s.client = client 596 } 597 } 598 599 rv := namespace{ 600 store: s, 601 name: name, 602 cbNamespace: &cbpool, 603 keyspaceCache: make(map[string]*keyspaceEntry), 604 } 605 606 return &rv, nil 607} 608 609// a namespace represents a couchbase pool 610type namespace struct { 611 store *store 612 name string 613 cbNamespace *cb.Pool 614 keyspaceCache map[string]*keyspaceEntry 615 version uint64 616 lock sync.RWMutex // lock to guard the keyspaceCache 617 nslock sync.RWMutex // lock for this structure 618} 619 620type keyspaceEntry struct { 621 sync.Mutex 622 cbKeyspace datastore.Keyspace 623 errCount int 624 errTime time.Time 625 lastUse time.Time 626} 627 628const ( 629 _MIN_ERR_INTERVAL time.Duration = 5 * time.Second 630 _THROTTLING_TIMEOUT time.Duration = 10 * time.Millisecond 631 _CLEANUP_INTERVAL time.Duration = time.Hour 632) 633 634func (p *namespace) DatastoreId() string { 635 return p.store.Id() 636} 637 638func (p *namespace) Id() string { 639 return p.Name() 640} 641 642func (p *namespace) Name() string { 643 return p.name 644} 645 646func (p *namespace) KeyspaceIds() ([]string, errors.Error) { 647 return p.KeyspaceNames() 648} 649 650func (p *namespace) KeyspaceNames() ([]string, errors.Error) { 651 p.refresh() 652 rv := make([]string, 0, len(p.cbNamespace.BucketMap)) 653 for name, _ := range p.cbNamespace.BucketMap { 654 rv = append(rv, name) 655 } 656 return rv, nil 657} 658 659func (p *namespace) KeyspaceByName(name string) (datastore.Keyspace, errors.Error) { 660 var err errors.Error 661 662 // make sure that no one is deleting the keyspace as we check 663 p.lock.RLock() 664 entry, ok := p.keyspaceCache[name] 665 p.lock.RUnlock() 666 if ok && entry.cbKeyspace != nil { 667 return entry.cbKeyspace, nil 668 } 669 670 // MB-19601 we haven't found the keyspace, so we have to load it, 671 // however, there might be a flood of other requests coming in, all 672 // wanting to do use the same keyspace and all needing to load it. 673 // In the previous implementation all requests would first create 674 // and refresh the keyspace, refreshing the indexes, etc 675 // In YCSB enviroments this resulted in thousends of requests 676 // flooding ns_server with buckets and ddocs load at the same time. 677 // What we want instead is for one request to do the work, and all the 678 // others waiting and benefiting from that work. 679 // This is the exact scenario for using Shared Optimistic Locks, but, 680 // sadly, they are patented by IBM, so clearly it's no go for us. 681 // What we do is create the keyspace entry, and record that we are priming 682 // it by locking that entry. 683 // Everyone else will have to wait on the lock, and once they get it, 684 // they can check on the keyspace again - if all is fine, just continue 685 // if not try and load again. 686 // Shared Optimistic Locks by stealth, although not as efficient (there 687 // might be sequencing of would be loaders on the keyspace lock after 688 // the initial keyspace loading has been done). 689 // If we fail, again! then there's something wrong with the keyspace, 690 // which means that retrying over and over again, we'll be loading ns_server 691 // so what we do is throttle the reloads and log errors, so that the 692 // powers that be are alerted that there's some resource issue. 693 // Finally, since we are having to use two locks rather than one, make sure 694 // that the locking sequence is predictable. 695 // keyspace lock is always locked outside of the keyspace cache lock. 696 697 // 1) create the entry if necessary, record time of loading attempt 698 p.lock.Lock() 699 entry, ok = p.keyspaceCache[name] 700 if !ok { 701 entry = &keyspaceEntry{} 702 p.keyspaceCache[name] = entry 703 } 704 entry.lastUse = time.Now() 705 p.lock.Unlock() 706 707 // 2) serialize the loading by locking the entry 708 entry.Lock() 709 defer entry.Unlock() 710 711 // 3) check if somebody has done the job for us in the interim 712 if entry.cbKeyspace != nil { 713 return entry.cbKeyspace, nil 714 } 715 716 // 4) if previous loads resulted in errors, throttle requests 717 if entry.errCount > 0 && time.Since(entry.lastUse) < _THROTTLING_TIMEOUT { 718 time.Sleep(_THROTTLING_TIMEOUT) 719 } 720 721 // 5) try the loading 722 k, err := newKeyspace(p, name) 723 if err != nil { 724 725 // We try not to flood the log with errors 726 if entry.errCount == 0 { 727 entry.errTime = time.Now() 728 } else if time.Since(entry.errTime) > _MIN_ERR_INTERVAL { 729 entry.errTime = time.Now() 730 } 731 entry.errCount++ 732 return nil, err 733 } 734 entry.errCount = 0 735 736 // this is the only place where entry.cbKeyspace is set 737 // it is never unset - so it's safe to test cbKeyspace != nil 738 entry.cbKeyspace = k 739 return k, nil 740} 741 742// compare the list of node addresses 743// Assumption: the list of node addresses in each list are sorted 744func compareNodeAddress(a, b []string) bool { 745 746 if len(a) != len(b) { 747 return false 748 } 749 750 for i := 0; i < len(a); i++ { 751 if a[i] != b[i] { 752 return false 753 } 754 } 755 return true 756} 757 758func (p *namespace) KeyspaceById(id string) (datastore.Keyspace, errors.Error) { 759 return p.KeyspaceByName(id) 760} 761 762func (p *namespace) MetadataVersion() uint64 { 763 return p.version 764} 765 766func (p *namespace) setPool(cbpool *cb.Pool) { 767 p.nslock.Lock() 768 oldPool := p.cbNamespace 769 p.cbNamespace = cbpool 770 p.nslock.Unlock() 771 772 // MB-33185 let go of old pool 773 oldPool.Close() 774} 775 776func (p *namespace) getPool() *cb.Pool { 777 p.nslock.RLock() 778 defer p.nslock.RUnlock() 779 return p.cbNamespace 780} 781 782func (p *namespace) refresh() { 783 // trigger refresh of this pool 784 logging.Debugf("Refreshing pool %s", p.name) 785 786 newpool, err := p.store.client.GetPool(p.name) 787 if err != nil { 788 newpool, err = p.reload1(err) 789 if err == nil { 790 p.reload2(&newpool) 791 } 792 return 793 } 794 oldpool := p.getPool() 795 changed := len(oldpool.BucketMap) != len(newpool.BucketMap) 796 if !changed { 797 for on, ob := range oldpool.BucketMap { 798 nb := newpool.BucketMap[on] 799 if nb != nil && nb.UUID == ob.UUID { 800 continue 801 } 802 changed = true 803 break 804 } 805 } 806 if changed { 807 p.reload2(&newpool) 808 return 809 } 810 newpool.Close() 811 812 p.lock.Lock() 813 for _, ks := range p.keyspaceCache { 814 815 // in case a change has kicked in in between checking bucketMaps and cquiring the lock 816 if ks.cbKeyspace == nil { 817 continue 818 } 819 820 // Not deleted. Check if GSI indexer is available 821 if ks.cbKeyspace.(*keyspace).gsiIndexer == nil { 822 ks.cbKeyspace.(*keyspace).refreshIndexer(p.store.URL(), p.Name()) 823 } 824 } 825 p.lock.Unlock() 826} 827 828func (p *namespace) reload() { 829 logging.Debugf("Reload %s", p.name) 830 831 newpool, err := p.store.client.GetPool(p.name) 832 if err != nil { 833 newpool, err = p.reload1(err) 834 if err != nil { 835 return 836 } 837 } 838 p.reload2(&newpool) 839} 840 841func (p *namespace) reload1(err error) (cb.Pool, error) { 842 var client cb.Client 843 844 logging.Errorf("Error updating pool name %s: Error %v", p.name, err) 845 url := p.store.URL() 846 847 /* 848 transport := cbauth.WrapHTTPTransport(cb.HTTPTransport, nil) 849 cb.HTTPClient.Transport = transport 850 */ 851 852 if p.store.CbAuthInit == true { 853 client, err = cb.ConnectWithAuth(url, cbauth.NewAuthHandler(nil)) 854 } else { 855 client, err = cb.Connect(url) 856 } 857 if err != nil { 858 logging.Errorf("Error connecting to URL %s - %v", url, err) 859 return cb.Pool{}, err 860 } 861 // check if the default pool exists 862 newpool, err := client.GetPool(p.name) 863 if err != nil { 864 logging.Errorf("Retry Failed Error updating pool name <ud>%s</ud>: Error %v", p.name, err) 865 return newpool, err 866 } 867 p.store.client = client 868 869 return newpool, nil 870} 871 872func (p *namespace) reload2(newpool *cb.Pool) { 873 p.lock.Lock() 874 for name, ks := range p.keyspaceCache { 875 logging.Debugf(" Checking keyspace %s", name) 876 if ks.cbKeyspace == nil { 877 if time.Since(ks.lastUse) > _CLEANUP_INTERVAL { 878 delete(p.keyspaceCache, name) 879 } 880 continue 881 } 882 newbucket, err := newpool.GetBucket(name) 883 if err != nil { 884 ks.cbKeyspace.(*keyspace).deleted = true 885 logging.Errorf(" Error retrieving bucket %s", name) 886 ks.cbKeyspace.(*keyspace).cbbucket.Close() 887 logging.Errorf(" Error retrieving bucket %s - %v", name, err) 888 delete(p.keyspaceCache, name) 889 890 } else if ks.cbKeyspace.(*keyspace).cbbucket.UUID != newbucket.UUID { 891 logging.Debugf(" UUid of keyspace %v uuid now %v", ks.cbKeyspace.(*keyspace).cbbucket.UUID, newbucket.UUID) 892 // UUID has changed. Update the keyspace struct with the newbucket 893 // and release old one 894 ks.cbKeyspace.(*keyspace).cbbucket.Close() 895 ks.cbKeyspace.(*keyspace).cbbucket = newbucket 896 } else { 897 898 // we are reloading, so close old and set new bucket 899 ks.cbKeyspace.(*keyspace).cbbucket.Close() 900 ks.cbKeyspace.(*keyspace).cbbucket = newbucket 901 } 902 903 // Not deleted. Check if GSI indexer is available 904 if ks.cbKeyspace.(*keyspace).gsiIndexer == nil { 905 ks.cbKeyspace.(*keyspace).refreshIndexer(p.store.URL(), p.Name()) 906 } 907 } 908 p.lock.Unlock() 909 910 p.getPool().Close() 911 p.setPool(newpool) 912 913 // keyspaces have been reloaded, force full auto reprepare check 914 p.version++ 915} 916 917type keyspace struct { 918 namespace *namespace 919 name string 920 cbbucket *cb.Bucket 921 deleted bool 922 viewIndexer datastore.Indexer // View index provider 923 gsiIndexer datastore.Indexer // GSI index provider 924} 925 926// 927// Inferring schemas sometimes requires getting a sample of random documents 928// from a keyspace. Ideally this should come through a random traversal of the 929// primary index, but until that is available, we need to use the Bucket's 930// connection pool of memcached.Clients to request random documents from 931// the KV store. 932// 933 934func (k *keyspace) GetRandomEntry() (string, value.Value, errors.Error) { 935 resp, err := k.cbbucket.GetRandomDoc() 936 937 if err != nil { 938 return "", nil, errors.NewCbGetRandomEntryError(err) 939 } 940 941 return fmt.Sprintf("%s", resp.Key), value.NewValue(resp.Body), nil 942} 943 944func newKeyspace(p *namespace, name string) (datastore.Keyspace, errors.Error) { 945 946 cbNamespace := p.getPool() 947 cbbucket, err := cbNamespace.GetBucket(name) 948 949 if err != nil { 950 logging.Infof(" keyspace %s not found %v", name, err) 951 // go-couchbase caches the buckets 952 // to be sure no such bucket exists right now 953 // we trigger a refresh 954 p.reload() 955 cbNamespace = p.getPool() 956 957 // and then check one more time 958 logging.Infof(" Retrying bucket %s", name) 959 cbbucket, err = cbNamespace.GetBucket(name) 960 if err != nil { 961 // really no such bucket exists 962 return nil, errors.NewCbKeyspaceNotFoundError(err, "keyspace "+name) 963 } 964 } 965 966 if strings.EqualFold(cbbucket.Type, "memcached") { 967 return nil, errors.NewCbBucketTypeNotSupportedError(nil, cbbucket.Type) 968 } 969 970 rv := &keyspace{ 971 namespace: p, 972 name: name, 973 cbbucket: cbbucket, 974 } 975 976 // Initialize index providers 977 rv.viewIndexer = newViewIndexer(rv) 978 979 logging.Infof("Created New Bucket %s", name) 980 981 //discover existing indexes 982 if ierr := rv.loadIndexes(); ierr != nil { 983 logging.Warnf("Error loading indexes for keyspace %s, Error %v", name, ierr) 984 } 985 986 var qerr errors.Error 987 rv.gsiIndexer, qerr = gsi.NewGSIIndexer(p.store.URL(), p.Name(), name) 988 if qerr != nil { 989 logging.Warnf("Error loading GSI indexes for keyspace %s. Error %v", name, qerr) 990 } 991 992 // Create a bucket updater that will keep the couchbase bucket fresh. 993 cbbucket.RunBucketUpdater(p.KeyspaceDeleteCallback) 994 995 return rv, nil 996} 997 998// Called by go-couchbase if a configured keyspace is deleted 999func (p *namespace) KeyspaceDeleteCallback(name string, err error) { 1000 1001 p.lock.Lock() 1002 defer p.lock.Unlock() 1003 1004 ks, ok := p.keyspaceCache[name] 1005 if ok && ks.cbKeyspace != nil { 1006 logging.Infof("Keyspace %v being deleted", name) 1007 ks.cbKeyspace.(*keyspace).deleted = true 1008 delete(p.keyspaceCache, name) 1009 1010 } else { 1011 logging.Warnf("Keyspace %v not configured on this server", name) 1012 } 1013} 1014 1015func (b *keyspace) NamespaceId() string { 1016 return b.namespace.Id() 1017} 1018 1019func (b *keyspace) Namespace() datastore.Namespace { 1020 return b.namespace 1021} 1022 1023func (b *keyspace) Id() string { 1024 return b.Name() 1025} 1026 1027func (b *keyspace) Name() string { 1028 return b.name 1029} 1030 1031func (b *keyspace) Count(context datastore.QueryContext) (int64, errors.Error) { 1032 totalCount, err := b.cbbucket.GetCount(true) 1033 if err != nil { 1034 return 0, errors.NewCbKeyspaceCountError(nil, "keyspace "+b.Name()+"Error "+err.Error()) 1035 } 1036 return totalCount, nil 1037} 1038 1039func (b *keyspace) Indexer(name datastore.IndexType) (datastore.Indexer, errors.Error) { 1040 switch name { 1041 case datastore.GSI, datastore.DEFAULT: 1042 if b.gsiIndexer != nil { 1043 return b.gsiIndexer, nil 1044 } 1045 return nil, errors.NewCbIndexerNotImplementedError(nil, fmt.Sprintf("GSI may not be enabled")) 1046 case datastore.VIEW: 1047 return b.viewIndexer, nil 1048 default: 1049 return nil, errors.NewCbIndexerNotImplementedError(nil, fmt.Sprintf("Type %s", name)) 1050 } 1051} 1052 1053func (b *keyspace) Indexers() ([]datastore.Indexer, errors.Error) { 1054 indexers := make([]datastore.Indexer, 0, 2) 1055 if b.gsiIndexer != nil { 1056 indexers = append(indexers, b.gsiIndexer) 1057 } 1058 1059 indexers = append(indexers, b.viewIndexer) 1060 return indexers, nil 1061} 1062 1063func (b *keyspace) Fetch(keys []string, fetchMap map[string]value.AnnotatedValue, 1064 context datastore.QueryContext, subPaths []string) []errors.Error { 1065 var bulkResponse map[string]*gomemcached.MCResponse 1066 var mcr *gomemcached.MCResponse 1067 var err error 1068 1069 _subPaths := subPaths 1070 noVirtualDocAttr := false 1071 1072 if len(_subPaths) > 0 && _subPaths[0] != "$document" { 1073 _subPaths = append([]string{"$document"}, _subPaths...) 1074 noVirtualDocAttr = true 1075 } 1076 1077 l := len(keys) 1078 if l == 0 { 1079 return nil 1080 } 1081 1082 if l == 1 { 1083 mcr, err = b.cbbucket.GetsMC(keys[0], context.GetReqDeadline(), _subPaths) 1084 } else { 1085 bulkResponse, err = b.cbbucket.GetBulk(keys, context.GetReqDeadline(), _subPaths) 1086 defer b.cbbucket.ReleaseGetBulkPools(bulkResponse) 1087 } 1088 1089 if err != nil { 1090 // Ignore "Not found" keys 1091 if !isNotFoundError(err) { 1092 if cb.IsReadTimeOutError(err) { 1093 logging.Errorf(err.Error()) 1094 } 1095 return []errors.Error{errors.NewCbBulkGetError(err, "")} 1096 } 1097 } 1098 1099 i := 0 1100 if l == 1 { 1101 if mcr != nil && err == nil { 1102 if len(_subPaths) > 0 { 1103 fetchMap[keys[0]] = getSubDocFetchResults(keys[0], mcr, _subPaths, noVirtualDocAttr) 1104 } else { 1105 fetchMap[keys[0]] = doFetch(keys[0], mcr) 1106 } 1107 i++ 1108 } 1109 } else { 1110 if len(_subPaths) > 0 { 1111 for k, v := range bulkResponse { 1112 fetchMap[k] = getSubDocFetchResults(k, v, _subPaths, noVirtualDocAttr) 1113 i++ 1114 } 1115 } else { 1116 for k, v := range bulkResponse { 1117 fetchMap[k] = doFetch(k, v) 1118 i++ 1119 } 1120 } 1121 1122 } 1123 1124 logging.Debugf("Fetched %d keys ", i) 1125 1126 return nil 1127} 1128 1129func doFetch(k string, v *gomemcached.MCResponse) value.AnnotatedValue { 1130 val := value.NewAnnotatedValue(value.NewParsedValue(v.Body, false)) 1131 1132 var flags, expiration uint32 1133 1134 if len(v.Extras) >= 4 { 1135 flags = binary.BigEndian.Uint32(v.Extras[0:4]) 1136 } 1137 1138 if len(v.Extras) >= 8 { 1139 expiration = binary.BigEndian.Uint32(v.Extras[4:8]) 1140 } 1141 1142 meta_type := "json" 1143 if val.Type() == value.BINARY { 1144 meta_type = "base64" 1145 } 1146 1147 val.SetAttachment("meta", map[string]interface{}{ 1148 "id": k, 1149 "cas": v.Cas, 1150 "type": meta_type, 1151 "flags": flags, 1152 "expiration": expiration, 1153 }) 1154 1155 // Uncomment when needed 1156 //logging.Debugf("CAS Value for key %v is %v flags %v", k, uint64(v.Cas), meta_flags) 1157 1158 return val 1159} 1160 1161func getSubDocFetchResults(k string, v *gomemcached.MCResponse, subPaths []string, noVirtualDocAttr bool) value.AnnotatedValue { 1162 responseIter := 0 1163 i := 0 1164 xVal := map[string]interface{}{} 1165 1166 for i < len(subPaths) { 1167 // For the xattr contents - $document 1168 xattrError := gomemcached.Status(binary.BigEndian.Uint16(v.Body[responseIter+0:])) 1169 xattrValueLen := int(binary.BigEndian.Uint32(v.Body[responseIter+2:])) 1170 1171 xattrValue := v.Body[responseIter+6 : responseIter+6+xattrValueLen] 1172 1173 // When xattr value not defined for a doc, set missing 1174 tmpVal := value.NewValue(xattrValue) 1175 1176 if xattrError != gomemcached.SUBDOC_PATH_NOT_FOUND { 1177 xVal[subPaths[i]] = tmpVal.Actual() 1178 } 1179 1180 // Calculate actual doc value 1181 responseIter = responseIter + 6 + xattrValueLen 1182 i = i + 1 1183 } 1184 1185 // For the actual document contents - 1186 respError := gomemcached.Status(binary.BigEndian.Uint16(v.Body[responseIter+0:])) 1187 respValueLen := int(binary.BigEndian.Uint32(v.Body[responseIter+2:])) 1188 1189 respValue := v.Body[responseIter+6 : responseIter+6+respValueLen] 1190 1191 // For deleted documents with respError path not found set to null 1192 var val value.AnnotatedValue 1193 1194 // For non deleted documents 1195 if respError == gomemcached.SUBDOC_PATH_NOT_FOUND { 1196 // Final Doc value 1197 val = value.NewAnnotatedValue(nil) 1198 } else { 1199 val = value.NewAnnotatedValue(value.NewParsedValue(respValue, false)) 1200 } 1201 1202 // type 1203 meta_type := "json" 1204 if val.Type() == value.BINARY { 1205 meta_type = "base64" 1206 } 1207 1208 // Get flags and expiration from the $document virtual xattrs 1209 docMeta := xVal["$document"].(map[string]interface{}) 1210 1211 // Convert unmarshalled int64 values to uint32 1212 flags := uint32(value.NewValue(docMeta["flags"]).(value.NumberValue).Int64()) 1213 exptime := uint32(value.NewValue(docMeta["exptime"]).(value.NumberValue).Int64()) 1214 1215 if noVirtualDocAttr { 1216 delete(xVal, "$document") 1217 } 1218 1219 a := map[string]interface{}{ 1220 "id": k, 1221 "cas": v.Cas, 1222 "type": meta_type, 1223 "flags": flags, 1224 "expiration": exptime, 1225 } 1226 1227 if len(xVal) > 0 { 1228 a["xattrs"] = xVal 1229 } 1230 1231 val.SetAttachment("meta", a) 1232 1233 return val 1234} 1235 1236const ( 1237 INSERT = 0x01 1238 UPDATE = 0x02 1239 UPSERT = 0x04 1240) 1241 1242func opToString(op int) string { 1243 1244 switch op { 1245 case INSERT: 1246 return "insert" 1247 case UPDATE: 1248 return "update" 1249 case UPSERT: 1250 return "upsert" 1251 } 1252 1253 return "unknown operation" 1254} 1255 1256func isNotFoundError(err error) bool { 1257 return cb.IsKeyNoEntError(err) 1258} 1259 1260func isEExistError(err error) bool { 1261 return cb.IsKeyEExistsError(err) 1262} 1263 1264func getMeta(key string, meta map[string]interface{}) (cas uint64, flags uint32, err error) { 1265 1266 defer func() { 1267 if r := recover(); r != nil { 1268 err = fmt.Errorf("Recovered in f %v", r) 1269 } 1270 }() 1271 1272 if _, ok := meta["cas"]; ok { 1273 cas = meta["cas"].(uint64) 1274 } else { 1275 return 0, 0, fmt.Errorf("Cas value not found for key %v", key) 1276 } 1277 1278 if _, ok := meta["flags"]; ok { 1279 flags = meta["flags"].(uint32) 1280 } else { 1281 return 0, 0, fmt.Errorf("Flags value not found for key %v", key) 1282 } 1283 1284 return cas, flags, nil 1285 1286} 1287 1288func (b *keyspace) performOp(op int, inserts []value.Pair) ([]value.Pair, errors.Error) { 1289 1290 if len(inserts) == 0 { 1291 return nil, nil 1292 } 1293 1294 insertedKeys := make([]value.Pair, 0, len(inserts)) 1295 var err error 1296 1297 for _, kv := range inserts { 1298 key := kv.Name 1299 val := kv.Value.ActualForIndex() 1300 1301 //mv := kv.Value.GetAttachment("meta") 1302 1303 // TODO Need to also set meta 1304 switch op { 1305 1306 case INSERT: 1307 var added bool 1308 // add the key to the backend 1309 added, err = b.cbbucket.Add(key, 0, val) 1310 if added == false { 1311 // false & err == nil => given key aready exists in the bucket 1312 if err != nil { 1313 err = errors.NewError(err, "Key "+key) 1314 } else { 1315 err = errors.NewError(nil, "Duplicate Key "+key) 1316 } 1317 } 1318 case UPDATE: 1319 // check if the key exists and if so then use the cas value 1320 // to update the key 1321 var meta map[string]interface{} 1322 var cas uint64 1323 var flags uint32 1324 1325 an := kv.Value.(value.AnnotatedValue) 1326 meta = an.GetAttachment("meta").(map[string]interface{}) 1327 1328 cas, flags, err = getMeta(key, meta) 1329 if err != nil { 1330 // Don't perform the update if the meta values are not found 1331 logging.Errorf("Failed to get meta values for key <ud>%v</ud>, error %v", key, err) 1332 } else { 1333 1334 logging.Debugf("CAS Value (Update) for key <ud>%v</ud> is %v flags <ud>%v</ud> value <ud>%v</ud>", key, uint64(cas), flags, val) 1335 _, _, err = b.cbbucket.CasWithMeta(key, int(flags), 0, uint64(cas), val) 1336 } 1337 1338 case UPSERT: 1339 err = b.cbbucket.Set(key, 0, val) 1340 } 1341 1342 if err != nil { 1343 if isEExistError(err) { 1344 logging.Errorf("Failed to perform update on key <ud>%s</ud>. CAS mismatch due to concurrent modifications", key) 1345 } else { 1346 logging.Errorf("Failed to perform <ud>%s</ud> on key <ud>%s</ud> for Keyspace %s.", opToString(op), key, b.Name()) 1347 } 1348 } else { 1349 insertedKeys = append(insertedKeys, kv) 1350 } 1351 } 1352 1353 if len(insertedKeys) == 0 { 1354 return nil, errors.NewCbDMLError(err, "Failed to perform "+opToString(op)) 1355 } 1356 1357 return insertedKeys, nil 1358 1359} 1360 1361func (b *keyspace) Insert(inserts []value.Pair) ([]value.Pair, errors.Error) { 1362 return b.performOp(INSERT, inserts) 1363 1364} 1365 1366func (b *keyspace) Update(updates []value.Pair) ([]value.Pair, errors.Error) { 1367 return b.performOp(UPDATE, updates) 1368} 1369 1370func (b *keyspace) Upsert(upserts []value.Pair) ([]value.Pair, errors.Error) { 1371 return b.performOp(UPSERT, upserts) 1372} 1373 1374func (b *keyspace) Delete(deletes []string, context datastore.QueryContext) ([]string, errors.Error) { 1375 1376 failedDeletes := make([]string, 0) 1377 actualDeletes := make([]string, 0) 1378 var err error 1379 for _, key := range deletes { 1380 if err = b.cbbucket.Delete(key); err != nil { 1381 if !isNotFoundError(err) { 1382 logging.Infof("Failed to delete key <ud>%s</ud> Error %s", key, err) 1383 failedDeletes = append(failedDeletes, key) 1384 } 1385 } else { 1386 actualDeletes = append(actualDeletes, key) 1387 } 1388 } 1389 1390 if len(failedDeletes) > 0 { 1391 return actualDeletes, errors.NewCbDeleteFailedError(err, "Some keys were not deleted "+fmt.Sprintf("%v", failedDeletes)) 1392 } 1393 1394 return actualDeletes, nil 1395} 1396 1397func (b *keyspace) Release() { 1398 b.deleted = true 1399 b.cbbucket.Close() 1400} 1401 1402func (b *keyspace) refreshIndexer(url string, poolName string) { 1403 var err error 1404 b.gsiIndexer, err = gsi.NewGSIIndexer(url, poolName, b.Name()) 1405 if err == nil { 1406 logging.Infof(" GSI Indexer loaded ") 1407 } 1408} 1409 1410func (b *keyspace) loadIndexes() (err errors.Error) { 1411 viewIndexer := b.viewIndexer.(*viewIndexer) 1412 if err1 := viewIndexer.loadViewIndexes(); err1 != nil { 1413 err = err1 1414 } 1415 return 1416} 1417 1418// primaryIndex performs full keyspace scans. 1419type primaryIndex struct { 1420 viewIndex 1421} 1422 1423func (pi *primaryIndex) KeyspaceId() string { 1424 return pi.keyspace.Id() 1425} 1426 1427func (pi *primaryIndex) Id() string { 1428 return pi.Name() 1429} 1430 1431func (pi *primaryIndex) Name() string { 1432 return pi.name 1433} 1434 1435func (pi *primaryIndex) Type() datastore.IndexType { 1436 return pi.viewIndex.Type() 1437} 1438 1439func (pi *primaryIndex) SeekKey() expression.Expressions { 1440 return pi.viewIndex.SeekKey() 1441} 1442 1443func (pi *primaryIndex) RangeKey() expression.Expressions { 1444 return pi.viewIndex.RangeKey() 1445} 1446 1447func (pi *primaryIndex) Condition() expression.Expression { 1448 return pi.viewIndex.Condition() 1449} 1450 1451func (pi *primaryIndex) State() (state datastore.IndexState, msg string, err errors.Error) { 1452 return pi.viewIndex.State() 1453} 1454 1455func (pi *primaryIndex) Statistics(requestId string, span *datastore.Span) ( 1456 datastore.Statistics, errors.Error) { 1457 return pi.viewIndex.Statistics(requestId, span) 1458} 1459 1460func (pi *primaryIndex) Drop(requestId string) errors.Error { 1461 return pi.viewIndex.Drop(requestId) 1462} 1463 1464func (pi *primaryIndex) Scan(requestId string, span *datastore.Span, distinct bool, limit int64, 1465 cons datastore.ScanConsistency, vector timestamp.Vector, conn *datastore.IndexConnection) { 1466 pi.viewIndex.Scan(requestId, span, distinct, limit, cons, vector, conn) 1467} 1468 1469func (pi *primaryIndex) ScanEntries(requestId string, limit int64, cons datastore.ScanConsistency, 1470 vector timestamp.Vector, conn *datastore.IndexConnection) { 1471 pi.viewIndex.ScanEntries(requestId, limit, cons, vector, conn) 1472} 1473