1// go implementation of dcp client.
2// See https://github.com/couchbaselabs/cbupr/blob/master/transport-spec.md
3
4package memcached
5
6import (
7	"bytes"
8	"encoding/binary"
9	"errors"
10	"fmt"
11	"io"
12	"strconv"
13	"time"
14
15	"github.com/couchbase/indexing/secondary/dcp/transport"
16	"github.com/couchbase/indexing/secondary/logging"
17)
18
19const dcpMutationExtraLen = 16
20const bufferAckThreshold = 0.2
21const opaqueOpen = 0xBEAF0001
22const opaqueFailover = 0xDEADBEEF
23const opaqueGetseqno = 0xDEADBEEF
24const openConnFlag = uint32(0x1)
25const includeXATTR = uint32(0x4)
26const dcpJSON = uint8(0x1)
27const dcpXATTR = uint8(0x4)
28
29// error codes
30var ErrorInvalidLog = errors.New("couchbase.errorInvalidLog")
31
32// ErrorConnection
33var ErrorConnection = errors.New("dcp.connection")
34
35// ErrorInvalidFeed
36var ErrorInvalidFeed = errors.New("dcp.invalidFeed")
37
38// DcpFeed represents an DCP feed. A feed contains a connection to a single
39// host and multiple vBuckets
40type DcpFeed struct {
41	conn      *Client // connection to DCP producer
42	name      string
43	outch     chan<- *DcpEvent      // Exported channel for receiving DCP events
44	vbstreams map[uint16]*DcpStream // vb->stream mapping
45	// genserver
46	reqch     chan []interface{}
47	finch     chan bool
48	logPrefix string
49	// stats
50	toAckBytes  uint32   // bytes client has read
51	maxAckBytes uint32   // Max buffer control ack bytes
52	stats       DcpStats // Stats for dcp client
53	dcplatency  *Average
54}
55
56// NewDcpFeed creates a new DCP Feed.
57func NewDcpFeed(
58	mc *Client, name string, outch chan<- *DcpEvent,
59	opaque uint16, config map[string]interface{}) (*DcpFeed, error) {
60
61	genChanSize := config["genChanSize"].(int)
62	dataChanSize := config["dataChanSize"].(int)
63	feed := &DcpFeed{
64		name:      name,
65		outch:     outch,
66		vbstreams: make(map[uint16]*DcpStream),
67		reqch:     make(chan []interface{}, genChanSize),
68		finch:     make(chan bool),
69		// TODO: would be nice to add host-addr as part of prefix.
70		logPrefix:  fmt.Sprintf("DCPT[%s]", name),
71		dcplatency: &Average{},
72	}
73
74	mc.Hijack()
75	feed.conn = mc
76	rcvch := make(chan []interface{}, dataChanSize)
77	go feed.genServer(opaque, feed.reqch, feed.finch, rcvch, config)
78	go feed.doReceive(rcvch, feed.finch, mc)
79	logging.Infof("%v ##%x feed started ...", feed.logPrefix, opaque)
80	return feed, nil
81}
82
83func (feed *DcpFeed) Name() string {
84	return feed.name
85}
86
87// DcpOpen to connect with a DCP producer.
88// Name: name of te DCP connection
89// sequence: sequence number for the connection
90// bufsize: max size of the application
91func (feed *DcpFeed) DcpOpen(
92	name string, sequence, flags, bufsize uint32, opaque uint16) error {
93
94	respch := make(chan []interface{}, 1)
95	cmd := []interface{}{
96		dfCmdOpen, name, sequence, flags, bufsize, opaque, respch,
97	}
98	resp, err := failsafeOp(feed.reqch, respch, cmd, feed.finch)
99	return opError(err, resp, 0)
100}
101
102// DcpGetFailoverLog for given list of vbuckets.
103func (feed *DcpFeed) DcpGetFailoverLog(
104	opaque uint16, vblist []uint16) (map[uint16]*FailoverLog, error) {
105
106	respch := make(chan []interface{}, 1)
107	cmd := []interface{}{dfCmdGetFailoverlog, opaque, vblist, respch}
108	resp, err := failsafeOp(feed.reqch, respch, cmd, feed.finch)
109	if err = opError(err, resp, 1); err != nil {
110		return nil, err
111	}
112	return resp[0].(map[uint16]*FailoverLog), err
113}
114
115// DcpGetSeqnos for vbuckets hosted by this node.
116func (feed *DcpFeed) DcpGetSeqnos() (map[uint16]uint64, error) {
117	respch := make(chan []interface{}, 1)
118	cmd := []interface{}{dfCmdGetSeqnos, respch}
119	resp, err := failsafeOp(feed.reqch, respch, cmd, feed.finch)
120	if err = opError(err, resp, 1); err != nil {
121		return nil, err
122	}
123	return resp[0].(map[uint16]uint64), nil
124}
125
126// DcpRequestStream for a single vbucket.
127func (feed *DcpFeed) DcpRequestStream(vbno, opaqueMSB uint16, flags uint32,
128	vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error {
129
130	respch := make(chan []interface{}, 1)
131	cmd := []interface{}{
132		dfCmdRequestStream, vbno, opaqueMSB, flags, vuuid,
133		startSequence, endSequence, snapStart, snapEnd, respch}
134	resp, err := failsafeOp(feed.reqch, respch, cmd, feed.finch)
135	return opError(err, resp, 0)
136}
137
138// CloseStream for specified vbucket.
139func (feed *DcpFeed) CloseStream(vbno, opaqueMSB uint16) error {
140	respch := make(chan []interface{}, 1)
141	cmd := []interface{}{dfCmdCloseStream, vbno, opaqueMSB, respch}
142	resp, err := failsafeOp(feed.reqch, respch, cmd, feed.finch)
143	return opError(err, resp, 0)
144}
145
146// Close this DcpFeed.
147func (feed *DcpFeed) Close() error {
148	respch := make(chan []interface{}, 1)
149	cmd := []interface{}{dfCmdClose, respch}
150	_, err := failsafeOp(feed.reqch, respch, cmd, feed.finch)
151	return err
152}
153
154const (
155	dfCmdOpen byte = iota + 1
156	dfCmdGetFailoverlog
157	dfCmdGetSeqnos
158	dfCmdRequestStream
159	dfCmdCloseStream
160	dfCmdClose
161)
162
163func (feed *DcpFeed) genServer(
164	opaque uint16, reqch chan []interface{}, finch chan bool,
165	rcvch chan []interface{},
166	config map[string]interface{}) {
167
168	defer func() { // panic safe
169		if r := recover(); r != nil {
170			logging.Errorf("%v ##%x crashed: %v\n", feed.logPrefix, opaque, r)
171			logging.Errorf("%s", logging.StackTrace())
172			feed.sendStreamEnd(feed.outch)
173		}
174		close(feed.finch)
175		feed.conn.Close()
176		feed.conn = nil
177		logging.Infof("%v ##%x ... stopped\n", feed.logPrefix, opaque)
178	}()
179
180	prefix := feed.logPrefix
181	latencyTick := int64(60 * 1000) // in milli-seconds
182	if val, ok := config["latencyTick"]; ok && val != nil {
183		latencyTick = int64(val.(int)) // in milli-seconds
184	}
185	latencyTm := time.NewTicker(time.Duration(latencyTick) * time.Millisecond)
186	defer func() {
187		latencyTm.Stop()
188	}()
189
190loop:
191	for {
192		select {
193		case <-latencyTm.C:
194			fmsg := "%v dcp latency stats %v\n"
195			logging.Infof(fmsg, feed.logPrefix, feed.dcplatency)
196			fmsg = "%v dcp stats %v\n"
197			logging.Infof(fmsg, feed.logPrefix, feed.stats.String(feed))
198			if feed.stats.TotalSpurious > 0 {
199				fmsg = "%v detected spurious messages for inactive streams\n"
200				logging.Fatalf(fmsg, feed.logPrefix)
201			}
202
203		case msg := <-reqch:
204			cmd := msg[0].(byte)
205			switch cmd {
206			case dfCmdOpen:
207				name, sequence := msg[1].(string), msg[2].(uint32)
208				flags := msg[3].(uint32)
209				bufsize, opaque := msg[4].(uint32), msg[5].(uint16)
210				respch := msg[6].(chan []interface{})
211				err := feed.doDcpOpen(
212					name, sequence, flags, bufsize, opaque, rcvch)
213				respch <- []interface{}{err}
214
215			case dfCmdGetFailoverlog:
216				opaque := msg[1].(uint16)
217				vblist, respch := msg[2].([]uint16), msg[3].(chan []interface{})
218				if len(feed.vbstreams) > 0 {
219					fmsg := "%v %##x active streams in doDcpGetFailoverLog"
220					logging.Errorf(fmsg, prefix, opaque)
221					respch <- []interface{}{nil, ErrorInvalidFeed}
222				}
223				flog, err := feed.doDcpGetFailoverLog(opaque, vblist, rcvch)
224				respch <- []interface{}{flog, err}
225
226			case dfCmdGetSeqnos:
227				respch := msg[1].(chan []interface{})
228				seqnos, err := feed.doDcpGetSeqnos(rcvch)
229				respch <- []interface{}{seqnos, err}
230
231			case dfCmdRequestStream:
232				vbno, opaqueMSB := msg[1].(uint16), msg[2].(uint16)
233				flags, vuuid := msg[3].(uint32), msg[4].(uint64)
234				startSequence, endSequence := msg[5].(uint64), msg[6].(uint64)
235				snapStart, snapEnd := msg[7].(uint64), msg[8].(uint64)
236				respch := msg[9].(chan []interface{})
237				err := feed.doDcpRequestStream(
238					vbno, opaqueMSB, flags, vuuid,
239					startSequence, endSequence, snapStart, snapEnd)
240				respch <- []interface{}{err}
241
242			case dfCmdCloseStream:
243				vbno, opaqueMSB := msg[1].(uint16), msg[2].(uint16)
244				respch := msg[3].(chan []interface{})
245				err := feed.doDcpCloseStream(vbno, opaqueMSB)
246				respch <- []interface{}{err}
247
248			case dfCmdClose:
249				feed.sendStreamEnd(feed.outch)
250				respch := msg[1].(chan []interface{})
251				respch <- []interface{}{nil}
252				break loop
253			}
254
255		case resp, ok := <-rcvch:
256			if !ok {
257				feed.sendStreamEnd(feed.outch)
258				break loop
259			}
260			pkt, bytes := resp[0].(*transport.MCRequest), resp[1].(int)
261			switch feed.handlePacket(pkt, bytes) {
262			case "exit":
263				feed.sendStreamEnd(feed.outch)
264				break loop
265			}
266		}
267	}
268}
269
270func (feed *DcpFeed) isClosed() bool {
271	select {
272	case <-feed.finch:
273		return true
274	default:
275	}
276	return false
277}
278
279func (feed *DcpFeed) handlePacket(
280	pkt *transport.MCRequest, bytes int) string {
281
282	var event *DcpEvent
283	feed.stats.TotalBytes += uint64(bytes)
284	res := &transport.MCResponse{
285		Opcode: pkt.Opcode,
286		Cas:    pkt.Cas,
287		Opaque: pkt.Opaque,
288		Status: transport.Status(pkt.VBucket),
289		Extras: pkt.Extras,
290		Key:    pkt.Key,
291		Body:   pkt.Body,
292	}
293	vb := vbOpaque(pkt.Opaque)
294
295	sendAck := false
296	prefix := feed.logPrefix
297	stream := feed.vbstreams[vb]
298	if stream == nil {
299		feed.stats.TotalSpurious++
300		// log first 10000 spurious messages
301		logok := feed.stats.TotalSpurious < 10000
302		// then log 1 spurious message for every 1000.
303		logok = logok || (feed.stats.TotalSpurious%1000) == 1
304		if logok {
305			fmsg := "%v spurious %v for %d: %#v\n"
306			arg1 := logging.TagUD(pkt)
307			logging.Fatalf(fmsg, prefix, pkt.Opcode, vb, arg1)
308		}
309		return "ok" // yeah it not _my_ mistake...
310	}
311
312	defer func() { feed.dcplatency.Add(computeLatency(stream)) }()
313
314	stream.LastSeen = time.Now().UnixNano()
315	switch pkt.Opcode {
316	case transport.DCP_STREAMREQ:
317		event = newDcpEvent(pkt, stream)
318		feed.handleStreamRequest(res, vb, stream, event)
319		feed.stats.TotalStreamReq++
320
321	case transport.DCP_MUTATION, transport.DCP_DELETION,
322		transport.DCP_EXPIRATION:
323		event = newDcpEvent(pkt, stream)
324		stream.Seqno = event.Seqno
325		feed.stats.TotalMutation++
326		sendAck = true
327
328	case transport.DCP_STREAMEND:
329		event = newDcpEvent(pkt, stream)
330		sendAck = true
331		delete(feed.vbstreams, vb)
332		fmsg := "%v ##%x DCP_STREAMEND for vb %d\n"
333		logging.Debugf(fmsg, prefix, stream.AppOpaque, vb)
334		feed.stats.TotalStreamEnd++
335
336	case transport.DCP_SNAPSHOT:
337		event = newDcpEvent(pkt, stream)
338		event.SnapstartSeq = binary.BigEndian.Uint64(pkt.Extras[0:8])
339		event.SnapendSeq = binary.BigEndian.Uint64(pkt.Extras[8:16])
340		event.SnapshotType = binary.BigEndian.Uint32(pkt.Extras[16:20])
341		stream.Snapstart = event.SnapstartSeq
342		stream.Snapend = event.SnapendSeq
343		feed.stats.TotalSnapShot++
344		sendAck = true
345		if (stream.Snapend - stream.Snapstart) > 50000 {
346			fmsg := "%v ##%x DCP_SNAPSHOT for vb %d snapshot {%v,%v}\n"
347			logging.Infof(fmsg, prefix, stream.AppOpaque, vb, stream.Snapstart, stream.Snapend)
348		}
349		fmsg := "%v ##%x DCP_SNAPSHOT for vb %d\n"
350		logging.Debugf(fmsg, prefix, stream.AppOpaque, vb)
351
352	case transport.DCP_FLUSH:
353		event = newDcpEvent(pkt, stream) // special processing ?
354
355	case transport.DCP_CLOSESTREAM:
356		event = newDcpEvent(pkt, stream)
357		if event.Opaque != stream.CloseOpaque {
358			fmsg := "%v ##%x DCP_CLOSESTREAM mismatch in opaque %v != %v\n"
359			logging.Fatalf(
360				fmsg, prefix, stream.AppOpaque, event.Opaque, stream.CloseOpaque)
361		}
362		event.Opcode = transport.DCP_STREAMEND // opcode re-write !!
363		event.Opaque = stream.AppOpaque        // opaque re-write !!
364		sendAck = true
365		delete(feed.vbstreams, vb)
366		fmsg := "%v ##%x DCP_CLOSESTREAM for vb %d\n"
367		logging.Debugf(fmsg, prefix, stream.AppOpaque, vb)
368		feed.stats.TotalCloseStream++
369
370	case transport.DCP_CONTROL, transport.DCP_BUFFERACK:
371		if res.Status != transport.SUCCESS {
372			fmsg := "%v ##%x opcode %v received status %v\n"
373			logging.Errorf(fmsg, prefix, stream.AppOpaque, pkt.Opcode, res.Status)
374		}
375
376	case transport.DCP_ADDSTREAM:
377		fmsg := "%v ##%x opcode DCP_ADDSTREAM not implemented\n"
378		logging.Fatalf(fmsg, prefix, stream.AppOpaque)
379
380	default:
381		fmsg := "%v opcode %v not known for vbucket %d\n"
382		logging.Warnf(fmsg, prefix, pkt.Opcode, vb)
383	}
384
385	if event != nil {
386		feed.outch <- event
387	}
388	feed.sendBufferAck(sendAck, uint32(bytes))
389	return "ok"
390}
391
392func (feed *DcpFeed) doDcpGetFailoverLog(
393	opaque uint16,
394	vblist []uint16,
395	rcvch chan []interface{}) (map[uint16]*FailoverLog, error) {
396
397	rq := &transport.MCRequest{
398		Opcode: transport.DCP_FAILOVERLOG,
399		Opaque: opaqueFailover,
400	}
401	failoverLogs := make(map[uint16]*FailoverLog)
402	for _, vBucket := range vblist {
403		rq.VBucket = vBucket
404		if err := feed.conn.Transmit(rq); err != nil {
405			fmsg := "%v ##%x doDcpGetFailoverLog.Transmit(): %v"
406			logging.Errorf(fmsg, feed.logPrefix, opaque, err)
407			return nil, err
408		}
409		msg, ok := <-rcvch
410		if !ok {
411			fmsg := "%v ##%x doDcpGetFailoverLog.rcvch closed"
412			logging.Errorf(fmsg, feed.logPrefix, opaque)
413			return nil, ErrorConnection
414		}
415		pkt := msg[0].(*transport.MCRequest)
416		req := &transport.MCResponse{
417			Opcode: pkt.Opcode,
418			Cas:    pkt.Cas,
419			Opaque: pkt.Opaque,
420			Status: transport.Status(pkt.VBucket),
421			Extras: pkt.Extras,
422			Key:    pkt.Key,
423			Body:   pkt.Body,
424		}
425		if req.Opcode != transport.DCP_FAILOVERLOG {
426			fmsg := "%v ##%x for failover log request unexpected #opcode %v"
427			logging.Errorf(fmsg, feed.logPrefix, opaque, req.Opcode)
428			return nil, ErrorInvalidFeed
429
430		} else if req.Status != transport.SUCCESS {
431			fmsg := "%v ##%x for failover log request unexpected #status %v"
432			logging.Errorf(fmsg, feed.logPrefix, opaque, req.Status)
433			return nil, ErrorInvalidFeed
434		}
435		flog, err := parseFailoverLog(req.Body)
436		if err != nil {
437			fmsg := "%v ##%x parse failover logs for vb %d"
438			logging.Errorf(fmsg, feed.logPrefix, opaque, vBucket)
439			return nil, ErrorInvalidFeed
440		}
441		failoverLogs[vBucket] = flog
442	}
443	return failoverLogs, nil
444}
445
446func (feed *DcpFeed) doDcpGetSeqnos(
447	rcvch chan []interface{}) (map[uint16]uint64, error) {
448
449	rq := &transport.MCRequest{
450		Opcode: transport.DCP_GET_SEQNO,
451		Opaque: opaqueGetseqno,
452	}
453
454	rq.Extras = make([]byte, 4)
455	binary.BigEndian.PutUint32(rq.Extras, 1) // Only active vbuckets
456
457	if err := feed.conn.Transmit(rq); err != nil {
458		fmsg := "%v ##%x doDcpGetSeqnos.Transmit(): %v"
459		logging.Errorf(fmsg, feed.logPrefix, rq.Opaque, err)
460		return nil, err
461	}
462	msg, ok := <-rcvch
463	if !ok {
464		fmsg := "%v ##%x doDcpGetSeqnos.rcvch closed"
465		logging.Errorf(fmsg, feed.logPrefix, rq.Opaque)
466		return nil, ErrorConnection
467	}
468	pkt := msg[0].(*transport.MCRequest)
469	req := &transport.MCResponse{
470		Opcode: pkt.Opcode,
471		Cas:    pkt.Cas,
472		Opaque: pkt.Opaque,
473		Status: transport.Status(pkt.VBucket),
474		Extras: pkt.Extras,
475		Key:    pkt.Key,
476		Body:   pkt.Body,
477	}
478	if req.Opcode != transport.DCP_GET_SEQNO {
479		fmsg := "%v ##%x for get-seqno request unexpected #opcode %v"
480		logging.Errorf(fmsg, feed.logPrefix, req.Opaque, req.Opcode)
481		return nil, ErrorInvalidFeed
482
483	} else if req.Status != transport.SUCCESS {
484		fmsg := "%v ##%x for get-seqno request unexpected #status %v"
485		logging.Errorf(fmsg, feed.logPrefix, req.Opaque, req.Status)
486		return nil, ErrorInvalidFeed
487	}
488	seqnos, err := parseGetSeqnos(req.Body)
489	if err != nil {
490		fmsg := "%v ##%x parsing get-seqnos: %v"
491		logging.Errorf(fmsg, feed.logPrefix, req.Opaque, err)
492		return nil, ErrorInvalidFeed
493	}
494	return seqnos, nil
495}
496
497func (feed *DcpFeed) doDcpOpen(
498	name string, sequence, flags, bufsize uint32,
499	opaque uint16,
500	rcvch chan []interface{}) error {
501
502	rq := &transport.MCRequest{
503		Opcode: transport.DCP_OPEN,
504		Key:    []byte(name),
505		Opaque: opaqueOpen,
506	}
507	rq.Extras = make([]byte, 8)
508	flags = flags | openConnFlag | includeXATTR
509	binary.BigEndian.PutUint32(rq.Extras[:4], sequence)
510	binary.BigEndian.PutUint32(rq.Extras[4:], flags) // we are consumer
511
512	prefix := feed.logPrefix
513	if err := feed.conn.Transmit(rq); err != nil {
514		return err
515	}
516	msg, ok := <-rcvch
517	if !ok {
518		logging.Errorf("%v ##%x doDcpOpen.rcvch closed", prefix, opaque)
519		return ErrorConnection
520	}
521	pkt := msg[0].(*transport.MCRequest)
522	req := &transport.MCResponse{
523		Opcode: pkt.Opcode,
524		Cas:    pkt.Cas,
525		Opaque: pkt.Opaque,
526		Status: transport.Status(pkt.VBucket),
527		Extras: pkt.Extras,
528		Key:    pkt.Key,
529		Body:   pkt.Body,
530	}
531	if req.Opcode != transport.DCP_OPEN {
532		logging.Errorf("%v ##%x unexpected #%v", prefix, opaque, req.Opcode)
533		return ErrorConnection
534	} else if rq.Opaque != req.Opaque {
535		fmsg := "%v ##%x opaque mismatch, %v != %v"
536		logging.Errorf(fmsg, prefix, opaque, req.Opaque, req.Opaque)
537		return ErrorConnection
538	} else if req.Status != transport.SUCCESS {
539		fmsg := "%v ##%x doDcpOpen response status %v"
540		logging.Errorf(fmsg, prefix, opaque, req.Status)
541		return ErrorConnection
542	}
543
544	// send a DCP control message to set the window size for
545	// this connection
546	if bufsize > 0 {
547		rq := &transport.MCRequest{
548			Opcode: transport.DCP_CONTROL,
549			Key:    []byte("connection_buffer_size"),
550			Body:   []byte(strconv.Itoa(int(bufsize))),
551		}
552		if err := feed.conn.Transmit(rq); err != nil {
553			fmsg := "%v ##%x doDcpOpen.DCP_CONTROL.Transmit(connection_buffer_size): %v"
554			logging.Errorf(fmsg, prefix, opaque, err)
555			return err
556		}
557		msg, ok := <-rcvch
558		if !ok {
559			fmsg := "%v ##%x doDcpOpen.DCP_CONTROL.rcvch (connection_buffer_size) closed"
560			logging.Errorf(fmsg, prefix, opaque)
561			return ErrorConnection
562		}
563		pkt := msg[0].(*transport.MCRequest)
564		req := &transport.MCResponse{
565			Opcode: pkt.Opcode,
566			Cas:    pkt.Cas,
567			Opaque: pkt.Opaque,
568			Status: transport.Status(pkt.VBucket),
569			Extras: pkt.Extras,
570			Key:    pkt.Key,
571			Body:   pkt.Body,
572		}
573		if req.Opcode != transport.DCP_CONTROL {
574			fmsg := "%v ##%x DCP_CONTROL (connection_buffer_size) != #%v"
575			logging.Errorf(fmsg, prefix, opaque, req.Opcode)
576			return ErrorConnection
577		} else if req.Status != transport.SUCCESS {
578			fmsg := "%v ##%x doDcpOpen (connection_buffer_size) response status %v"
579			logging.Errorf(fmsg, prefix, opaque, req.Status)
580			return ErrorConnection
581		}
582		feed.maxAckBytes = uint32(bufferAckThreshold * float32(bufsize))
583	}
584
585	// send a DCP control message to enable_noop
586	if true /*enable_noop*/ {
587		rq := &transport.MCRequest{
588			Opcode: transport.DCP_CONTROL,
589			Key:    []byte("enable_noop"),
590			Body:   []byte("true"),
591		}
592		if err := feed.conn.Transmit(rq); err != nil {
593			fmsg := "%v ##%x doDcpOpen.DCP_CONTROL.Transmit(enable_noop): %v"
594			logging.Errorf(fmsg, prefix, opaque, err)
595			return err
596		}
597		logging.Infof("%v ##%x sending enable_noop", prefix, opaque)
598		msg, ok := <-rcvch
599		if !ok {
600			fmsg := "%v ##%x doDcpOpen.DCP_CONTROL.rcvch (enable_noop) closed"
601			logging.Errorf(fmsg, prefix, opaque)
602			return ErrorConnection
603		}
604		pkt := msg[0].(*transport.MCRequest)
605		opcode, status := pkt.Opcode, transport.Status(pkt.VBucket)
606		if opcode != transport.DCP_CONTROL {
607			fmsg := "%v ##%x DCP_CONTROL (enable_noop) != #%v"
608			logging.Errorf(fmsg, prefix, opaque, opcode)
609			return ErrorConnection
610		} else if status != transport.SUCCESS {
611			fmsg := "%v ##%x doDcpOpen (enable_noop) response status %v"
612			logging.Errorf(fmsg, prefix, opaque, status)
613			return ErrorConnection
614		}
615		logging.Infof("%v ##%x received enable_noop response", prefix, opaque)
616	}
617
618	// send a DCP control message to set_noop_interval
619	if true /*set_noop_interval*/ {
620		rq := &transport.MCRequest{
621			Opcode: transport.DCP_CONTROL,
622			Key:    []byte("set_noop_interval"),
623			Body:   []byte("120"),
624		}
625		if err := feed.conn.Transmit(rq); err != nil {
626			fmsg := "%v ##%x doDcpOpen.Transmit(set_noop_interval): %v"
627			logging.Errorf(fmsg, prefix, opaque, err)
628			return err
629		}
630		logging.Infof("%v ##%x sending set_noop_interval", prefix, opaque)
631		msg, ok := <-rcvch
632		if !ok {
633			fmsg := "%v ##%x doDcpOpen.rcvch (set_noop_interval) closed"
634			logging.Errorf(fmsg, prefix, opaque)
635			return ErrorConnection
636		}
637		pkt := msg[0].(*transport.MCRequest)
638		opcode, status := pkt.Opcode, transport.Status(pkt.VBucket)
639		if opcode != transport.DCP_CONTROL {
640			fmsg := "%v ##%x DCP_CONTROL (set_noop_interval) != #%v"
641			logging.Errorf(fmsg, prefix, opaque, opcode)
642			return ErrorConnection
643		} else if status != transport.SUCCESS {
644			fmsg := "%v ##%x doDcpOpen (set_noop_interval) response status %v"
645			logging.Errorf(fmsg, prefix, opaque, status)
646			return ErrorConnection
647		}
648		fmsg := "%v ##%x received response for set_noop_interval"
649		logging.Infof(fmsg, prefix, opaque)
650	}
651	return nil
652}
653
654func (feed *DcpFeed) doDcpRequestStream(
655	vbno, opaqueMSB uint16, flags uint32,
656	vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error {
657
658	rq := &transport.MCRequest{
659		Opcode:  transport.DCP_STREAMREQ,
660		VBucket: vbno,
661		Opaque:  composeOpaque(vbno, opaqueMSB),
662	}
663	rq.Extras = make([]byte, 48) // #Extras
664	binary.BigEndian.PutUint32(rq.Extras[:4], flags)
665	binary.BigEndian.PutUint32(rq.Extras[4:8], uint32(0))
666	binary.BigEndian.PutUint64(rq.Extras[8:16], startSequence)
667	binary.BigEndian.PutUint64(rq.Extras[16:24], endSequence)
668	binary.BigEndian.PutUint64(rq.Extras[24:32], vuuid)
669	binary.BigEndian.PutUint64(rq.Extras[32:40], snapStart)
670	binary.BigEndian.PutUint64(rq.Extras[40:48], snapEnd)
671
672	prefix := feed.logPrefix
673	if err := feed.conn.Transmit(rq); err != nil {
674		fmsg := "%v ##%x doDcpRequestStream.Transmit(): %v"
675		logging.Errorf(fmsg, prefix, opaqueMSB, err)
676		return err
677	}
678	stream := &DcpStream{
679		AppOpaque: opaqueMSB,
680		Vbucket:   vbno,
681		Vbuuid:    vuuid,
682		StartSeq:  startSequence,
683		EndSeq:    endSequence,
684	}
685	feed.vbstreams[vbno] = stream
686	return nil
687}
688
689func (feed *DcpFeed) doDcpCloseStream(vbno, opaqueMSB uint16) error {
690	prefix := feed.logPrefix
691	stream, ok := feed.vbstreams[vbno]
692	if !ok || stream == nil {
693		fmsg := "%v ##%x stream for vb %d is not active"
694		logging.Warnf(fmsg, prefix, opaqueMSB, vbno)
695		return nil // TODO: should we return error here ?
696	}
697	stream.CloseOpaque = opaqueMSB
698	rq := &transport.MCRequest{
699		Opcode:  transport.DCP_CLOSESTREAM,
700		VBucket: vbno,
701		Opaque:  composeOpaque(vbno, opaqueMSB),
702	}
703	if err := feed.conn.Transmit(rq); err != nil {
704		fmsg := "%v ##%x (##%x) doDcpCloseStream.Transmit(): %v"
705		logging.Errorf(fmsg, prefix, opaqueMSB, stream.AppOpaque, err)
706		return err
707	}
708	return nil
709}
710
711// generate stream end responses for all active vb streams
712func (feed *DcpFeed) sendStreamEnd(outch chan<- *DcpEvent) {
713	if feed.vbstreams != nil {
714		for vb, stream := range feed.vbstreams {
715			dcpEvent := &DcpEvent{
716				VBucket: vb,
717				VBuuid:  stream.Vbuuid,
718				Opcode:  transport.DCP_STREAMEND,
719				Opaque:  stream.AppOpaque,
720				Ctime:   time.Now().UnixNano(),
721			}
722			outch <- dcpEvent
723		}
724		feed.vbstreams = nil
725	}
726}
727
728func (feed *DcpFeed) handleStreamRequest(
729	res *transport.MCResponse, vb uint16, stream *DcpStream, event *DcpEvent) {
730
731	prefix := feed.logPrefix
732	switch {
733	case res.Status == transport.ROLLBACK && len(res.Body) != 8:
734		event.Status, event.Seqno = res.Status, 0
735		fmsg := "%v ##%x STREAMREQ(%v) invalid rollback: %v\n"
736		arg1 := logging.TagUD(res.Body)
737		logging.Errorf(fmsg, prefix, stream.AppOpaque, vb, arg1)
738		delete(feed.vbstreams, vb)
739
740	case res.Status == transport.ROLLBACK:
741		rollback := binary.BigEndian.Uint64(res.Body)
742		event.Status, event.Seqno = res.Status, rollback
743		fmsg := "%v ##%x STREAMREQ(%v) with rollback %d\n"
744		logging.Warnf(fmsg, prefix, stream.AppOpaque, vb, rollback)
745		delete(feed.vbstreams, vb)
746
747	case res.Status == transport.SUCCESS:
748		event.Status, event.Seqno = res.Status, stream.StartSeq
749		flog, err := parseFailoverLog(res.Body[:])
750		if err != nil {
751			fmsg := "%v ##%x STREAMREQ(%v) parseFailoverLog: %v\n"
752			logging.Errorf(fmsg, prefix, stream.AppOpaque, vb, err)
753		}
754		event.FailoverLog = flog
755		stream.connected = true
756		fmsg := "%v ##%x STREAMREQ(%d) successful\n"
757		logging.Debugf(fmsg, prefix, stream.AppOpaque, vb)
758
759	default:
760		event.Status = res.Status
761		event.VBucket = vb
762		fmsg := "%v ##%x STREAMREQ(%v) unexpected status: %v\n"
763		logging.Errorf(fmsg, prefix, stream.AppOpaque, vb, res.Status)
764		delete(feed.vbstreams, vb)
765	}
766	return
767}
768
769// Send buffer ack
770func (feed *DcpFeed) sendBufferAck(sendAck bool, bytes uint32) {
771	prefix := feed.logPrefix
772	if sendAck {
773		totalBytes := feed.toAckBytes + bytes
774		if totalBytes > feed.maxAckBytes {
775			feed.toAckBytes = 0
776			bufferAck := &transport.MCRequest{
777				Opcode: transport.DCP_BUFFERACK,
778			}
779			bufferAck.Extras = make([]byte, 4)
780			binary.BigEndian.PutUint32(bufferAck.Extras[:4], uint32(totalBytes))
781			feed.stats.TotalBufferAckSent++
782			if err := feed.conn.Transmit(bufferAck); err != nil {
783				logging.Errorf("%v NOOP.Transmit(): %v", prefix, err)
784
785			} else {
786				logging.Tracef("%v buffer-ack %v\n", prefix, totalBytes)
787			}
788		} else {
789			feed.toAckBytes += bytes
790		}
791	}
792}
793
794func composeOpaque(vbno, opaqueMSB uint16) uint32 {
795	return (uint32(opaqueMSB) << 16) | uint32(vbno)
796}
797
798func appOpaque(opq32 uint32) uint16 {
799	return uint16((opq32 & 0xFFFF0000) >> 16)
800}
801
802func vbOpaque(opq32 uint32) uint16 {
803	return uint16(opq32 & 0xFFFF)
804}
805
806// DcpStream is per stream data structure over an DCP Connection.
807type DcpStream struct {
808	AppOpaque   uint16
809	CloseOpaque uint16
810	Vbucket     uint16 // Vbucket id
811	Vbuuid      uint64 // vbucket uuid
812	Seqno       uint64
813	StartSeq    uint64 // start sequence number
814	EndSeq      uint64 // end sequence number
815	Snapstart   uint64
816	Snapend     uint64
817	LastSeen    int64 // UnixNano value of last seen
818	connected   bool
819}
820
821// DcpEvent memcached events for DCP streams.
822type DcpEvent struct {
823	Opcode     transport.CommandCode // Type of event
824	Status     transport.Status      // Response status
825	Datatype   uint8                 // Datatype per binary protocol
826	VBucket    uint16                // VBucket this event applies to
827	Opaque     uint16                // 16 MSB of opaque
828	VBuuid     uint64                // This field is set by downstream
829	Key, Value []byte                // Item key/value
830	OldValue   []byte                // TODO: TBD: old document value
831	Cas        uint64                // CAS value of the item
832	// meta fields
833	Seqno uint64 // seqno. of the mutation, doubles as rollback-seqno
834	// https://issues.couchbase.com/browse/MB-15333,
835	RevSeqno uint64
836	Flags    uint32 // Item flags
837	Expiry   uint32 // Item expiration time
838	LockTime uint32
839	Nru      byte
840	// snapshots
841	SnapstartSeq uint64 // start sequence number of this snapshot
842	SnapendSeq   uint64 // End sequence number of the snapshot
843	SnapshotType uint32 // 0: disk 1: memory
844	// failoverlog
845	FailoverLog *FailoverLog // Failover log containing vvuid and sequnce number
846	Error       error        // Error value in case of a failure
847	// stats
848	Ctime int64
849	// extended attributes
850	RawXATTR    map[string][]byte
851	ParsedXATTR map[string]interface{}
852}
853
854func newDcpEvent(rq *transport.MCRequest, stream *DcpStream) (event *DcpEvent) {
855	defer func() {
856		if r := recover(); r != nil {
857			// Error parsing XATTR, Request body might be malformed
858			arg1 := logging.TagStrUD(rq.Key)
859			logging.Errorf("Panic: Error parsing RawXATTR for %s: %v", arg1, r)
860			event.Value = make([]byte, 0)
861			event.Datatype &= ^(dcpXATTR | dcpJSON)
862		}
863	}()
864	event = &DcpEvent{
865		Cas:      rq.Cas,
866		Datatype: rq.Datatype,
867		Opcode:   rq.Opcode,
868		VBucket:  stream.Vbucket,
869		VBuuid:   stream.Vbuuid,
870		Ctime:    time.Now().UnixNano(),
871	}
872	event.Key = make([]byte, len(rq.Key))
873	copy(event.Key, rq.Key)
874
875	// 16 LSBits are used by client library to encode vbucket number.
876	// 16 MSBits are left for application to multiplex on opaque value.
877	event.Opaque = appOpaque(rq.Opaque)
878
879	if len(rq.Extras) >= tapMutationExtraLen {
880		event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
881		switch event.Opcode {
882		case transport.DCP_MUTATION:
883			event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:])
884			event.Flags = binary.BigEndian.Uint32(rq.Extras[16:])
885			event.Expiry = binary.BigEndian.Uint32(rq.Extras[20:])
886			event.LockTime = binary.BigEndian.Uint32(rq.Extras[24:])
887			event.Nru = rq.Extras[30]
888
889		case transport.DCP_DELETION:
890			event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:])
891
892		case transport.DCP_EXPIRATION:
893			event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:])
894		}
895
896	} else if len(rq.Extras) >= tapMutationExtraLen &&
897		event.Opcode == transport.DCP_SNAPSHOT {
898
899		event.SnapstartSeq = binary.BigEndian.Uint64(rq.Extras[:8])
900		event.SnapendSeq = binary.BigEndian.Uint64(rq.Extras[8:16])
901		event.SnapshotType = binary.BigEndian.Uint32(rq.Extras[16:20])
902	}
903
904	if (event.Opcode == transport.DCP_MUTATION ||
905		event.Opcode == transport.DCP_DELETION) && event.HasXATTR() {
906		xattrLen := int(binary.BigEndian.Uint32(rq.Body))
907		xattrData := rq.Body[4 : 4+xattrLen]
908		event.RawXATTR = make(map[string][]byte, xattrLen)
909		for len(xattrData) > 0 {
910			pairLen := binary.BigEndian.Uint32(xattrData[0:])
911			xattrData = xattrData[4:]
912			binaryPair := xattrData[:pairLen-1]
913			xattrData = xattrData[pairLen:]
914			kvPair := bytes.Split(binaryPair, []byte{0x00})
915			event.RawXATTR[string(kvPair[0])] = kvPair[1]
916		}
917		event.Value = make([]byte, len(rq.Body)-(4+xattrLen))
918		copy(event.Value, rq.Body[4+xattrLen:])
919	} else {
920		event.Value = make([]byte, len(rq.Body))
921		copy(event.Value, rq.Body)
922	}
923
924	return event
925}
926
927func (event *DcpEvent) IsJSON() bool {
928	return (event.Datatype & dcpJSON) != 0
929}
930
931func (event *DcpEvent) TreatAsJSON() {
932	event.Datatype |= dcpJSON
933}
934
935func (event *DcpEvent) HasXATTR() bool {
936	return (event.Datatype & dcpXATTR) != 0
937}
938
939func (event *DcpEvent) String() string {
940	name := transport.CommandNames[event.Opcode]
941	if name == "" {
942		name = fmt.Sprintf("#%d", event.Opcode)
943	}
944	return name
945}
946
947// DcpStats on mutations/snapshots/buff-acks.
948type DcpStats struct {
949	TotalBufferAckSent uint64
950	TotalBytes         uint64
951	TotalMutation      uint64
952	TotalSnapShot      uint64
953	TotalStreamReq     uint64
954	TotalCloseStream   uint64
955	TotalStreamEnd     uint64
956	TotalSpurious      uint64
957}
958
959func (stats *DcpStats) String(feed *DcpFeed) string {
960	return fmt.Sprintf(
961		"bytes: %v buffacks: %v toAckBytes: %v streamreqs: %v "+
962			"snapshots: %v mutations: %v streamends: %v closestreams: %v",
963		stats.TotalBytes, stats.TotalBufferAckSent, feed.toAckBytes,
964		stats.TotalStreamReq, stats.TotalSnapShot, stats.TotalMutation,
965		stats.TotalStreamEnd, stats.TotalCloseStream,
966	)
967}
968
969// FailoverLog containing vvuid and sequnce number
970type FailoverLog [][2]uint64
971
972// Latest will return the recent vbuuid and its high-seqno.
973func (flogp *FailoverLog) Latest() (vbuuid, seqno uint64, err error) {
974	if flogp != nil {
975		flog := *flogp
976		latest := flog[0]
977		return latest[0], latest[1], nil
978	}
979	return vbuuid, seqno, ErrorInvalidLog
980}
981
982// failsafeOp can be used by gen-server implementors to avoid infinitely
983// blocked API calls.
984func failsafeOp(
985	reqch, respch chan []interface{},
986	cmd []interface{},
987	finch chan bool) ([]interface{}, error) {
988
989	select {
990	case reqch <- cmd:
991		if respch != nil {
992			select {
993			case resp := <-respch:
994				return resp, nil
995			case <-finch:
996				return nil, ErrorConnection
997			}
998		}
999	case <-finch:
1000		return nil, ErrorConnection
1001	}
1002	return nil, nil
1003}
1004
1005// opError suppliments FailsafeOp used by gen-servers.
1006func opError(err error, vals []interface{}, idx int) error {
1007	if err != nil {
1008		return err
1009	} else if vals[idx] == nil {
1010		return nil
1011	}
1012	return vals[idx].(error)
1013}
1014
1015// parse failover log fields from response body.
1016func parseFailoverLog(body []byte) (*FailoverLog, error) {
1017	if len(body)%16 != 0 {
1018		fmsg := "invalid body length %v, in failover-log\n"
1019		err := fmt.Errorf(fmsg, len(body))
1020		return nil, err
1021	}
1022	log := make(FailoverLog, len(body)/16)
1023	for i, j := 0, 0; i < len(body); i += 16 {
1024		vuuid := binary.BigEndian.Uint64(body[i : i+8])
1025		seqno := binary.BigEndian.Uint64(body[i+8 : i+16])
1026		log[j] = [2]uint64{vuuid, seqno}
1027		j++
1028	}
1029	return &log, nil
1030}
1031
1032// parse vbno,seqno from response body for get-seqnos.
1033func parseGetSeqnos(body []byte) (map[uint16]uint64, error) {
1034	if len(body)%10 != 0 {
1035		fmsg := "invalid body length %v, in get-seqnos\n"
1036		err := fmt.Errorf(fmsg, len(body))
1037		return nil, err
1038	}
1039	seqnos := make(map[uint16]uint64)
1040	for i := 0; i < len(body); i += 10 {
1041		vbno := binary.BigEndian.Uint16(body[i : i+2])
1042		seqno := binary.BigEndian.Uint64(body[i+2 : i+10])
1043		seqnos[vbno] = seqno
1044	}
1045	return seqnos, nil
1046}
1047
1048func computeLatency(stream *DcpStream) int64 {
1049	now := time.Now().UnixNano()
1050	strm_seqno := stream.Seqno
1051	if stream.Snapend == 0 || strm_seqno == stream.Snapend {
1052		return 0
1053	}
1054	delta := now - stream.LastSeen
1055	stream.LastSeen = now
1056	return delta
1057}
1058
1059// receive loop
1060func (feed *DcpFeed) doReceive(
1061	rcvch chan []interface{}, finch chan bool, conn *Client) {
1062	defer close(rcvch)
1063
1064	var headerBuf [transport.HDR_LEN]byte
1065	var duration time.Duration
1066	var start time.Time
1067	var blocked bool
1068
1069	epoc := time.Now()
1070	tick := time.NewTicker(time.Second * 5) // log every 5 second, if blocked.
1071	defer func() {
1072		tick.Stop()
1073	}()
1074
1075loop:
1076	for {
1077		pkt := transport.MCRequest{} // always a new instance.
1078		bytes, err := pkt.Receive(conn.conn, headerBuf[:])
1079		if err != nil && err == io.EOF {
1080			logging.Infof("%v EOF received\n", feed.logPrefix)
1081			break loop
1082
1083		} else if feed.isClosed() {
1084			logging.Infof("%v doReceive(): connection closed\n", feed.logPrefix)
1085			break loop
1086
1087		} else if err != nil {
1088			logging.Errorf("%v doReceive(): %v\n", feed.logPrefix, err)
1089			break loop
1090		}
1091
1092		// Immediately respond to NOOP and listen for next message.
1093		// NOOPs are not accounted for buffer-ack.
1094		if pkt.Opcode == transport.DCP_NOOP {
1095			noop := &transport.MCResponse{
1096				Opcode: transport.DCP_NOOP, Opaque: pkt.Opaque,
1097			}
1098			if err := feed.conn.TransmitResponse(noop); err != nil {
1099				logging.Errorf("%v NOOP.Transmit(): %v", feed.logPrefix, err)
1100			} else {
1101				fmsg := "%v responded to NOOP ok ...\n"
1102				logging.Tracef(fmsg, feed.logPrefix)
1103			}
1104			continue loop
1105		}
1106
1107		logging.LazyTrace(func() string {
1108			return fmt.Sprintf("%v packet received %#v", feed.logPrefix, logging.TagUD(pkt))
1109		})
1110
1111		if len(rcvch) == cap(rcvch) {
1112			start, blocked = time.Now(), true
1113		}
1114		select {
1115		case rcvch <- []interface{}{&pkt, bytes}:
1116		case <-finch:
1117			break loop
1118		}
1119		if blocked {
1120			blockedTs := time.Since(start)
1121			duration += blockedTs
1122			blocked = false
1123			select {
1124			case <-tick.C:
1125				percent := float64(duration) / float64(time.Since(epoc))
1126				fmsg := "%v DCP-socket -> projector blocked %v (%f%%)"
1127				logging.Infof(fmsg, feed.logPrefix, blockedTs, percent)
1128			default:
1129			}
1130		}
1131	}
1132}
1133