1//  Copyright (c) 2018 Couchbase, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// 		http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package zap
16
17import (
18	"bytes"
19
20	"github.com/couchbase/vellum"
21)
22
23// enumerator provides an ordered traversal of multiple vellum
24// iterators.  Like JOIN of iterators, the enumerator produces a
25// sequence of (key, iteratorIndex, value) tuples, sorted by key ASC,
26// then iteratorIndex ASC, where the same key might be seen or
27// repeated across multiple child iterators.
28type enumerator struct {
29	itrs   []vellum.Iterator
30	currKs [][]byte
31	currVs []uint64
32
33	lowK    []byte
34	lowIdxs []int
35	lowCurr int
36}
37
38// newEnumerator returns a new enumerator over the vellum Iterators
39func newEnumerator(itrs []vellum.Iterator) (*enumerator, error) {
40	rv := &enumerator{
41		itrs:    itrs,
42		currKs:  make([][]byte, len(itrs)),
43		currVs:  make([]uint64, len(itrs)),
44		lowIdxs: make([]int, 0, len(itrs)),
45	}
46	for i, itr := range rv.itrs {
47		rv.currKs[i], rv.currVs[i] = itr.Current()
48	}
49	rv.updateMatches(false)
50	if rv.lowK == nil && len(rv.lowIdxs) == 0 {
51		return rv, vellum.ErrIteratorDone
52	}
53	return rv, nil
54}
55
56// updateMatches maintains the low key matches based on the currKs
57func (m *enumerator) updateMatches(skipEmptyKey bool) {
58	m.lowK = nil
59	m.lowIdxs = m.lowIdxs[:0]
60	m.lowCurr = 0
61
62	for i, key := range m.currKs {
63		if (key == nil && m.currVs[i] == 0) || // in case of empty iterator
64			(len(key) == 0 && skipEmptyKey) { // skip empty keys
65			continue
66		}
67
68		cmp := bytes.Compare(key, m.lowK)
69		if cmp < 0 || len(m.lowIdxs) == 0 {
70			// reached a new low
71			m.lowK = key
72			m.lowIdxs = m.lowIdxs[:0]
73			m.lowIdxs = append(m.lowIdxs, i)
74		} else if cmp == 0 {
75			m.lowIdxs = append(m.lowIdxs, i)
76		}
77	}
78}
79
80// Current returns the enumerator's current key, iterator-index, and
81// value.  If the enumerator is not pointing at a valid value (because
82// Next returned an error previously), Current will return nil,0,0.
83func (m *enumerator) Current() ([]byte, int, uint64) {
84	var i int
85	var v uint64
86	if m.lowCurr < len(m.lowIdxs) {
87		i = m.lowIdxs[m.lowCurr]
88		v = m.currVs[i]
89	}
90	return m.lowK, i, v
91}
92
93// Next advances the enumerator to the next key/iterator/value result,
94// else vellum.ErrIteratorDone is returned.
95func (m *enumerator) Next() error {
96	m.lowCurr += 1
97	if m.lowCurr >= len(m.lowIdxs) {
98		// move all the current low iterators forwards
99		for _, vi := range m.lowIdxs {
100			err := m.itrs[vi].Next()
101			if err != nil && err != vellum.ErrIteratorDone {
102				return err
103			}
104			m.currKs[vi], m.currVs[vi] = m.itrs[vi].Current()
105		}
106		// can skip any empty keys encountered at this point
107		m.updateMatches(true)
108	}
109	if m.lowK == nil && len(m.lowIdxs) == 0 {
110		return vellum.ErrIteratorDone
111	}
112	return nil
113}
114
115// Close all the underlying Iterators.  The first error, if any, will
116// be returned.
117func (m *enumerator) Close() error {
118	var rv error
119	for _, itr := range m.itrs {
120		err := itr.Close()
121		if rv == nil {
122			rv = err
123		}
124	}
125	return rv
126}
127