1//  Copyright (c) 2014 Couchbase, Inc.
2//  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
3//  except in compliance with the License. You may obtain a copy of the License at
4//    http://www.apache.org/licenses/LICENSE-2.0
5//  Unless required by applicable law or agreed to in writing, software distributed under the
6//  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
7//  either express or implied. See the License for the specific language governing permissions
8//  and limitations under the License.
9
10/*
11
12Package file provides a file-based implementation of the datastore
13package.
14
15*/
16package file
17
18import (
19	"encoding/json"
20	"fmt"
21	"io/ioutil"
22	"net/http"
23	"os"
24	"path/filepath"
25	"strings"
26	"sync"
27
28	"github.com/couchbase/query/auth"
29	"github.com/couchbase/query/datastore"
30	"github.com/couchbase/query/errors"
31	"github.com/couchbase/query/expression"
32	"github.com/couchbase/query/logging"
33	"github.com/couchbase/query/timestamp"
34	"github.com/couchbase/query/util"
35	"github.com/couchbase/query/value"
36)
37
38// datastore is the root for the file-based Datastore.
39type store struct {
40	path           string
41	namespaces     map[string]*namespace
42	namespaceNames []string
43
44	users map[string]*datastore.User
45}
46
47func (s *store) Id() string {
48	return s.path
49}
50
51func (s *store) URL() string {
52	return "file://" + s.path
53}
54
55func (s *store) Info() datastore.Info {
56	return &infoImpl{}
57}
58
59type infoImpl struct {
60}
61
62func (i *infoImpl) Version() string {
63	return util.VERSION
64}
65
66func (info *infoImpl) Topology() ([]string, []errors.Error) {
67	return []string{}, nil
68}
69
70func (info *infoImpl) Services(node string) (map[string]interface{}, []errors.Error) {
71	return map[string]interface{}{}, nil
72}
73
74func (s *store) NamespaceIds() ([]string, errors.Error) {
75	return s.NamespaceNames()
76}
77
78func (s *store) NamespaceNames() ([]string, errors.Error) {
79	return s.namespaceNames, nil
80}
81
82func (s *store) NamespaceById(id string) (p datastore.Namespace, e errors.Error) {
83	return s.NamespaceByName(id)
84}
85
86func (s *store) NamespaceByName(name string) (p datastore.Namespace, e errors.Error) {
87	p, ok := s.namespaces[strings.ToUpper(name)]
88	if !ok {
89		e = errors.NewFileNamespaceNotFoundError(nil, name)
90	}
91
92	return
93}
94
95func (s *store) Authorize(*auth.Privileges, auth.Credentials, *http.Request) (auth.AuthenticatedUsers, errors.Error) {
96	return nil, nil
97}
98
99func (s *store) CredsString(req *http.Request) string {
100	return ""
101}
102
103func (s *store) SetLogLevel(level logging.Level) {
104	// No-op. Uses query engine logger.
105}
106
107func (s *store) Inferencer(name datastore.InferenceType) (datastore.Inferencer, errors.Error) {
108	return nil, errors.NewOtherNotImplementedError(nil, "INFER")
109}
110
111func (s *store) Inferencers() ([]datastore.Inferencer, errors.Error) {
112	return nil, errors.NewOtherNotImplementedError(nil, "INFER")
113}
114
115func (s *store) AuditInfo() (*datastore.AuditInfo, errors.Error) {
116	return nil, errors.NewOtherNotImplementedError(nil, "AuditInfo")
117}
118
119func (s *store) ProcessAuditUpdateStream(callb func(uid string) error) errors.Error {
120	return errors.NewOtherNotImplementedError(nil, "ProcessAuditUpdateStream")
121}
122
123func (s *store) UserInfo() (value.Value, errors.Error) {
124	// Return an array of no users.
125	jsonData := make([]interface{}, 0)
126	v := value.NewValue(jsonData)
127	return v, nil
128}
129
130func (s *store) GetUserInfoAll() ([]datastore.User, errors.Error) {
131	ret := make([]datastore.User, 0, len(s.users))
132	for _, v := range s.users {
133		ret = append(ret, *v)
134	}
135	return ret, nil
136}
137
138func (s *store) PutUserInfo(u *datastore.User) errors.Error {
139	s.users[u.Id] = u
140	return nil
141}
142
143func (s *store) GetRolesAll() ([]datastore.Role, errors.Error) {
144	return []datastore.Role{
145		datastore.Role{Name: "cluster_admin"},
146		datastore.Role{Name: "replication_admin"},
147		datastore.Role{Name: "bucket_admin", Bucket: "*"},
148	}, nil
149}
150
151// NewStore creates a new file-based store for the given filepath.
152func NewDatastore(path string) (s datastore.Datastore, e errors.Error) {
153	path, er := filepath.Abs(path)
154	if er != nil {
155		return nil, errors.NewFileDatastoreError(er, "")
156	}
157
158	fs := &store{path: path, users: make(map[string]*datastore.User, 4)}
159
160	e = fs.loadNamespaces()
161	if e != nil {
162		return
163	}
164
165	s = fs
166	return
167}
168
169func (s *store) loadNamespaces() (e errors.Error) {
170	dirEntries, er := ioutil.ReadDir(s.path)
171	if er != nil {
172		return errors.NewFileDatastoreError(er, "")
173	}
174
175	s.namespaces = make(map[string]*namespace, len(dirEntries))
176	s.namespaceNames = make([]string, 0, len(dirEntries))
177
178	var p *namespace
179	for _, dirEntry := range dirEntries {
180		if dirEntry.IsDir() {
181			s.namespaceNames = append(s.namespaceNames, dirEntry.Name())
182			diru := strings.ToUpper(dirEntry.Name())
183			if _, ok := s.namespaces[diru]; ok {
184				return errors.NewFileDuplicateNamespaceError(nil, dirEntry.Name())
185			}
186
187			p, e = newNamespace(s, dirEntry.Name())
188			if e != nil {
189				return
190			}
191
192			s.namespaces[diru] = p
193		}
194	}
195
196	return
197}
198
199// namespace represents a file-based Namespace.
200type namespace struct {
201	store         *store
202	name          string
203	keyspaces     map[string]*keyspace
204	keyspaceNames []string
205}
206
207func (p *namespace) DatastoreId() string {
208	return p.store.Id()
209}
210
211func (p *namespace) Id() string {
212	return p.Name()
213}
214
215func (p *namespace) Name() string {
216	return p.name
217}
218
219func (p *namespace) KeyspaceIds() ([]string, errors.Error) {
220	return p.KeyspaceNames()
221}
222
223func (p *namespace) KeyspaceNames() ([]string, errors.Error) {
224	return p.keyspaceNames, nil
225}
226
227func (p *namespace) KeyspaceById(id string) (b datastore.Keyspace, e errors.Error) {
228	return p.KeyspaceByName(id)
229}
230
231func (p *namespace) KeyspaceByName(name string) (b datastore.Keyspace, e errors.Error) {
232	b, ok := p.keyspaces[strings.ToUpper(name)]
233	if !ok {
234		e = errors.NewFileKeyspaceNotFoundError(nil, name)
235	}
236
237	return
238}
239
240func (p *namespace) MetadataVersion() uint64 {
241	return 0
242}
243
244func (p *namespace) path() string {
245	return filepath.Join(p.store.path, p.name)
246}
247
248// newNamespace creates a new namespace.
249func newNamespace(s *store, dir string) (p *namespace, e errors.Error) {
250	p = new(namespace)
251	p.store = s
252	p.name = dir
253
254	e = p.loadKeyspaces()
255	return
256}
257
258func (p *namespace) loadKeyspaces() (e errors.Error) {
259	dirEntries, er := ioutil.ReadDir(p.path())
260	if er != nil {
261		return errors.NewFileDatastoreError(er, "")
262	}
263
264	p.keyspaces = make(map[string]*keyspace, len(dirEntries))
265	p.keyspaceNames = make([]string, 0, len(dirEntries))
266
267	var b *keyspace
268	for _, dirEntry := range dirEntries {
269		if dirEntry.IsDir() {
270			diru := strings.ToUpper(dirEntry.Name())
271			if _, ok := p.keyspaces[diru]; ok {
272				return errors.NewFileDuplicateKeyspaceError(nil, dirEntry.Name())
273			}
274
275			b, e = newKeyspace(p, dirEntry.Name())
276			if e != nil {
277				return
278			}
279
280			p.keyspaces[diru] = b
281			p.keyspaceNames = append(p.keyspaceNames, b.Name())
282		}
283	}
284
285	return
286}
287
288// keyspace is a file-based keyspace.
289type keyspace struct {
290	namespace *namespace
291	name      string
292	fi        datastore.Indexer
293	fileLock  sync.Mutex
294}
295
296func (b *keyspace) NamespaceId() string {
297	return b.namespace.Id()
298}
299
300func (b *keyspace) Namespace() datastore.Namespace {
301	return b.namespace
302}
303
304func (b *keyspace) Id() string {
305	return b.Name()
306}
307
308func (b *keyspace) Name() string {
309	return b.name
310}
311
312func (b *keyspace) Count(context datastore.QueryContext) (int64, errors.Error) {
313	dirEntries, er := ioutil.ReadDir(b.path())
314	if er != nil {
315		return 0, errors.NewFileDatastoreError(er, "")
316	}
317	return int64(len(dirEntries)), nil
318}
319
320func (b *keyspace) Indexer(name datastore.IndexType) (datastore.Indexer, errors.Error) {
321	return b.fi, nil
322}
323
324func (b *keyspace) Indexers() ([]datastore.Indexer, errors.Error) {
325	return []datastore.Indexer{b.fi}, nil
326}
327
328func (b *keyspace) Fetch(keys []string, keysMap map[string]value.AnnotatedValue,
329	context datastore.QueryContext, subPaths []string) []errors.Error {
330	var errs []errors.Error
331
332	for _, k := range keys {
333		item, e := b.fetchOne(k)
334
335		if e != nil {
336			if os.IsNotExist(e.Cause()) {
337				// file doesn't exist => key denotes non-existent doc => ignore it
338				continue
339			}
340			if errs == nil {
341				errs = make([]errors.Error, 0, 1)
342			}
343			errs = append(errs, e)
344			continue
345		}
346
347		if item != nil {
348			item.SetAttachment("meta", map[string]interface{}{
349				"id": k,
350			})
351		}
352
353		keysMap[k] = item
354	}
355
356	return errs
357}
358
359func (b *keyspace) fetchOne(key string) (value.AnnotatedValue, errors.Error) {
360	path := filepath.Join(b.path(), key+".json")
361	item, e := fetch(path)
362	if e != nil {
363		item = nil
364	}
365
366	return item, e
367}
368
369const (
370	INSERT = 0x01
371	UPDATE = 0x02
372	UPSERT = 0x04
373)
374
375func opToString(op int) string {
376
377	switch op {
378	case INSERT:
379		return "insert"
380	case UPDATE:
381		return "update"
382	case UPSERT:
383		return "upsert"
384	}
385
386	return "unknown operation"
387}
388
389func (b *keyspace) performOp(op int, kvPairs []value.Pair) ([]value.Pair, errors.Error) {
390
391	if len(kvPairs) == 0 {
392		return nil, errors.NewFileNoKeysInsertError(nil, "keyspace "+b.Name())
393	}
394
395	insertedKeys := make([]value.Pair, 0)
396	var returnErr errors.Error
397
398	// this lock can be mode more granular FIXME
399	b.fileLock.Lock()
400	defer b.fileLock.Unlock()
401
402	for _, kv := range kvPairs {
403		var file *os.File
404		var err error
405
406		key := kv.Name
407		value, _ := json.Marshal(kv.Value.Actual())
408		filename := filepath.Join(b.path(), key+".json")
409
410		switch op {
411
412		case INSERT:
413			// add the key only if it doesn't exist
414			if _, err = os.Stat(filename); err == nil {
415				err = errors.NewFileKeyExists(nil, "Key (File) "+filename)
416			} else {
417				// create and write the file
418				if file, err = os.Create(filename); err == nil {
419					_, err = file.Write(value)
420					file.Close()
421				}
422			}
423		case UPDATE:
424			// add the key only if it doesn't exist
425			if _, err = os.Stat(filename); err == nil {
426				// open and write the file
427				if file, err = os.OpenFile(filename, os.O_TRUNC|os.O_RDWR, 0666); err == nil {
428					_, err = file.Write(value)
429					file.Close()
430				}
431			}
432
433		case UPSERT:
434			// open the file for writing, if doesn't exist then create
435			if file, err = os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0666); err == nil {
436				_, err = file.Write(value)
437				file.Close()
438			}
439		}
440
441		if err != nil {
442			returnErr = errors.NewFileDMLError(returnErr, opToString(op)+" Failed "+err.Error())
443		} else {
444			insertedKeys = append(insertedKeys, kv)
445		}
446	}
447
448	return insertedKeys, returnErr
449
450}
451
452func (b *keyspace) Insert(inserts []value.Pair) ([]value.Pair, errors.Error) {
453	return b.performOp(INSERT, inserts)
454}
455
456func (b *keyspace) Update(updates []value.Pair) ([]value.Pair, errors.Error) {
457	return b.performOp(UPDATE, updates)
458}
459
460func (b *keyspace) Upsert(upserts []value.Pair) ([]value.Pair, errors.Error) {
461	return b.performOp(UPSERT, upserts)
462}
463
464func (b *keyspace) Delete(deletes []string, context datastore.QueryContext) ([]string, errors.Error) {
465
466	var fileError []string
467	var deleted []string
468	for _, key := range deletes {
469		filename := filepath.Join(b.path(), key+".json")
470		if err := os.Remove(filename); err != nil {
471			if !os.IsNotExist(err) {
472				fileError = append(fileError, err.Error())
473			}
474		} else {
475			deleted = append(deleted, key)
476		}
477	}
478
479	if len(fileError) > 0 {
480		errLine := fmt.Sprintf("Delete failed on some keys %v", fileError)
481		return deleted, errors.NewFileDatastoreError(nil, errLine)
482	}
483
484	return deleted, nil
485}
486
487func (b *keyspace) Release() {
488}
489
490func (b *keyspace) path() string {
491	return filepath.Join(b.namespace.path(), b.name)
492}
493
494// newKeyspace creates a new keyspace.
495func newKeyspace(p *namespace, dir string) (b *keyspace, e errors.Error) {
496	b = new(keyspace)
497	b.namespace = p
498	b.name = dir
499
500	fi, er := os.Stat(b.path())
501	if er != nil {
502		return nil, errors.NewFileDatastoreError(er, "")
503	}
504
505	if !fi.IsDir() {
506		return nil, errors.NewFileKeyspaceNotDirError(nil, "Keyspace path "+dir)
507	}
508
509	b.fi = newFileIndexer(b)
510	b.fi.CreatePrimaryIndex("", "#primary", nil)
511
512	return
513}
514
515type fileIndexer struct {
516	keyspace *keyspace
517	indexes  map[string]datastore.Index
518	primary  datastore.PrimaryIndex
519}
520
521func newFileIndexer(keyspace *keyspace) datastore.Indexer {
522
523	return &fileIndexer{
524		keyspace: keyspace,
525		indexes:  make(map[string]datastore.Index),
526	}
527}
528
529func (fi *fileIndexer) KeyspaceId() string {
530	return fi.keyspace.Id()
531}
532
533func (fi *fileIndexer) Name() datastore.IndexType {
534	return datastore.DEFAULT
535}
536
537func (fi *fileIndexer) IndexIds() ([]string, errors.Error) {
538	rv := make([]string, 0, len(fi.indexes))
539	for name, _ := range fi.indexes {
540		rv = append(rv, name)
541	}
542	return rv, nil
543}
544
545func (fi *fileIndexer) IndexNames() ([]string, errors.Error) {
546	rv := make([]string, 0, len(fi.indexes))
547	for name, _ := range fi.indexes {
548		rv = append(rv, name)
549	}
550	return rv, nil
551}
552
553func (fi *fileIndexer) IndexById(id string) (datastore.Index, errors.Error) {
554	return fi.IndexByName(id)
555}
556
557func (fi *fileIndexer) IndexByName(name string) (datastore.Index, errors.Error) {
558	index, ok := fi.indexes[name]
559	if !ok {
560		return nil, errors.NewFileIdxNotFound(nil, name)
561	}
562	return index, nil
563}
564
565func (fi *fileIndexer) PrimaryIndexes() ([]datastore.PrimaryIndex, errors.Error) {
566	return []datastore.PrimaryIndex{fi.primary}, nil
567}
568
569func (fi *fileIndexer) Indexes() ([]datastore.Index, errors.Error) {
570	return []datastore.Index{fi.primary}, nil
571}
572
573func (fi *fileIndexer) CreatePrimaryIndex(requestId, name string, with value.Value) (
574	datastore.PrimaryIndex, errors.Error) {
575	if fi.primary == nil {
576		pi := new(primaryIndex)
577		fi.primary = pi
578		pi.keyspace = fi.keyspace
579		pi.name = name
580		pi.indexer = fi
581		fi.indexes[pi.name] = pi
582	}
583
584	return fi.primary, nil
585}
586
587func (b *fileIndexer) CreateIndex(requestId, name string, seekKey, rangeKey expression.Expressions,
588	where expression.Expression, with value.Value) (datastore.Index, errors.Error) {
589	return nil, errors.NewFileNotSupported(nil, "CREATE INDEX is not supported for file-based datastore.")
590}
591
592func (b *fileIndexer) BuildIndexes(requestId string, names ...string) errors.Error {
593	return errors.NewFileNotSupported(nil, "BUILD INDEXES is not supported for file-based datastore.")
594}
595
596func (b *fileIndexer) Refresh() errors.Error {
597	return nil
598}
599
600func (b *fileIndexer) MetadataVersion() uint64 {
601	return 0
602}
603
604func (b *fileIndexer) SetLogLevel(level logging.Level) {
605	// No-op, uses query engine logger
606}
607
608// primaryIndex performs full keyspace scans.
609type primaryIndex struct {
610	name     string
611	keyspace *keyspace
612	indexer  *fileIndexer
613}
614
615func (pi *primaryIndex) KeyspaceId() string {
616	return pi.keyspace.Id()
617}
618
619func (pi *primaryIndex) Id() string {
620	return pi.Name()
621}
622
623func (pi *primaryIndex) Name() string {
624	return pi.name
625}
626
627func (pi *primaryIndex) Type() datastore.IndexType {
628	return datastore.DEFAULT
629}
630
631func (pi *primaryIndex) Indexer() datastore.Indexer {
632	return pi.indexer
633}
634
635func (pi *primaryIndex) SeekKey() expression.Expressions {
636	return nil
637}
638
639func (pi *primaryIndex) RangeKey() expression.Expressions {
640	// FIXME
641	return nil
642}
643
644func (pi *primaryIndex) Condition() expression.Expression {
645	return nil
646}
647
648func (pi *primaryIndex) IsPrimary() bool {
649	return true
650}
651
652func (pi *primaryIndex) State() (state datastore.IndexState, msg string, err errors.Error) {
653	return datastore.ONLINE, "", nil
654}
655
656func (pi *primaryIndex) Statistics(requestId string, span *datastore.Span) (
657	datastore.Statistics, errors.Error) {
658	return nil, nil
659}
660
661func (pi *primaryIndex) Drop(requestId string) errors.Error {
662	return errors.NewFilePrimaryIdxNoDropError(nil, pi.Name())
663}
664
665func (pi *primaryIndex) Scan(requestId string, span *datastore.Span, distinct bool, limit int64,
666	cons datastore.ScanConsistency, vector timestamp.Vector, conn *datastore.IndexConnection) {
667	defer close(conn.EntryChannel())
668
669	// For primary indexes, bounds must always be strings, so we
670	// can just enforce that directly
671	low, high := "", ""
672
673	// Ensure that lower bound is a string, if any
674	if len(span.Range.Low) > 0 {
675		a := span.Range.Low[0].Actual()
676		switch a := a.(type) {
677		case string:
678			low = a
679		default:
680			conn.Error(errors.NewFileDatastoreError(nil, fmt.Sprintf("Invalid lower bound %v of type %T.", a, a)))
681			return
682		}
683	}
684
685	// Ensure that upper bound is a string, if any
686	if len(span.Range.High) > 0 {
687		a := span.Range.High[0].Actual()
688		switch a := a.(type) {
689		case string:
690			high = a
691		default:
692			conn.Error(errors.NewFileDatastoreError(nil, fmt.Sprintf("Invalid upper bound %v of type %T.", a, a)))
693			return
694		}
695	}
696
697	dirEntries, er := ioutil.ReadDir(pi.keyspace.path())
698	if er != nil {
699		conn.Error(errors.NewFileDatastoreError(er, ""))
700		return
701	}
702
703	var n int64 = 0
704	for _, dirEntry := range dirEntries {
705
706		logging.Debugf("Dir entry being scanned <ud>%v</ud> \n", dirEntry.Name())
707		if limit > 0 && n > limit {
708			break
709		}
710
711		id := documentPathToId(dirEntry.Name())
712
713		if low != "" &&
714			(id < low ||
715				(id == low && (span.Range.Inclusion&datastore.LOW == 0))) {
716			continue
717		}
718
719		low = ""
720
721		if high != "" &&
722			(id > high ||
723				(id == high && (span.Range.Inclusion&datastore.HIGH == 0))) {
724			break
725		}
726
727		if !dirEntry.IsDir() {
728			entry := datastore.IndexEntry{PrimaryKey: id}
729			conn.EntryChannel() <- &entry
730			n++
731		}
732	}
733}
734
735func (pi *primaryIndex) ScanEntries(requestId string, limit int64, cons datastore.ScanConsistency,
736	vector timestamp.Vector, conn *datastore.IndexConnection) {
737	defer close(conn.EntryChannel())
738
739	dirEntries, er := ioutil.ReadDir(pi.keyspace.path())
740	if er != nil {
741		conn.Error(errors.NewFileDatastoreError(er, ""))
742		return
743	}
744
745	for i, dirEntry := range dirEntries {
746		if limit > 0 && int64(i) > limit {
747			break
748		}
749		if !dirEntry.IsDir() {
750			entry := datastore.IndexEntry{PrimaryKey: documentPathToId(dirEntry.Name())}
751			conn.EntryChannel() <- &entry
752		}
753	}
754}
755
756func fetch(path string) (item value.AnnotatedValue, e errors.Error) {
757	bytes, er := ioutil.ReadFile(path)
758	if er != nil {
759		return nil, errors.NewFileDatastoreError(er, "")
760	}
761
762	doc := value.NewAnnotatedValue(value.NewValue(bytes))
763	doc.SetAttachment("meta", map[string]interface{}{"id": documentPathToId(path)})
764	item = doc
765
766	return
767}
768
769func documentPathToId(p string) string {
770	_, file := filepath.Split(p)
771	ext := filepath.Ext(file)
772	return file[0 : len(file)-len(ext)]
773}
774