1// defines timestamp types to interface with go-coubhbase and also provides
2// functions for set-operations on time-stamps.
3
4package common
5
6import "github.com/couchbase/indexing/secondary/logging"
7import "bytes"
8import "fmt"
9import "sync"
10
11// TsVb is logical clock for a subset of vbuckets.
12type TsVb struct {
13	Bucket string
14	Vbnos  []uint16
15	Seqnos []uint64
16}
17
18// TsVbFull is logical clock for full set of vbuckets.
19type TsVbFull struct {
20	Bucket string
21	Seqnos []uint64
22}
23
24// TsVbuuid is logical clock for full set of vbuckets along with branch value
25// and last seen snapshot.
26type TsVbuuid struct {
27	Bucket       string
28	Seqnos       []uint64
29	Vbuuids      []uint64
30	Crc64        uint64
31	Snapshots    [][2]uint64
32	SnapType     IndexSnapType
33	LargeSnap    bool
34	SnapAligned  bool
35	DisableAlign bool
36}
37
38// NewTsVbuuid returns reference to new instance of TsVbuuid.
39// `numVbuckets` is same as `maxVbuckets`.
40func NewTsVbuuid(bucket string, numVbuckets int) *TsVbuuid {
41	return &TsVbuuid{
42		Bucket:    bucket,
43		Seqnos:    make([]uint64, numVbuckets),
44		Vbuuids:   make([]uint64, numVbuckets),
45		Snapshots: make([][2]uint64, numVbuckets),
46	}
47}
48
49func newTsVbuuid() interface{} {
50	return &TsVbuuid{
51		Bucket:    "",
52		Seqnos:    make([]uint64, NUM_VBUCKETS),
53		Vbuuids:   make([]uint64, NUM_VBUCKETS),
54		Snapshots: make([][2]uint64, NUM_VBUCKETS),
55	}
56}
57
58var tsVbuuidPool = sync.Pool{New: newTsVbuuid}
59var NUM_VBUCKETS int
60
61func NewTsVbuuidCached(bucket string, numVbuckets int) *TsVbuuid {
62
63	NUM_VBUCKETS = numVbuckets
64
65	ts := tsVbuuidPool.Get().(*TsVbuuid)
66
67	//re-init
68	for i, _ := range ts.Vbuuids {
69		ts.Seqnos[i] = 0
70		ts.Vbuuids[i] = 0
71		ts.Snapshots[i][0] = 0
72		ts.Snapshots[i][1] = 0
73		ts.Crc64 = 0
74	}
75	ts.Bucket = bucket
76	return ts
77}
78
79func (ts *TsVbuuid) Free() {
80	tsVbuuidPool.Put(ts)
81}
82
83// GetVbnos will return the list of all vbnos.
84func (ts *TsVbuuid) GetVbnos() []uint16 {
85	var vbnos []uint16
86	for i := 0; i < len(ts.Vbuuids); i++ {
87		if ts.Vbuuids[i] != 0 { //if vbuuid is valid
88			vbnos = append(vbnos, uint16(i))
89		}
90	}
91	return vbnos
92}
93
94// CompareVbuuids will compare two timestamps for its bucket and vbuuids
95func (ts *TsVbuuid) CompareVbuuids(other *TsVbuuid) bool {
96	if ts == nil || other == nil {
97		return false
98	}
99	if ts.Bucket != other.Bucket || ts.Len() != other.Len() {
100		return false
101	}
102	for i, vbuuid := range ts.Vbuuids {
103		if (vbuuid != other.Vbuuids[i]) ||
104			(ts.Snapshots[i][0] != other.Snapshots[i][0]) ||
105			(ts.Snapshots[i][1] != other.Snapshots[i][1]) {
106			return false
107		}
108	}
109	return true
110}
111
112func (ts *TsVbuuid) IsEpoch() bool {
113	for _, seqno := range ts.Seqnos {
114		if seqno != 0 {
115			return false
116		}
117	}
118	return true
119}
120
121// CheckVbuuids will check whether vbuuids in timestamp `ts` is same
122// as that of `other`.
123func (ts *TsVbuuid) CheckCrc64(other *TsVbuuid) bool {
124	if ts == nil || other == nil {
125		return false
126	}
127
128	if ts.Bucket != other.Bucket {
129		return false
130	}
131	return ts.Crc64 == 0 || other.Crc64 == 0 || ts.Crc64 == other.Crc64
132}
133
134// AsRecent will check whether timestamp `ts` is atleast as recent as
135// timestamp `other`.
136func (ts *TsVbuuid) AsRecent(other *TsVbuuid) bool {
137	if ts == nil || other == nil {
138		return false
139	}
140	if ts.Bucket != other.Bucket {
141		return false
142	}
143	if len(ts.Vbuuids) > len(other.Vbuuids) {
144		return false
145	}
146	for i, vbuuid := range ts.Vbuuids {
147		//skip comparing the vbucket if "other" ts has vbuuid 0
148		if other.Vbuuids[i] == 0 {
149			continue
150		}
151		if vbuuid != other.Vbuuids[i] || ts.Seqnos[i] < other.Seqnos[i] {
152			return false
153		}
154	}
155	return true
156}
157
158// AsRecentTs will check whether timestamp `ts` is atleast as recent as
159// timestamp `other`.
160func (ts *TsVbuuid) AsRecentTs(other *TsVbuuid) bool {
161	if ts == nil || other == nil {
162		return false
163	}
164	if ts.Bucket != other.Bucket {
165		return false
166	}
167	if len(ts.Seqnos) > len(other.Seqnos) {
168		return false
169	}
170	for i, seqno := range ts.Seqnos {
171		if seqno < other.Seqnos[i] {
172			return false
173		}
174	}
175	return true
176}
177
178// Len return number of entries in the timestamp.
179func (ts *TsVbuuid) Len() int {
180	length := 0
181	for i := 0; i < len(ts.Vbuuids); i++ {
182		if ts.Vbuuids[i] != 0 { //if vbuuid is valid
183			length++
184		}
185	}
186	return length
187}
188
189//Persisted returns the value of persisted flag
190func (ts *TsVbuuid) GetSnapType() IndexSnapType {
191	return ts.SnapType
192}
193
194//Persisted sets the persisted flag
195func (ts *TsVbuuid) SetSnapType(typ IndexSnapType) {
196	ts.SnapType = typ
197}
198
199//HasLargeSnapshot returns the value of largeSnap flag
200func (ts *TsVbuuid) HasLargeSnapshot() bool {
201	return ts.LargeSnap
202}
203
204//SetLargeSnapshot sets the largeSnap flag
205func (ts *TsVbuuid) SetLargeSnapshot(largeSnap bool) {
206	ts.LargeSnap = largeSnap
207}
208
209func (ts *TsVbuuid) HasDisableAlign() bool {
210	return ts.DisableAlign
211}
212
213func (ts *TsVbuuid) SetDisableAlign(disable bool) {
214	ts.DisableAlign = disable
215}
216
217func (ts *TsVbuuid) GetCrc64() uint64 {
218
219	if ts == nil {
220		return 0
221	}
222
223	return ts.Crc64
224}
225
226// Copy will return a clone of this timestamp.
227func (ts *TsVbuuid) Copy() *TsVbuuid {
228	newTs := NewTsVbuuid(ts.Bucket, len(ts.Seqnos))
229	copy(newTs.Seqnos, ts.Seqnos)
230	copy(newTs.Vbuuids, ts.Vbuuids)
231	copy(newTs.Snapshots, ts.Snapshots)
232	newTs.SnapType = ts.SnapType
233	newTs.LargeSnap = ts.LargeSnap
234	newTs.SnapAligned = ts.SnapAligned
235	newTs.Crc64 = ts.Crc64
236	return newTs
237}
238
239func (ts *TsVbuuid) CopyFrom(src *TsVbuuid) {
240	copy(ts.Seqnos, src.Seqnos)
241	copy(ts.Vbuuids, src.Vbuuids)
242	copy(ts.Snapshots, src.Snapshots)
243	ts.SnapType = src.SnapType
244	ts.LargeSnap = src.LargeSnap
245	ts.SnapAligned = src.SnapAligned
246	ts.Crc64 = src.Crc64
247}
248
249// Equal returns whether `ts` and `other` compare equal.
250func (ts *TsVbuuid) Equal(other *TsVbuuid) bool {
251	return ts.Equal2(other, true)
252}
253
254// Equal returns whether `ts` and `other` compare equal.
255func (ts *TsVbuuid) Equal2(other *TsVbuuid, compareSnapshot bool) bool {
256	if ts != nil && other == nil ||
257		ts == nil && other != nil {
258		return false
259	}
260
261	if ts == nil && other == nil {
262		return true
263	}
264
265	if len(ts.Seqnos) != len(other.Seqnos) {
266		return false
267	}
268
269	for i, seqno := range ts.Seqnos {
270		if other.Seqnos[i] != seqno {
271			return false
272		}
273	}
274
275	for i, vbuuid := range ts.Vbuuids {
276		if other.Vbuuids[i] != vbuuid {
277			return false
278		}
279	}
280
281	if compareSnapshot {
282		for i, sn := range ts.Snapshots {
283			if other.Snapshots[i][0] != sn[0] {
284				return false
285			}
286
287			if other.Snapshots[i][1] != sn[1] {
288				return false
289			}
290		}
291	}
292
293	return true
294}
295
296// Equal returns whether `ts` is equal or greater than `other`
297func (ts *TsVbuuid) EqualOrGreater(other *TsVbuuid) bool {
298	if ts != nil && other == nil ||
299		ts == nil && other != nil {
300		return false
301	}
302
303	if ts == nil && other == nil {
304		return true
305	}
306
307	if len(ts.Seqnos) != len(other.Seqnos) {
308		return false
309	}
310
311	for i, seqno := range ts.Seqnos {
312		if other.Seqnos[i] > seqno {
313			return false
314		}
315	}
316
317	for i, vbuuid := range ts.Vbuuids {
318		if other.Vbuuids[i] != vbuuid {
319			return false
320		}
321	}
322
323	return true
324}
325
326// Clone of TsVbuuid
327func (ts *TsVbuuid) Clone() *TsVbuuid {
328
329	other := NewTsVbuuid(ts.Bucket, len(ts.Seqnos))
330	for i, seqno := range ts.Seqnos {
331		other.Seqnos[i] = seqno
332	}
333
334	for i, vbuuid := range ts.Vbuuids {
335		other.Vbuuids[i] = vbuuid
336	}
337
338	for i, sn := range ts.Snapshots {
339		other.Snapshots[i][0] = sn[0]
340		other.Snapshots[i][1] = sn[1]
341	}
342	other.Crc64 = ts.Crc64
343
344	return other
345}
346
347// Convert into a human readable format
348func (ts *TsVbuuid) String() string {
349	var buf bytes.Buffer
350	vbnos := ts.GetVbnos()
351	fmsg := "bucket: %v, vbuckets: %v Crc64: %v snapType %v -\n"
352	buf.WriteString(fmt.Sprintf(fmsg, ts.Bucket, len(vbnos), ts.Crc64, ts.SnapType))
353	fmsg = "    {vbno, vbuuid, seqno, snapshot-start, snapshot-end}\n"
354	buf.WriteString(fmt.Sprintf(fmsg))
355	for _, v := range vbnos {
356		start, end := ts.Snapshots[v][0], ts.Snapshots[v][1]
357		buf.WriteString(fmt.Sprintf("    {%5d %16x %10d %10d %10d}\n",
358			v, ts.Vbuuids[v], ts.Seqnos[v], start, end))
359	}
360	return buf.String()
361}
362
363// Convert the difference between two timestamps to human readable format
364func (ts *TsVbuuid) Diff(other *TsVbuuid) string {
365
366	var buf bytes.Buffer
367	if ts.Equal(other) {
368		buf.WriteString("Timestamps are equal\n")
369		return buf.String()
370	}
371
372	if other == nil {
373		buf.WriteString("This timestamp:\n")
374		buf.WriteString(ts.String())
375		buf.WriteString("Other timestamp is nil\n")
376		return buf.String()
377	}
378
379	if len(other.Seqnos) != len(ts.Seqnos) {
380		logging.Debugf("Two timestamps contain different number of vbuckets\n")
381		buf.WriteString("This timestamp:\n")
382		buf.WriteString(ts.String())
383		buf.WriteString("Other timestamp:\n")
384		buf.WriteString(other.String())
385		return buf.String()
386	}
387
388	for i := range ts.Seqnos {
389		if ts.Seqnos[i] != other.Seqnos[i] || ts.Vbuuids[i] != other.Vbuuids[i] ||
390			ts.Snapshots[i][0] != other.Snapshots[i][0] || ts.Snapshots[i][1] != other.Snapshots[i][1] {
391			buf.WriteString(fmt.Sprintf("This timestamp: bucket %s, vb = %d, vbuuid = %d, seqno = %d, snapshot[0] = %d, snapshot[1] = %d\n",
392				ts.Bucket, i, ts.Vbuuids[i], ts.Seqnos[i], ts.Snapshots[0], ts.Snapshots[1]))
393			buf.WriteString(fmt.Sprintf("Other timestamp: bucket %s, vb = %d, vbuuid = %d, seqno = %d, snapshot[0] = %d, snapshot[1] = %d\n",
394				other.Bucket, i, other.Vbuuids[i], other.Seqnos[i], other.Snapshots[0], other.Snapshots[1]))
395		}
396	}
397
398	return buf.String()
399}
400
401//check if seqnum of all vbuckets are aligned with the snapshot end
402func (ts *TsVbuuid) CheckSnapAligned() bool {
403
404	// Nil timestamp can be considered equivalent to all vbs with seqno=0 (empty bucket)
405	if ts == nil {
406		return true
407	}
408
409	for i, s := range ts.Snapshots {
410		if ts.Seqnos[i] != s[1] {
411			return false
412		}
413	}
414	return true
415
416}
417
418//IsSnapAligned returns the value of SnapAligned flag
419func (ts *TsVbuuid) IsSnapAligned() bool {
420
421	if ts == nil {
422		return true
423	}
424
425	return ts.SnapAligned
426}
427
428//SetSnapAligned sets the SnapAligned flag
429func (ts *TsVbuuid) SetSnapAligned(snapAligned bool) {
430	ts.SnapAligned = snapAligned
431}
432