1// Protobuf encoding scheme for payload
2
3package protobuf
4
5import "errors"
6
7import c "github.com/couchbase/indexing/secondary/common"
8import "github.com/golang/protobuf/proto"
9
10// ErrorTransportVersion
11var ErrorTransportVersion = errors.New("dataport.transportVersion")
12
13// ErrorMissingPayload
14var ErrorMissingPayload = errors.New("dataport.missingPlayload")
15
16// ProtobufEncode encode payload message into protobuf array of bytes. Return
17// `data` can be transported to the other end and decoded back to Payload
18// message.
19func ProtobufEncode(payload interface{}) (data []byte, err error) {
20	return ProtobufEncodeInBuf(payload, nil)
21}
22
23func ProtobufEncodeInBuf(payload interface{}, buf []byte) (data []byte, err error) {
24	pl := &QueryPayload{Version: proto.Uint32(uint32(ProtobufVersion()))}
25	switch val := payload.(type) {
26	// request
27	case *StatisticsRequest:
28		pl.StatisticsRequest = val
29
30	case *CountRequest:
31		pl.CountRequest = val
32
33	case *ScanRequest:
34		pl.ScanRequest = val
35
36	case *ScanAllRequest:
37		pl.ScanAllRequest = val
38
39	case *EndStreamRequest:
40		pl.EndStream = val
41
42	// response
43	case *StatisticsResponse:
44		pl.Statistics = val
45
46	case *CountResponse:
47		pl.CountResponse = val
48
49	case *ResponseStream:
50		pl.Stream = val
51
52	case *StreamEndResponse:
53		pl.StreamEnd = val
54
55	case *HeloRequest:
56		pl.HeloRequest = val
57
58	case *HeloResponse:
59		pl.HeloResponse = val
60
61	default:
62		return nil, ErrorMissingPayload
63	}
64
65	p := proto.NewBuffer(buf)
66	err = p.Marshal(pl)
67	data = p.Bytes()
68	return
69}
70
71// ProtobufDecode complements ProtobufEncode() API. `data` returned by encode
72// is converted back to protobuf message structure.
73func ProtobufDecode(data []byte) (value interface{}, err error) {
74	pl := &QueryPayload{}
75	if err = proto.Unmarshal(data, pl); err != nil {
76		return nil, err
77	}
78	currVer := ProtobufVersion()
79	if ver := byte(pl.GetVersion()); ver == currVer {
80		// do nothing
81	} else if ver > currVer {
82		return nil, ErrorTransportVersion
83	} else {
84		pl = protoMsgConvertor[ver](pl)
85	}
86
87	// request
88	if val := pl.GetStatisticsRequest(); val != nil {
89		return val, nil
90	} else if val := pl.GetCountRequest(); val != nil {
91		return val, nil
92	} else if val := pl.GetScanRequest(); val != nil {
93		return val, nil
94	} else if val := pl.GetScanAllRequest(); val != nil {
95		return val, nil
96	} else if val := pl.GetEndStream(); val != nil {
97		return val, nil
98		// response
99	} else if val := pl.GetStatistics(); val != nil {
100		return val, nil
101	} else if val := pl.GetStream(); val != nil {
102		return val, nil
103	} else if val := pl.GetCountResponse(); val != nil {
104		return val, nil
105	} else if val := pl.GetEndStream(); val != nil {
106		return val, nil
107	} else if val := pl.GetStreamEnd(); val != nil {
108		return val, nil
109	} else if val := pl.GetHeloRequest(); val != nil {
110		return val, nil
111	} else if val := pl.GetHeloResponse(); val != nil {
112		return val, nil
113	}
114	return nil, ErrorMissingPayload
115}
116
117// ProtobufVersion return version of protobuf schema used in packet transport.
118func ProtobufVersion() byte {
119	return (c.ProtobufDataPathMajorNum << 4) | c.ProtobufDataPathMinorNum
120}
121
122var protoMsgConvertor = map[byte]func(*QueryPayload) *QueryPayload{}
123