1// - Transport independent library for mutation streaming.
2// - Provide APIs to create KeyVersions.
3//
4// TODO: use slab allocated or memory pool to manage KeyVersions
5// TODO: change KeyVersions command to a specific type.
6
7package common
8
9import (
10	"bytes"
11	"encoding/binary"
12	"fmt"
13)
14
15// types of payload
16const (
17	PayloadKeyVersions byte = iota + 1
18	PayloadVbmap
19)
20
21// List of possible mutation commands. Mutation messages are broadly divided
22// into data and control messages. The division is based on the command field.
23const (
24	Upsert         byte = iota + 1 // data command
25	Deletion                       // data command
26	UpsertDeletion                 // data command
27	Sync                           // control command
28	DropData                       // control command
29	StreamBegin                    // control command
30	StreamEnd                      // control command
31	Snapshot                       // control command
32)
33
34type ProjectorVersion byte
35
36//Projector Version
37const (
38	ProjVer_5_1_0 ProjectorVersion = iota + 1
39	ProjVer_5_1_1
40	ProjVer_5_5_0
41)
42
43// Payload either carries `vbmap` or `vbs`.
44type Payload struct {
45	Payltyp byte
46	Vbmap   *VbConnectionMap
47	Vbs     []*VbKeyVersions // for N number of vbuckets
48}
49
50// StreamID is unique id for a vbucket across buckets.
51func StreamID(bucket string, vbno uint16) string {
52	return bucket + fmt.Sprintf("#%v", vbno)
53}
54
55// NewStreamPayload returns a reference to payload, `nVb` provides the maximum
56// number of vbuckets that can be carried by a payload.
57func NewStreamPayload(payltyp byte, nVb int) *Payload {
58	p := &Payload{
59		Payltyp: payltyp,
60		Vbs:     make([]*VbKeyVersions, 0, nVb),
61	}
62	return p
63}
64
65// Reset the payload structure for next transport.
66func (p *Payload) Reset(payltyp byte) {
67	p.Payltyp = payltyp
68	p.Vbmap = nil
69	p.Vbs = p.Vbs[:0]
70}
71
72// AddVbKeyVersions add a VbKeyVersions as payload, one or more VbKeyVersions
73// can be added before transport.
74func (p *Payload) AddVbKeyVersions(vb *VbKeyVersions) (err error) {
75	if vb == nil || p.Payltyp != PayloadKeyVersions {
76		return ErrorUnexpectedPayload
77	}
78	p.Vbs = append(p.Vbs, vb)
79	return nil
80}
81
82// SetVbmap set vbmap as payload.
83func (p *Payload) SetVbmap(bucket string, vbnos []uint16, vbuuids []uint64) error {
84	if p.Payltyp != PayloadVbmap {
85		return ErrorUnexpectedPayload
86	}
87	p.Vbmap = &VbConnectionMap{
88		Bucket:   bucket,
89		Vbuckets: vbnos,
90		Vbuuids:  vbuuids,
91	}
92	return nil
93}
94
95// VbConnectionMap specifies list of vbuckets and current vbuuids for each
96// vbucket.
97type VbConnectionMap struct {
98	Bucket   string
99	Vbuckets []uint16
100	Vbuuids  []uint64
101}
102
103// Equal compares to VbConnectionMap objects.
104func (vbmap *VbConnectionMap) Equal(other *VbConnectionMap) bool {
105	if vbmap.Bucket != other.Bucket {
106		return false
107	}
108	if len(vbmap.Vbuckets) != len(other.Vbuckets) ||
109		len(vbmap.Vbuuids) != len(other.Vbuuids) {
110		return false
111	}
112	for i, vbno := range vbmap.Vbuckets {
113		if vbno != other.Vbuckets[i] || vbmap.Vbuuids[i] != other.Vbuuids[i] {
114			return false
115		}
116	}
117	return true
118}
119
120// GetVbuuid returns vbuuid for specified vbucket-number from VbConnectionMap
121// object.
122func (vbmap *VbConnectionMap) GetVbuuid(vbno uint16) (uint64, error) {
123	for i, num := range vbmap.Vbuckets {
124		if num == vbno {
125			return vbmap.Vbuuids[i], nil
126		}
127	}
128	return 0, ErrorNotMyVbucket
129}
130
131// VbKeyVersions carries per vbucket key-versions for one or more mutations.
132type VbKeyVersions struct {
133	Bucket  string
134	Vbucket uint16         // vbucket number
135	Vbuuid  uint64         // unique id to detect branch history
136	Kvs     []*KeyVersions // N number of mutations
137	Uuid    string
138	ProjVer ProjectorVersion
139}
140
141// NewVbKeyVersions return a reference to a single vbucket payload
142func NewVbKeyVersions(bucket string, vbno uint16, vbuuid uint64, maxMutations int) *VbKeyVersions {
143	vb := &VbKeyVersions{Bucket: bucket, Vbucket: vbno, Vbuuid: vbuuid, ProjVer: ProjVer_5_5_0}
144	vb.Kvs = make([]*KeyVersions, 0, maxMutations)
145	vb.Uuid = StreamID(bucket, vbno)
146	return vb
147}
148
149// AddKeyVersions will add KeyVersions for a single mutation.
150func (vb *VbKeyVersions) AddKeyVersions(kv *KeyVersions) error {
151	vb.Kvs = append(vb.Kvs, kv)
152	return nil
153}
154
155// Equal compare equality of two VbKeyVersions object.
156func (vb *VbKeyVersions) Equal(other *VbKeyVersions) bool {
157	if vb.Vbucket != other.Vbucket ||
158		vb.Vbuuid != other.Vbuuid {
159		return false
160	}
161	if len(vb.Kvs) != len(other.Kvs) {
162		return false
163	}
164	for i, kv := range vb.Kvs {
165		if kv.Equal(other.Kvs[i]) == false {
166			return false
167		}
168	}
169	return true
170}
171
172// Free this object.
173func (vb *VbKeyVersions) Free() {
174	for _, kv := range vb.Kvs {
175		kv.Free()
176	}
177	vb.Kvs = vb.Kvs[:0]
178	// TODO: give `vb` back to pool
179}
180
181// FreeKeyVersions free mutations contained by this object.
182func (vb *VbKeyVersions) FreeKeyVersions() {
183	for _, kv := range vb.Kvs {
184		kv.Free()
185	}
186	vb.Kvs = vb.Kvs[:0]
187}
188
189// KeyVersions for a single mutation from KV for a subset of index.
190type KeyVersions struct {
191	Seqno     uint64   // vbucket sequence number for this mutation
192	Docid     []byte   // primary document id
193	Uuids     []uint64 // list of unique ids, like index-ids
194	Commands  []byte   // list of commands for each index
195	Keys      [][]byte // list of key-versions for each index
196	Oldkeys   [][]byte // previous key-versions, if available
197	Partnkeys [][]byte // partition key for each key-version
198	Ctime     int64
199}
200
201// NewKeyVersions return a reference KeyVersions for a single mutation.
202func NewKeyVersions(seqno uint64, docid []byte, maxCount, ctime int64) *KeyVersions {
203	kv := &KeyVersions{Seqno: seqno}
204	if docid != nil {
205		kv.Docid = make([]byte, len(docid))
206		copy(kv.Docid, docid)
207	}
208
209	kv.Uuids = make([]uint64, 0, maxCount)
210	kv.Commands = make([]byte, 0, maxCount)
211	kv.Keys = make([][]byte, 0, maxCount)
212	kv.Oldkeys = make([][]byte, 0, maxCount)
213	kv.Partnkeys = make([][]byte, 0, maxCount)
214	kv.Ctime = ctime
215	return kv
216}
217
218// addKey will add key-version for a single index.
219func (kv *KeyVersions) addKey(uuid uint64, command byte, key, oldkey, pkey []byte) {
220	kv.Uuids = append(kv.Uuids, uuid)
221	kv.Commands = append(kv.Commands, command)
222	kv.Keys = append(kv.Keys, key)
223	kv.Oldkeys = append(kv.Oldkeys, oldkey)
224	kv.Partnkeys = append(kv.Partnkeys, pkey)
225}
226
227// Equal compares for equality of two KeyVersions object.
228func (kv *KeyVersions) Equal(other *KeyVersions) bool {
229	if kv.Seqno != other.Seqno || bytes.Compare(kv.Docid, other.Docid) != 0 {
230		return false
231	}
232	if len(kv.Uuids) != len(other.Uuids) {
233		return false
234	}
235	for i, uuid := range kv.Uuids {
236		if uuid != other.Uuids[i] ||
237			kv.Commands[i] != other.Commands[i] ||
238			bytes.Compare(kv.Keys[i], other.Keys[i]) != 0 ||
239			bytes.Compare(kv.Oldkeys[i], other.Oldkeys[i]) != 0 ||
240			bytes.Compare(kv.Partnkeys[i], other.Partnkeys[i]) != 0 {
241			return false
242		}
243	}
244	return true
245}
246
247// Free this object.
248func (kv *KeyVersions) Free() {
249	// TODO: give `kv` back to pool
250}
251
252// Length number of key-versions are stored.
253func (kv *KeyVersions) Length() int {
254	return len(kv.Uuids)
255}
256
257// AddUpsert add a new keyversion for same OpMutation.
258func (kv *KeyVersions) AddUpsert(uuid uint64, key, oldkey, pkey []byte) {
259	kv.addKey(uuid, Upsert, key, oldkey, pkey)
260}
261
262// AddDeletion add a new keyversion for same OpDeletion.
263func (kv *KeyVersions) AddDeletion(uuid uint64, oldkey, pkey []byte) {
264	kv.addKey(uuid, Deletion, nil, oldkey, pkey)
265}
266
267// AddUpsertDeletion add a keyversion command to delete old entry.
268func (kv *KeyVersions) AddUpsertDeletion(uuid uint64, oldkey, pkey []byte) {
269	kv.addKey(uuid, UpsertDeletion, nil, oldkey, pkey)
270}
271
272// AddSync add Sync command for vbucket heartbeat.
273func (kv *KeyVersions) AddSync() {
274	kv.addKey(0, Sync, nil, nil, nil)
275}
276
277// AddDropData add DropData command for trigger downstream catchup.
278func (kv *KeyVersions) AddDropData() {
279	kv.addKey(0, DropData, nil, nil, nil)
280}
281
282// AddStreamBegin add StreamBegin command for a new vbucket.
283func (kv *KeyVersions) AddStreamBegin() {
284	kv.addKey(0, StreamBegin, nil, nil, nil)
285}
286
287// AddStreamEnd add StreamEnd command for a vbucket shutdown.
288func (kv *KeyVersions) AddStreamEnd() {
289	kv.addKey(0, StreamEnd, nil, nil, nil)
290}
291
292// AddSnapshot add Snapshot command for a vbucket shutdown.
293// * type is sent via uuid field
294// * start and end values are big-ending encoded to as key and old-key
295func (kv *KeyVersions) AddSnapshot(typ uint32, start, end uint64) {
296	var key, okey [8]byte
297	binary.BigEndian.PutUint64(key[:8], start)
298	binary.BigEndian.PutUint64(okey[:8], end)
299	kv.addKey(uint64(typ), Snapshot, key[:8], okey[:8], nil)
300}
301
302func (kv *KeyVersions) String() string {
303	s := fmt.Sprintf("`%s` - Seqno:%v\n", string(kv.Docid), kv.Seqno)
304	for i, uuid := range kv.Uuids {
305		s += fmt.Sprintf("    %v Cmd(%v) `%s`",
306			uuid, kv.Commands[i], string(kv.Keys[i]))
307	}
308	return s
309}
310
311// DataportKeyVersions accepted by this endpoint.
312type DataportKeyVersions struct {
313	Bucket string
314	Vbno   uint16
315	Vbuuid uint64
316	Kv     *KeyVersions
317}
318