1//  Copyright (c) 2016 Couchbase, Inc.
2//  Licensed under the Apache License, Version 2.0 (the "License");
3//  you may not use this file except in compliance with the
4//  License. You may obtain a copy of the License at
5//    http://www.apache.org/licenses/LICENSE-2.0
6//  Unless required by applicable law or agreed to in writing,
7//  software distributed under the License is distributed on an "AS
8//  IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
9//  express or implied. See the License for the specific language
10//  governing permissions and limitations under the License.
11
12package moss
13
14import (
15	"bytes"
16	"fmt"
17	"io/ioutil"
18	"os"
19	"sort"
20	"strings"
21	"sync"
22	"testing"
23	"time"
24)
25
26// Implementation of mock lower-level iterator, using map that's
27// cloned and sorted on creation.
28type TestPersisterIterator struct {
29	pos     int
30	kvpairs map[string][]byte // immutable.
31	keys    []string          // immutable, sorted.
32	endkey  string
33}
34
35// NewTestPersisterIterator returns an iterator, cloning the provided
36// kvpairs.
37func NewTestPersisterIterator(kvpairs map[string][]byte,
38	startkey, endkey string) *TestPersisterIterator {
39	rv := &TestPersisterIterator{
40		kvpairs: kvpairs,
41		endkey:  endkey,
42	}
43	for k := range rv.kvpairs {
44		rv.keys = append(rv.keys, k)
45	}
46	sort.Strings(rv.keys)
47	rv.pos = sort.SearchStrings(rv.keys, string(startkey))
48	return rv
49}
50
51func (i *TestPersisterIterator) Close() error {
52	i.kvpairs = nil
53	i.keys = nil
54	return nil
55}
56
57func (i *TestPersisterIterator) Next() error {
58	i.pos++
59	if i.pos >= len(i.keys) {
60		return ErrIteratorDone
61	}
62	return nil
63}
64
65func (i *TestPersisterIterator) SeekTo(seekToKey []byte) error {
66	return naiveSeekTo(i, seekToKey, 0)
67}
68
69func (i *TestPersisterIterator) Current() ([]byte, []byte, error) {
70	if i.pos >= len(i.keys) {
71		return nil, nil, ErrIteratorDone
72	}
73	k := i.keys[i.pos]
74	if i.endkey != "" && strings.Compare(k, i.endkey) >= 0 {
75		return nil, nil, ErrIteratorDone
76	}
77	return []byte(k), i.kvpairs[k], nil
78}
79
80func (i *TestPersisterIterator) CurrentEx() (entryEx EntryEx,
81	key, val []byte, err error) {
82	k, v, err := i.Current()
83	if err != nil {
84		return EntryEx{OperationSet}, nil, nil, err
85	}
86	return EntryEx{OperationSet}, k, v, err
87}
88
89// Implementation of mock lower-level test persister, using a map
90// that's cloned on updates and with key sorting whenever an iterator
91// is needed.
92type TestPersister struct {
93	// stable snapshots through writes blocking reads
94	mutex sync.RWMutex
95
96	kvpairs map[string][]byte
97}
98
99// NewTestPersister returns a TestPersister instance that can be used
100// to test lower-level persistence features.
101func NewTestPersister() *TestPersister {
102	return &TestPersister{
103		kvpairs: map[string][]byte{},
104	}
105}
106
107func (p *TestPersister) cloneLOCKED() *TestPersister {
108	c := NewTestPersister()
109	for k, v := range p.kvpairs {
110		c.kvpairs[k] = v
111	}
112	return c
113}
114
115func (p *TestPersister) Close() error {
116	// ensure any writes in progress finish
117	p.mutex.Lock()
118	defer p.mutex.Unlock()
119	p.kvpairs = nil
120	return nil
121}
122
123func (p *TestPersister) Get(key []byte,
124	readOptions ReadOptions) ([]byte, error) {
125	p.mutex.RLock()
126	defer p.mutex.RUnlock()
127	return p.kvpairs[string(key)], nil
128}
129
130func (p *TestPersister) StartIterator(
131	startKeyInclusive, endKeyExclusive []byte,
132	iteratorOptions IteratorOptions) (Iterator, error) {
133	p.mutex.RLock() // closing iterator unlocks
134	defer p.mutex.RUnlock()
135	return NewTestPersisterIterator(p.cloneLOCKED().kvpairs,
136		string(startKeyInclusive), string(endKeyExclusive)), nil
137}
138
139func (p *TestPersister) Update(higher Snapshot) (*TestPersister, error) {
140	p.mutex.Lock()
141	defer p.mutex.Unlock()
142
143	c := p.cloneLOCKED()
144
145	if higher != nil {
146		iter, err := higher.StartIterator(nil, nil, IteratorOptions{
147			IncludeDeletions: true,
148			SkipLowerLevel:   true,
149		})
150		if err != nil {
151			return nil, err
152		}
153
154		defer iter.Close()
155
156		var readOptions ReadOptions
157
158		for {
159			ex, key, val, err := iter.CurrentEx()
160			if err == ErrIteratorDone {
161				break
162			}
163			if err != nil {
164				return nil, err
165			}
166
167			switch ex.Operation {
168			case OperationSet:
169				c.kvpairs[string(key)] = val
170
171			case OperationDel:
172				delete(c.kvpairs, string(key))
173
174			case OperationMerge:
175				val, err = higher.Get(key, readOptions)
176				if err != nil {
177					return nil, err
178				}
179
180				if val != nil {
181					c.kvpairs[string(key)] = val
182				} else {
183					delete(c.kvpairs, string(key))
184				}
185
186			default:
187				return nil, fmt.Errorf("moss TestPersister, update,"+
188					" unexpected operation, ex: %v", ex)
189			}
190
191			err = iter.Next()
192			if err == ErrIteratorDone {
193				break
194			}
195			if err != nil {
196				return nil, err
197			}
198		}
199	}
200
201	return c, nil
202}
203
204// ----------------------------------------------------
205
206// TestPersister tests that the persister is invoked as expected.
207func Test1Persister(t *testing.T) {
208	runTestPersister(t, 1)
209}
210
211func Test10Persister(t *testing.T) {
212	runTestPersister(t, 10)
213}
214
215func Test1000Persister(t *testing.T) {
216	runTestPersister(t, 1000)
217}
218
219func runTestPersister(t *testing.T, numItems int) {
220	// create a new instance of our mock lower-level persister
221	lowerLevelPersister := newTestPersister()
222	lowerLevelUpdater := func(higher Snapshot) (Snapshot, error) {
223		p, err := lowerLevelPersister.Update(higher)
224		if err != nil {
225			return nil, err
226		}
227		lowerLevelPersister = p
228		return p, nil
229	}
230
231	persisterCh := make(chan string)
232
233	onEvent := func(event Event) {
234		if event.Kind == EventKindPersisterProgress {
235			persisterCh <- "persisterProgress"
236		}
237	}
238
239	// create new collection configured to use lower level persister
240	m, err := NewCollection(
241		CollectionOptions{
242			LowerLevelInit:   lowerLevelPersister,
243			LowerLevelUpdate: lowerLevelUpdater,
244			OnEvent:          onEvent,
245		})
246	if err != nil || m == nil {
247		t.Fatalf("expected moss")
248	}
249
250	// FIXME possibly replace start with manual persister invocations?
251	// this would require some refactoring
252	err = m.Start()
253	if err != nil {
254		t.Fatalf("error starting moss: %v", err)
255	}
256
257	// create new batch to set some keys
258	b, err := m.NewBatch(0, 0)
259	if err != nil {
260		t.Fatalf("error creating new batch: %v", err)
261	}
262
263	// also create a child batch
264	childB, err := b.NewChildCollectionBatch("child1", BatchOptions{0, 0})
265	if err != nil {
266		t.Fatalf("error creating new child batch: %v", err)
267	}
268
269	itemLoader := func(b Batch, numItems int) {
270		// put numItems in
271		for i := 0; i < numItems; i++ {
272			k := fmt.Sprintf("%d", i)
273			b.Set([]byte(k), []byte(k))
274		}
275	}
276	itemLoader(b, numItems)
277	itemLoader(childB, numItems)
278
279	err = m.ExecuteBatch(b, WriteOptions{})
280	if err != nil {
281		t.Fatalf("error executing batch: %v", err)
282	}
283
284	ss0, err := m.Snapshot()
285	if err != nil || ss0 == nil {
286		t.Fatalf("error snapshoting: %v", err)
287	}
288
289	childNames, err := ss0.ChildCollectionNames()
290	if err != nil {
291		t.Fatalf("error getting child collection names: %v", err)
292	}
293	if len(childNames) != 1 {
294		t.Fatalf("Unable to retrieve child snapshot")
295	}
296	childSnap, err := ss0.ChildCollectionSnapshot("child1")
297	if err != nil || ss0 == nil {
298		t.Fatalf("error getting child snapshot: %v", err)
299	}
300
301	// cleanup that batch
302	err = b.Close()
303	if err != nil {
304		t.Fatalf("error closing batch: %v", err)
305	}
306
307	ss1, err := m.Snapshot()
308	if err != nil || ss1 == nil {
309		t.Fatalf("error snapshoting: %v", err)
310	}
311
312	// wait for persister to run
313	<-persisterCh
314
315	ss2, err := m.Snapshot()
316	if err != nil || ss2 == nil {
317		t.Fatalf("error snapshoting: %v", err)
318	}
319
320	checkSnapshot := func(msg string, ss Snapshot, expectedNum int) {
321		for i := 0; i < numItems; i++ {
322			k := fmt.Sprintf("%d", i)
323			var v []byte
324			v, err = ss.Get([]byte(k), ReadOptions{})
325			if err != nil {
326				t.Fatalf("error %s getting key: %s, %v", msg, k, err)
327			}
328			if string(v) != k {
329				t.Errorf("expected %s value for key: %s to be %s, got %s", msg, k, k, v)
330			}
331		}
332
333		var iter Iterator
334		iter, err = ss.StartIterator(nil, nil, IteratorOptions{})
335		if err != nil {
336			t.Fatalf("error %s checkSnapshot iter, err: %v", msg, err)
337		}
338
339		n := 0
340		var lastKey []byte
341		for {
342			var ex EntryEx
343			var key, val []byte
344			ex, key, val, err = iter.CurrentEx()
345			if err == ErrIteratorDone {
346				break
347			}
348			if err != nil {
349				t.Fatalf("error %s iter currentEx, err: %v", msg, err)
350			}
351
352			n++
353
354			if ex.Operation != OperationSet {
355				t.Fatalf("error %s iter op, ex: %v, err: %v", msg, ex, err)
356			}
357
358			cmp := bytes.Compare(lastKey, key)
359			if cmp >= 0 {
360				t.Fatalf("error %s iter cmp: %v, err: %v", msg, cmp, err)
361			}
362
363			if bytes.Compare(key, val) != 0 {
364				t.Fatalf("error %s iter key != val: %v, %v", msg, key, val)
365			}
366
367			lastKey = key
368
369			err = iter.Next()
370			if err == ErrIteratorDone {
371				break
372			}
373			if err != nil {
374				t.Fatalf("error %s iter next, err: %v", msg, err)
375			}
376		}
377
378		if n != expectedNum {
379			t.Fatalf("error %s iter expectedNum: %d, got: %d", msg, expectedNum, n)
380		}
381
382		iter.Close()
383	}
384
385	checkSnapshot("lowerLevelPersister", lowerLevelPersister, numItems)
386	checkSnapshot("ss0", ss0, numItems)
387	checkSnapshot("ss1", ss1, numItems)
388	checkSnapshot("ss2", ss2, numItems)
389
390	checkSnapshot("ss0:child1", childSnap, numItems)
391
392	// cleanup that batch
393	err = b.Close()
394	if err != nil {
395		t.Fatalf("error closing batch: %v", err)
396	}
397
398	// open new batch
399	b, err = m.NewBatch(0, 0)
400	if err != nil {
401		t.Fatalf("error creating new batch: %v", err)
402	}
403
404	// delete the values we just set
405	for i := 0; i < numItems; i++ {
406		k := fmt.Sprintf("%d", i)
407		b.Del([]byte(k))
408	}
409
410	err = b.DelChildCollection("child1")
411	if err != nil {
412		t.Fatalf("error deleting child collection: %v", err)
413	}
414
415	err = m.ExecuteBatch(b, WriteOptions{})
416	if err != nil {
417		t.Fatalf("error executing batch: %v", err)
418	}
419
420	ssd0, err := m.Snapshot()
421	if err != nil || ssd0 == nil {
422		t.Fatalf("error snapshoting: %v", err)
423	}
424	childNames, err = ssd0.ChildCollectionNames()
425	if len(childNames) > 0 {
426		t.Fatalf("error child snapshot not deleted: %v", err)
427	}
428
429	// cleanup that batch
430	err = b.Close()
431	if err != nil {
432		t.Fatalf("error closing batch: %v", err)
433	}
434
435	ssd1, err := m.Snapshot()
436	if err != nil || ssd1 == nil {
437		t.Fatalf("error snapshoting: %v", err)
438	}
439
440	<-persisterCh
441	go func() {
442		for range persisterCh { /* EAT */
443		}
444	}()
445
446	ssd2, err := m.Snapshot()
447	if err != nil || ssd2 == nil {
448		t.Fatalf("error snapshoting: %v", err)
449	}
450
451	// check that values are now gone
452	checkGetsGone := func(ss Snapshot) {
453		for i := 0; i < numItems; i++ {
454			k := fmt.Sprintf("%d", i)
455			var v []byte
456			v, err = ss.Get([]byte(k), ReadOptions{})
457			if err != nil {
458				t.Fatalf("error getting key: %s, %v", k, err)
459			}
460			if v != nil {
461				t.Errorf("expected no value for key: %s, got %s", k, v)
462			}
463		}
464	}
465
466	checkGetsGone(lowerLevelPersister)
467	checkGetsGone(ssd0)
468	checkGetsGone(ssd1)
469	checkGetsGone(ssd2)
470
471	// Check that our old snapshots are still stable.
472	checkSnapshot("ss0", ss0, numItems)
473	checkSnapshot("ss1", ss1, numItems)
474	checkSnapshot("ss2", ss2, numItems)
475
476	// cleanup moss
477	err = m.Close()
478	if err != nil {
479		t.Fatalf("error closing moss: %v", err)
480	}
481}
482
483// TestPersisterError ensures that if the provided LowerLevelUpdate
484// method returns an error, the configured OnError callback is
485// invoked
486func TestPersisterError(t *testing.T) {
487	onErrorCh := make(chan string)
488	customOnError := func(err error) {
489		onErrorCh <- "error expected!"
490	}
491
492	// create a new instance of our mock lower-level persister
493	lowerLevelPersister := newTestPersister()
494	lowerLevelUpdater := func(higher Snapshot) (Snapshot, error) {
495		return nil, fmt.Errorf("test error")
496	}
497
498	gotPersistence := false
499	onEvent := func(event Event) {
500		if event.Kind == EventKindPersisterProgress {
501			gotPersistence = true
502		}
503	}
504
505	// create new collection configured to use lower level persister
506	m, err := NewCollection(
507		CollectionOptions{
508			LowerLevelInit:   lowerLevelPersister,
509			LowerLevelUpdate: lowerLevelUpdater,
510			OnError:          customOnError,
511			OnEvent:          onEvent,
512		})
513	if err != nil || m == nil {
514		t.Fatalf("expected moss")
515	}
516
517	// FIXME possibly replace start with manual persister invocations?
518	// this would require some refactoring
519	err = m.Start()
520	if err != nil {
521		t.Fatalf("error starting moss: %v", err)
522	}
523
524	// create new batch to set some keys
525	b, err := m.NewBatch(0, 0)
526	if err != nil {
527		t.Fatalf("error creating new batch: %v", err)
528	}
529
530	// put 100 values in
531	for i := 0; i < 1000; i++ {
532		k := fmt.Sprintf("%d", i)
533		b.Set([]byte(k), []byte(k))
534	}
535
536	err = m.ExecuteBatch(b, WriteOptions{})
537	if err != nil {
538		t.Fatalf("error executing batch: %v", err)
539	}
540
541	// wait for persister to run
542	msg := <-onErrorCh
543	if msg != "error expected!" {
544		t.Errorf("expected error callback")
545	}
546
547	if gotPersistence {
548		t.Errorf("expected no persistence due to error")
549	}
550}
551
552// -----------------------------------------------------------------------------
553// implementation of mock lower-level test persister and iterator,
554// with COW, using map that's cloned on updates and with key sorting
555// whenever an iterator is needed.
556
557type testPersisterIterator struct {
558	pos     int
559	kvpairs map[string][]byte // immutable.
560	keys    []string          // immutable, sorted.
561	endkey  string
562}
563
564func newTestPersisterIterator(kvpairs map[string][]byte,
565	startkey, endkey string) *testPersisterIterator {
566	rv := &testPersisterIterator{
567		kvpairs: kvpairs,
568		endkey:  endkey,
569	}
570	for k := range rv.kvpairs {
571		rv.keys = append(rv.keys, k)
572	}
573	sort.Strings(rv.keys)
574	rv.pos = sort.SearchStrings(rv.keys, string(startkey))
575	return rv
576}
577
578func (i *testPersisterIterator) Close() error {
579	i.kvpairs = nil
580	i.keys = nil
581	return nil
582}
583
584func (i *testPersisterIterator) Next() error {
585	i.pos++
586	if i.pos >= len(i.keys) {
587		return ErrIteratorDone
588	}
589	return nil
590}
591
592func (i *testPersisterIterator) SeekTo(seekToKey []byte) error {
593	return naiveSeekTo(i, seekToKey, 0)
594}
595
596func (i *testPersisterIterator) Current() ([]byte, []byte, error) {
597	if i.pos >= len(i.keys) {
598		return nil, nil, ErrIteratorDone
599	}
600	k := i.keys[i.pos]
601	if i.endkey != "" && strings.Compare(k, i.endkey) >= 0 {
602		return nil, nil, ErrIteratorDone
603	}
604	return []byte(k), i.kvpairs[k], nil
605}
606
607func (i *testPersisterIterator) CurrentEx() (entryEx EntryEx,
608	key, val []byte, err error) {
609	k, v, err := i.Current()
610	if err != nil {
611		return EntryEx{OperationSet}, nil, nil, err
612	}
613	return EntryEx{OperationSet}, k, v, err
614}
615
616// Implements the moss.Snapshot interface
617type testPersister struct {
618	// stable snapshots through writes blocking reads
619	mutex sync.RWMutex
620
621	kvpairs map[string][]byte
622
623	childSnapshots map[string]*testPersister
624}
625
626func newTestPersister() *testPersister {
627	return &testPersister{
628		kvpairs:        map[string][]byte{},
629		childSnapshots: make(map[string]*testPersister),
630	}
631}
632
633func (p *testPersister) cloneLOCKED() *testPersister {
634	c := newTestPersister()
635	for k, v := range p.kvpairs {
636		c.kvpairs[k] = v
637	}
638	return c
639}
640
641// ChildCollectionNames returns an array of child collection name strings.
642func (p *testPersister) ChildCollectionNames() ([]string, error) {
643	var childCollections = make([]string, len(p.childSnapshots))
644	idx := 0
645	for name := range p.childSnapshots {
646		childCollections[idx] = name
647		idx++
648	}
649	return childCollections, nil
650}
651
652// ChildCollectionSnapshot returns a Snapshot on a given child
653// collection by its name.
654func (p *testPersister) ChildCollectionSnapshot(childCollectionName string) (
655	Snapshot, error) {
656	childSnapshot, exists := p.childSnapshots[childCollectionName]
657	if !exists {
658		return nil, ErrNoSuchCollection
659	}
660	return childSnapshot, nil
661}
662
663func (p *testPersister) Close() error {
664	// ensure any writes in progress finish
665	p.mutex.Lock()
666	defer p.mutex.Unlock()
667	p.kvpairs = nil
668	return nil
669}
670
671func (p *testPersister) Get(key []byte,
672	readOptions ReadOptions) ([]byte, error) {
673	p.mutex.RLock()
674	defer p.mutex.RUnlock()
675	return p.kvpairs[string(key)], nil
676}
677
678func (p *testPersister) StartIterator(
679	startKeyInclusive, endKeyExclusive []byte,
680	iteratorOptions IteratorOptions) (Iterator, error) {
681	p.mutex.RLock() // closing iterator unlocks
682	defer p.mutex.RUnlock()
683	return newTestPersisterIterator(p.cloneLOCKED().kvpairs,
684		string(startKeyInclusive), string(endKeyExclusive)), nil
685}
686
687func (p *testPersister) Update(higher Snapshot) (*testPersister, error) {
688	p.mutex.Lock()
689	defer p.mutex.Unlock()
690
691	c := p.cloneLOCKED()
692
693	if higher != nil {
694		iter, err := higher.StartIterator(nil, nil, IteratorOptions{
695			IncludeDeletions: true,
696			SkipLowerLevel:   true,
697		})
698		if err != nil {
699			return nil, err
700		}
701
702		defer iter.Close()
703
704		var readOptions ReadOptions
705
706		for {
707			ex, key, val, err := iter.CurrentEx()
708			if err == ErrIteratorDone {
709				break
710			}
711			if err != nil {
712				return nil, err
713			}
714
715			switch ex.Operation {
716			case OperationSet:
717				c.kvpairs[string(key)] = val
718
719			case OperationDel:
720				delete(c.kvpairs, string(key))
721
722			case OperationMerge:
723				val, err = higher.Get(key, readOptions)
724				if err != nil {
725					return nil, err
726				}
727
728				if val != nil {
729					c.kvpairs[string(key)] = val
730				} else {
731					delete(c.kvpairs, string(key))
732				}
733
734			default:
735				return nil, fmt.Errorf("moss testPersister, update,"+
736					" unexpected operation, ex: %v", ex)
737			}
738
739			err = iter.Next()
740			if err == ErrIteratorDone {
741				break
742			}
743			if err != nil {
744				return nil, err
745			}
746		}
747	}
748
749	return c, nil
750}
751
752func TestPersistMergeOps_MB19667(t *testing.T) {
753	// Need to arrange that...
754	// - stack dirty top = empty
755	// - stack dirty mid = [ various merge ops Z ]
756	// - stack dirty base = [ more merge ops Y ]
757	// - lower-level has stuff (X)
758	//
759	// Then persister runs and...
760	// - stack dirty base = [ (empty) ]
761	// - lower-level has more stuff (X + Y)
762	//
763	// But, stack dirty mid was (incorrectly) pointing at old lower
764	// level snapshot (X), which doesn't have anything from Y.  Then,
765	// when persister runs again, you'd end up incorrectly with X + Z
766	// when you wanted X + Y + Z.
767	//
768	var mlock sync.Mutex
769
770	events := map[EventKind]int{}
771
772	var eventCh chan EventKind
773	var onPersistCh chan bool
774
775	mo := &MergeOperatorStringAppend{Sep: ":"}
776
777	lowerLevelPersister := newTestPersister()
778	lowerLevelUpdater := func(higher Snapshot) (Snapshot, error) {
779		if onPersistCh != nil {
780			<-onPersistCh
781		}
782		p, err := lowerLevelPersister.Update(higher)
783		if err != nil {
784			return nil, err
785		}
786		lowerLevelPersister = p
787		p.mutex.RLock()
788		defer p.mutex.RUnlock()
789		return p.cloneLOCKED(), nil
790	}
791
792	m, err := NewCollection(CollectionOptions{
793		MergeOperator:    mo,
794		LowerLevelInit:   lowerLevelPersister,
795		LowerLevelUpdate: lowerLevelUpdater,
796		OnEvent: func(e Event) {
797			mlock.Lock()
798			events[e.Kind]++
799			eventCh2 := eventCh
800			mlock.Unlock()
801
802			if eventCh2 != nil {
803				eventCh2 <- e.Kind
804			}
805		},
806	})
807	if err != nil || m == nil {
808		t.Errorf("expected moss")
809	}
810	mc := m.(*collection)
811
812	// Note that we don't Start()'ed the collection, so it doesn't
813	// have the merger background goroutines runnning.  But we do
814	// kickoff the background persister goroutine...
815	go mc.runPersister()
816
817	mergeVal := func(v string) {
818		var b Batch
819		b, err = m.NewBatch(0, 0)
820		if err != nil || b == nil {
821			t.Errorf("expected b ok")
822		}
823		b.Merge([]byte("k"), []byte(v))
824		err = m.ExecuteBatch(b, WriteOptions{})
825		if err != nil {
826			t.Errorf("expected execute batch ok")
827		}
828		b.Close()
829	}
830
831	mergeVal("X")
832
833	// Pretend to be the merger, moving stack dirty top into base, and
834	// notify and wait for the persister.
835	mc.m.Lock()
836	mc.stackDirtyBase = mc.stackDirtyTop
837	mc.stackDirtyTop = nil
838
839	waitDirtyOutgoingCh := make(chan struct{})
840	mc.waitDirtyOutgoingCh = waitDirtyOutgoingCh
841
842	mc.stackDirtyBaseCond.Broadcast()
843	mc.m.Unlock()
844
845	<-waitDirtyOutgoingCh
846
847	// At this point...
848	// - stackDirtyTop  : empty
849	// - stackDirtyMid  : empty
850	// - stackDirtyBase : empty
851	// - lowerLevel     : X
852
853	mc.m.Lock()
854	if mc.stackDirtyTop != nil || mc.stackDirtyMid != nil || mc.stackDirtyBase != nil {
855		t.Errorf("expected X state")
856	}
857	if mc.lowerLevelSnapshot == nil {
858		t.Errorf("unexpected llss X state")
859	}
860	v, err := mc.lowerLevelSnapshot.Get([]byte("k"), ReadOptions{})
861	if err != nil {
862		t.Errorf("expected get ok")
863	}
864	if string(v) != ":X" {
865		t.Errorf("expected :X, got: %s", v)
866	}
867	mc.m.Unlock()
868
869	// --------------------------------------------
870
871	mergeVal("Y")
872
873	// Pretend to be the merger, moving stack dirty top into base,
874	// but don't notify the persister.
875	stackDirtyMid, _, _, _, _ :=
876		mc.snapshot(snapshotSkipClean|snapshotSkipDirtyBase, nil, false)
877
878	mc.m.Lock()
879	mc.stackDirtyBase = stackDirtyMid
880	mc.stackDirtyTop = nil
881	mc.m.Unlock()
882
883	// At this point...
884	// - stackDirtyTop  : empty
885	// - stackDirtyMid  : empty
886	// - stackDirtyBase : Y (and points to lowerLevel X)
887	// - lowerLevel     : X
888
889	mc.m.Lock()
890	if mc.stackDirtyTop != nil || mc.stackDirtyMid != nil || mc.stackDirtyBase == nil {
891		t.Errorf("expected X/Y state")
892	}
893	if mc.lowerLevelSnapshot == nil {
894		t.Errorf("unexpected llss X/Y state")
895	}
896	v, err = mc.lowerLevelSnapshot.Get([]byte("k"), ReadOptions{})
897	if err != nil {
898		t.Errorf("expected get ok")
899	}
900	if string(v) != ":X" {
901		t.Errorf("expected :X, got: %s", v)
902	}
903	mc.m.Unlock()
904
905	// --------------------------------------------
906
907	mergeVal("Z")
908
909	// Pretend to be the merger, moving stack dirty top into mid,
910	// but don't notify the persister.
911	stackDirtyMid, _, _, _, _ =
912		mc.snapshot(snapshotSkipClean|snapshotSkipDirtyBase, nil, false)
913
914	mc.m.Lock()
915	mc.stackDirtyMid = stackDirtyMid
916	mc.stackDirtyTop = nil
917	mc.m.Unlock()
918
919	// At this point...
920	// - stackDirtyTop  : empty
921	// - stackDirtyMid  : Z (and points to lowerLevel X)
922	// - stackDirtyBase : Y (and points to lowerLevel X)
923	// - lowerLevel     : X
924
925	mc.m.Lock()
926	if mc.stackDirtyTop != nil || mc.stackDirtyMid == nil || mc.stackDirtyBase == nil {
927		t.Errorf("expected X/Y/Z state")
928	}
929	if mc.lowerLevelSnapshot == nil {
930		t.Errorf("unexpected llss X/Y/Z state")
931	}
932	v, err = mc.lowerLevelSnapshot.Get([]byte("k"), ReadOptions{})
933	if err != nil {
934		t.Errorf("expected get ok")
935	}
936	if string(v) != ":X" {
937		t.Errorf("expected :X, got: %s", v)
938	}
939	if len(mc.stackDirtyMid.a) != 1 {
940		t.Errorf("expected stackDirtyMid len of 1")
941	}
942	if mc.stackDirtyMid.lowerLevelSnapshot == nil {
943		t.Errorf("expected stackDirtyMid.lowerLevelSnapshot")
944	}
945	v, err = mc.stackDirtyMid.lowerLevelSnapshot.Get([]byte("k"), ReadOptions{})
946	if err != nil {
947		t.Errorf("expected get ok")
948	}
949	if string(v) != ":X" {
950		t.Errorf("expected :X, got: %s", v)
951	}
952	if mc.stackDirtyBase.lowerLevelSnapshot == nil {
953		t.Errorf("expected stackDirtyBase.lowerLevelSnapshot")
954	}
955	v, err = mc.stackDirtyBase.lowerLevelSnapshot.Get([]byte("k"), ReadOptions{})
956	if err != nil {
957		t.Errorf("expected get ok")
958	}
959	if string(v) != ":X" {
960		t.Errorf("expected :X, got: %s", v)
961	}
962	if mc.stackDirtyBase.lowerLevelSnapshot != mc.stackDirtyMid.lowerLevelSnapshot {
963		t.Errorf("expected same snapshots")
964	}
965	if mc.stackDirtyBase.lowerLevelSnapshot != mc.lowerLevelSnapshot {
966		t.Errorf("expected same snapshots")
967	}
968	mc.m.Unlock()
969
970	// --------------------------------------------
971
972	checkVal := func(msg, expected string) {
973		var ss Snapshot
974		ss, err = m.Snapshot()
975		if err != nil {
976			t.Errorf("%s - expected ss ok", msg)
977		}
978		var getv []byte
979		getv, err = ss.Get([]byte("k"), ReadOptions{})
980		if err != nil || string(getv) != expected {
981			t.Errorf("%s - expected Get %s, got: %s, err: %v", msg, expected, getv, err)
982		}
983		var iter Iterator
984		iter, err = ss.StartIterator(nil, nil, IteratorOptions{})
985		if err != nil || iter == nil {
986			t.Errorf("%s - expected iter", msg)
987		}
988		var k []byte
989		k, v, err = iter.Current()
990		if err != nil {
991			t.Errorf("%s - expected iter current no err", msg)
992		}
993		if string(k) != "k" {
994			t.Errorf("%s - expected iter current key k", msg)
995		}
996		if string(v) != expected {
997			t.Errorf("%s - expected iter current val expected: %v, got: %s", msg, expected, v)
998		}
999		if iter.Next() != ErrIteratorDone {
1000			t.Errorf("%s - expected only 1 value in iterator", msg)
1001		}
1002		ss.Close()
1003	}
1004
1005	checkVal("before", ":X:Y:Z")
1006
1007	// -------------------------------------------
1008
1009	// Register a fake log func to hold up the merger.
1010	logCh := make(chan string)
1011	logBlockCh := make(chan string)
1012	mc.options.Debug = 1
1013	mc.options.Log = func(format string, a ...interface{}) {
1014		if logCh != nil {
1015			logCh <- format
1016		}
1017		if logBlockCh != nil {
1018			<-logBlockCh
1019		}
1020	}
1021
1022	go mc.runMerger() // Finally start the real merger goroutine.
1023
1024	notifyDoneCh := make(chan error)
1025	go func() {
1026		notifyDoneCh <- mc.NotifyMerger("wake up merger", true)
1027	}()
1028
1029	fmtStr := <-logCh
1030	if !strings.HasPrefix(fmtStr, "collection: mergerMain,") {
1031		t.Errorf("expected a fmt str, got: %s", fmtStr)
1032	}
1033
1034	// At this point the merger is now blocked in a Log() callback.
1035
1036	// Next, kick the persister goroutine to force it to run concurrently once.
1037	mc.m.Lock()
1038	waitDirtyOutgoingCh = make(chan struct{})
1039	mc.waitDirtyOutgoingCh = waitDirtyOutgoingCh
1040	mc.stackDirtyBaseCond.Broadcast()
1041	mc.m.Unlock()
1042
1043	<-waitDirtyOutgoingCh
1044
1045	// At this point...
1046	// - stackDirtyTop  : empty
1047	// - stackDirtyMid  : Z (and points to lowerLevel X)
1048	// - stackDirtyBase : empty
1049	// - lowerLevel     : X+Y
1050
1051	mc.m.Lock()
1052	if mc.stackDirtyTop != nil || mc.stackDirtyMid == nil || mc.stackDirtyBase != nil {
1053		t.Errorf("expected X+Y/Z middle state")
1054	}
1055	if mc.lowerLevelSnapshot == nil {
1056		t.Errorf("unexpected llss X+Y/Z middle state")
1057	}
1058	v, err = mc.lowerLevelSnapshot.Get([]byte("k"), ReadOptions{})
1059	if err != nil {
1060		t.Errorf("expected get ok")
1061	}
1062	if string(v) != ":X:Y" {
1063		t.Errorf("expected :X:Y, got: %s", v)
1064	}
1065	if mc.stackDirtyMid.lowerLevelSnapshot == nil {
1066		t.Errorf("expected stackDirtyMid.lowerLevelSnapshot")
1067	}
1068	v, err = mc.stackDirtyMid.lowerLevelSnapshot.Get([]byte("k"), ReadOptions{})
1069	if err != nil {
1070		t.Errorf("expected get ok")
1071	}
1072	if string(v) != ":X" {
1073		t.Errorf("expected :X, got: %s", v)
1074	}
1075	if mc.lowerLevelSnapshot == mc.stackDirtyMid.lowerLevelSnapshot {
1076		t.Errorf("expected different snapshots")
1077	}
1078	v, err = mc.stackDirtyMid.Get([]byte("k"), ReadOptions{})
1079	if err != nil {
1080		t.Errorf("expected get ok")
1081	}
1082	if string(v) != ":X:Z" {
1083		t.Errorf("expected :X:Z, got: %s", v)
1084	}
1085	if len(mc.stackDirtyMid.a) != 1 {
1086		t.Errorf("expected stackDirtyMid to have len 1")
1087	}
1088	if mc.options.LowerLevelUpdate == nil {
1089		t.Errorf("expected options.LowerLevelUpdate")
1090	}
1091	mc.m.Unlock()
1092
1093	checkVal("mid", ":X:Y:Z")
1094
1095	// -------------------------------------------
1096
1097	// Next let the merger proceed.
1098
1099	mc.options.Debug = 0
1100	mc.options.Log = nil
1101
1102	mlock.Lock()
1103	eventCh = make(chan EventKind)
1104	mlock.Unlock()
1105
1106	logBlockCh <- "let the merger proceed"
1107
1108	<-waitDirtyOutgoingCh
1109
1110	kind := <-eventCh
1111	if kind != EventKindMergerProgress {
1112		t.Errorf("expected EventKindMergerProgress")
1113	}
1114	mlock.Lock()
1115	eventCh = nil
1116	mlock.Unlock()
1117
1118	<-notifyDoneCh
1119
1120	mc.m.Lock()
1121	if mc.waitDirtyOutgoingCh != nil {
1122		waitDirtyOutgoingCh = mc.waitDirtyOutgoingCh
1123		mc.m.Unlock()
1124		<-waitDirtyOutgoingCh
1125		mc.m.Lock()
1126	}
1127	waitDirtyOutgoingCh = make(chan struct{})
1128	mc.waitDirtyOutgoingCh = waitDirtyOutgoingCh
1129	mc.stackDirtyBaseCond.Broadcast()
1130	mc.m.Unlock()
1131
1132	// At this point...
1133	// - stackDirtyTop  : empty
1134	// - stackDirtyMid  : empty
1135	// - stackDirtyBase : empty
1136	// - lowerLevel     : X+Z (incorrect)
1137
1138	mc.m.Lock()
1139	if mc.stackDirtyTop != nil {
1140		t.Errorf("expected X/Y/Z last state top nil")
1141	}
1142	if mc.stackDirtyMid != nil && len(mc.stackDirtyMid.a) > 0 {
1143		t.Errorf("expected X/Y/Z last state mid nil, got: %#v", mc.stackDirtyMid)
1144	}
1145	if mc.stackDirtyBase != nil && len(mc.stackDirtyBase.a) > 0 {
1146		t.Errorf("expected X/Y/Z last state base nil")
1147	}
1148	if mc.lowerLevelSnapshot == nil {
1149		t.Errorf("unexpected llss X/Y/Z last state")
1150	}
1151	v, err = mc.lowerLevelSnapshot.Get([]byte("k"), ReadOptions{})
1152	if err != nil {
1153		t.Errorf("expected get ok")
1154	}
1155	if string(v) != ":X:Y:Z" { // Before fix, this incorrectly returned :X:Z.
1156		t.Errorf("expected :X:Y:Z, got: %s", v)
1157	}
1158	mc.m.Unlock()
1159
1160	// Before the fix, we used to incorrectly get :X:Z.
1161	checkVal("after", ":X:Y:Z")
1162}
1163
1164func Test_JustLoad1Mitems(b *testing.T) {
1165	numItems := 100000
1166	batchSize := 100
1167	tmpDir, _ := ioutil.TempDir("", "mossStore")
1168	defer os.RemoveAll(tmpDir)
1169
1170	so := DefaultStoreOptions
1171	so.CollectionOptions.MinMergePercentage = 0.0
1172	so.CompactionPercentage = 0.0
1173	so.CompactionSync = true
1174	spo := StorePersistOptions{CompactionConcern: CompactionAllow}
1175
1176	store, coll, err := OpenStoreCollection(tmpDir, so, spo)
1177	if err != nil || store == nil || coll == nil {
1178		b.Fatalf("error opening store collection:%v", tmpDir)
1179	}
1180
1181	for i := numItems; i >= 0; i = i - batchSize {
1182		// create new batch to set some keys
1183		ba, err := coll.NewBatch(0, 0)
1184		if err != nil {
1185			b.Fatalf("error creating new batch: %v", err)
1186			return
1187		}
1188
1189		loadItems(ba, i, i-batchSize)
1190		err = coll.ExecuteBatch(ba, WriteOptions{})
1191		if err != nil {
1192			b.Fatalf("error executing batch: %v", err)
1193			return
1194		}
1195
1196		// cleanup that batch
1197		err = ba.Close()
1198		if err != nil {
1199			b.Fatalf("error closing batch: %v", err)
1200			return
1201		}
1202		val, erro := coll.Get([]byte(fmt.Sprintf("%04d", numItems)), ReadOptions{})
1203		if erro != nil || val == nil {
1204			b.Fatalf("Unable to fetch the key written: %v", err)
1205		}
1206	}
1207
1208	if store.Close() != nil {
1209		b.Fatalf("expected store close to work")
1210	}
1211
1212	if coll.Close() != nil {
1213		b.Fatalf("Error closing child collection")
1214	}
1215}
1216
1217func Test_LevelCompactDeletes(t *testing.T) {
1218	numItems := 100000
1219	batchSize := 100
1220	tmpDir, _ := ioutil.TempDir("", "mossStore")
1221	defer os.RemoveAll(tmpDir)
1222
1223	so := DefaultStoreOptions
1224	so.CollectionOptions.MinMergePercentage = 0.0
1225	so.CompactionPercentage = 0.0
1226	so.CompactionSync = true
1227	spo := StorePersistOptions{CompactionConcern: CompactionAllow}
1228
1229	store, coll, err := OpenStoreCollection(tmpDir, so, spo)
1230	if err != nil || store == nil || coll == nil {
1231		t.Fatalf("error opening store collection:%v", tmpDir)
1232	}
1233
1234	for i := numItems; i > 0; i = i - batchSize {
1235		// create new batch to set some keys
1236		ba, erro := coll.NewBatch(0, 0)
1237		if erro != nil {
1238			t.Fatalf("error creating new batch: %v", err)
1239			return
1240		}
1241		for j := i - 1; j >= i-batchSize; j-- {
1242			k := fmt.Sprintf("%08d", j)
1243			ba.Set([]byte(k), []byte(k))
1244		}
1245		err = coll.ExecuteBatch(ba, WriteOptions{})
1246		if err != nil {
1247			t.Fatalf("error executing batch: %v", err)
1248			return
1249		}
1250
1251		// cleanup that batch
1252		err = ba.Close()
1253		if err != nil {
1254			t.Fatalf("error closing batch: %v", err)
1255			return
1256		}
1257		val, er := coll.Get([]byte(fmt.Sprintf("%08d", i-1)), ReadOptions{})
1258		if er != nil || val == nil {
1259			t.Fatalf("Unable to fetch the key written: %v", er)
1260		}
1261	}
1262
1263	waitForPersistence(coll)
1264
1265	if store.Close() != nil {
1266		t.Fatalf("expected store close to work")
1267	}
1268
1269	if coll.Close() != nil {
1270		t.Fatalf("Error closing child collection")
1271	}
1272	// Now reopen the store in level compaction mode and Only do deletes.
1273	so.CompactionPercentage = 100.0
1274	so.CompactionLevelMaxSegments = 2
1275	so.CompactionLevelMultiplier = 3
1276	store, coll, err = OpenStoreCollection(tmpDir, so, spo)
1277	if err != nil || store == nil || coll == nil {
1278		t.Fatalf("error opening store collection:%v", tmpDir)
1279	}
1280
1281	// Now load data such that a new batch comprising of only deleted items
1282	// is appended to the end of the file.
1283	// On the last attempt to append, level compaction should kick in
1284	// and only partially compact those last segments comprising of deletes.
1285	level0segments := 3 // level compact on the third level0 segment.
1286	for i := 0; i < numItems && level0segments > 0; i = i + batchSize {
1287		ba, erro := coll.NewBatch(0, 0)
1288		if erro != nil {
1289			t.Fatalf("error creating new batch: %v", erro)
1290			return
1291		}
1292		key := fmt.Sprintf("%08d", i)
1293		erro = ba.Del([]byte(key))
1294		if erro != nil {
1295			t.Fatalf("unable to delete key")
1296		}
1297		erro = coll.ExecuteBatch(ba, WriteOptions{})
1298		if erro != nil {
1299			t.Fatalf("error executing batch: %v", erro)
1300			return
1301		}
1302
1303		// cleanup that batch
1304		erro = ba.Close()
1305		if erro != nil {
1306			t.Fatalf("error closing batch: %v", erro)
1307			return
1308		}
1309
1310		waitForPersistence(coll)
1311		level0segments--
1312	}
1313	val, erro := coll.Get([]byte(fmt.Sprintf("%08d", 0)), ReadOptions{})
1314	if erro == nil && val != nil {
1315		t.Fatalf("Should have deleted key 0: %v", err)
1316	}
1317
1318	coll.Close()
1319	store.Close()
1320}
1321
1322func Test_IdleCompactionThrottle(t *testing.T) {
1323	numItems := 1000
1324	batchSize := 100
1325	tmpDir, _ := ioutil.TempDir("", "mossStore")
1326	defer os.RemoveAll(tmpDir)
1327
1328	so := DefaultStoreOptions
1329	so.CollectionOptions.MergerIdleRunTimeoutMS = 10
1330	so.CompactionSync = true
1331	spo := StorePersistOptions{CompactionConcern: CompactionAllow}
1332
1333	store, coll, err := OpenStoreCollection(tmpDir, so, spo)
1334	if err != nil || store == nil || coll == nil {
1335		t.Fatalf("error opening store collection:%v", tmpDir)
1336	}
1337
1338	for i := numItems; i > 0; i = i - batchSize {
1339		// Get the number of idle merger runs before insertion.
1340		collStats, _ := coll.Stats()
1341		idleRunsBefore := collStats.TotMergerIdleRuns
1342
1343		// create new batch to set some keys
1344		ba, erro := coll.NewBatch(0, 0)
1345		if erro != nil {
1346			t.Fatalf("error creating new batch: %v", err)
1347			return
1348		}
1349		for j := i - 1; j >= i-batchSize; j-- {
1350			k := fmt.Sprintf("%08d", j)
1351			ba.Set([]byte(k), []byte(k))
1352		}
1353		err = coll.ExecuteBatch(ba, WriteOptions{})
1354		if err != nil {
1355			t.Fatalf("error executing batch: %v", err)
1356			return
1357		}
1358
1359		// cleanup that batch
1360		err = ba.Close()
1361		if err != nil {
1362			t.Fatalf("error closing batch: %v", err)
1363			return
1364		}
1365		waitForPersistence(coll)
1366
1367		// wait longer than 3X the idle compaction timeout to verify throttle.
1368		time.Sleep(40 * time.Millisecond)
1369
1370		collStats, _ = coll.Stats()
1371		idleRunsAfter := collStats.TotMergerIdleRuns
1372		if idleRunsAfter <= idleRunsBefore {
1373			t.Errorf("Idle compactions not run %v", idleRunsAfter)
1374		}
1375	}
1376
1377	waitForPersistence(coll)
1378
1379	if store.Close() != nil {
1380		t.Fatalf("expected store close to work")
1381	}
1382
1383	if coll.Close() != nil {
1384		t.Fatalf("Error closing child collection")
1385	}
1386}
1387