1// Copyright (c) 2014 Couchbase, Inc. 2// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file 3// except in compliance with the License. You may obtain a copy of the License at 4// http://www.apache.org/licenses/LICENSE-2.0 5// Unless required by applicable law or agreed to in writing, software distributed under the 6// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 7// either express or implied. See the License for the specific language governing permissions 8// and limitations under the License. 9 10/* 11 12Package file provides a file-based implementation of the datastore 13package. 14 15*/ 16package file 17 18import ( 19 "encoding/json" 20 "fmt" 21 "io/ioutil" 22 "net/http" 23 "os" 24 "path/filepath" 25 "strings" 26 "sync" 27 28 "github.com/couchbase/query/auth" 29 "github.com/couchbase/query/datastore" 30 "github.com/couchbase/query/errors" 31 "github.com/couchbase/query/expression" 32 "github.com/couchbase/query/logging" 33 "github.com/couchbase/query/timestamp" 34 "github.com/couchbase/query/util" 35 "github.com/couchbase/query/value" 36) 37 38// datastore is the root for the file-based Datastore. 39type store struct { 40 path string 41 namespaces map[string]*namespace 42 namespaceNames []string 43 44 users map[string]*datastore.User 45} 46 47func (s *store) Id() string { 48 return s.path 49} 50 51func (s *store) URL() string { 52 return "file://" + s.path 53} 54 55func (s *store) Info() datastore.Info { 56 return &infoImpl{} 57} 58 59type infoImpl struct { 60} 61 62func (i *infoImpl) Version() string { 63 return util.VERSION 64} 65 66func (info *infoImpl) Topology() ([]string, []errors.Error) { 67 return []string{}, nil 68} 69 70func (info *infoImpl) Services(node string) (map[string]interface{}, []errors.Error) { 71 return map[string]interface{}{}, nil 72} 73 74func (s *store) NamespaceIds() ([]string, errors.Error) { 75 return s.NamespaceNames() 76} 77 78func (s *store) NamespaceNames() ([]string, errors.Error) { 79 return s.namespaceNames, nil 80} 81 82func (s *store) NamespaceById(id string) (p datastore.Namespace, e errors.Error) { 83 return s.NamespaceByName(id) 84} 85 86func (s *store) NamespaceByName(name string) (p datastore.Namespace, e errors.Error) { 87 p, ok := s.namespaces[strings.ToUpper(name)] 88 if !ok { 89 e = errors.NewFileNamespaceNotFoundError(nil, name) 90 } 91 92 return 93} 94 95func (s *store) Authorize(*auth.Privileges, auth.Credentials, *http.Request) (auth.AuthenticatedUsers, errors.Error) { 96 return nil, nil 97} 98 99func (s *store) CredsString(req *http.Request) string { 100 return "" 101} 102 103func (s *store) SetLogLevel(level logging.Level) { 104 // No-op. Uses query engine logger. 105} 106 107func (s *store) Inferencer(name datastore.InferenceType) (datastore.Inferencer, errors.Error) { 108 return nil, errors.NewOtherNotImplementedError(nil, "INFER") 109} 110 111func (s *store) Inferencers() ([]datastore.Inferencer, errors.Error) { 112 return nil, errors.NewOtherNotImplementedError(nil, "INFER") 113} 114 115func (s *store) AuditInfo() (*datastore.AuditInfo, errors.Error) { 116 return nil, errors.NewOtherNotImplementedError(nil, "AuditInfo") 117} 118 119func (s *store) ProcessAuditUpdateStream(callb func(uid string) error) errors.Error { 120 return errors.NewOtherNotImplementedError(nil, "ProcessAuditUpdateStream") 121} 122 123func (s *store) UserInfo() (value.Value, errors.Error) { 124 // Return an array of no users. 125 jsonData := make([]interface{}, 0) 126 v := value.NewValue(jsonData) 127 return v, nil 128} 129 130func (s *store) GetUserInfoAll() ([]datastore.User, errors.Error) { 131 ret := make([]datastore.User, 0, len(s.users)) 132 for _, v := range s.users { 133 ret = append(ret, *v) 134 } 135 return ret, nil 136} 137 138func (s *store) PutUserInfo(u *datastore.User) errors.Error { 139 s.users[u.Id] = u 140 return nil 141} 142 143func (s *store) GetRolesAll() ([]datastore.Role, errors.Error) { 144 return []datastore.Role{ 145 datastore.Role{Name: "cluster_admin"}, 146 datastore.Role{Name: "replication_admin"}, 147 datastore.Role{Name: "bucket_admin", Bucket: "*"}, 148 }, nil 149} 150 151// NewStore creates a new file-based store for the given filepath. 152func NewDatastore(path string) (s datastore.Datastore, e errors.Error) { 153 path, er := filepath.Abs(path) 154 if er != nil { 155 return nil, errors.NewFileDatastoreError(er, "") 156 } 157 158 fs := &store{path: path, users: make(map[string]*datastore.User, 4)} 159 160 e = fs.loadNamespaces() 161 if e != nil { 162 return 163 } 164 165 s = fs 166 return 167} 168 169func (s *store) loadNamespaces() (e errors.Error) { 170 dirEntries, er := ioutil.ReadDir(s.path) 171 if er != nil { 172 return errors.NewFileDatastoreError(er, "") 173 } 174 175 s.namespaces = make(map[string]*namespace, len(dirEntries)) 176 s.namespaceNames = make([]string, 0, len(dirEntries)) 177 178 var p *namespace 179 for _, dirEntry := range dirEntries { 180 if dirEntry.IsDir() { 181 s.namespaceNames = append(s.namespaceNames, dirEntry.Name()) 182 diru := strings.ToUpper(dirEntry.Name()) 183 if _, ok := s.namespaces[diru]; ok { 184 return errors.NewFileDuplicateNamespaceError(nil, dirEntry.Name()) 185 } 186 187 p, e = newNamespace(s, dirEntry.Name()) 188 if e != nil { 189 return 190 } 191 192 s.namespaces[diru] = p 193 } 194 } 195 196 return 197} 198 199// namespace represents a file-based Namespace. 200type namespace struct { 201 store *store 202 name string 203 keyspaces map[string]*keyspace 204 keyspaceNames []string 205} 206 207func (p *namespace) DatastoreId() string { 208 return p.store.Id() 209} 210 211func (p *namespace) Id() string { 212 return p.Name() 213} 214 215func (p *namespace) Name() string { 216 return p.name 217} 218 219func (p *namespace) KeyspaceIds() ([]string, errors.Error) { 220 return p.KeyspaceNames() 221} 222 223func (p *namespace) KeyspaceNames() ([]string, errors.Error) { 224 return p.keyspaceNames, nil 225} 226 227func (p *namespace) KeyspaceById(id string) (b datastore.Keyspace, e errors.Error) { 228 return p.KeyspaceByName(id) 229} 230 231func (p *namespace) KeyspaceByName(name string) (b datastore.Keyspace, e errors.Error) { 232 b, ok := p.keyspaces[strings.ToUpper(name)] 233 if !ok { 234 e = errors.NewFileKeyspaceNotFoundError(nil, name) 235 } 236 237 return 238} 239 240func (p *namespace) MetadataVersion() uint64 { 241 return 0 242} 243 244func (p *namespace) path() string { 245 return filepath.Join(p.store.path, p.name) 246} 247 248// newNamespace creates a new namespace. 249func newNamespace(s *store, dir string) (p *namespace, e errors.Error) { 250 p = new(namespace) 251 p.store = s 252 p.name = dir 253 254 e = p.loadKeyspaces() 255 return 256} 257 258func (p *namespace) loadKeyspaces() (e errors.Error) { 259 dirEntries, er := ioutil.ReadDir(p.path()) 260 if er != nil { 261 return errors.NewFileDatastoreError(er, "") 262 } 263 264 p.keyspaces = make(map[string]*keyspace, len(dirEntries)) 265 p.keyspaceNames = make([]string, 0, len(dirEntries)) 266 267 var b *keyspace 268 for _, dirEntry := range dirEntries { 269 if dirEntry.IsDir() { 270 diru := strings.ToUpper(dirEntry.Name()) 271 if _, ok := p.keyspaces[diru]; ok { 272 return errors.NewFileDuplicateKeyspaceError(nil, dirEntry.Name()) 273 } 274 275 b, e = newKeyspace(p, dirEntry.Name()) 276 if e != nil { 277 return 278 } 279 280 p.keyspaces[diru] = b 281 p.keyspaceNames = append(p.keyspaceNames, b.Name()) 282 } 283 } 284 285 return 286} 287 288// keyspace is a file-based keyspace. 289type keyspace struct { 290 namespace *namespace 291 name string 292 fi datastore.Indexer 293 fileLock sync.Mutex 294} 295 296func (b *keyspace) NamespaceId() string { 297 return b.namespace.Id() 298} 299 300func (b *keyspace) Namespace() datastore.Namespace { 301 return b.namespace 302} 303 304func (b *keyspace) Id() string { 305 return b.Name() 306} 307 308func (b *keyspace) Name() string { 309 return b.name 310} 311 312func (b *keyspace) Count(context datastore.QueryContext) (int64, errors.Error) { 313 dirEntries, er := ioutil.ReadDir(b.path()) 314 if er != nil { 315 return 0, errors.NewFileDatastoreError(er, "") 316 } 317 return int64(len(dirEntries)), nil 318} 319 320func (b *keyspace) Indexer(name datastore.IndexType) (datastore.Indexer, errors.Error) { 321 return b.fi, nil 322} 323 324func (b *keyspace) Indexers() ([]datastore.Indexer, errors.Error) { 325 return []datastore.Indexer{b.fi}, nil 326} 327 328func (b *keyspace) Fetch(keys []string, keysMap map[string]value.AnnotatedValue, 329 context datastore.QueryContext, subPaths []string) []errors.Error { 330 var errs []errors.Error 331 332 for _, k := range keys { 333 item, e := b.fetchOne(k) 334 335 if e != nil { 336 if os.IsNotExist(e.Cause()) { 337 // file doesn't exist => key denotes non-existent doc => ignore it 338 continue 339 } 340 if errs == nil { 341 errs = make([]errors.Error, 0, 1) 342 } 343 errs = append(errs, e) 344 continue 345 } 346 347 if item != nil { 348 item.SetAttachment("meta", map[string]interface{}{ 349 "id": k, 350 }) 351 } 352 353 keysMap[k] = item 354 } 355 356 return errs 357} 358 359func (b *keyspace) fetchOne(key string) (value.AnnotatedValue, errors.Error) { 360 path := filepath.Join(b.path(), key+".json") 361 item, e := fetch(path) 362 if e != nil { 363 item = nil 364 } 365 366 return item, e 367} 368 369const ( 370 INSERT = 0x01 371 UPDATE = 0x02 372 UPSERT = 0x04 373) 374 375func opToString(op int) string { 376 377 switch op { 378 case INSERT: 379 return "insert" 380 case UPDATE: 381 return "update" 382 case UPSERT: 383 return "upsert" 384 } 385 386 return "unknown operation" 387} 388 389func (b *keyspace) performOp(op int, kvPairs []value.Pair) ([]value.Pair, errors.Error) { 390 391 if len(kvPairs) == 0 { 392 return nil, errors.NewFileNoKeysInsertError(nil, "keyspace "+b.Name()) 393 } 394 395 insertedKeys := make([]value.Pair, 0) 396 var returnErr errors.Error 397 398 // this lock can be mode more granular FIXME 399 b.fileLock.Lock() 400 defer b.fileLock.Unlock() 401 402 for _, kv := range kvPairs { 403 var file *os.File 404 var err error 405 406 key := kv.Name 407 value, _ := json.Marshal(kv.Value.Actual()) 408 filename := filepath.Join(b.path(), key+".json") 409 410 switch op { 411 412 case INSERT: 413 // add the key only if it doesn't exist 414 if _, err = os.Stat(filename); err == nil { 415 err = errors.NewFileKeyExists(nil, "Key (File) "+filename) 416 } else { 417 // create and write the file 418 if file, err = os.Create(filename); err == nil { 419 _, err = file.Write(value) 420 file.Close() 421 } 422 } 423 case UPDATE: 424 // add the key only if it doesn't exist 425 if _, err = os.Stat(filename); err == nil { 426 // open and write the file 427 if file, err = os.OpenFile(filename, os.O_TRUNC|os.O_RDWR, 0666); err == nil { 428 _, err = file.Write(value) 429 file.Close() 430 } 431 } 432 433 case UPSERT: 434 // open the file for writing, if doesn't exist then create 435 if file, err = os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0666); err == nil { 436 _, err = file.Write(value) 437 file.Close() 438 } 439 } 440 441 if err != nil { 442 returnErr = errors.NewFileDMLError(returnErr, opToString(op)+" Failed "+err.Error()) 443 } else { 444 insertedKeys = append(insertedKeys, kv) 445 } 446 } 447 448 return insertedKeys, returnErr 449 450} 451 452func (b *keyspace) Insert(inserts []value.Pair) ([]value.Pair, errors.Error) { 453 return b.performOp(INSERT, inserts) 454} 455 456func (b *keyspace) Update(updates []value.Pair) ([]value.Pair, errors.Error) { 457 return b.performOp(UPDATE, updates) 458} 459 460func (b *keyspace) Upsert(upserts []value.Pair) ([]value.Pair, errors.Error) { 461 return b.performOp(UPSERT, upserts) 462} 463 464func (b *keyspace) Delete(deletes []string, context datastore.QueryContext) ([]string, errors.Error) { 465 466 var fileError []string 467 var deleted []string 468 for _, key := range deletes { 469 filename := filepath.Join(b.path(), key+".json") 470 if err := os.Remove(filename); err != nil { 471 if !os.IsNotExist(err) { 472 fileError = append(fileError, err.Error()) 473 } 474 } else { 475 deleted = append(deleted, key) 476 } 477 } 478 479 if len(fileError) > 0 { 480 errLine := fmt.Sprintf("Delete failed on some keys %v", fileError) 481 return deleted, errors.NewFileDatastoreError(nil, errLine) 482 } 483 484 return deleted, nil 485} 486 487func (b *keyspace) Release() { 488} 489 490func (b *keyspace) path() string { 491 return filepath.Join(b.namespace.path(), b.name) 492} 493 494// newKeyspace creates a new keyspace. 495func newKeyspace(p *namespace, dir string) (b *keyspace, e errors.Error) { 496 b = new(keyspace) 497 b.namespace = p 498 b.name = dir 499 500 fi, er := os.Stat(b.path()) 501 if er != nil { 502 return nil, errors.NewFileDatastoreError(er, "") 503 } 504 505 if !fi.IsDir() { 506 return nil, errors.NewFileKeyspaceNotDirError(nil, "Keyspace path "+dir) 507 } 508 509 b.fi = newFileIndexer(b) 510 b.fi.CreatePrimaryIndex("", "#primary", nil) 511 512 return 513} 514 515type fileIndexer struct { 516 keyspace *keyspace 517 indexes map[string]datastore.Index 518 primary datastore.PrimaryIndex 519} 520 521func newFileIndexer(keyspace *keyspace) datastore.Indexer { 522 523 return &fileIndexer{ 524 keyspace: keyspace, 525 indexes: make(map[string]datastore.Index), 526 } 527} 528 529func (fi *fileIndexer) KeyspaceId() string { 530 return fi.keyspace.Id() 531} 532 533func (fi *fileIndexer) Name() datastore.IndexType { 534 return datastore.DEFAULT 535} 536 537func (fi *fileIndexer) IndexIds() ([]string, errors.Error) { 538 rv := make([]string, 0, len(fi.indexes)) 539 for name, _ := range fi.indexes { 540 rv = append(rv, name) 541 } 542 return rv, nil 543} 544 545func (fi *fileIndexer) IndexNames() ([]string, errors.Error) { 546 rv := make([]string, 0, len(fi.indexes)) 547 for name, _ := range fi.indexes { 548 rv = append(rv, name) 549 } 550 return rv, nil 551} 552 553func (fi *fileIndexer) IndexById(id string) (datastore.Index, errors.Error) { 554 return fi.IndexByName(id) 555} 556 557func (fi *fileIndexer) IndexByName(name string) (datastore.Index, errors.Error) { 558 index, ok := fi.indexes[name] 559 if !ok { 560 return nil, errors.NewFileIdxNotFound(nil, name) 561 } 562 return index, nil 563} 564 565func (fi *fileIndexer) PrimaryIndexes() ([]datastore.PrimaryIndex, errors.Error) { 566 return []datastore.PrimaryIndex{fi.primary}, nil 567} 568 569func (fi *fileIndexer) Indexes() ([]datastore.Index, errors.Error) { 570 return []datastore.Index{fi.primary}, nil 571} 572 573func (fi *fileIndexer) CreatePrimaryIndex(requestId, name string, with value.Value) ( 574 datastore.PrimaryIndex, errors.Error) { 575 if fi.primary == nil { 576 pi := new(primaryIndex) 577 fi.primary = pi 578 pi.keyspace = fi.keyspace 579 pi.name = name 580 pi.indexer = fi 581 fi.indexes[pi.name] = pi 582 } 583 584 return fi.primary, nil 585} 586 587func (b *fileIndexer) CreateIndex(requestId, name string, seekKey, rangeKey expression.Expressions, 588 where expression.Expression, with value.Value) (datastore.Index, errors.Error) { 589 return nil, errors.NewFileNotSupported(nil, "CREATE INDEX is not supported for file-based datastore.") 590} 591 592func (b *fileIndexer) BuildIndexes(requestId string, names ...string) errors.Error { 593 return errors.NewFileNotSupported(nil, "BUILD INDEXES is not supported for file-based datastore.") 594} 595 596func (b *fileIndexer) Refresh() errors.Error { 597 return nil 598} 599 600func (b *fileIndexer) MetadataVersion() uint64 { 601 return 0 602} 603 604func (b *fileIndexer) SetLogLevel(level logging.Level) { 605 // No-op, uses query engine logger 606} 607 608// primaryIndex performs full keyspace scans. 609type primaryIndex struct { 610 name string 611 keyspace *keyspace 612 indexer *fileIndexer 613} 614 615func (pi *primaryIndex) KeyspaceId() string { 616 return pi.keyspace.Id() 617} 618 619func (pi *primaryIndex) Id() string { 620 return pi.Name() 621} 622 623func (pi *primaryIndex) Name() string { 624 return pi.name 625} 626 627func (pi *primaryIndex) Type() datastore.IndexType { 628 return datastore.DEFAULT 629} 630 631func (pi *primaryIndex) Indexer() datastore.Indexer { 632 return pi.indexer 633} 634 635func (pi *primaryIndex) SeekKey() expression.Expressions { 636 return nil 637} 638 639func (pi *primaryIndex) RangeKey() expression.Expressions { 640 // FIXME 641 return nil 642} 643 644func (pi *primaryIndex) Condition() expression.Expression { 645 return nil 646} 647 648func (pi *primaryIndex) IsPrimary() bool { 649 return true 650} 651 652func (pi *primaryIndex) State() (state datastore.IndexState, msg string, err errors.Error) { 653 return datastore.ONLINE, "", nil 654} 655 656func (pi *primaryIndex) Statistics(requestId string, span *datastore.Span) ( 657 datastore.Statistics, errors.Error) { 658 return nil, nil 659} 660 661func (pi *primaryIndex) Drop(requestId string) errors.Error { 662 return errors.NewFilePrimaryIdxNoDropError(nil, pi.Name()) 663} 664 665func (pi *primaryIndex) Scan(requestId string, span *datastore.Span, distinct bool, limit int64, 666 cons datastore.ScanConsistency, vector timestamp.Vector, conn *datastore.IndexConnection) { 667 defer close(conn.EntryChannel()) 668 669 // For primary indexes, bounds must always be strings, so we 670 // can just enforce that directly 671 low, high := "", "" 672 673 // Ensure that lower bound is a string, if any 674 if len(span.Range.Low) > 0 { 675 a := span.Range.Low[0].Actual() 676 switch a := a.(type) { 677 case string: 678 low = a 679 default: 680 conn.Error(errors.NewFileDatastoreError(nil, fmt.Sprintf("Invalid lower bound %v of type %T.", a, a))) 681 return 682 } 683 } 684 685 // Ensure that upper bound is a string, if any 686 if len(span.Range.High) > 0 { 687 a := span.Range.High[0].Actual() 688 switch a := a.(type) { 689 case string: 690 high = a 691 default: 692 conn.Error(errors.NewFileDatastoreError(nil, fmt.Sprintf("Invalid upper bound %v of type %T.", a, a))) 693 return 694 } 695 } 696 697 dirEntries, er := ioutil.ReadDir(pi.keyspace.path()) 698 if er != nil { 699 conn.Error(errors.NewFileDatastoreError(er, "")) 700 return 701 } 702 703 var n int64 = 0 704 for _, dirEntry := range dirEntries { 705 706 logging.Debugf("Dir entry being scanned <ud>%v</ud> \n", dirEntry.Name()) 707 if limit > 0 && n > limit { 708 break 709 } 710 711 id := documentPathToId(dirEntry.Name()) 712 713 if low != "" && 714 (id < low || 715 (id == low && (span.Range.Inclusion&datastore.LOW == 0))) { 716 continue 717 } 718 719 low = "" 720 721 if high != "" && 722 (id > high || 723 (id == high && (span.Range.Inclusion&datastore.HIGH == 0))) { 724 break 725 } 726 727 if !dirEntry.IsDir() { 728 entry := datastore.IndexEntry{PrimaryKey: id} 729 conn.EntryChannel() <- &entry 730 n++ 731 } 732 } 733} 734 735func (pi *primaryIndex) ScanEntries(requestId string, limit int64, cons datastore.ScanConsistency, 736 vector timestamp.Vector, conn *datastore.IndexConnection) { 737 defer close(conn.EntryChannel()) 738 739 dirEntries, er := ioutil.ReadDir(pi.keyspace.path()) 740 if er != nil { 741 conn.Error(errors.NewFileDatastoreError(er, "")) 742 return 743 } 744 745 for i, dirEntry := range dirEntries { 746 if limit > 0 && int64(i) > limit { 747 break 748 } 749 if !dirEntry.IsDir() { 750 entry := datastore.IndexEntry{PrimaryKey: documentPathToId(dirEntry.Name())} 751 conn.EntryChannel() <- &entry 752 } 753 } 754} 755 756func fetch(path string) (item value.AnnotatedValue, e errors.Error) { 757 bytes, er := ioutil.ReadFile(path) 758 if er != nil { 759 return nil, errors.NewFileDatastoreError(er, "") 760 } 761 762 doc := value.NewAnnotatedValue(value.NewValue(bytes)) 763 doc.SetAttachment("meta", map[string]interface{}{"id": documentPathToId(path)}) 764 item = doc 765 766 return 767} 768 769func documentPathToId(p string) string { 770 _, file := filepath.Split(p) 771 ext := filepath.Ext(file) 772 return file[0 : len(file)-len(ext)] 773} 774