1// On the wire transport for custom packets packet.
2//
3//      { uint32(packetlen), uint16(flags), []byte(mutation) }
4//
5//      where, packetlen == len(mutation)
6//
7// `flags` used for specifying encoding format, compression etc.
8
9package transport
10
11import "errors"
12import "net"
13import "github.com/couchbase/indexing/secondary/logging"
14
15// error codes
16
17// ErrorPacketWrite is error writing packet on the wire.
18var ErrorPacketWrite = errors.New("transport.packetWrite")
19
20// ErrorPacketOverflow is input packet overflows maximum configured packet size.
21var ErrorPacketOverflow = errors.New("transport.packetOverflow")
22
23// ErrorEncoderUnknown for unknown encoder.
24var ErrorEncoderUnknown = errors.New("transport.encoderUnknown")
25
26// ErrorDecoderUnknown for unknown decoder.
27var ErrorDecoderUnknown = errors.New("transport.decoderUnknown")
28
29//ErrorChecksumMismatch for mismatch in checksum
30var ErrorChecksumMismatch = errors.New("transport.checksumUnknown")
31
32// packet field offset and size in bytes
33const (
34	pktLenOffset   int = 0
35	pktLenSize     int = 4
36	pktFlagOffset  int = pktLenOffset + pktLenSize
37	pktFlagSize    int = 2
38	pktDataOffset  int = pktFlagOffset + pktFlagSize
39	MaxSendBufSize int = pktLenSize + pktFlagSize
40)
41
42type transporter interface { // facilitates unit testing
43	Read(b []byte) (n int, err error)
44	Write(b []byte) (n int, err error)
45	LocalAddr() net.Addr
46	RemoteAddr() net.Addr
47}
48
49// TransportPacket to send and receive mutation packets between router
50// and downstream client.
51type TransportPacket struct {
52	flags    TransportFlag
53	buf      []byte
54	encoders map[byte]Encoder
55	decoders map[byte]Decoder
56}
57
58// Encoder callback
59type Encoder func(payload interface{}) (data []byte, err error)
60
61// Decoder callback
62type Decoder func(data []byte) (payload interface{}, err error)
63
64// NewTransportPacket creates a new TransportPacket and return its
65// reference. Typically application should call this once and reuse it while
66// sending or receiving a sequence of packets, so that same buffer can be
67// reused.
68//
69// maxlen, maximum size of internal buffer used to marshal and unmarshal
70//         packets.
71// flags,  specifying encoding and compression.
72func NewTransportPacket(maxlen int, flags TransportFlag) *TransportPacket {
73	pkt := &TransportPacket{
74		flags:    flags,
75		buf:      make([]byte, maxlen),
76		encoders: make(map[byte]Encoder),
77		decoders: make(map[byte]Decoder),
78	}
79	pkt.encoders[EncodingNone] = nil
80	pkt.decoders[EncodingNone] = nil
81	return pkt
82}
83
84// SetEncoder callback function for `type`.
85func (pkt *TransportPacket) SetEncoder(typ byte, callb Encoder) *TransportPacket {
86	pkt.encoders[typ] = callb
87	return pkt
88}
89
90// SetDecoder callback function for `type`.
91func (pkt *TransportPacket) SetDecoder(typ byte, callb Decoder) *TransportPacket {
92	pkt.decoders[typ] = callb
93	return pkt
94}
95
96// Send payload to the other end using sufficient encoding and compression.
97func (pkt *TransportPacket) Send(conn transporter, payload interface{}) (err error) {
98	var data []byte
99
100	// encode
101	if data, err = pkt.encode(payload); err != nil {
102		return
103	}
104	// compress
105	if data, err = pkt.compress(data); err != nil {
106		return
107	}
108
109	err = Send(conn, pkt.buf, pkt.flags, data, true)
110	return
111}
112
113// Receive payload from remote, decode, decompress the payload and return the
114// payload.
115func (pkt *TransportPacket) Receive(conn transporter) (payload interface{}, err error) {
116	var data []byte
117	var flags TransportFlag
118
119	flags, data, err = Receive(conn, pkt.buf)
120	if err != nil {
121		return
122	}
123
124	// Special packet to indicate end response
125	if len(data) == 0 && flags == 0 {
126		return nil, nil
127	}
128
129	pkt.flags = flags
130
131	laddr, raddr := conn.LocalAddr(), conn.RemoteAddr()
132	logging.Tracef("read %v bytes on connection %v<-%v", len(data), laddr, raddr)
133
134	// de-compression
135	if data, err = pkt.decompress(data); err != nil {
136		return
137	}
138	// decoding
139	if payload, err = pkt.decode(data); err != nil {
140		return
141	}
142	return
143}
144
145// encode payload to array of bytes, if callback was specified `nil` for a
146// valid type then return `payload` as `data`.
147func (pkt *TransportPacket) encode(payload interface{}) (data []byte, err error) {
148	typ := pkt.flags.GetEncoding()
149	if callb, ok := pkt.encoders[typ]; ok {
150		return callb(payload)
151	} else if callb == nil {
152		return payload.([]byte), nil
153	}
154	return nil, ErrorEncoderUnknown
155}
156
157// decode array of bytes back to payload, if callback was specified `nil` for
158// a valid type then return `data` as `payload`.
159func (pkt *TransportPacket) decode(data []byte) (payload interface{}, err error) {
160	typ := pkt.flags.GetEncoding()
161	if callb, ok := pkt.decoders[typ]; ok && callb != nil {
162		return callb(data)
163	}
164	return nil, ErrorDecoderUnknown
165}
166
167// compress array of bytes.
168func (pkt *TransportPacket) compress(big []byte) (small []byte, err error) {
169	switch pkt.flags.GetCompression() {
170	case CompressionNone:
171		small = big
172	}
173	return
174}
175
176// decompress array of bytes.
177func (pkt *TransportPacket) decompress(small []byte) (big []byte, err error) {
178	switch pkt.flags.GetCompression() {
179	case CompressionNone:
180		big = small
181	}
182	return
183}
184
185// read len(buf) bytes from `conn`.
186func fullRead(conn transporter, buf []byte) error {
187	size, start := 0, 0
188	for size < len(buf) {
189		n, err := conn.Read(buf[start:])
190		if err != nil {
191			return err
192		}
193		size += n
194		start += n
195	}
196	return nil
197}
198