1// Copyright (c) 2013 Couchbase, Inc.
2// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
3// except in compliance with the License. You may obtain a copy of the License at
4//   http://www.apache.org/licenses/LICENSE-2.0
5// Unless required by applicable law or agreed to in writing, software distributed under the
6// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
7// either express or implied. See the License for the specific language governing permissions
8// and limitations under the License.
9
10package parts
11
12import (
13	"bufio"
14	"bytes"
15	"encoding/binary"
16	"encoding/hex"
17	"encoding/json"
18	"errors"
19	"fmt"
20	mc "github.com/couchbase/gomemcached"
21	base "github.com/couchbase/goxdcr/base"
22	common "github.com/couchbase/goxdcr/common"
23	gen_server "github.com/couchbase/goxdcr/gen_server"
24	"github.com/couchbase/goxdcr/log"
25	"github.com/couchbase/goxdcr/metadata"
26	utilities "github.com/couchbase/goxdcr/utils"
27	"io"
28	"io/ioutil"
29	"net"
30	"net/http"
31	"reflect"
32	"strconv"
33	"strings"
34	"sync"
35	"sync/atomic"
36	"time"
37)
38
39const (
40	SETTING_UPLOAD_WINDOW_SIZE = "upload_window_size"
41	SETTING_CONNECTION_TIMEOUT = "connection_timeout"
42	SETTING_RETRY_INTERVAL     = "retry_interval"
43
44	//default configuration
45	default_maxRetryInterval_capi                  = 30 * time.Second
46	default_upload_window_size       int           = 3 // erlang xdcr value
47	default_selfMonitorInterval_capi time.Duration = 300 * time.Millisecond
48	default_maxIdleCount_capi        int           = 30
49)
50
51var capi_setting_defs base.SettingDefinitions = base.SettingDefinitions{SETTING_BATCHCOUNT: base.NewSettingDef(reflect.TypeOf((*int)(nil)), true),
52	SETTING_BATCHSIZE:             base.NewSettingDef(reflect.TypeOf((*int)(nil)), true),
53	SETTING_OPTI_REP_THRESHOLD:    base.NewSettingDef(reflect.TypeOf((*int)(nil)), true),
54	SETTING_BATCH_EXPIRATION_TIME: base.NewSettingDef(reflect.TypeOf((*time.Duration)(nil)), false),
55	SETTING_NUMOFRETRY:            base.NewSettingDef(reflect.TypeOf((*int)(nil)), false),
56	SETTING_RETRY_INTERVAL:        base.NewSettingDef(reflect.TypeOf((*time.Duration)(nil)), false),
57	SETTING_WRITE_TIMEOUT:         base.NewSettingDef(reflect.TypeOf((*time.Duration)(nil)), false),
58	SETTING_READ_TIMEOUT:          base.NewSettingDef(reflect.TypeOf((*time.Duration)(nil)), false),
59	SETTING_MAX_RETRY_INTERVAL:    base.NewSettingDef(reflect.TypeOf((*time.Duration)(nil)), false),
60	SETTING_UPLOAD_WINDOW_SIZE:    base.NewSettingDef(reflect.TypeOf((*int)(nil)), false),
61	SETTING_CONNECTION_TIMEOUT:    base.NewSettingDef(reflect.TypeOf((*time.Duration)(nil)), false)}
62
63var NewEditsKey = "new_edits"
64var DocsKey = "docs"
65var MetaKey = "meta"
66var BodyKey = "base64"
67var IdKey = "id"
68var RevKey = "rev"
69var ExpirationKey = "expiration"
70var FlagsKey = "flags"
71var DeletedKey = "deleted"
72var AttReasonKey = "att_reason"
73var InvalidJson = "invalid_json"
74
75var BodyPartsPrefix = []byte("{\"new_edits\":false,\"docs\":[")
76var BodyPartsSuffix = []byte("]}")
77var BodyPartsDelimiter = ","
78var SizePartDelimiter = "\r\n"
79
80var CouchFullCommitKey = "X-Couch-Full-Commit"
81
82var MalformedResponseError = "Received malformed response from tcp connection"
83var MaxErrorMessageLength = 400
84
85// nnumber of last sent batch to remember and log
86var BatchHistorySize = 20
87
88/************************************
89/* struct capiBatch
90 * NOTE: see dataBatch comments for more info
91*************************************/
92type capiBatch struct {
93	dataBatch
94	vbno uint16
95}
96
97/************************************
98/* struct capiConfig
99*************************************/
100type capiConfig struct {
101	baseConfig
102	uploadWindowSize int
103	// timeout of capi rest calls
104	connectionTimeout time.Duration
105	retryInterval     time.Duration
106	certificate       []byte
107	// key = vbno; value = couchApiBase for capi calls, e.g., http://127.0.0.1:9500/target%2Baa3466851d268241d9465826d3d8dd11%2f13
108	// this map serves two purposes: 1. provides a list of vbs that the capi is responsible for
109	// 2. provides the couchApiBase for each of the vbs
110	vbCouchApiBaseMap map[uint16]string
111}
112
113func newCapiConfig(logger *log.CommonLogger) capiConfig {
114	return capiConfig{
115		baseConfig: baseConfig{maxCount: -1,
116			maxSize:             -1,
117			maxRetry:            base.CapiMaxRetryBatchUpdateDocs,
118			writeTimeout:        base.CapiWriteTimeout,
119			readTimeout:         base.CapiReadTimeout,
120			maxRetryInterval:    default_maxRetryInterval_capi,
121			selfMonitorInterval: default_selfMonitorInterval_capi,
122			connectStr:          "",
123			username:            "",
124			password:            "",
125		},
126		uploadWindowSize:  default_upload_window_size,
127		connectionTimeout: base.CapiBatchTimeout,
128		retryInterval:     base.CapiRetryInterval,
129	}
130}
131
132func (config *capiConfig) initializeConfig(settings metadata.ReplicationSettingsMap, utils utilities.UtilsIface) error {
133	err := utils.ValidateSettings(capi_setting_defs, settings, config.logger)
134
135	if err == nil {
136		config.baseConfig.initializeConfig(settings)
137
138		if val, ok := settings[SETTING_UPLOAD_WINDOW_SIZE]; ok {
139			config.uploadWindowSize = val.(int)
140		}
141		if val, ok := settings[SETTING_CONNECTION_TIMEOUT]; ok {
142			config.connectionTimeout = val.(time.Duration)
143		}
144		if val, ok := settings[SETTING_RETRY_INTERVAL]; ok {
145			config.retryInterval = val.(time.Duration)
146		}
147	}
148	return err
149}
150
151/************************************
152/* struct CapiNozzle
153*************************************/
154type CapiNozzle struct {
155
156	//parent inheritance
157	gen_server.GenServer
158	AbstractPart
159
160	bOpen      bool
161	lock_bOpen sync.RWMutex
162
163	//data channels to accept the incoming data, one for each vb
164	vb_dataChan_map map[uint16]chan *base.WrappedMCRequest
165	//the total number of items queued in all data channels
166	items_in_dataChan int32
167	//the total size of data (in bytes) queued in all data channels
168	bytes_in_dataChan int64
169
170	client      *net.TCPConn
171	lock_client sync.RWMutex
172
173	//configurable parameter
174	config capiConfig
175
176	//queue for ready batches
177	batches_ready chan *capiBatch
178
179	batches_nonempty_ch chan bool
180
181	//batches to be accumulated, one for each vb
182	vb_batch_map      map[uint16]*capiBatch
183	vb_batch_map_lock chan bool
184
185	childrenWaitGrp sync.WaitGroup
186
187	finish_ch chan bool
188
189	counter_sent      uint32
190	counter_received  uint32
191	start_time        time.Time
192	handle_error      bool
193	lock_handle_error sync.RWMutex
194	dataObj_recycler  base.DataObjRecycler
195	topic             string
196
197	utils utilities.UtilsIface
198
199	// stores a list of last sent batches in the form of []*BatchInfo
200	last_sent_batches      []*BatchInfo
201	last_sent_batches_lock sync.RWMutex
202}
203
204type BatchInfo struct {
205	vbno uint16
206	size uint32
207}
208
209func NewCapiNozzle(id string,
210	topic string,
211	connectString string,
212	username string,
213	password string,
214	certificate []byte,
215	vbCouchApiBaseMap map[uint16]string,
216	dataObj_recycler base.DataObjRecycler,
217	logger_context *log.LoggerContext,
218	utilsIn utilities.UtilsIface) *CapiNozzle {
219
220	//callback functions from GenServer
221	var msg_callback_func gen_server.Msg_Callback_Func
222	var exit_callback_func gen_server.Exit_Callback_Func
223	var error_handler_func gen_server.Error_Handler_Func
224
225	server := gen_server.NewGenServer(&msg_callback_func,
226		&exit_callback_func, &error_handler_func, logger_context, "CapiNozzle", utilsIn)
227	part := NewAbstractPartWithLogger(id, server.Logger())
228
229	capi := &CapiNozzle{GenServer: server, /*gen_server.GenServer*/
230		AbstractPart:        part,                           /*part.AbstractPart*/
231		bOpen:               true,                           /*bOpen	bool*/
232		lock_bOpen:          sync.RWMutex{},                 /*lock_bOpen	sync.RWMutex*/
233		config:              newCapiConfig(server.Logger()), /*config	capiConfig*/
234		batches_ready:       nil,                            /*batches_ready chan *capiBatch*/
235		childrenWaitGrp:     sync.WaitGroup{},               /*childrenWaitGrp sync.WaitGroup*/
236		finish_ch:           make(chan bool, 1),
237		batches_nonempty_ch: make(chan bool, 1),
238		//		send_allow_ch:    make(chan bool, 1), /*send_allow_ch chan bool*/
239		counter_sent:      0,
240		handle_error:      true,
241		lock_handle_error: sync.RWMutex{},
242		counter_received:  0,
243		dataObj_recycler:  dataObj_recycler,
244		topic:             topic,
245		utils:             utilsIn,
246	}
247
248	capi.last_sent_batches = make([]*BatchInfo, BatchHistorySize)
249	for i := 0; i < BatchHistorySize; i++ {
250		capi.last_sent_batches[i] = &BatchInfo{}
251	}
252
253	capi.config.connectStr = connectString
254	capi.config.username = username
255	capi.config.password = password
256	capi.config.certificate = certificate
257	capi.config.vbCouchApiBaseMap = vbCouchApiBaseMap
258
259	msg_callback_func = nil
260	exit_callback_func = capi.onExit
261	error_handler_func = capi.handleGeneralError
262
263	return capi
264
265}
266
267func (capi *CapiNozzle) IsOpen() bool {
268	capi.lock_bOpen.RLock()
269	defer capi.lock_bOpen.RUnlock()
270	return capi.bOpen
271}
272
273func (capi *CapiNozzle) Open() error {
274	capi.lock_bOpen.Lock()
275	defer capi.lock_bOpen.Unlock()
276	if !capi.bOpen {
277		capi.bOpen = true
278
279	}
280	return nil
281}
282
283func (capi *CapiNozzle) Close() error {
284	capi.lock_bOpen.Lock()
285	defer capi.lock_bOpen.Unlock()
286	if capi.bOpen {
287		capi.bOpen = false
288	}
289	return nil
290}
291
292func (capi *CapiNozzle) handleError() bool {
293	capi.lock_handle_error.RLock()
294	defer capi.lock_handle_error.RUnlock()
295	return capi.handle_error
296}
297
298func (capi *CapiNozzle) disableHandleError() {
299	capi.lock_handle_error.Lock()
300	defer capi.lock_handle_error.Unlock()
301	capi.handle_error = false
302}
303
304func (capi *CapiNozzle) getClient() *net.TCPConn {
305	capi.lock_client.RLock()
306	defer capi.lock_client.RUnlock()
307	return capi.client
308}
309
310func (capi *CapiNozzle) setClient(client *net.TCPConn) {
311	capi.lock_client.Lock()
312	defer capi.lock_client.Unlock()
313	if capi.client != nil {
314		capi.client.Close()
315	}
316	capi.client = client
317}
318
319func (capi *CapiNozzle) Start(settings metadata.ReplicationSettingsMap) error {
320	capi.Logger().Infof("%v starting ....\n", capi.Id())
321
322	err := capi.SetState(common.Part_Starting)
323	if err != nil {
324		return err
325	}
326
327	err = capi.initialize(settings)
328	capi.Logger().Infof("%v initialized\n", capi.Id())
329	if err == nil {
330		capi.childrenWaitGrp.Add(1)
331		go capi.selfMonitor(capi.finish_ch, &capi.childrenWaitGrp)
332
333		capi.childrenWaitGrp.Add(1)
334		go capi.processData_batch(capi.finish_ch, &capi.childrenWaitGrp)
335
336		capi.start_time = time.Now()
337		err = capi.Start_server()
338	}
339
340	if err == nil {
341		err = capi.SetState(common.Part_Running)
342		if err == nil {
343			capi.Logger().Infof("%v has been started successfully\n", capi.Id())
344		}
345	}
346	if err != nil {
347		capi.Logger().Errorf("%v failed to start. err=%v\n", capi.Id(), err)
348	}
349	return err
350}
351
352func (capi *CapiNozzle) Stop() error {
353	capi.Logger().Infof("%v stopping \n", capi.Id())
354
355	err := capi.SetState(common.Part_Stopping)
356	if err != nil {
357		return err
358	}
359
360	capi.Logger().Debugf("%v processed %v items\n", capi.Id(), atomic.LoadUint32(&capi.counter_sent))
361
362	//close data channels
363	for _, dataChan := range capi.vb_dataChan_map {
364		close(dataChan)
365	}
366
367	if capi.batches_ready != nil {
368		capi.Logger().Infof("%v closing batches ready\n", capi.Id())
369		close(capi.batches_ready)
370	}
371
372	err = capi.Stop_server()
373
374	err = capi.SetState(common.Part_Stopped)
375	if err == nil {
376		capi.Logger().Infof("%v has been stopped\n", capi.Id())
377	} else {
378		capi.Logger().Errorf("%v failed to stop. err=%v\n", capi.Id(), err)
379	}
380
381	return err
382}
383
384func (capi *CapiNozzle) batchReady(vbno uint16) error {
385	//move the batch to ready batches channel
386	defer func() {
387		if r := recover(); r != nil {
388			if capi.validateRunningState() == nil {
389				// report error only when capi is still in running state
390				capi.handleGeneralError(errors.New(fmt.Sprintf("%v", r)))
391			}
392		}
393
394		capi.Logger().Debugf("%v End moving batch, %v batches ready\n", capi.Id(), len(capi.batches_ready))
395	}()
396
397	batch := capi.vb_batch_map[vbno]
398	if batch.count() > 0 {
399		capi.Logger().Debugf("%v move the batch (count=%d) for vb %v into ready queue\n", capi.Id(), batch.count(), vbno)
400		select {
401		case capi.batches_ready <- batch:
402			capi.Logger().Debugf("%v There are %d batches in ready queue\n", capi.Id(), len(capi.batches_ready))
403
404			capi.initNewBatch(vbno)
405		}
406	}
407	return nil
408
409}
410
411// Coming from Router's Forward
412func (capi *CapiNozzle) Receive(data interface{}) error {
413	// the attempt to write to dataChan may panic if dataChan has been closed
414	defer func() {
415		if r := recover(); r != nil {
416			capi.Logger().Errorf("%v recovered from %v", capi.Id(), r)
417			if capi.validateRunningState() == nil {
418				// report error only when capi is still in running state
419				capi.handleGeneralError(errors.New(fmt.Sprintf("%v", r)))
420			}
421		}
422	}()
423
424	req := data.(*base.WrappedMCRequest)
425
426	vbno := req.Req.VBucket
427
428	dataChan, ok := capi.vb_dataChan_map[vbno]
429	if !ok {
430		capi.Logger().Errorf("%v received a request with unexpected vb %v\n", capi.Id(), vbno)
431		// No need to redact vb_dataChan_map because the contents within the channels are not printed
432		capi.Logger().Errorf("%v datachan map len=%v, map = %v \n", capi.Id(), len(capi.vb_dataChan_map), capi.vb_dataChan_map)
433	}
434
435	err := capi.validateRunningState()
436	if err != nil {
437		capi.Logger().Infof("%v is in %v state, Recieve did no-op", capi.Id(), capi.State())
438		return err
439	}
440
441	atomic.AddUint32(&capi.counter_received, 1)
442	size := req.Req.Size()
443	atomic.AddInt32(&capi.items_in_dataChan, 1)
444	atomic.AddInt64(&capi.bytes_in_dataChan, int64(size))
445
446	dataChan <- req
447
448	//accumulate the batchCount and batchSize
449	capi.accumuBatch(vbno, req)
450
451	return nil
452}
453
454func (capi *CapiNozzle) accumuBatch(vbno uint16, request *base.WrappedMCRequest) {
455	capi.vb_batch_map_lock <- true
456	defer func() { <-capi.vb_batch_map_lock }()
457
458	batch := capi.vb_batch_map[vbno]
459	_, isFirst, isFull := batch.accumuBatch(request, capi.optimisticRep)
460	if isFirst {
461		select {
462		case capi.batches_nonempty_ch <- true:
463		default:
464			// batches_nonempty_ch is already flagged.
465		}
466	}
467
468	if isFull {
469		capi.batchReady(vbno)
470	}
471}
472
473func (capi *CapiNozzle) processData_batch(finch chan bool, waitGrp *sync.WaitGroup) (err error) {
474	capi.Logger().Infof("%v processData starts..........\n", capi.Id())
475	defer waitGrp.Done()
476	for {
477		select {
478		case <-finch:
479			goto done
480		// Take batch and process it
481		case batch, ok := <-capi.batches_ready:
482			if !ok {
483				capi.Logger().Infof("%v batches_ready closed. Exiting processData.", capi.Id())
484				goto done
485			}
486			select {
487			case <-finch:
488				goto done
489			default:
490				if capi.validateRunningState() != nil {
491					capi.Logger().Infof("%v has stopped. Exiting.", capi.Id())
492					goto done
493				}
494				if capi.IsOpen() {
495					capi.Logger().Debugf("%v Batch Send..., %v batches ready, %v items in queue, count_recieved=%v, count_sent=%v\n", capi.Id(), len(capi.batches_ready), atomic.LoadInt32(&capi.items_in_dataChan), atomic.LoadUint32(&capi.counter_received), atomic.LoadUint32(&capi.counter_sent))
496					err = capi.send_internal(batch)
497					if err != nil {
498						capi.handleGeneralError(err)
499						goto done
500					}
501				}
502			}
503		// Get the not full batch and start processing it
504		case <-capi.batches_nonempty_ch:
505			if capi.validateRunningState() != nil {
506				capi.Logger().Infof("%v has stopped. Exiting", capi.Id())
507				goto done
508			}
509
510			if len(capi.batches_ready) == 0 {
511				// There's currently no batch in place, otherwise, piggy back off the batches_ready above
512				max_count, max_batch_vbno := capi.getBatchWithMaxCount()
513				if max_count > 0 {
514					select {
515					case capi.vb_batch_map_lock <- true:
516						capi.batchReady(max_batch_vbno)
517						<-capi.vb_batch_map_lock
518					default:
519					}
520				}
521			}
522
523			// check if a token needs to be put back into batches_nonempty_ch,
524			// i.e., check if there is at least one non-empty batch remaining
525			nonEmptyBatchExist := capi.checkIfNonEmptyBatchExist()
526
527			if nonEmptyBatchExist {
528				select {
529				case capi.batches_nonempty_ch <- true:
530				default:
531					// batches_nonempty_ch is already flagged.
532				}
533			}
534		}
535	}
536
537done:
538	capi.Logger().Infof("%v processData_batch exits\n", capi.Id())
539	return
540}
541
542func (capi *CapiNozzle) getBatchWithMaxCount() (max_count uint32, max_batch_vbno uint16) {
543	max_count = 0
544	max_batch_vbno = 0
545
546	select {
547	case capi.vb_batch_map_lock <- true:
548		for vbno, batch := range capi.vb_batch_map {
549			if batch.count() > max_count {
550				max_count = batch.count()
551				max_batch_vbno = vbno
552			}
553		}
554		<-capi.vb_batch_map_lock
555	default:
556		// if cannot acquire lock on batch_map, return right away
557	}
558
559	return
560}
561
562func (capi *CapiNozzle) checkIfNonEmptyBatchExist() bool {
563	nonEmptyBatchExist := false
564
565	select {
566	case capi.vb_batch_map_lock <- true:
567	outer:
568		for _, batch := range capi.vb_batch_map {
569			select {
570			case <-batch.batch_nonempty_ch:
571				nonEmptyBatchExist = true
572				break outer
573			default:
574				continue
575			}
576		}
577		<-capi.vb_batch_map_lock
578	default:
579		// if cannot acquire lock on batch_map, return right away
580		// return true to ensure that we will be checking for nonempty channels in the next iteration
581		nonEmptyBatchExist = true
582	}
583
584	return nonEmptyBatchExist
585}
586
587func (capi *CapiNozzle) send_internal(batch *capiBatch) error {
588	var err error
589	if batch != nil {
590		count := batch.count()
591
592		new_counter_sent := atomic.AddUint32(&capi.counter_sent, count)
593		capi.Logger().Debugf("So far, capi %v processed %d items", capi.Id(), new_counter_sent)
594
595		// A map of documents that should not be replicated
596		var bigDoc_noRep_map map[string]bool
597		// Populate no replication map to optimize data bandwidth before actually sending
598		bigDoc_noRep_map, err = capi.batchGetMeta(batch.vbno, batch.bigDoc_map)
599		if err != nil {
600			capi.Logger().Errorf("%v batchGetMeta failed. err=%v\n", capi.Id(), err)
601		} else {
602			// Attach the map to the batch before actually sending
603			batch.bigDoc_noRep_map = bigDoc_noRep_map
604		}
605
606		//batch send
607		err = capi.batchSendWithRetry(batch)
608		if err == nil {
609			capi.recordLastSentBatch(batch.vbno, count)
610		}
611	}
612	return err
613}
614
615/**
616 * batch call for document size larger than the optimistic threshold
617 * Returns a map of all the keys that are fed in bigDoc_map, with a boolean value
618 * The boolean value == true meaning that the document referred by key should *not* be replicated
619 */
620func (capi *CapiNozzle) batchGetMeta(vbno uint16, bigDoc_map base.McRequestMap) (map[string]bool, error) {
621	if capi.Logger().GetLogLevel() >= log.LogLevelDebug {
622		capi.Logger().Debugf("%v batchGetMeta called for vb %v and bigDoc_map with len %v, map=%v%v%v\n", capi.Id(), vbno, len(bigDoc_map), base.UdTagBegin, bigDoc_map, base.UdTagEnd)
623	}
624
625	bigDoc_noRep_map := make(BigDocNoRepMap)
626
627	if len(bigDoc_map) == 0 {
628		return bigDoc_noRep_map, nil
629	}
630
631	couchApiBaseHost, couchApiBasePath, err := capi.getCouchApiBaseHostAndPathForVB(vbno)
632	if err != nil {
633		return nil, err
634	}
635
636	// Used for sending to target
637	key_rev_map := make(map[string]string)
638	// Used for stats updating
639	key_seqnostarttime_map := make(map[string][]interface{})
640	sent_id_map := make(map[string]bool)
641	// Populate necessary data maps above from the passed in bigDoc_map to be able to query the target
642	for id, req := range bigDoc_map {
643		key := string(req.Req.Key)
644		if _, ok := key_rev_map[key]; !ok {
645			key_rev_map[key] = getSerializedRevision(req.Req)
646			key_seqnostarttime_map[key] = []interface{}{req.Seqno, time.Now()}
647			sent_id_map[id] = true
648		}
649	}
650
651	keysAndRevisions, err := json.Marshal(key_rev_map)
652	if err != nil {
653		return nil, err
654	}
655
656	// Query the Target by feeding it the current key -> revisions
657	var out interface{}
658	err, statusCode := capi.utils.QueryRestApiWithAuth(couchApiBaseHost, couchApiBasePath+base.RevsDiffPath, true, capi.config.username, capi.config.password, base.HttpAuthMechPlain, nil, false, nil, nil, base.MethodPost, base.JsonContentType,
659		keysAndRevisions, capi.config.connectionTimeout, &out, nil, false, capi.Logger())
660	capi.Logger().Debugf("%v results of _revs_diff query for vb %v: err=%v, status=%v\n", capi.Id(), vbno, err, statusCode)
661	if err != nil {
662		capi.Logger().Errorf("%v _revs_diff query for vb %v failed with err=%v\n", capi.Id(), vbno, err)
663		return nil, err
664	} else if statusCode != 200 {
665		errMsg := fmt.Sprintf("Received unexpected status code %v from _revs_diff query for vbucket %v.\n", statusCode, vbno)
666		capi.Logger().Errorf("%v %v", capi.Id(), errMsg)
667		return nil, errors.New(errMsg)
668	}
669
670	// Update stats
671	for key, seqnostarttime := range key_seqnostarttime_map {
672		additionalInfo := GetMetaReceivedEventAdditional{Key: key,
673			Seqno:       seqnostarttime[0].(uint64),
674			Commit_time: time.Since(seqnostarttime[1].(time.Time))}
675		capi.RaiseEvent(common.NewEvent(common.GetMetaReceived, nil, capi, nil, additionalInfo))
676	}
677
678	// Convert the result from sending key_rev to target into a map, which if a key exists, means "send me this document"
679	bigDoc_rep_map, ok := out.(base.InterfaceMap)
680	if capi.Logger().GetLogLevel() >= log.LogLevelDebug {
681		capi.Logger().Debugf("%v bigDoc_rep_map=%v%v%v\n", capi.Id(), base.UdTagBegin, bigDoc_rep_map, base.UdTagEnd)
682	}
683	if !ok {
684		capi.Logger().Errorf(fmt.Sprintf("Error parsing return value from _revs_diff query for vbucket %v. bigDoc_rep_map=%v%v%v", vbno, base.UdTagBegin, bigDoc_rep_map, base.UdTagEnd))
685		return nil, errors.New(fmt.Sprintf("Error parsing return value from _revs_diff query for vbucket %v.", vbno))
686	}
687
688	// bigDoc_noRep_map = bigDoc_map - bigDoc_rep_map
689	for id, req := range bigDoc_map {
690		if _, found := sent_id_map[id]; found {
691			docKey := string(req.Req.Key)
692			if _, ok = bigDoc_rep_map[docKey]; !ok {
693				// True == failed CR, else other reasons
694				bigDoc_noRep_map[id] = true
695			}
696		}
697	}
698
699	if capi.Logger().GetLogLevel() >= log.LogLevelDebug {
700		capi.Logger().Debugf("%v done with batchGetMeta,bigDoc_noRep_map=%v\n", capi.Id(), bigDoc_noRep_map.CloneAndRedact())
701	}
702	return bigDoc_noRep_map, nil
703}
704
705func (capi *CapiNozzle) batchSendWithRetry(batch *capiBatch) error {
706	var err error
707	vbno := batch.vbno
708	count := batch.count()
709	dataChan := capi.vb_dataChan_map[vbno]
710
711	// List to be sent
712	req_list := make([]*base.WrappedMCRequest, 0)
713
714	// Make sure only the items that are supposed to be sent are to be sent
715	for i := 0; i < int(count); i++ {
716		item, ok := <-dataChan
717		if !ok {
718			capi.Logger().Debugf("%v exiting batchSendWithRetry since data channel has been closed\n", capi.Id())
719			return nil
720		}
721
722		atomic.AddInt32(&capi.items_in_dataChan, -1)
723		atomic.AddInt64(&capi.bytes_in_dataChan, int64(0-item.Req.Size()))
724
725		needSendStatus := needSend(item, &batch.dataBatch, capi.Logger())
726		if needSendStatus == Send {
727			capi.adjustRequest(item)
728			req_list = append(req_list, item)
729		} else {
730			if needSendStatus == Not_Send_Failed_CR {
731				if capi.Logger().GetLogLevel() >= log.LogLevelDebug {
732					capi.Logger().Debugf("%v did not send doc with key %v since it failed conflict resolution\n", capi.Id(), base.TagUD(item.Req.Key))
733				}
734				additionalInfo := DataFailedCRSourceEventAdditional{Seqno: item.Seqno,
735					Opcode:      encodeOpCode(item.Req.Opcode),
736					IsExpirySet: (binary.BigEndian.Uint32(item.Req.Extras[4:8]) != 0),
737					VBucket:     item.Req.VBucket,
738				}
739				capi.RaiseEvent(common.NewEvent(common.DataFailedCRSource, nil, capi, nil, additionalInfo))
740			}
741
742			// recycle data obj so we don't have to keep allocating/deallocating MCRequests
743			capi.recycleDataObj(item)
744		}
745
746	}
747
748	err = capi.batchUpdateDocsWithRetry(vbno, &req_list)
749	if err == nil {
750		for _, req := range req_list {
751			// requests in req_list have strictly increasing seqnos
752			// each seqno is the new high seqno
753			additionalInfo := DataSentEventAdditional{Seqno: req.Seqno,
754				IsOptRepd:   capi.optimisticRep(req.Req),
755				Commit_time: time.Since(req.Start_time),
756				Opcode:      req.Req.Opcode,
757				IsExpirySet: (binary.BigEndian.Uint32(req.Req.Extras[4:8]) != 0),
758				VBucket:     req.Req.VBucket,
759				Req_size:    req.Req.Size(),
760			}
761			capi.RaiseEvent(common.NewEvent(common.DataSent, nil, capi, nil, additionalInfo))
762
763			//recycle the request object
764			capi.recycleDataObj(req)
765		}
766	} else {
767		capi.Logger().Errorf("%v error updating docs on target. err=%v\n", capi.Id(), err)
768		if err != PartStoppedError {
769			capi.handleGeneralError(err)
770		}
771	}
772
773	return err
774}
775
776func (capi *CapiNozzle) onExit() {
777	//in the process of stopping, no need to report any error to replication manager anymore
778	capi.disableHandleError()
779
780	//notify the data processing routine
781	close(capi.finish_ch)
782	capi.childrenWaitGrp.Wait()
783
784	//cleanup
785	capi.Logger().Infof("%v releasing capi client", capi.Id())
786	client := capi.getClient()
787	if client != nil {
788		client.Close()
789	}
790
791}
792
793func (capi *CapiNozzle) selfMonitor(finch chan bool, waitGrp *sync.WaitGroup) {
794	defer waitGrp.Done()
795	statsTicker := time.NewTicker(capi.config.statsInterval)
796	defer statsTicker.Stop()
797	for {
798		select {
799		case <-finch:
800			goto done
801		case <-statsTicker.C:
802			capi.RaiseEvent(common.NewEvent(common.StatsUpdate, nil, capi, nil, []int{int(atomic.LoadInt32(&capi.items_in_dataChan)), int(atomic.LoadInt64(&capi.bytes_in_dataChan))}))
803		}
804	}
805done:
806	capi.Logger().Infof("%v selfMonitor routine exits", capi.Id())
807
808}
809
810func (capi *CapiNozzle) validateRunningState() error {
811	state := capi.State()
812	if state == common.Part_Stopping || state == common.Part_Stopped || state == common.Part_Error {
813		return PartStoppedError
814	}
815	return nil
816}
817
818func (capi *CapiNozzle) adjustRequest(req *base.WrappedMCRequest) {
819	mc_req := req.Req
820	mc_req.Opcode = encodeOpCode(mc_req.Opcode)
821	mc_req.Cas = 0
822}
823
824//batch call to update docs on target
825func (capi *CapiNozzle) batchUpdateDocsWithRetry(vbno uint16, req_list *[]*base.WrappedMCRequest) error {
826	if len(*req_list) == 0 {
827		return nil
828	}
829
830	num_of_retry := 0
831	backoffTime := capi.config.retryInterval
832	for {
833		err := capi.validateRunningState()
834		if err != nil {
835			return err
836		}
837
838		err = capi.batchUpdateDocs(vbno, req_list)
839		if err == nil {
840			// success. no need to retry further
841			return nil
842		}
843
844		if num_of_retry < capi.config.maxRetry {
845			// reset connection to ensure a clean start
846			err = capi.resetConn()
847			if err != nil {
848				return err
849			}
850			num_of_retry++
851			base.WaitForTimeoutOrFinishSignal(backoffTime, capi.finish_ch)
852			backoffTime *= 2
853			capi.Logger().Infof("%v retrying update docs for vb %v for the %vth time\n", capi.Id(), vbno, num_of_retry)
854		} else {
855			// max retry reached. no need to call resetConn() since pipeline will get restarted
856			return errors.New(fmt.Sprintf("batch update docs failed for vb %v after %v retries", vbno, num_of_retry))
857		}
858	}
859}
860
861func (capi *CapiNozzle) batchUpdateDocs(vbno uint16, req_list *[]*base.WrappedMCRequest) (err error) {
862	capi.Logger().Debugf("%v batchUpdateDocs, vbno=%v, len(req_list)=%v\n", capi.Id(), vbno, len(*req_list))
863
864	couchApiBaseHost, couchApiBasePath, err := capi.getCouchApiBaseHostAndPathForVB(vbno)
865	if err != nil {
866		return
867	}
868
869	/**
870	 * construct docs to send
871	 * doc_list contains slices of documents represented by serialized buffers
872	 */
873	doc_list := make([][]byte, 0)
874	doc_length := 0
875	doc_map := make(map[string]interface{})
876	meta_map := make(map[string]interface{})
877	doc_map[MetaKey] = meta_map
878
879	for _, req := range *req_list {
880		// Populate doc_map with the information of request
881		getDocMap(req.Req, doc_map)
882		var doc_bytes []byte
883		doc_bytes, err = json.Marshal(doc_map)
884		if err != nil {
885			return
886		}
887		doc_bytes = append(doc_bytes, BodyPartsDelimiter...)
888		doc_length += len(doc_bytes)
889		doc_list = append(doc_list, doc_bytes)
890	}
891
892	// remove the unnecessary delimiter at the end of doc list
893	last_doc := doc_list[len(doc_list)-1]
894	doc_list[len(doc_list)-1] = last_doc[:len(last_doc)-len(BodyPartsDelimiter)]
895	doc_length -= len(BodyPartsDelimiter)
896
897	total_length := len(BodyPartsPrefix) + doc_length + len(BodyPartsSuffix)
898
899	http_req, _, err := capi.utils.ConstructHttpRequest(couchApiBaseHost, couchApiBasePath+base.BulkDocsPath, true, capi.config.username, capi.config.password, base.HttpAuthMechPlain, base.UserAuthModeBasic, base.MethodPost, base.JsonContentType,
900		nil, capi.Logger())
901	if err != nil {
902		return
903	}
904
905	// set content length.
906	http_req.Header.Set(base.ContentLength, strconv.Itoa(total_length))
907
908	// enable delayed commit
909	http_req.Header.Set(CouchFullCommitKey, "false")
910
911	// unfortunately request.Write() does not preserve Content-Length. have to encode the request ourselves
912	req_bytes, err := capi.utils.EncodeHttpRequest(http_req)
913	if err != nil {
914		return
915	}
916
917	resp_ch := make(chan bool, 1)
918	err_ch := make(chan error, 2)
919	fin_ch := make(chan bool)
920
921	// data channel for body parts. The per-defined size controls the flow between
922	// the two go routines below so as to reduce the chance of overwhelming the target server
923	part_ch := make(chan []byte, capi.config.uploadWindowSize)
924	waitGrp := &sync.WaitGroup{}
925	// start go routine which actually writes to and reads from tcp connection
926	waitGrp.Add(1)
927	go capi.tcpProxy(vbno, part_ch, resp_ch, err_ch, fin_ch, waitGrp)
928	// start go rountine that write body parts to tcpProxy()
929	waitGrp.Add(1)
930	go capi.writeDocs(vbno, req_bytes, doc_list, part_ch, err_ch, fin_ch, waitGrp)
931
932	ticker := time.NewTicker(capi.config.connectionTimeout)
933	defer ticker.Stop()
934	select {
935	case <-capi.finish_ch:
936		// capi is stopping.
937	case <-resp_ch:
938		// response received. everything is good
939		capi.Logger().Debugf("%v batchUpdateDocs for vb %v succesfully updated %v docs.\n", capi.Id(), vbno, len(*req_list))
940	case err = <-err_ch:
941		// error encountered
942		capi.Logger().Errorf("%v batchUpdateDocs for vb %v failed with err %v.\n", capi.Id(), vbno, err)
943	case <-ticker.C:
944		// connection timed out
945		errMsg := fmt.Sprintf("Connection timeout when updating docs for vb %v", vbno)
946		capi.Logger().Errorf("%v %v", capi.Id(), errMsg)
947		err = errors.New(errMsg)
948	}
949
950	// get all send routines to stop
951	close(fin_ch)
952
953	// wait for writeDocs and tcpProxy routines to stop before returning
954	// this way there are no concurrent writeDocs and tcpProxy routines running
955	// and no concurrent use of capi.client
956	waitGrp.Wait()
957
958	return err
959
960}
961
962func (capi *CapiNozzle) writeDocs(vbno uint16, req_bytes []byte, doc_list [][]byte, part_ch chan []byte,
963	err_ch chan error, fin_ch chan bool, waitGrp *sync.WaitGroup) {
964	defer waitGrp.Done()
965
966	partIndex := 0
967	for {
968		select {
969		case <-fin_ch:
970			capi.Logger().Debugf("%v terminating writeDocs because of closure of finch\n", capi.Id())
971			return
972		default:
973			// if no error, keep sending body parts
974			if partIndex == 0 {
975				// send initial request to tcp
976				if !capi.writeToPartCh(part_ch, req_bytes) {
977					return
978				}
979			} else if partIndex == 1 {
980				// write body part prefix
981				if !capi.writeToPartCh(part_ch, BodyPartsPrefix) {
982					return
983				}
984			} else if partIndex < len(doc_list)+2 {
985				// write individual doc
986				if !capi.writeToPartCh(part_ch, doc_list[partIndex-2]) {
987					return
988				}
989			} else {
990				// write body part suffix
991				if !capi.writeToPartCh(part_ch, BodyPartsSuffix) {
992					return
993				}
994				// all parts have been sent. terminate sendBodyPart rountine
995				close(part_ch)
996				return
997			}
998			partIndex++
999		}
1000
1001	}
1002}
1003
1004// use timeout to give it a chance to detect nozzle stop event and abort
1005func (capi *CapiNozzle) writeToPartCh(part_ch chan []byte, data []byte) bool {
1006	timeoutticker := time.NewTicker(capi.config.writeTimeout)
1007	defer timeoutticker.Stop()
1008	for {
1009		select {
1010		case part_ch <- data:
1011			return true
1012		case <-timeoutticker.C:
1013			if capi.validateRunningState() != nil {
1014				capi.Logger().Infof("%v is no longer running, aborting writing to part ch", capi.Id())
1015				return false
1016			}
1017		}
1018	}
1019}
1020
1021func (capi *CapiNozzle) tcpProxy(vbno uint16, part_ch chan []byte, resp_ch chan bool, err_ch chan error, fin_ch chan bool, waitGrp *sync.WaitGroup) {
1022	defer waitGrp.Done()
1023	capi.Logger().Debugf("%v tcpProxy routine for vb %v is starting\n", capi.Id(), vbno)
1024	for {
1025		select {
1026		case <-fin_ch:
1027			capi.Logger().Debugf("%v tcpProxy routine is exiting because of closure of finch\n", capi.Id())
1028			return
1029		case part, ok := <-part_ch:
1030
1031			if ok {
1032				client := capi.getClient()
1033				client.SetWriteDeadline(time.Now().Add(capi.config.writeTimeout))
1034				_, err := client.Write(part)
1035				if err != nil {
1036					capi.Logger().Errorf("%v Received error when writing boby part. err=%v\n", capi.Id(), err)
1037					err_ch <- err
1038					return
1039				}
1040			} else {
1041				// the closing of part_ch signals that all body parts have been sent. start receiving responses
1042
1043				// read response
1044				client := capi.getClient()
1045				client.SetReadDeadline(time.Now().Add(capi.config.readTimeout))
1046
1047				response, err := http.ReadResponse(bufio.NewReader(client), nil)
1048				if err != nil || response == nil {
1049					errMsg := fmt.Sprintf("Error reading response. vb=%v, err=%v\n", vbno, trimErrorMessage(err))
1050					capi.Logger().Errorf("%v %v", capi.Id(), errMsg)
1051					err_ch <- errors.New(errMsg)
1052					return
1053				}
1054
1055				defer response.Body.Close()
1056
1057				if response.StatusCode != 201 {
1058					errMsg := fmt.Sprintf("Received unexpected status code, %v, from update docs request for vb %v\n", response.StatusCode, vbno)
1059					capi.Logger().Errorf("%v %v", capi.Id(), errMsg)
1060					err_ch <- errors.New(errMsg)
1061
1062					// no need to read leftover bytes, if any, since connection will get reset soon
1063					return
1064				}
1065
1066				_, err = ioutil.ReadAll(response.Body)
1067				if err != nil {
1068					// if we get an error reading the entirety of response body, e.g., because of timeout
1069					// we need to reset connection to give subsequent requests a clean start
1070					// there is no need to return error, though, since the current batch has already
1071					// succeeded (as signaled by the 201 response status)
1072					errMsg := MalformedResponseError + fmt.Sprintf(" vb=%v, err=%v\n", vbno, trimErrorMessage(err))
1073					capi.Logger().Errorf("%v %v", capi.Id(), errMsg)
1074					capi.resetConn()
1075				}
1076
1077				// notify caller that write succeeded
1078				resp_ch <- true
1079
1080				return
1081			}
1082		}
1083	}
1084
1085}
1086
1087func (capi *CapiNozzle) getResponseBodyLength(response *http.Response) (int, error) {
1088	contents, err := ioutil.ReadAll(response.Body)
1089	if err != nil && (err == io.ErrUnexpectedEOF || strings.Contains(err.Error(), base.UnexpectedEOF)) {
1090		// unexpected EOF is expected when response.Body does not contain all response bytes, which happens often
1091		err = nil
1092	}
1093	return len(contents), err
1094}
1095
1096// malformed http response error may print the entire response buffer, which can be arbitrarily long
1097// trim the error message to at most 400 chars to avoid flooding the log file
1098func trimErrorMessage(err error) string {
1099	errMsg := err.Error()
1100	if len(errMsg) > MaxErrorMessageLength {
1101		errMsg = errMsg[:MaxErrorMessageLength]
1102	}
1103	return errMsg
1104}
1105
1106// produce a serialized document from mc request
1107func getDocMap(req *mc.MCRequest, doc_map map[string]interface{}) {
1108	doc_map[BodyKey] = req.Body
1109	meta_map := doc_map[MetaKey].(map[string]interface{})
1110
1111	//TODO need to handle Key being non-UTF8?
1112	meta_map[IdKey] = string(req.Key)
1113	meta_map[RevKey] = getSerializedRevision(req)
1114	meta_map[ExpirationKey] = binary.BigEndian.Uint32(req.Extras[4:8])
1115	meta_map[FlagsKey] = binary.BigEndian.Uint32(req.Extras[0:4])
1116	if req.Opcode == base.DELETE_WITH_META {
1117		meta_map[DeletedKey] = true
1118	} else {
1119		delete(meta_map, DeletedKey)
1120	}
1121
1122	if !base.IsJSON(req.Body) {
1123		meta_map[AttReasonKey] = InvalidJson
1124	} else {
1125		delete(meta_map, AttReasonKey)
1126	}
1127}
1128
1129// produce serialized revision info in the form of revSeq-Cas+Expiration+Flags
1130func getSerializedRevision(req *mc.MCRequest) string {
1131	var revId [16]byte
1132	// CAS
1133	copy(revId[0:8], req.Extras[16:24])
1134	// expiration
1135	copy(revId[8:12], req.Extras[4:8])
1136	// flags
1137	copy(revId[12:16], req.Extras[0:4])
1138
1139	revSeq := binary.BigEndian.Uint64(req.Extras[8:16])
1140	revSeqStr := strconv.FormatUint(revSeq, 10)
1141	revIdStr := hex.EncodeToString(revId[0:16])
1142	return revSeqStr + "-" + revIdStr
1143}
1144
1145func (capi *CapiNozzle) initNewBatch(vbno uint16) {
1146	capi.Logger().Debugf("%v init a new batch for vb %v\n", capi.Id(), vbno)
1147	capi.vb_batch_map[vbno] = &capiBatch{*newBatch(uint32(capi.config.maxCount), uint32(capi.config.maxSize), capi.Logger()), vbno}
1148}
1149
1150func (capi *CapiNozzle) initialize(settings metadata.ReplicationSettingsMap) error {
1151	err := capi.config.initializeConfig(settings, capi.utils)
1152	if err != nil {
1153		return err
1154	}
1155
1156	capi.vb_dataChan_map = make(map[uint16]chan *base.WrappedMCRequest)
1157	for vbno, _ := range capi.config.vbCouchApiBaseMap {
1158		capi.vb_dataChan_map[vbno] = make(chan *base.WrappedMCRequest, capi.config.maxCount*base.CapiDataChanSizeMultiplier)
1159	}
1160	capi.items_in_dataChan = 0
1161	capi.bytes_in_dataChan = 0
1162	capi.batches_ready = make(chan *capiBatch, len(capi.config.vbCouchApiBaseMap)*10)
1163
1164	//enable send
1165	//	capi.send_allow_ch <- true
1166
1167	//init new batches
1168	capi.vb_batch_map = make(map[uint16]*capiBatch)
1169	capi.vb_batch_map_lock = make(chan bool, 1)
1170	for vbno, _ := range capi.config.vbCouchApiBaseMap {
1171		capi.initNewBatch(vbno)
1172	}
1173
1174	capi.Logger().Debugf("%v about to start initializing connection", capi.Id())
1175	// resetConn() initializes the connection when called for the first time
1176	err = capi.resetConn()
1177	if err == nil {
1178		capi.Logger().Infof("%v connection initialization completed.", capi.Id())
1179	} else {
1180		capi.Logger().Errorf("%v connection initialization failed with err=%v.", capi.Id(), err)
1181	}
1182
1183	return err
1184}
1185
1186func (capi *CapiNozzle) PrintStatusSummary() {
1187	capi.Logger().Infof("%v received %v items, sent %v items, last sent batches = %v", capi.Id(), atomic.LoadUint32(&capi.counter_received), atomic.LoadUint32(&capi.counter_sent), capi.getLastSentBatches())
1188}
1189
1190func (capi *CapiNozzle) handleGeneralError(err error) {
1191	if capi.handleError() {
1192		capi.Logger().Errorf("%v raise error condition %v\n", capi.Id(), err)
1193		capi.RaiseEvent(common.NewEvent(common.ErrorEncountered, nil, capi, nil, err))
1194	} else {
1195		capi.Logger().Debugf("%v in shutdown process, err=%v is ignored\n", capi.Id(), err)
1196	}
1197}
1198
1199func (capi *CapiNozzle) optimisticRep(req *mc.MCRequest) bool {
1200	if req != nil {
1201		return uint32(req.Size()) < capi.getOptiRepThreshold()
1202	}
1203	return true
1204}
1205
1206func (capi *CapiNozzle) getOptiRepThreshold() uint32 {
1207	return atomic.LoadUint32(&(capi.config.optiRepThreshold))
1208}
1209
1210func (capi *CapiNozzle) getPoolName(config capiConfig) string {
1211	return "Couch_Capi_" + config.connectStr
1212}
1213
1214func (capi *CapiNozzle) getCouchApiBaseHostAndPathForVB(vbno uint16) (string, string, error) {
1215	couchApiBase, ok := capi.config.vbCouchApiBaseMap[vbno]
1216	if !ok {
1217		return "", "", errors.New(fmt.Sprintf("Cannot find couchApiBase for vbucket %v", vbno))
1218	}
1219
1220	index := strings.LastIndex(couchApiBase, base.UrlDelimiter)
1221	if index < 0 {
1222		return "", "", errors.New(fmt.Sprintf("Error parsing couchApiBase for vbucket %v", vbno))
1223	}
1224	couchApiBaseHost := couchApiBase[:index]
1225	couchApiBasePath := couchApiBase[index:]
1226
1227	return couchApiBaseHost, couchApiBasePath, nil
1228}
1229
1230// resetConn() initializes the connection when called for the first time
1231func (capi *CapiNozzle) resetConn() error {
1232	capi.Logger().Infof("%v resetting capi connection. \n", capi.Id())
1233
1234	if capi.validateRunningState() != nil {
1235		capi.Logger().Infof("%v is not running, no need to resetConn", capi.Id())
1236		return nil
1237	}
1238
1239	// the sole purpose of getClientOpFunc is to match the func signature required by ExponentialBackoffExecutorWithFinishSignal
1240	getClientOpFunc := func(param interface{}) (interface{}, error) {
1241		return capi.utils.NewTCPConn(param.(string))
1242	}
1243
1244	result, err := capi.utils.ExponentialBackoffExecutorWithFinishSignal("capi.resetConn", base.XmemBackoffTimeNewConn, base.XmemMaxRetryNewConn,
1245		base.MetaKvBackoffFactor, getClientOpFunc, capi.config.connectStr, capi.finish_ch)
1246	if err != nil {
1247		capi.Logger().Errorf("%v - Connection reset failed. err=%v\n", capi.Id(), err)
1248		capi.handleGeneralError(err)
1249		return err
1250	}
1251	newClient, ok := result.(*net.TCPConn)
1252	if !ok {
1253		// should never get here
1254		err = fmt.Errorf("%v resetConn returned wrong type of client", capi.Id())
1255		capi.Logger().Error(err.Error())
1256		capi.handleGeneralError(err)
1257		return err
1258	}
1259
1260	capi.setClient(newClient)
1261	capi.Logger().Infof("%v Connection reset succeeded", capi.Id())
1262
1263	return nil
1264}
1265
1266func (capi *CapiNozzle) UpdateSettings(settings metadata.ReplicationSettingsMap) error {
1267	optimisticReplicationThreshold, ok := settings[SETTING_OPTI_REP_THRESHOLD]
1268	if ok {
1269		optimisticReplicationThresholdInt := optimisticReplicationThreshold.(int)
1270		atomic.StoreUint32(&capi.config.optiRepThreshold, uint32(optimisticReplicationThresholdInt))
1271		capi.Logger().Infof("%v updated optimistic replication threshold to %v\n", capi.Id(), optimisticReplicationThresholdInt)
1272	}
1273
1274	return nil
1275}
1276
1277func (capi *CapiNozzle) recycleDataObj(req *base.WrappedMCRequest) {
1278	if capi.dataObj_recycler != nil {
1279		capi.dataObj_recycler(capi.topic, req)
1280	}
1281}
1282
1283func (capi *CapiNozzle) recordLastSentBatch(vbno uint16, batchSize uint32) {
1284	capi.last_sent_batches_lock.Lock()
1285	defer capi.last_sent_batches_lock.Unlock()
1286
1287	// oldestBatchInfo will be rotated out of the array
1288	// reuse it to hold info about the last sent batch
1289	oldestBatchInfo := capi.last_sent_batches[BatchHistorySize-1]
1290	oldestBatchInfo.vbno = vbno
1291	oldestBatchInfo.size = batchSize
1292	for i := BatchHistorySize - 1; i > 0; i-- {
1293		capi.last_sent_batches[i] = capi.last_sent_batches[i-1]
1294	}
1295	capi.last_sent_batches[0] = oldestBatchInfo
1296}
1297
1298func (capi *CapiNozzle) getLastSentBatches() string {
1299	capi.last_sent_batches_lock.RLock()
1300	defer capi.last_sent_batches_lock.RUnlock()
1301	var buffer bytes.Buffer
1302	buffer.WriteByte('[')
1303	first := true
1304	for _, last_sent_batch := range capi.last_sent_batches {
1305		if !first {
1306			buffer.WriteByte(',')
1307		} else {
1308			first = false
1309		}
1310		buffer.WriteString(fmt.Sprintf("%v", *last_sent_batch))
1311	}
1312	buffer.WriteByte(']')
1313	return buffer.String()
1314}
1315