1//  Copyright (c) 2014 Couchbase, Inc.
2//  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
3//  except in compliance with the License. You may obtain a copy of the License at
4//    http://www.apache.org/licenses/LICENSE-2.0
5//  Unless required by applicable law or agreed to in writing, software distributed under the
6//  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
7//  either express or implied. See the License for the specific language governing permissions
8//  and limitations under the License.
9
10/*
11
12Package couchbase provides a couchbase-server implementation of the datastore
13package.
14
15*/
16
17package couchbase
18
19import (
20	"encoding/binary"
21	"fmt"
22	"net/http"
23	"net/url"
24	"os"
25	"strconv"
26	"strings"
27	"sync"
28	"time"
29
30	"github.com/couchbase/cbauth"
31	cb "github.com/couchbase/go-couchbase"
32	"github.com/couchbase/gomemcached"
33	gsi "github.com/couchbase/indexing/secondary/queryport/n1ql"
34	"github.com/couchbase/query/auth"
35	"github.com/couchbase/query/datastore"
36	"github.com/couchbase/query/errors"
37	"github.com/couchbase/query/expression"
38	"github.com/couchbase/query/logging"
39	"github.com/couchbase/query/timestamp"
40	"github.com/couchbase/query/value"
41
42	"github.com/couchbase/query/server"
43)
44
45var REQUIRE_CBAUTH bool // Connection to authorization system must succeed.
46
47// cbPoolMap and cbPoolServices implement a local cache of the datastore's topology
48type cbPoolMap struct {
49	sync.RWMutex
50	poolServices map[string]cbPoolServices
51}
52
53type cbPoolServices struct {
54	name         string
55	rev          int
56	nodeServices map[string]interface{}
57}
58
59var _POOLMAP cbPoolMap
60
61func init() {
62
63	// MB-27415 have a larger overflow pool and close overflow connections asynchronously
64	cb.SetConnectionPoolParams(64, 64)
65	cb.EnableAsynchronousCloser(true)
66
67	val, err := strconv.ParseBool(os.Getenv("REQUIRE_CBAUTH"))
68	if err != nil {
69		REQUIRE_CBAUTH = val
70	} else {
71		REQUIRE_CBAUTH = true // default
72	}
73
74	// enable data type response
75	cb.EnableDataType = true
76
77	// enable xattrs
78	cb.EnableXattr = true
79
80	// start the fetch workers for servicing the BulkGet operations
81	cb.InitBulkGet()
82	_POOLMAP.poolServices = make(map[string]cbPoolServices, 1)
83}
84
85const (
86	PRIMARY_INDEX = "#primary"
87)
88
89// store is the root for the couchbase datastore
90type store struct {
91	client         cb.Client             // instance of go-couchbase client
92	namespaceCache map[string]*namespace // map of pool-names and IDs
93	CbAuthInit     bool                  // whether cbAuth is initialized
94	inferencer     datastore.Inferencer  // what we use to infer schemas
95	connectionUrl  string                // where to contact ns_server
96}
97
98func (s *store) Id() string {
99	return s.URL()
100}
101
102func (s *store) URL() string {
103	return s.client.BaseURL.String()
104}
105
106func (s *store) Info() datastore.Info {
107	info := &infoImpl{client: &s.client}
108	return info
109}
110
111type infoImpl struct {
112	client *cb.Client
113}
114
115func (info *infoImpl) Version() string {
116	return info.client.Info.ImplementationVersion
117}
118
119func fullhostName(n string) string {
120	hostName, portVal := server.HostNameandPort(n)
121	if hostName != "" {
122		return n
123	}
124	return server.GetIP(true) + ":" + portVal
125}
126
127func (info *infoImpl) Topology() ([]string, []errors.Error) {
128	var nodes []string
129	var errs []errors.Error
130
131	for _, p := range info.client.Info.Pools {
132		pool, err := info.client.GetPool(p.Name)
133
134		if err == nil {
135			for _, node := range pool.Nodes {
136				nodes = append(nodes, fullhostName(node.Hostname))
137			}
138		} else {
139			errs = append(errs, errors.NewDatastoreClusterError(err, p.Name))
140		}
141	}
142	return nodes, errs
143}
144
145func (info *infoImpl) Services(node string) (map[string]interface{}, []errors.Error) {
146	var errs []errors.Error
147
148	isReadLock := true
149	_POOLMAP.RLock()
150	defer func() {
151		if isReadLock {
152			_POOLMAP.RUnlock()
153		} else {
154			_POOLMAP.Unlock()
155		}
156	}()
157
158	// scan the pools
159	for _, p := range info.client.Info.Pools {
160		pool, err := info.client.GetPool(p.Name)
161		poolServices, pErr := info.client.GetPoolServices(p.Name)
162
163		if err == nil && pErr == nil {
164			var found bool = false
165			var services cbPoolServices
166
167			services, ok := _POOLMAP.poolServices[p.Name]
168			found = ok && (services.rev == poolServices.Rev)
169
170			// missing the information, rebuild
171			if !found {
172
173				// promote the lock
174				if isReadLock {
175					_POOLMAP.RUnlock()
176					_POOLMAP.Lock()
177					isReadLock = false
178
179					// now that we have promoted the lock, did we get beaten by somebody else to it?
180					services, ok = _POOLMAP.poolServices[p.Name]
181					found = ok && (services.rev == poolServices.Rev)
182					if found {
183						continue
184					}
185				}
186
187				newPoolServices := cbPoolServices{name: p.Name, rev: poolServices.Rev}
188				nodeServices := make(map[string]interface{}, len(pool.Nodes))
189
190				// go through all the nodes in the pool
191				for _, n := range pool.Nodes {
192					var servicesCopy []interface{}
193
194					newServices := make(map[string]interface{}, 3)
195					newServices["name"] = fullhostName(n.Hostname)
196					for _, s := range n.Services {
197						servicesCopy = append(servicesCopy, s)
198					}
199					newServices["services"] = servicesCopy
200
201					// go through all bucket independet services in the pool
202					for _, ns := range poolServices.NodesExt {
203
204						mgmtPort := ns.Services["mgmt"]
205						if mgmtPort == 0 {
206
207							// shouldn't happen, there should always be a mgmt port on each node
208							// we should return an error
209							msg := fmt.Sprintf("NodeServices does not report mgmt endpoint for "+
210								"this node: %v", node)
211							errs = append(errs, errors.NewAdminGetNodeError(nil, msg))
212							continue
213						}
214
215						nsHostname := ""
216						if ns.Hostname != "" {
217							nsHostname = ns.Hostname + ":" + strconv.Itoa(mgmtPort)
218						}
219						// if we can positively match nodeServices and node, add ports
220						if n.Hostname == nsHostname ||
221							(nsHostname == "" && ns.ThisNode && n.ThisNode) {
222							ports := make(map[string]interface{}, len(ns.Services))
223
224							// only add the ports for those services that are advertised
225							for _, s := range n.Services {
226								for pn, p := range ns.Services {
227									if strings.Index(pn, s) == 0 {
228										ports[pn] = p
229									}
230								}
231							}
232							newServices["ports"] = ports
233							break
234						}
235					}
236					nodeServices[fullhostName(n.Hostname)] = newServices
237				}
238				newPoolServices.nodeServices = nodeServices
239				_POOLMAP.poolServices[p.Name] = newPoolServices
240				services = newPoolServices
241			}
242			nodeServices, ok := services.nodeServices[node]
243			if ok {
244				return nodeServices.(map[string]interface{}), errs
245			}
246		} else {
247			if err != nil {
248				errs = append(errs, errors.NewDatastoreClusterError(err, p.Name))
249			}
250			if pErr != nil {
251				errs = append(errs, errors.NewDatastoreClusterError(pErr, p.Name))
252			}
253		}
254	}
255	return map[string]interface{}{}, errs
256}
257
258func (s *store) NamespaceIds() ([]string, errors.Error) {
259	return s.NamespaceNames()
260}
261
262func (s *store) NamespaceNames() ([]string, errors.Error) {
263	return []string{"default"}, nil
264}
265
266func (s *store) NamespaceById(id string) (p datastore.Namespace, e errors.Error) {
267	return s.NamespaceByName(id)
268}
269
270func (s *store) NamespaceByName(name string) (p datastore.Namespace, e errors.Error) {
271	p, ok := s.namespaceCache[name]
272	if !ok {
273		var err errors.Error
274		p, err = loadNamespace(s, name)
275		if err != nil {
276			return nil, err
277		}
278		s.namespaceCache[name] = p.(*namespace)
279	}
280	return p, nil
281}
282
283// The ns_server admin API is open iff we can access the /pools API without a password.
284func (s *store) adminIsOpen() bool {
285	url := s.connectionUrl + "/pools"
286	resp, err := http.Get(url)
287	if err != nil {
288		return false
289	}
290	defer resp.Body.Close()
291	if resp.StatusCode != 200 {
292		return false
293	}
294	return true
295}
296
297func (s *store) auth(user, pwd string) (cbauth.Creds, error) {
298	return cbauth.Auth(user, pwd)
299}
300
301func (s *store) authWebCreds(req *http.Request) (cbauth.Creds, error) {
302	return cbauth.AuthWebCreds(req)
303}
304
305func (s *store) Authorize(privileges *auth.Privileges, credentials auth.Credentials, req *http.Request) (auth.AuthenticatedUsers, errors.Error) {
306	if s.CbAuthInit == false {
307		// cbauth is not initialized. Access to SASL protected buckets will be
308		// denied by the couchbase server
309		logging.Warnf("CbAuth not intialized")
310		return nil, nil
311	}
312
313	return cbAuthorize(s, privileges, credentials, req)
314}
315
316func (s *store) CredsString(req *http.Request) string {
317	if req != nil {
318		creds, err := cbauth.AuthWebCreds(req)
319		if err == nil {
320			return creds.Name()
321		}
322	}
323	return ""
324}
325
326func (s *store) SetLogLevel(level logging.Level) {
327	for _, n := range s.namespaceCache {
328		defer n.lock.Unlock()
329		n.lock.Lock()
330		for _, k := range n.keyspaceCache {
331			if k.cbKeyspace == nil {
332				continue
333			}
334			indexers, _ := k.cbKeyspace.Indexers()
335			if len(indexers) > 0 {
336				for _, idxr := range indexers {
337					idxr.SetLogLevel(level)
338				}
339
340				return
341			}
342		}
343	}
344}
345
346// Ignore the name parameter for now
347func (s *store) Inferencer(name datastore.InferenceType) (datastore.Inferencer, errors.Error) {
348	return s.inferencer, nil
349}
350
351func (s *store) Inferencers() ([]datastore.Inferencer, errors.Error) {
352	return []datastore.Inferencer{s.inferencer}, nil
353}
354
355func (s *store) AuditInfo() (*datastore.AuditInfo, errors.Error) {
356	auditSpec, err := s.client.GetAuditSpec()
357	if err != nil {
358		return nil, errors.NewSystemUnableToRetrieveError(err)
359	}
360
361	users := make(map[datastore.UserInfo]bool, len(auditSpec.DisabledUsers))
362	for _, u := range auditSpec.DisabledUsers {
363		ui := datastore.UserInfo{Name: u.Name, Domain: u.Domain}
364		users[ui] = true
365	}
366
367	events := make(map[uint32]bool, len(auditSpec.Disabled))
368	for _, eid := range auditSpec.Disabled {
369		events[eid] = true
370	}
371
372	ret := &datastore.AuditInfo{
373		EventDisabled:   events,
374		UserWhitelisted: users,
375		AuditEnabled:    auditSpec.AuditdEnabled,
376		Uid:             auditSpec.Uid,
377	}
378	return ret, nil
379}
380
381func (s *store) ProcessAuditUpdateStream(callb func(uid string) error) errors.Error {
382	f := func(data interface{}) error {
383		d, ok := data.(*DefaultObject)
384		if !ok {
385			return fmt.Errorf("Unable to convert received object to proper type: %T", data)
386		}
387		return callb(d.Uid)
388	}
389	do := DefaultObject{}
390	err := s.client.ProcessStream("/poolsStreaming/default", f, &do)
391	if err != nil {
392		return errors.NewAuditStreamHandlerFailed(err)
393	}
394	return nil
395}
396
397type DefaultObject struct {
398	Uid string `json:"auditUid"`
399}
400
401func (s *store) UserInfo() (value.Value, errors.Error) {
402	data, err := s.client.GetUserRoles()
403	if err != nil {
404		return nil, errors.NewSystemUnableToRetrieveError(err)
405	}
406	return value.NewValue(data), nil
407}
408
409func (s *store) GetUserInfoAll() ([]datastore.User, errors.Error) {
410	sourceUsers, err := s.client.GetUserInfoAll()
411	if err != nil {
412		return nil, errors.NewSystemUnableToRetrieveError(err)
413	}
414	resultUsers := make([]datastore.User, len(sourceUsers))
415	for i, u := range sourceUsers {
416		resultUsers[i].Name = u.Name
417		resultUsers[i].Id = u.Id
418		resultUsers[i].Domain = u.Domain
419		roles := make([]datastore.Role, len(u.Roles))
420		for j, r := range u.Roles {
421			roles[j].Name = r.Role
422			roles[j].Bucket = r.BucketName
423		}
424		resultUsers[i].Roles = roles
425	}
426	return resultUsers, nil
427}
428
429func (s *store) PutUserInfo(u *datastore.User) errors.Error {
430	var outputUser cb.User
431	outputUser.Name = u.Name
432	outputUser.Id = u.Id
433	outputUser.Roles = make([]cb.Role, len(u.Roles))
434	outputUser.Domain = u.Domain
435	for i, r := range u.Roles {
436		outputUser.Roles[i].Role = r.Name
437		outputUser.Roles[i].BucketName = r.Bucket
438	}
439	err := s.client.PutUserInfo(&outputUser)
440	if err != nil {
441		return errors.NewSystemUnableToUpdateError(err)
442	}
443	return nil
444}
445
446func (s *store) GetRolesAll() ([]datastore.Role, errors.Error) {
447	roleDescList, err := s.client.GetRolesAll()
448	if err != nil {
449		return nil, errors.NewDatastoreUnableToRetrieveRoles(err)
450	}
451	roles := make([]datastore.Role, len(roleDescList))
452	for i, rd := range roleDescList {
453		roles[i].Name = rd.Role
454		roles[i].Bucket = rd.BucketName
455	}
456	return roles, nil
457}
458
459func initCbAuth(url string) (*cb.Client, error) {
460
461	transport := cbauth.WrapHTTPTransport(cb.HTTPTransport, nil)
462	cb.HTTPClient.Transport = transport
463
464	client, err := cb.ConnectWithAuth(url, cbauth.NewAuthHandler(nil))
465	if err != nil {
466		return nil, err
467	}
468
469	logging.Infof(" Initialization of cbauth succeeded ")
470
471	return &client, nil
472}
473
474func parseUrl(u string) (host string, username string, password string, err error) {
475	url, err := url.Parse(u)
476	if err != nil {
477		return "", "", "", err
478	}
479	if url.User == nil {
480		return "", "", "", fmt.Errorf("Unusable url %s. No user information.", u)
481	}
482	password, _ = url.User.Password()
483	if password == "" {
484		logging.Warnf("No password found in url <ud>%s</ud>.", u)
485	}
486	if url.User.Username() == "" {
487		logging.Warnf("No username found in url <ud>%s</ud>.", u)
488	}
489	return url.Host, url.User.Username(), password, nil
490}
491
492// NewStore creates a new Couchbase store for the given url.
493// In the main server, and error return here will cause the server to shut down.
494func NewDatastore(u string) (s datastore.Datastore, e errors.Error) {
495	var client cb.Client
496	var cbAuthInit bool
497	var err error
498	var connectionUrl string
499
500	// initialize cbauth
501	c, err := initCbAuth(u)
502	if err != nil {
503		logging.Errorf("Unable to initialize cbauth. Error %v", err)
504
505		// intialize cb_auth variables manually
506		host, username, password, err := parseUrl(u)
507		if err != nil {
508			logging.Warnf("Unable to parse url <ud>%s</ud>: %v", u, err)
509		} else {
510			set, err := cbauth.InternalRetryDefaultInit(host, username, password)
511			if set == false || err != nil {
512				logging.Errorf("Unable to initialize cbauth variables. Error %v", err)
513			} else {
514				c, err = initCbAuth("http://" + host)
515				if err != nil {
516					logging.Errorf("Unable to initialize cbauth. Error %v", err)
517				} else {
518					client = *c
519					cbAuthInit = true
520					connectionUrl = "http://" + host
521				}
522			}
523		}
524	} else {
525		client = *c
526		cbAuthInit = true
527		connectionUrl = u
528	}
529
530	if !cbAuthInit {
531		if REQUIRE_CBAUTH {
532			return nil, errors.NewUnableToInitCbAuthError(err)
533		}
534		// connect without auth
535		logging.Warnf("Unable to initialize cbAuth, access to couchbase buckets may be restricted")
536		cb.HTTPClient = &http.Client{}
537		client, err = cb.Connect(u)
538		if err != nil {
539			return nil, errors.NewCbConnectionError(err, "url "+u)
540		}
541	}
542
543	store := &store{
544		client:         client,
545		namespaceCache: make(map[string]*namespace),
546		CbAuthInit:     cbAuthInit,
547		connectionUrl:  connectionUrl,
548	}
549
550	// get the schema inferencer
551	var er errors.Error
552	store.inferencer, er = GetDefaultInferencer(store)
553	if er != nil {
554		return nil, er
555	}
556
557	// initialize the default pool.
558	// TODO can couchbase server contain more than one pool ?
559
560	defaultPool, er := loadNamespace(store, "default")
561	if er != nil {
562		logging.Errorf("Cannot connect to default pool")
563		return nil, er
564	}
565
566	store.namespaceCache["default"] = defaultPool
567	logging.Infof("New store created with url %s", u)
568
569	return store, nil
570}
571
572func loadNamespace(s *store, name string) (*namespace, errors.Error) {
573
574	cbpool, err := s.client.GetPool(name)
575	if err != nil {
576		if name == "default" {
577			// if default pool is not available, try reconnecting to the server
578			url := s.URL()
579
580			var client cb.Client
581
582			if s.CbAuthInit == true {
583				client, err = cb.ConnectWithAuth(url, cbauth.NewAuthHandler(nil))
584			} else {
585				client, err = cb.Connect(url)
586			}
587			if err != nil {
588				return nil, errors.NewCbNamespaceNotFoundError(err, "Namespace "+name)
589			}
590			// check if the default pool exists
591			cbpool, err = client.GetPool(name)
592			if err != nil {
593				return nil, errors.NewCbNamespaceNotFoundError(err, "Namespace "+name)
594			}
595			s.client = client
596		}
597	}
598
599	rv := namespace{
600		store:         s,
601		name:          name,
602		cbNamespace:   &cbpool,
603		keyspaceCache: make(map[string]*keyspaceEntry),
604	}
605
606	return &rv, nil
607}
608
609// a namespace represents a couchbase pool
610type namespace struct {
611	store         *store
612	name          string
613	cbNamespace   *cb.Pool
614	keyspaceCache map[string]*keyspaceEntry
615	version       uint64
616	lock          sync.RWMutex // lock to guard the keyspaceCache
617	nslock        sync.RWMutex // lock for this structure
618}
619
620type keyspaceEntry struct {
621	sync.Mutex
622	cbKeyspace datastore.Keyspace
623	errCount   int
624	errTime    time.Time
625	lastUse    time.Time
626}
627
628const (
629	_MIN_ERR_INTERVAL   time.Duration = 5 * time.Second
630	_THROTTLING_TIMEOUT time.Duration = 10 * time.Millisecond
631	_CLEANUP_INTERVAL   time.Duration = time.Hour
632)
633
634func (p *namespace) DatastoreId() string {
635	return p.store.Id()
636}
637
638func (p *namespace) Id() string {
639	return p.Name()
640}
641
642func (p *namespace) Name() string {
643	return p.name
644}
645
646func (p *namespace) KeyspaceIds() ([]string, errors.Error) {
647	return p.KeyspaceNames()
648}
649
650func (p *namespace) KeyspaceNames() ([]string, errors.Error) {
651	p.refresh()
652	rv := make([]string, 0, len(p.cbNamespace.BucketMap))
653	for name, _ := range p.cbNamespace.BucketMap {
654		rv = append(rv, name)
655	}
656	return rv, nil
657}
658
659func (p *namespace) KeyspaceByName(name string) (datastore.Keyspace, errors.Error) {
660	var err errors.Error
661
662	// make sure that no one is deleting the keyspace as we check
663	p.lock.RLock()
664	entry, ok := p.keyspaceCache[name]
665	p.lock.RUnlock()
666	if ok && entry.cbKeyspace != nil {
667		return entry.cbKeyspace, nil
668	}
669
670	// MB-19601 we haven't found the keyspace, so we have to load it,
671	// however, there might be a flood of other requests coming in, all
672	// wanting to do use the same keyspace and all needing to load it.
673	// In the previous implementation all requests would first create
674	// and refresh the keyspace, refreshing the indexes, etc
675	// In YCSB enviroments this resulted in thousends of requests
676	// flooding ns_server with buckets and ddocs load at the same time.
677	// What we want instead is for one request to do the work, and all the
678	// others waiting and benefiting from that work.
679	// This is the exact scenario for using Shared Optimistic Locks, but,
680	// sadly, they are patented by IBM, so clearly it's no go for us.
681	// What we do is create the keyspace entry, and record that we are priming
682	// it by locking that entry.
683	// Everyone else will have to wait on the lock, and once they get it,
684	// they can check on the keyspace again - if all is fine, just continue
685	// if not try and load again.
686	// Shared Optimistic Locks by stealth, although not as efficient (there
687	// might be sequencing of would be loaders on the keyspace lock after
688	// the initial keyspace loading has been done).
689	// If we fail, again! then there's something wrong with the keyspace,
690	// which means that retrying over and over again, we'll be loading ns_server
691	// so what we do is throttle the reloads and log errors, so that the
692	// powers that be are alerted that there's some resource issue.
693	// Finally, since we are having to use two locks rather than one, make sure
694	// that the locking sequence is predictable.
695	// keyspace lock is always locked outside of the keyspace cache lock.
696
697	// 1) create the entry if necessary, record time of loading attempt
698	p.lock.Lock()
699	entry, ok = p.keyspaceCache[name]
700	if !ok {
701		entry = &keyspaceEntry{}
702		p.keyspaceCache[name] = entry
703	}
704	entry.lastUse = time.Now()
705	p.lock.Unlock()
706
707	// 2) serialize the loading by locking the entry
708	entry.Lock()
709	defer entry.Unlock()
710
711	// 3) check if somebody has done the job for us in the interim
712	if entry.cbKeyspace != nil {
713		return entry.cbKeyspace, nil
714	}
715
716	// 4) if previous loads resulted in errors, throttle requests
717	if entry.errCount > 0 && time.Since(entry.lastUse) < _THROTTLING_TIMEOUT {
718		time.Sleep(_THROTTLING_TIMEOUT)
719	}
720
721	// 5) try the loading
722	k, err := newKeyspace(p, name)
723	if err != nil {
724
725		// We try not to flood the log with errors
726		if entry.errCount == 0 {
727			entry.errTime = time.Now()
728		} else if time.Since(entry.errTime) > _MIN_ERR_INTERVAL {
729			entry.errTime = time.Now()
730		}
731		entry.errCount++
732		return nil, err
733	}
734	entry.errCount = 0
735
736	// this is the only place where entry.cbKeyspace is set
737	// it is never unset - so it's safe to test cbKeyspace != nil
738	entry.cbKeyspace = k
739	return k, nil
740}
741
742// compare the list of node addresses
743// Assumption: the list of node addresses in each list are sorted
744func compareNodeAddress(a, b []string) bool {
745
746	if len(a) != len(b) {
747		return false
748	}
749
750	for i := 0; i < len(a); i++ {
751		if a[i] != b[i] {
752			return false
753		}
754	}
755	return true
756}
757
758func (p *namespace) KeyspaceById(id string) (datastore.Keyspace, errors.Error) {
759	return p.KeyspaceByName(id)
760}
761
762func (p *namespace) MetadataVersion() uint64 {
763	return p.version
764}
765
766func (p *namespace) setPool(cbpool *cb.Pool) {
767	p.nslock.Lock()
768	oldPool := p.cbNamespace
769	p.cbNamespace = cbpool
770	p.nslock.Unlock()
771
772	// MB-33185 let go of old pool
773	oldPool.Close()
774}
775
776func (p *namespace) getPool() *cb.Pool {
777	p.nslock.RLock()
778	defer p.nslock.RUnlock()
779	return p.cbNamespace
780}
781
782func (p *namespace) refresh() {
783	// trigger refresh of this pool
784	logging.Debugf("Refreshing pool %s", p.name)
785
786	newpool, err := p.store.client.GetPool(p.name)
787	if err != nil {
788		newpool, err = p.reload1(err)
789		if err == nil {
790			p.reload2(&newpool)
791		}
792		return
793	}
794	oldpool := p.getPool()
795	changed := len(oldpool.BucketMap) != len(newpool.BucketMap)
796	if !changed {
797		for on, ob := range oldpool.BucketMap {
798			nb := newpool.BucketMap[on]
799			if nb != nil && nb.UUID == ob.UUID {
800				continue
801			}
802			changed = true
803			break
804		}
805	}
806	if changed {
807		p.reload2(&newpool)
808		return
809	}
810	newpool.Close()
811
812	p.lock.Lock()
813	for _, ks := range p.keyspaceCache {
814
815                // in case a change has kicked in in between checking bucketMaps and cquiring the lock
816                if ks.cbKeyspace == nil {
817                        continue
818                }
819
820		// Not deleted. Check if GSI indexer is available
821		if ks.cbKeyspace.(*keyspace).gsiIndexer == nil {
822			ks.cbKeyspace.(*keyspace).refreshIndexer(p.store.URL(), p.Name())
823		}
824	}
825	p.lock.Unlock()
826}
827
828func (p *namespace) reload() {
829	logging.Debugf("Reload %s", p.name)
830
831	newpool, err := p.store.client.GetPool(p.name)
832	if err != nil {
833		newpool, err = p.reload1(err)
834		if err != nil {
835			return
836		}
837	}
838	p.reload2(&newpool)
839}
840
841func (p *namespace) reload1(err error) (cb.Pool, error) {
842	var client cb.Client
843
844	logging.Errorf("Error updating pool name %s: Error %v", p.name, err)
845	url := p.store.URL()
846
847	/*
848	   transport := cbauth.WrapHTTPTransport(cb.HTTPTransport, nil)
849	   cb.HTTPClient.Transport = transport
850	*/
851
852	if p.store.CbAuthInit == true {
853		client, err = cb.ConnectWithAuth(url, cbauth.NewAuthHandler(nil))
854	} else {
855		client, err = cb.Connect(url)
856	}
857	if err != nil {
858		logging.Errorf("Error connecting to URL %s - %v", url, err)
859		return cb.Pool{}, err
860	}
861	// check if the default pool exists
862	newpool, err := client.GetPool(p.name)
863	if err != nil {
864		logging.Errorf("Retry Failed Error updating pool name <ud>%s</ud>: Error %v", p.name, err)
865		return newpool, err
866	}
867	p.store.client = client
868
869	return newpool, nil
870}
871
872func (p *namespace) reload2(newpool *cb.Pool) {
873	p.lock.Lock()
874	for name, ks := range p.keyspaceCache {
875		logging.Debugf(" Checking keyspace %s", name)
876		if ks.cbKeyspace == nil {
877			if time.Since(ks.lastUse) > _CLEANUP_INTERVAL {
878				delete(p.keyspaceCache, name)
879			}
880			continue
881		}
882		newbucket, err := newpool.GetBucket(name)
883		if err != nil {
884			ks.cbKeyspace.(*keyspace).deleted = true
885			logging.Errorf(" Error retrieving bucket %s", name)
886			ks.cbKeyspace.(*keyspace).cbbucket.Close()
887			logging.Errorf(" Error retrieving bucket %s - %v", name, err)
888			delete(p.keyspaceCache, name)
889
890		} else if ks.cbKeyspace.(*keyspace).cbbucket.UUID != newbucket.UUID {
891			logging.Debugf(" UUid of keyspace %v uuid now %v", ks.cbKeyspace.(*keyspace).cbbucket.UUID, newbucket.UUID)
892			// UUID has changed. Update the keyspace struct with the newbucket
893			// and release old one
894			ks.cbKeyspace.(*keyspace).cbbucket.Close()
895			ks.cbKeyspace.(*keyspace).cbbucket = newbucket
896		} else {
897
898			// we are reloading, so close old and set new bucket
899			ks.cbKeyspace.(*keyspace).cbbucket.Close()
900			ks.cbKeyspace.(*keyspace).cbbucket = newbucket
901		}
902
903		// Not deleted. Check if GSI indexer is available
904		if ks.cbKeyspace.(*keyspace).gsiIndexer == nil {
905			ks.cbKeyspace.(*keyspace).refreshIndexer(p.store.URL(), p.Name())
906		}
907	}
908	p.lock.Unlock()
909
910	p.getPool().Close()
911	p.setPool(newpool)
912
913	// keyspaces have been reloaded, force full auto reprepare check
914	p.version++
915}
916
917type keyspace struct {
918	namespace   *namespace
919	name        string
920	cbbucket    *cb.Bucket
921	deleted     bool
922	viewIndexer datastore.Indexer // View index provider
923	gsiIndexer  datastore.Indexer // GSI index provider
924}
925
926//
927// Inferring schemas sometimes requires getting a sample of random documents
928// from a keyspace. Ideally this should come through a random traversal of the
929// primary index, but until that is available, we need to use the Bucket's
930// connection pool of memcached.Clients to request random documents from
931// the KV store.
932//
933
934func (k *keyspace) GetRandomEntry() (string, value.Value, errors.Error) {
935	resp, err := k.cbbucket.GetRandomDoc()
936
937	if err != nil {
938		return "", nil, errors.NewCbGetRandomEntryError(err)
939	}
940
941	return fmt.Sprintf("%s", resp.Key), value.NewValue(resp.Body), nil
942}
943
944func newKeyspace(p *namespace, name string) (datastore.Keyspace, errors.Error) {
945
946	cbNamespace := p.getPool()
947	cbbucket, err := cbNamespace.GetBucket(name)
948
949	if err != nil {
950		logging.Infof(" keyspace %s not found %v", name, err)
951		// go-couchbase caches the buckets
952		// to be sure no such bucket exists right now
953		// we trigger a refresh
954		p.reload()
955		cbNamespace = p.getPool()
956
957		// and then check one more time
958		logging.Infof(" Retrying bucket %s", name)
959		cbbucket, err = cbNamespace.GetBucket(name)
960		if err != nil {
961			// really no such bucket exists
962			return nil, errors.NewCbKeyspaceNotFoundError(err, "keyspace "+name)
963		}
964	}
965
966	if strings.EqualFold(cbbucket.Type, "memcached") {
967		return nil, errors.NewCbBucketTypeNotSupportedError(nil, cbbucket.Type)
968	}
969
970	rv := &keyspace{
971		namespace: p,
972		name:      name,
973		cbbucket:  cbbucket,
974	}
975
976	// Initialize index providers
977	rv.viewIndexer = newViewIndexer(rv)
978
979	logging.Infof("Created New Bucket %s", name)
980
981	//discover existing indexes
982	if ierr := rv.loadIndexes(); ierr != nil {
983		logging.Warnf("Error loading indexes for keyspace %s, Error %v", name, ierr)
984	}
985
986	var qerr errors.Error
987	rv.gsiIndexer, qerr = gsi.NewGSIIndexer(p.store.URL(), p.Name(), name)
988	if qerr != nil {
989		logging.Warnf("Error loading GSI indexes for keyspace %s. Error %v", name, qerr)
990	}
991
992	// Create a bucket updater that will keep the couchbase bucket fresh.
993	cbbucket.RunBucketUpdater(p.KeyspaceDeleteCallback)
994
995	return rv, nil
996}
997
998// Called by go-couchbase if a configured keyspace is deleted
999func (p *namespace) KeyspaceDeleteCallback(name string, err error) {
1000
1001	p.lock.Lock()
1002	defer p.lock.Unlock()
1003
1004	ks, ok := p.keyspaceCache[name]
1005	if ok && ks.cbKeyspace != nil {
1006		logging.Infof("Keyspace %v being deleted", name)
1007		ks.cbKeyspace.(*keyspace).deleted = true
1008		delete(p.keyspaceCache, name)
1009
1010	} else {
1011		logging.Warnf("Keyspace %v not configured on this server", name)
1012	}
1013}
1014
1015func (b *keyspace) NamespaceId() string {
1016	return b.namespace.Id()
1017}
1018
1019func (b *keyspace) Namespace() datastore.Namespace {
1020	return b.namespace
1021}
1022
1023func (b *keyspace) Id() string {
1024	return b.Name()
1025}
1026
1027func (b *keyspace) Name() string {
1028	return b.name
1029}
1030
1031func (b *keyspace) Count(context datastore.QueryContext) (int64, errors.Error) {
1032	totalCount, err := b.cbbucket.GetCount(true)
1033	if err != nil {
1034		return 0, errors.NewCbKeyspaceCountError(nil, "keyspace "+b.Name()+"Error "+err.Error())
1035	}
1036	return totalCount, nil
1037}
1038
1039func (b *keyspace) Indexer(name datastore.IndexType) (datastore.Indexer, errors.Error) {
1040	switch name {
1041	case datastore.GSI, datastore.DEFAULT:
1042		if b.gsiIndexer != nil {
1043			return b.gsiIndexer, nil
1044		}
1045		return nil, errors.NewCbIndexerNotImplementedError(nil, fmt.Sprintf("GSI may not be enabled"))
1046	case datastore.VIEW:
1047		return b.viewIndexer, nil
1048	default:
1049		return nil, errors.NewCbIndexerNotImplementedError(nil, fmt.Sprintf("Type %s", name))
1050	}
1051}
1052
1053func (b *keyspace) Indexers() ([]datastore.Indexer, errors.Error) {
1054	indexers := make([]datastore.Indexer, 0, 2)
1055	if b.gsiIndexer != nil {
1056		indexers = append(indexers, b.gsiIndexer)
1057	}
1058
1059	indexers = append(indexers, b.viewIndexer)
1060	return indexers, nil
1061}
1062
1063func (b *keyspace) Fetch(keys []string, fetchMap map[string]value.AnnotatedValue,
1064	context datastore.QueryContext, subPaths []string) []errors.Error {
1065	var bulkResponse map[string]*gomemcached.MCResponse
1066	var mcr *gomemcached.MCResponse
1067	var err error
1068
1069	_subPaths := subPaths
1070	noVirtualDocAttr := false
1071
1072	if len(_subPaths) > 0 && _subPaths[0] != "$document" {
1073		_subPaths = append([]string{"$document"}, _subPaths...)
1074		noVirtualDocAttr = true
1075	}
1076
1077	l := len(keys)
1078	if l == 0 {
1079		return nil
1080	}
1081
1082	if l == 1 {
1083		mcr, err = b.cbbucket.GetsMC(keys[0], context.GetReqDeadline(), _subPaths)
1084	} else {
1085		bulkResponse, err = b.cbbucket.GetBulk(keys, context.GetReqDeadline(), _subPaths)
1086		defer b.cbbucket.ReleaseGetBulkPools(bulkResponse)
1087	}
1088
1089	if err != nil {
1090		// Ignore "Not found" keys
1091		if !isNotFoundError(err) {
1092			if cb.IsReadTimeOutError(err) {
1093				logging.Errorf(err.Error())
1094			}
1095			return []errors.Error{errors.NewCbBulkGetError(err, "")}
1096		}
1097	}
1098
1099	i := 0
1100	if l == 1 {
1101		if mcr != nil && err == nil {
1102			if len(_subPaths) > 0 {
1103				fetchMap[keys[0]] = getSubDocFetchResults(keys[0], mcr, _subPaths, noVirtualDocAttr)
1104			} else {
1105				fetchMap[keys[0]] = doFetch(keys[0], mcr)
1106			}
1107			i++
1108		}
1109	} else {
1110		if len(_subPaths) > 0 {
1111			for k, v := range bulkResponse {
1112				fetchMap[k] = getSubDocFetchResults(k, v, _subPaths, noVirtualDocAttr)
1113				i++
1114			}
1115		} else {
1116			for k, v := range bulkResponse {
1117				fetchMap[k] = doFetch(k, v)
1118				i++
1119			}
1120		}
1121
1122	}
1123
1124	logging.Debugf("Fetched %d keys ", i)
1125
1126	return nil
1127}
1128
1129func doFetch(k string, v *gomemcached.MCResponse) value.AnnotatedValue {
1130	val := value.NewAnnotatedValue(value.NewParsedValue(v.Body, false))
1131
1132	var flags, expiration uint32
1133
1134	if len(v.Extras) >= 4 {
1135		flags = binary.BigEndian.Uint32(v.Extras[0:4])
1136	}
1137
1138	if len(v.Extras) >= 8 {
1139		expiration = binary.BigEndian.Uint32(v.Extras[4:8])
1140	}
1141
1142	meta_type := "json"
1143	if val.Type() == value.BINARY {
1144		meta_type = "base64"
1145	}
1146
1147	val.SetAttachment("meta", map[string]interface{}{
1148		"id":         k,
1149		"cas":        v.Cas,
1150		"type":       meta_type,
1151		"flags":      flags,
1152		"expiration": expiration,
1153	})
1154
1155	// Uncomment when needed
1156	//logging.Debugf("CAS Value for key %v is %v flags %v", k, uint64(v.Cas), meta_flags)
1157
1158	return val
1159}
1160
1161func getSubDocFetchResults(k string, v *gomemcached.MCResponse, subPaths []string, noVirtualDocAttr bool) value.AnnotatedValue {
1162	responseIter := 0
1163	i := 0
1164	xVal := map[string]interface{}{}
1165
1166	for i < len(subPaths) {
1167		// For the xattr contents - $document
1168		xattrError := gomemcached.Status(binary.BigEndian.Uint16(v.Body[responseIter+0:]))
1169		xattrValueLen := int(binary.BigEndian.Uint32(v.Body[responseIter+2:]))
1170
1171		xattrValue := v.Body[responseIter+6 : responseIter+6+xattrValueLen]
1172
1173		// When xattr value not defined for a doc, set missing
1174		tmpVal := value.NewValue(xattrValue)
1175
1176		if xattrError != gomemcached.SUBDOC_PATH_NOT_FOUND {
1177			xVal[subPaths[i]] = tmpVal.Actual()
1178		}
1179
1180		// Calculate actual doc value
1181		responseIter = responseIter + 6 + xattrValueLen
1182		i = i + 1
1183	}
1184
1185	// For the actual document contents -
1186	respError := gomemcached.Status(binary.BigEndian.Uint16(v.Body[responseIter+0:]))
1187	respValueLen := int(binary.BigEndian.Uint32(v.Body[responseIter+2:]))
1188
1189	respValue := v.Body[responseIter+6 : responseIter+6+respValueLen]
1190
1191	// For deleted documents with respError path not found set to null
1192	var val value.AnnotatedValue
1193
1194	// For non deleted documents
1195	if respError == gomemcached.SUBDOC_PATH_NOT_FOUND {
1196		// Final Doc value
1197		val = value.NewAnnotatedValue(nil)
1198	} else {
1199		val = value.NewAnnotatedValue(value.NewParsedValue(respValue, false))
1200	}
1201
1202	// type
1203	meta_type := "json"
1204	if val.Type() == value.BINARY {
1205		meta_type = "base64"
1206	}
1207
1208	// Get flags and expiration from the $document virtual xattrs
1209	docMeta := xVal["$document"].(map[string]interface{})
1210
1211	// Convert unmarshalled int64 values to uint32
1212	flags := uint32(value.NewValue(docMeta["flags"]).(value.NumberValue).Int64())
1213	exptime := uint32(value.NewValue(docMeta["exptime"]).(value.NumberValue).Int64())
1214
1215	if noVirtualDocAttr {
1216		delete(xVal, "$document")
1217	}
1218
1219	a := map[string]interface{}{
1220		"id":         k,
1221		"cas":        v.Cas,
1222		"type":       meta_type,
1223		"flags":      flags,
1224		"expiration": exptime,
1225	}
1226
1227	if len(xVal) > 0 {
1228		a["xattrs"] = xVal
1229	}
1230
1231	val.SetAttachment("meta", a)
1232
1233	return val
1234}
1235
1236const (
1237	INSERT = 0x01
1238	UPDATE = 0x02
1239	UPSERT = 0x04
1240)
1241
1242func opToString(op int) string {
1243
1244	switch op {
1245	case INSERT:
1246		return "insert"
1247	case UPDATE:
1248		return "update"
1249	case UPSERT:
1250		return "upsert"
1251	}
1252
1253	return "unknown operation"
1254}
1255
1256func isNotFoundError(err error) bool {
1257	return cb.IsKeyNoEntError(err)
1258}
1259
1260func isEExistError(err error) bool {
1261	return cb.IsKeyEExistsError(err)
1262}
1263
1264func getMeta(key string, meta map[string]interface{}) (cas uint64, flags uint32, err error) {
1265
1266	defer func() {
1267		if r := recover(); r != nil {
1268			err = fmt.Errorf("Recovered in f %v", r)
1269		}
1270	}()
1271
1272	if _, ok := meta["cas"]; ok {
1273		cas = meta["cas"].(uint64)
1274	} else {
1275		return 0, 0, fmt.Errorf("Cas value not found for key %v", key)
1276	}
1277
1278	if _, ok := meta["flags"]; ok {
1279		flags = meta["flags"].(uint32)
1280	} else {
1281		return 0, 0, fmt.Errorf("Flags value not found for key %v", key)
1282	}
1283
1284	return cas, flags, nil
1285
1286}
1287
1288func (b *keyspace) performOp(op int, inserts []value.Pair) ([]value.Pair, errors.Error) {
1289
1290	if len(inserts) == 0 {
1291		return nil, nil
1292	}
1293
1294	insertedKeys := make([]value.Pair, 0, len(inserts))
1295	var err error
1296
1297	for _, kv := range inserts {
1298		key := kv.Name
1299		val := kv.Value.ActualForIndex()
1300
1301		//mv := kv.Value.GetAttachment("meta")
1302
1303		// TODO Need to also set meta
1304		switch op {
1305
1306		case INSERT:
1307			var added bool
1308			// add the key to the backend
1309			added, err = b.cbbucket.Add(key, 0, val)
1310			if added == false {
1311				// false & err == nil => given key aready exists in the bucket
1312				if err != nil {
1313					err = errors.NewError(err, "Key "+key)
1314				} else {
1315					err = errors.NewError(nil, "Duplicate Key "+key)
1316				}
1317			}
1318		case UPDATE:
1319			// check if the key exists and if so then use the cas value
1320			// to update the key
1321			var meta map[string]interface{}
1322			var cas uint64
1323			var flags uint32
1324
1325			an := kv.Value.(value.AnnotatedValue)
1326			meta = an.GetAttachment("meta").(map[string]interface{})
1327
1328			cas, flags, err = getMeta(key, meta)
1329			if err != nil {
1330				// Don't perform the update if the meta values are not found
1331				logging.Errorf("Failed to get meta values for key <ud>%v</ud>, error %v", key, err)
1332			} else {
1333
1334				logging.Debugf("CAS Value (Update) for key <ud>%v</ud> is %v flags <ud>%v</ud> value <ud>%v</ud>", key, uint64(cas), flags, val)
1335				_, _, err = b.cbbucket.CasWithMeta(key, int(flags), 0, uint64(cas), val)
1336			}
1337
1338		case UPSERT:
1339			err = b.cbbucket.Set(key, 0, val)
1340		}
1341
1342		if err != nil {
1343			if isEExistError(err) {
1344				logging.Errorf("Failed to perform update on key <ud>%s</ud>. CAS mismatch due to concurrent modifications", key)
1345			} else {
1346				logging.Errorf("Failed to perform <ud>%s</ud> on key <ud>%s</ud> for Keyspace %s.", opToString(op), key, b.Name())
1347			}
1348		} else {
1349			insertedKeys = append(insertedKeys, kv)
1350		}
1351	}
1352
1353	if len(insertedKeys) == 0 {
1354		return nil, errors.NewCbDMLError(err, "Failed to perform "+opToString(op))
1355	}
1356
1357	return insertedKeys, nil
1358
1359}
1360
1361func (b *keyspace) Insert(inserts []value.Pair) ([]value.Pair, errors.Error) {
1362	return b.performOp(INSERT, inserts)
1363
1364}
1365
1366func (b *keyspace) Update(updates []value.Pair) ([]value.Pair, errors.Error) {
1367	return b.performOp(UPDATE, updates)
1368}
1369
1370func (b *keyspace) Upsert(upserts []value.Pair) ([]value.Pair, errors.Error) {
1371	return b.performOp(UPSERT, upserts)
1372}
1373
1374func (b *keyspace) Delete(deletes []string, context datastore.QueryContext) ([]string, errors.Error) {
1375
1376	failedDeletes := make([]string, 0)
1377	actualDeletes := make([]string, 0)
1378	var err error
1379	for _, key := range deletes {
1380		if err = b.cbbucket.Delete(key); err != nil {
1381			if !isNotFoundError(err) {
1382				logging.Infof("Failed to delete key <ud>%s</ud> Error %s", key, err)
1383				failedDeletes = append(failedDeletes, key)
1384			}
1385		} else {
1386			actualDeletes = append(actualDeletes, key)
1387		}
1388	}
1389
1390	if len(failedDeletes) > 0 {
1391		return actualDeletes, errors.NewCbDeleteFailedError(err, "Some keys were not deleted "+fmt.Sprintf("%v", failedDeletes))
1392	}
1393
1394	return actualDeletes, nil
1395}
1396
1397func (b *keyspace) Release() {
1398	b.deleted = true
1399	b.cbbucket.Close()
1400}
1401
1402func (b *keyspace) refreshIndexer(url string, poolName string) {
1403	var err error
1404	b.gsiIndexer, err = gsi.NewGSIIndexer(url, poolName, b.Name())
1405	if err == nil {
1406		logging.Infof(" GSI Indexer loaded ")
1407	}
1408}
1409
1410func (b *keyspace) loadIndexes() (err errors.Error) {
1411	viewIndexer := b.viewIndexer.(*viewIndexer)
1412	if err1 := viewIndexer.loadViewIndexes(); err1 != nil {
1413		err = err1
1414	}
1415	return
1416}
1417
1418// primaryIndex performs full keyspace scans.
1419type primaryIndex struct {
1420	viewIndex
1421}
1422
1423func (pi *primaryIndex) KeyspaceId() string {
1424	return pi.keyspace.Id()
1425}
1426
1427func (pi *primaryIndex) Id() string {
1428	return pi.Name()
1429}
1430
1431func (pi *primaryIndex) Name() string {
1432	return pi.name
1433}
1434
1435func (pi *primaryIndex) Type() datastore.IndexType {
1436	return pi.viewIndex.Type()
1437}
1438
1439func (pi *primaryIndex) SeekKey() expression.Expressions {
1440	return pi.viewIndex.SeekKey()
1441}
1442
1443func (pi *primaryIndex) RangeKey() expression.Expressions {
1444	return pi.viewIndex.RangeKey()
1445}
1446
1447func (pi *primaryIndex) Condition() expression.Expression {
1448	return pi.viewIndex.Condition()
1449}
1450
1451func (pi *primaryIndex) State() (state datastore.IndexState, msg string, err errors.Error) {
1452	return pi.viewIndex.State()
1453}
1454
1455func (pi *primaryIndex) Statistics(requestId string, span *datastore.Span) (
1456	datastore.Statistics, errors.Error) {
1457	return pi.viewIndex.Statistics(requestId, span)
1458}
1459
1460func (pi *primaryIndex) Drop(requestId string) errors.Error {
1461	return pi.viewIndex.Drop(requestId)
1462}
1463
1464func (pi *primaryIndex) Scan(requestId string, span *datastore.Span, distinct bool, limit int64,
1465	cons datastore.ScanConsistency, vector timestamp.Vector, conn *datastore.IndexConnection) {
1466	pi.viewIndex.Scan(requestId, span, distinct, limit, cons, vector, conn)
1467}
1468
1469func (pi *primaryIndex) ScanEntries(requestId string, limit int64, cons datastore.ScanConsistency,
1470	vector timestamp.Vector, conn *datastore.IndexConnection) {
1471	pi.viewIndex.ScanEntries(requestId, limit, cons, vector, conn)
1472}
1473