1// Copyright (c) 2013 Couchbase, Inc. 2// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file 3// except in compliance with the License. You may obtain a copy of the License at 4// http://www.apache.org/licenses/LICENSE-2.0 5// Unless required by applicable law or agreed to in writing, software distributed under the 6// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 7// either express or implied. See the License for the specific language governing permissions 8// and limitations under the License. 9 10package parts 11 12import ( 13 "bufio" 14 "bytes" 15 "encoding/binary" 16 "encoding/hex" 17 "encoding/json" 18 "errors" 19 "fmt" 20 mc "github.com/couchbase/gomemcached" 21 base "github.com/couchbase/goxdcr/base" 22 common "github.com/couchbase/goxdcr/common" 23 gen_server "github.com/couchbase/goxdcr/gen_server" 24 "github.com/couchbase/goxdcr/log" 25 "github.com/couchbase/goxdcr/metadata" 26 utilities "github.com/couchbase/goxdcr/utils" 27 "io" 28 "io/ioutil" 29 "net" 30 "net/http" 31 "reflect" 32 "strconv" 33 "strings" 34 "sync" 35 "sync/atomic" 36 "time" 37) 38 39const ( 40 SETTING_UPLOAD_WINDOW_SIZE = "upload_window_size" 41 SETTING_CONNECTION_TIMEOUT = "connection_timeout" 42 SETTING_RETRY_INTERVAL = "retry_interval" 43 44 //default configuration 45 default_maxRetryInterval_capi = 30 * time.Second 46 default_upload_window_size int = 3 // erlang xdcr value 47 default_selfMonitorInterval_capi time.Duration = 300 * time.Millisecond 48 default_maxIdleCount_capi int = 30 49) 50 51var capi_setting_defs base.SettingDefinitions = base.SettingDefinitions{SETTING_BATCHCOUNT: base.NewSettingDef(reflect.TypeOf((*int)(nil)), true), 52 SETTING_BATCHSIZE: base.NewSettingDef(reflect.TypeOf((*int)(nil)), true), 53 SETTING_OPTI_REP_THRESHOLD: base.NewSettingDef(reflect.TypeOf((*int)(nil)), true), 54 SETTING_BATCH_EXPIRATION_TIME: base.NewSettingDef(reflect.TypeOf((*time.Duration)(nil)), false), 55 SETTING_NUMOFRETRY: base.NewSettingDef(reflect.TypeOf((*int)(nil)), false), 56 SETTING_RETRY_INTERVAL: base.NewSettingDef(reflect.TypeOf((*time.Duration)(nil)), false), 57 SETTING_WRITE_TIMEOUT: base.NewSettingDef(reflect.TypeOf((*time.Duration)(nil)), false), 58 SETTING_READ_TIMEOUT: base.NewSettingDef(reflect.TypeOf((*time.Duration)(nil)), false), 59 SETTING_MAX_RETRY_INTERVAL: base.NewSettingDef(reflect.TypeOf((*time.Duration)(nil)), false), 60 SETTING_UPLOAD_WINDOW_SIZE: base.NewSettingDef(reflect.TypeOf((*int)(nil)), false), 61 SETTING_CONNECTION_TIMEOUT: base.NewSettingDef(reflect.TypeOf((*time.Duration)(nil)), false)} 62 63var NewEditsKey = "new_edits" 64var DocsKey = "docs" 65var MetaKey = "meta" 66var BodyKey = "base64" 67var IdKey = "id" 68var RevKey = "rev" 69var ExpirationKey = "expiration" 70var FlagsKey = "flags" 71var DeletedKey = "deleted" 72var AttReasonKey = "att_reason" 73var InvalidJson = "invalid_json" 74 75var BodyPartsPrefix = []byte("{\"new_edits\":false,\"docs\":[") 76var BodyPartsSuffix = []byte("]}") 77var BodyPartsDelimiter = "," 78var SizePartDelimiter = "\r\n" 79 80var CouchFullCommitKey = "X-Couch-Full-Commit" 81 82var MalformedResponseError = "Received malformed response from tcp connection" 83var MaxErrorMessageLength = 400 84 85// nnumber of last sent batch to remember and log 86var BatchHistorySize = 20 87 88/************************************ 89/* struct capiBatch 90 * NOTE: see dataBatch comments for more info 91*************************************/ 92type capiBatch struct { 93 dataBatch 94 vbno uint16 95} 96 97/************************************ 98/* struct capiConfig 99*************************************/ 100type capiConfig struct { 101 baseConfig 102 uploadWindowSize int 103 // timeout of capi rest calls 104 connectionTimeout time.Duration 105 retryInterval time.Duration 106 certificate []byte 107 // key = vbno; value = couchApiBase for capi calls, e.g., http://127.0.0.1:9500/target%2Baa3466851d268241d9465826d3d8dd11%2f13 108 // this map serves two purposes: 1. provides a list of vbs that the capi is responsible for 109 // 2. provides the couchApiBase for each of the vbs 110 vbCouchApiBaseMap map[uint16]string 111} 112 113func newCapiConfig(logger *log.CommonLogger) capiConfig { 114 return capiConfig{ 115 baseConfig: baseConfig{maxCount: -1, 116 maxSize: -1, 117 maxRetry: base.CapiMaxRetryBatchUpdateDocs, 118 writeTimeout: base.CapiWriteTimeout, 119 readTimeout: base.CapiReadTimeout, 120 maxRetryInterval: default_maxRetryInterval_capi, 121 selfMonitorInterval: default_selfMonitorInterval_capi, 122 connectStr: "", 123 username: "", 124 password: "", 125 }, 126 uploadWindowSize: default_upload_window_size, 127 connectionTimeout: base.CapiBatchTimeout, 128 retryInterval: base.CapiRetryInterval, 129 } 130} 131 132func (config *capiConfig) initializeConfig(settings metadata.ReplicationSettingsMap, utils utilities.UtilsIface) error { 133 err := utils.ValidateSettings(capi_setting_defs, settings, config.logger) 134 135 if err == nil { 136 config.baseConfig.initializeConfig(settings) 137 138 if val, ok := settings[SETTING_UPLOAD_WINDOW_SIZE]; ok { 139 config.uploadWindowSize = val.(int) 140 } 141 if val, ok := settings[SETTING_CONNECTION_TIMEOUT]; ok { 142 config.connectionTimeout = val.(time.Duration) 143 } 144 if val, ok := settings[SETTING_RETRY_INTERVAL]; ok { 145 config.retryInterval = val.(time.Duration) 146 } 147 } 148 return err 149} 150 151/************************************ 152/* struct CapiNozzle 153*************************************/ 154type CapiNozzle struct { 155 156 //parent inheritance 157 gen_server.GenServer 158 AbstractPart 159 160 bOpen bool 161 lock_bOpen sync.RWMutex 162 163 //data channels to accept the incoming data, one for each vb 164 vb_dataChan_map map[uint16]chan *base.WrappedMCRequest 165 //the total number of items queued in all data channels 166 items_in_dataChan int32 167 //the total size of data (in bytes) queued in all data channels 168 bytes_in_dataChan int64 169 170 client *net.TCPConn 171 lock_client sync.RWMutex 172 173 //configurable parameter 174 config capiConfig 175 176 //queue for ready batches 177 batches_ready chan *capiBatch 178 179 batches_nonempty_ch chan bool 180 181 //batches to be accumulated, one for each vb 182 vb_batch_map map[uint16]*capiBatch 183 vb_batch_map_lock chan bool 184 185 childrenWaitGrp sync.WaitGroup 186 187 finish_ch chan bool 188 189 counter_sent uint32 190 counter_received uint32 191 start_time time.Time 192 handle_error bool 193 lock_handle_error sync.RWMutex 194 dataObj_recycler base.DataObjRecycler 195 topic string 196 197 utils utilities.UtilsIface 198 199 // stores a list of last sent batches in the form of []*BatchInfo 200 last_sent_batches []*BatchInfo 201 last_sent_batches_lock sync.RWMutex 202} 203 204type BatchInfo struct { 205 vbno uint16 206 size uint32 207} 208 209func NewCapiNozzle(id string, 210 topic string, 211 connectString string, 212 username string, 213 password string, 214 certificate []byte, 215 vbCouchApiBaseMap map[uint16]string, 216 dataObj_recycler base.DataObjRecycler, 217 logger_context *log.LoggerContext, 218 utilsIn utilities.UtilsIface) *CapiNozzle { 219 220 //callback functions from GenServer 221 var msg_callback_func gen_server.Msg_Callback_Func 222 var exit_callback_func gen_server.Exit_Callback_Func 223 var error_handler_func gen_server.Error_Handler_Func 224 225 server := gen_server.NewGenServer(&msg_callback_func, 226 &exit_callback_func, &error_handler_func, logger_context, "CapiNozzle", utilsIn) 227 part := NewAbstractPartWithLogger(id, server.Logger()) 228 229 capi := &CapiNozzle{GenServer: server, /*gen_server.GenServer*/ 230 AbstractPart: part, /*part.AbstractPart*/ 231 bOpen: true, /*bOpen bool*/ 232 lock_bOpen: sync.RWMutex{}, /*lock_bOpen sync.RWMutex*/ 233 config: newCapiConfig(server.Logger()), /*config capiConfig*/ 234 batches_ready: nil, /*batches_ready chan *capiBatch*/ 235 childrenWaitGrp: sync.WaitGroup{}, /*childrenWaitGrp sync.WaitGroup*/ 236 finish_ch: make(chan bool, 1), 237 batches_nonempty_ch: make(chan bool, 1), 238 // send_allow_ch: make(chan bool, 1), /*send_allow_ch chan bool*/ 239 counter_sent: 0, 240 handle_error: true, 241 lock_handle_error: sync.RWMutex{}, 242 counter_received: 0, 243 dataObj_recycler: dataObj_recycler, 244 topic: topic, 245 utils: utilsIn, 246 } 247 248 capi.last_sent_batches = make([]*BatchInfo, BatchHistorySize) 249 for i := 0; i < BatchHistorySize; i++ { 250 capi.last_sent_batches[i] = &BatchInfo{} 251 } 252 253 capi.config.connectStr = connectString 254 capi.config.username = username 255 capi.config.password = password 256 capi.config.certificate = certificate 257 capi.config.vbCouchApiBaseMap = vbCouchApiBaseMap 258 259 msg_callback_func = nil 260 exit_callback_func = capi.onExit 261 error_handler_func = capi.handleGeneralError 262 263 return capi 264 265} 266 267func (capi *CapiNozzle) IsOpen() bool { 268 capi.lock_bOpen.RLock() 269 defer capi.lock_bOpen.RUnlock() 270 return capi.bOpen 271} 272 273func (capi *CapiNozzle) Open() error { 274 capi.lock_bOpen.Lock() 275 defer capi.lock_bOpen.Unlock() 276 if !capi.bOpen { 277 capi.bOpen = true 278 279 } 280 return nil 281} 282 283func (capi *CapiNozzle) Close() error { 284 capi.lock_bOpen.Lock() 285 defer capi.lock_bOpen.Unlock() 286 if capi.bOpen { 287 capi.bOpen = false 288 } 289 return nil 290} 291 292func (capi *CapiNozzle) handleError() bool { 293 capi.lock_handle_error.RLock() 294 defer capi.lock_handle_error.RUnlock() 295 return capi.handle_error 296} 297 298func (capi *CapiNozzle) disableHandleError() { 299 capi.lock_handle_error.Lock() 300 defer capi.lock_handle_error.Unlock() 301 capi.handle_error = false 302} 303 304func (capi *CapiNozzle) getClient() *net.TCPConn { 305 capi.lock_client.RLock() 306 defer capi.lock_client.RUnlock() 307 return capi.client 308} 309 310func (capi *CapiNozzle) setClient(client *net.TCPConn) { 311 capi.lock_client.Lock() 312 defer capi.lock_client.Unlock() 313 if capi.client != nil { 314 capi.client.Close() 315 } 316 capi.client = client 317} 318 319func (capi *CapiNozzle) Start(settings metadata.ReplicationSettingsMap) error { 320 capi.Logger().Infof("%v starting ....\n", capi.Id()) 321 322 err := capi.SetState(common.Part_Starting) 323 if err != nil { 324 return err 325 } 326 327 err = capi.initialize(settings) 328 capi.Logger().Infof("%v initialized\n", capi.Id()) 329 if err == nil { 330 capi.childrenWaitGrp.Add(1) 331 go capi.selfMonitor(capi.finish_ch, &capi.childrenWaitGrp) 332 333 capi.childrenWaitGrp.Add(1) 334 go capi.processData_batch(capi.finish_ch, &capi.childrenWaitGrp) 335 336 capi.start_time = time.Now() 337 err = capi.Start_server() 338 } 339 340 if err == nil { 341 err = capi.SetState(common.Part_Running) 342 if err == nil { 343 capi.Logger().Infof("%v has been started successfully\n", capi.Id()) 344 } 345 } 346 if err != nil { 347 capi.Logger().Errorf("%v failed to start. err=%v\n", capi.Id(), err) 348 } 349 return err 350} 351 352func (capi *CapiNozzle) Stop() error { 353 capi.Logger().Infof("%v stopping \n", capi.Id()) 354 355 err := capi.SetState(common.Part_Stopping) 356 if err != nil { 357 return err 358 } 359 360 capi.Logger().Debugf("%v processed %v items\n", capi.Id(), atomic.LoadUint32(&capi.counter_sent)) 361 362 //close data channels 363 for _, dataChan := range capi.vb_dataChan_map { 364 close(dataChan) 365 } 366 367 if capi.batches_ready != nil { 368 capi.Logger().Infof("%v closing batches ready\n", capi.Id()) 369 close(capi.batches_ready) 370 } 371 372 err = capi.Stop_server() 373 374 err = capi.SetState(common.Part_Stopped) 375 if err == nil { 376 capi.Logger().Infof("%v has been stopped\n", capi.Id()) 377 } else { 378 capi.Logger().Errorf("%v failed to stop. err=%v\n", capi.Id(), err) 379 } 380 381 return err 382} 383 384func (capi *CapiNozzle) batchReady(vbno uint16) error { 385 //move the batch to ready batches channel 386 defer func() { 387 if r := recover(); r != nil { 388 if capi.validateRunningState() == nil { 389 // report error only when capi is still in running state 390 capi.handleGeneralError(errors.New(fmt.Sprintf("%v", r))) 391 } 392 } 393 394 capi.Logger().Debugf("%v End moving batch, %v batches ready\n", capi.Id(), len(capi.batches_ready)) 395 }() 396 397 batch := capi.vb_batch_map[vbno] 398 if batch.count() > 0 { 399 capi.Logger().Debugf("%v move the batch (count=%d) for vb %v into ready queue\n", capi.Id(), batch.count(), vbno) 400 select { 401 case capi.batches_ready <- batch: 402 capi.Logger().Debugf("%v There are %d batches in ready queue\n", capi.Id(), len(capi.batches_ready)) 403 404 capi.initNewBatch(vbno) 405 } 406 } 407 return nil 408 409} 410 411// Coming from Router's Forward 412func (capi *CapiNozzle) Receive(data interface{}) error { 413 // the attempt to write to dataChan may panic if dataChan has been closed 414 defer func() { 415 if r := recover(); r != nil { 416 capi.Logger().Errorf("%v recovered from %v", capi.Id(), r) 417 if capi.validateRunningState() == nil { 418 // report error only when capi is still in running state 419 capi.handleGeneralError(errors.New(fmt.Sprintf("%v", r))) 420 } 421 } 422 }() 423 424 req := data.(*base.WrappedMCRequest) 425 426 vbno := req.Req.VBucket 427 428 dataChan, ok := capi.vb_dataChan_map[vbno] 429 if !ok { 430 capi.Logger().Errorf("%v received a request with unexpected vb %v\n", capi.Id(), vbno) 431 // No need to redact vb_dataChan_map because the contents within the channels are not printed 432 capi.Logger().Errorf("%v datachan map len=%v, map = %v \n", capi.Id(), len(capi.vb_dataChan_map), capi.vb_dataChan_map) 433 } 434 435 err := capi.validateRunningState() 436 if err != nil { 437 capi.Logger().Infof("%v is in %v state, Recieve did no-op", capi.Id(), capi.State()) 438 return err 439 } 440 441 atomic.AddUint32(&capi.counter_received, 1) 442 size := req.Req.Size() 443 atomic.AddInt32(&capi.items_in_dataChan, 1) 444 atomic.AddInt64(&capi.bytes_in_dataChan, int64(size)) 445 446 dataChan <- req 447 448 //accumulate the batchCount and batchSize 449 capi.accumuBatch(vbno, req) 450 451 return nil 452} 453 454func (capi *CapiNozzle) accumuBatch(vbno uint16, request *base.WrappedMCRequest) { 455 capi.vb_batch_map_lock <- true 456 defer func() { <-capi.vb_batch_map_lock }() 457 458 batch := capi.vb_batch_map[vbno] 459 _, isFirst, isFull := batch.accumuBatch(request, capi.optimisticRep) 460 if isFirst { 461 select { 462 case capi.batches_nonempty_ch <- true: 463 default: 464 // batches_nonempty_ch is already flagged. 465 } 466 } 467 468 if isFull { 469 capi.batchReady(vbno) 470 } 471} 472 473func (capi *CapiNozzle) processData_batch(finch chan bool, waitGrp *sync.WaitGroup) (err error) { 474 capi.Logger().Infof("%v processData starts..........\n", capi.Id()) 475 defer waitGrp.Done() 476 for { 477 select { 478 case <-finch: 479 goto done 480 // Take batch and process it 481 case batch, ok := <-capi.batches_ready: 482 if !ok { 483 capi.Logger().Infof("%v batches_ready closed. Exiting processData.", capi.Id()) 484 goto done 485 } 486 select { 487 case <-finch: 488 goto done 489 default: 490 if capi.validateRunningState() != nil { 491 capi.Logger().Infof("%v has stopped. Exiting.", capi.Id()) 492 goto done 493 } 494 if capi.IsOpen() { 495 capi.Logger().Debugf("%v Batch Send..., %v batches ready, %v items in queue, count_recieved=%v, count_sent=%v\n", capi.Id(), len(capi.batches_ready), atomic.LoadInt32(&capi.items_in_dataChan), atomic.LoadUint32(&capi.counter_received), atomic.LoadUint32(&capi.counter_sent)) 496 err = capi.send_internal(batch) 497 if err != nil { 498 capi.handleGeneralError(err) 499 goto done 500 } 501 } 502 } 503 // Get the not full batch and start processing it 504 case <-capi.batches_nonempty_ch: 505 if capi.validateRunningState() != nil { 506 capi.Logger().Infof("%v has stopped. Exiting", capi.Id()) 507 goto done 508 } 509 510 if len(capi.batches_ready) == 0 { 511 // There's currently no batch in place, otherwise, piggy back off the batches_ready above 512 max_count, max_batch_vbno := capi.getBatchWithMaxCount() 513 if max_count > 0 { 514 select { 515 case capi.vb_batch_map_lock <- true: 516 capi.batchReady(max_batch_vbno) 517 <-capi.vb_batch_map_lock 518 default: 519 } 520 } 521 } 522 523 // check if a token needs to be put back into batches_nonempty_ch, 524 // i.e., check if there is at least one non-empty batch remaining 525 nonEmptyBatchExist := capi.checkIfNonEmptyBatchExist() 526 527 if nonEmptyBatchExist { 528 select { 529 case capi.batches_nonempty_ch <- true: 530 default: 531 // batches_nonempty_ch is already flagged. 532 } 533 } 534 } 535 } 536 537done: 538 capi.Logger().Infof("%v processData_batch exits\n", capi.Id()) 539 return 540} 541 542func (capi *CapiNozzle) getBatchWithMaxCount() (max_count uint32, max_batch_vbno uint16) { 543 max_count = 0 544 max_batch_vbno = 0 545 546 select { 547 case capi.vb_batch_map_lock <- true: 548 for vbno, batch := range capi.vb_batch_map { 549 if batch.count() > max_count { 550 max_count = batch.count() 551 max_batch_vbno = vbno 552 } 553 } 554 <-capi.vb_batch_map_lock 555 default: 556 // if cannot acquire lock on batch_map, return right away 557 } 558 559 return 560} 561 562func (capi *CapiNozzle) checkIfNonEmptyBatchExist() bool { 563 nonEmptyBatchExist := false 564 565 select { 566 case capi.vb_batch_map_lock <- true: 567 outer: 568 for _, batch := range capi.vb_batch_map { 569 select { 570 case <-batch.batch_nonempty_ch: 571 nonEmptyBatchExist = true 572 break outer 573 default: 574 continue 575 } 576 } 577 <-capi.vb_batch_map_lock 578 default: 579 // if cannot acquire lock on batch_map, return right away 580 // return true to ensure that we will be checking for nonempty channels in the next iteration 581 nonEmptyBatchExist = true 582 } 583 584 return nonEmptyBatchExist 585} 586 587func (capi *CapiNozzle) send_internal(batch *capiBatch) error { 588 var err error 589 if batch != nil { 590 count := batch.count() 591 592 new_counter_sent := atomic.AddUint32(&capi.counter_sent, count) 593 capi.Logger().Debugf("So far, capi %v processed %d items", capi.Id(), new_counter_sent) 594 595 // A map of documents that should not be replicated 596 var bigDoc_noRep_map map[string]bool 597 // Populate no replication map to optimize data bandwidth before actually sending 598 bigDoc_noRep_map, err = capi.batchGetMeta(batch.vbno, batch.bigDoc_map) 599 if err != nil { 600 capi.Logger().Errorf("%v batchGetMeta failed. err=%v\n", capi.Id(), err) 601 } else { 602 // Attach the map to the batch before actually sending 603 batch.bigDoc_noRep_map = bigDoc_noRep_map 604 } 605 606 //batch send 607 err = capi.batchSendWithRetry(batch) 608 if err == nil { 609 capi.recordLastSentBatch(batch.vbno, count) 610 } 611 } 612 return err 613} 614 615/** 616 * batch call for document size larger than the optimistic threshold 617 * Returns a map of all the keys that are fed in bigDoc_map, with a boolean value 618 * The boolean value == true meaning that the document referred by key should *not* be replicated 619 */ 620func (capi *CapiNozzle) batchGetMeta(vbno uint16, bigDoc_map base.McRequestMap) (map[string]bool, error) { 621 if capi.Logger().GetLogLevel() >= log.LogLevelDebug { 622 capi.Logger().Debugf("%v batchGetMeta called for vb %v and bigDoc_map with len %v, map=%v%v%v\n", capi.Id(), vbno, len(bigDoc_map), base.UdTagBegin, bigDoc_map, base.UdTagEnd) 623 } 624 625 bigDoc_noRep_map := make(BigDocNoRepMap) 626 627 if len(bigDoc_map) == 0 { 628 return bigDoc_noRep_map, nil 629 } 630 631 couchApiBaseHost, couchApiBasePath, err := capi.getCouchApiBaseHostAndPathForVB(vbno) 632 if err != nil { 633 return nil, err 634 } 635 636 // Used for sending to target 637 key_rev_map := make(map[string]string) 638 // Used for stats updating 639 key_seqnostarttime_map := make(map[string][]interface{}) 640 sent_id_map := make(map[string]bool) 641 // Populate necessary data maps above from the passed in bigDoc_map to be able to query the target 642 for id, req := range bigDoc_map { 643 key := string(req.Req.Key) 644 if _, ok := key_rev_map[key]; !ok { 645 key_rev_map[key] = getSerializedRevision(req.Req) 646 key_seqnostarttime_map[key] = []interface{}{req.Seqno, time.Now()} 647 sent_id_map[id] = true 648 } 649 } 650 651 keysAndRevisions, err := json.Marshal(key_rev_map) 652 if err != nil { 653 return nil, err 654 } 655 656 // Query the Target by feeding it the current key -> revisions 657 var out interface{} 658 err, statusCode := capi.utils.QueryRestApiWithAuth(couchApiBaseHost, couchApiBasePath+base.RevsDiffPath, true, capi.config.username, capi.config.password, base.HttpAuthMechPlain, nil, false, nil, nil, base.MethodPost, base.JsonContentType, 659 keysAndRevisions, capi.config.connectionTimeout, &out, nil, false, capi.Logger()) 660 capi.Logger().Debugf("%v results of _revs_diff query for vb %v: err=%v, status=%v\n", capi.Id(), vbno, err, statusCode) 661 if err != nil { 662 capi.Logger().Errorf("%v _revs_diff query for vb %v failed with err=%v\n", capi.Id(), vbno, err) 663 return nil, err 664 } else if statusCode != 200 { 665 errMsg := fmt.Sprintf("Received unexpected status code %v from _revs_diff query for vbucket %v.\n", statusCode, vbno) 666 capi.Logger().Errorf("%v %v", capi.Id(), errMsg) 667 return nil, errors.New(errMsg) 668 } 669 670 // Update stats 671 for key, seqnostarttime := range key_seqnostarttime_map { 672 additionalInfo := GetMetaReceivedEventAdditional{Key: key, 673 Seqno: seqnostarttime[0].(uint64), 674 Commit_time: time.Since(seqnostarttime[1].(time.Time))} 675 capi.RaiseEvent(common.NewEvent(common.GetMetaReceived, nil, capi, nil, additionalInfo)) 676 } 677 678 // Convert the result from sending key_rev to target into a map, which if a key exists, means "send me this document" 679 bigDoc_rep_map, ok := out.(base.InterfaceMap) 680 if capi.Logger().GetLogLevel() >= log.LogLevelDebug { 681 capi.Logger().Debugf("%v bigDoc_rep_map=%v%v%v\n", capi.Id(), base.UdTagBegin, bigDoc_rep_map, base.UdTagEnd) 682 } 683 if !ok { 684 capi.Logger().Errorf(fmt.Sprintf("Error parsing return value from _revs_diff query for vbucket %v. bigDoc_rep_map=%v%v%v", vbno, base.UdTagBegin, bigDoc_rep_map, base.UdTagEnd)) 685 return nil, errors.New(fmt.Sprintf("Error parsing return value from _revs_diff query for vbucket %v.", vbno)) 686 } 687 688 // bigDoc_noRep_map = bigDoc_map - bigDoc_rep_map 689 for id, req := range bigDoc_map { 690 if _, found := sent_id_map[id]; found { 691 docKey := string(req.Req.Key) 692 if _, ok = bigDoc_rep_map[docKey]; !ok { 693 // True == failed CR, else other reasons 694 bigDoc_noRep_map[id] = true 695 } 696 } 697 } 698 699 if capi.Logger().GetLogLevel() >= log.LogLevelDebug { 700 capi.Logger().Debugf("%v done with batchGetMeta,bigDoc_noRep_map=%v\n", capi.Id(), bigDoc_noRep_map.CloneAndRedact()) 701 } 702 return bigDoc_noRep_map, nil 703} 704 705func (capi *CapiNozzle) batchSendWithRetry(batch *capiBatch) error { 706 var err error 707 vbno := batch.vbno 708 count := batch.count() 709 dataChan := capi.vb_dataChan_map[vbno] 710 711 // List to be sent 712 req_list := make([]*base.WrappedMCRequest, 0) 713 714 // Make sure only the items that are supposed to be sent are to be sent 715 for i := 0; i < int(count); i++ { 716 item, ok := <-dataChan 717 if !ok { 718 capi.Logger().Debugf("%v exiting batchSendWithRetry since data channel has been closed\n", capi.Id()) 719 return nil 720 } 721 722 atomic.AddInt32(&capi.items_in_dataChan, -1) 723 atomic.AddInt64(&capi.bytes_in_dataChan, int64(0-item.Req.Size())) 724 725 needSendStatus := needSend(item, &batch.dataBatch, capi.Logger()) 726 if needSendStatus == Send { 727 capi.adjustRequest(item) 728 req_list = append(req_list, item) 729 } else { 730 if needSendStatus == Not_Send_Failed_CR { 731 if capi.Logger().GetLogLevel() >= log.LogLevelDebug { 732 capi.Logger().Debugf("%v did not send doc with key %v since it failed conflict resolution\n", capi.Id(), base.TagUD(item.Req.Key)) 733 } 734 additionalInfo := DataFailedCRSourceEventAdditional{Seqno: item.Seqno, 735 Opcode: encodeOpCode(item.Req.Opcode), 736 IsExpirySet: (binary.BigEndian.Uint32(item.Req.Extras[4:8]) != 0), 737 VBucket: item.Req.VBucket, 738 } 739 capi.RaiseEvent(common.NewEvent(common.DataFailedCRSource, nil, capi, nil, additionalInfo)) 740 } 741 742 // recycle data obj so we don't have to keep allocating/deallocating MCRequests 743 capi.recycleDataObj(item) 744 } 745 746 } 747 748 err = capi.batchUpdateDocsWithRetry(vbno, &req_list) 749 if err == nil { 750 for _, req := range req_list { 751 // requests in req_list have strictly increasing seqnos 752 // each seqno is the new high seqno 753 additionalInfo := DataSentEventAdditional{Seqno: req.Seqno, 754 IsOptRepd: capi.optimisticRep(req.Req), 755 Commit_time: time.Since(req.Start_time), 756 Opcode: req.Req.Opcode, 757 IsExpirySet: (binary.BigEndian.Uint32(req.Req.Extras[4:8]) != 0), 758 VBucket: req.Req.VBucket, 759 Req_size: req.Req.Size(), 760 } 761 capi.RaiseEvent(common.NewEvent(common.DataSent, nil, capi, nil, additionalInfo)) 762 763 //recycle the request object 764 capi.recycleDataObj(req) 765 } 766 } else { 767 capi.Logger().Errorf("%v error updating docs on target. err=%v\n", capi.Id(), err) 768 if err != PartStoppedError { 769 capi.handleGeneralError(err) 770 } 771 } 772 773 return err 774} 775 776func (capi *CapiNozzle) onExit() { 777 //in the process of stopping, no need to report any error to replication manager anymore 778 capi.disableHandleError() 779 780 //notify the data processing routine 781 close(capi.finish_ch) 782 capi.childrenWaitGrp.Wait() 783 784 //cleanup 785 capi.Logger().Infof("%v releasing capi client", capi.Id()) 786 client := capi.getClient() 787 if client != nil { 788 client.Close() 789 } 790 791} 792 793func (capi *CapiNozzle) selfMonitor(finch chan bool, waitGrp *sync.WaitGroup) { 794 defer waitGrp.Done() 795 statsTicker := time.NewTicker(capi.config.statsInterval) 796 defer statsTicker.Stop() 797 for { 798 select { 799 case <-finch: 800 goto done 801 case <-statsTicker.C: 802 capi.RaiseEvent(common.NewEvent(common.StatsUpdate, nil, capi, nil, []int{int(atomic.LoadInt32(&capi.items_in_dataChan)), int(atomic.LoadInt64(&capi.bytes_in_dataChan))})) 803 } 804 } 805done: 806 capi.Logger().Infof("%v selfMonitor routine exits", capi.Id()) 807 808} 809 810func (capi *CapiNozzle) validateRunningState() error { 811 state := capi.State() 812 if state == common.Part_Stopping || state == common.Part_Stopped || state == common.Part_Error { 813 return PartStoppedError 814 } 815 return nil 816} 817 818func (capi *CapiNozzle) adjustRequest(req *base.WrappedMCRequest) { 819 mc_req := req.Req 820 mc_req.Opcode = encodeOpCode(mc_req.Opcode) 821 mc_req.Cas = 0 822} 823 824//batch call to update docs on target 825func (capi *CapiNozzle) batchUpdateDocsWithRetry(vbno uint16, req_list *[]*base.WrappedMCRequest) error { 826 if len(*req_list) == 0 { 827 return nil 828 } 829 830 num_of_retry := 0 831 backoffTime := capi.config.retryInterval 832 for { 833 err := capi.validateRunningState() 834 if err != nil { 835 return err 836 } 837 838 err = capi.batchUpdateDocs(vbno, req_list) 839 if err == nil { 840 // success. no need to retry further 841 return nil 842 } 843 844 if num_of_retry < capi.config.maxRetry { 845 // reset connection to ensure a clean start 846 err = capi.resetConn() 847 if err != nil { 848 return err 849 } 850 num_of_retry++ 851 base.WaitForTimeoutOrFinishSignal(backoffTime, capi.finish_ch) 852 backoffTime *= 2 853 capi.Logger().Infof("%v retrying update docs for vb %v for the %vth time\n", capi.Id(), vbno, num_of_retry) 854 } else { 855 // max retry reached. no need to call resetConn() since pipeline will get restarted 856 return errors.New(fmt.Sprintf("batch update docs failed for vb %v after %v retries", vbno, num_of_retry)) 857 } 858 } 859} 860 861func (capi *CapiNozzle) batchUpdateDocs(vbno uint16, req_list *[]*base.WrappedMCRequest) (err error) { 862 capi.Logger().Debugf("%v batchUpdateDocs, vbno=%v, len(req_list)=%v\n", capi.Id(), vbno, len(*req_list)) 863 864 couchApiBaseHost, couchApiBasePath, err := capi.getCouchApiBaseHostAndPathForVB(vbno) 865 if err != nil { 866 return 867 } 868 869 /** 870 * construct docs to send 871 * doc_list contains slices of documents represented by serialized buffers 872 */ 873 doc_list := make([][]byte, 0) 874 doc_length := 0 875 doc_map := make(map[string]interface{}) 876 meta_map := make(map[string]interface{}) 877 doc_map[MetaKey] = meta_map 878 879 for _, req := range *req_list { 880 // Populate doc_map with the information of request 881 getDocMap(req.Req, doc_map) 882 var doc_bytes []byte 883 doc_bytes, err = json.Marshal(doc_map) 884 if err != nil { 885 return 886 } 887 doc_bytes = append(doc_bytes, BodyPartsDelimiter...) 888 doc_length += len(doc_bytes) 889 doc_list = append(doc_list, doc_bytes) 890 } 891 892 // remove the unnecessary delimiter at the end of doc list 893 last_doc := doc_list[len(doc_list)-1] 894 doc_list[len(doc_list)-1] = last_doc[:len(last_doc)-len(BodyPartsDelimiter)] 895 doc_length -= len(BodyPartsDelimiter) 896 897 total_length := len(BodyPartsPrefix) + doc_length + len(BodyPartsSuffix) 898 899 http_req, _, err := capi.utils.ConstructHttpRequest(couchApiBaseHost, couchApiBasePath+base.BulkDocsPath, true, capi.config.username, capi.config.password, base.HttpAuthMechPlain, base.UserAuthModeBasic, base.MethodPost, base.JsonContentType, 900 nil, capi.Logger()) 901 if err != nil { 902 return 903 } 904 905 // set content length. 906 http_req.Header.Set(base.ContentLength, strconv.Itoa(total_length)) 907 908 // enable delayed commit 909 http_req.Header.Set(CouchFullCommitKey, "false") 910 911 // unfortunately request.Write() does not preserve Content-Length. have to encode the request ourselves 912 req_bytes, err := capi.utils.EncodeHttpRequest(http_req) 913 if err != nil { 914 return 915 } 916 917 resp_ch := make(chan bool, 1) 918 err_ch := make(chan error, 2) 919 fin_ch := make(chan bool) 920 921 // data channel for body parts. The per-defined size controls the flow between 922 // the two go routines below so as to reduce the chance of overwhelming the target server 923 part_ch := make(chan []byte, capi.config.uploadWindowSize) 924 waitGrp := &sync.WaitGroup{} 925 // start go routine which actually writes to and reads from tcp connection 926 waitGrp.Add(1) 927 go capi.tcpProxy(vbno, part_ch, resp_ch, err_ch, fin_ch, waitGrp) 928 // start go rountine that write body parts to tcpProxy() 929 waitGrp.Add(1) 930 go capi.writeDocs(vbno, req_bytes, doc_list, part_ch, err_ch, fin_ch, waitGrp) 931 932 ticker := time.NewTicker(capi.config.connectionTimeout) 933 defer ticker.Stop() 934 select { 935 case <-capi.finish_ch: 936 // capi is stopping. 937 case <-resp_ch: 938 // response received. everything is good 939 capi.Logger().Debugf("%v batchUpdateDocs for vb %v succesfully updated %v docs.\n", capi.Id(), vbno, len(*req_list)) 940 case err = <-err_ch: 941 // error encountered 942 capi.Logger().Errorf("%v batchUpdateDocs for vb %v failed with err %v.\n", capi.Id(), vbno, err) 943 case <-ticker.C: 944 // connection timed out 945 errMsg := fmt.Sprintf("Connection timeout when updating docs for vb %v", vbno) 946 capi.Logger().Errorf("%v %v", capi.Id(), errMsg) 947 err = errors.New(errMsg) 948 } 949 950 // get all send routines to stop 951 close(fin_ch) 952 953 // wait for writeDocs and tcpProxy routines to stop before returning 954 // this way there are no concurrent writeDocs and tcpProxy routines running 955 // and no concurrent use of capi.client 956 waitGrp.Wait() 957 958 return err 959 960} 961 962func (capi *CapiNozzle) writeDocs(vbno uint16, req_bytes []byte, doc_list [][]byte, part_ch chan []byte, 963 err_ch chan error, fin_ch chan bool, waitGrp *sync.WaitGroup) { 964 defer waitGrp.Done() 965 966 partIndex := 0 967 for { 968 select { 969 case <-fin_ch: 970 capi.Logger().Debugf("%v terminating writeDocs because of closure of finch\n", capi.Id()) 971 return 972 default: 973 // if no error, keep sending body parts 974 if partIndex == 0 { 975 // send initial request to tcp 976 if !capi.writeToPartCh(part_ch, req_bytes) { 977 return 978 } 979 } else if partIndex == 1 { 980 // write body part prefix 981 if !capi.writeToPartCh(part_ch, BodyPartsPrefix) { 982 return 983 } 984 } else if partIndex < len(doc_list)+2 { 985 // write individual doc 986 if !capi.writeToPartCh(part_ch, doc_list[partIndex-2]) { 987 return 988 } 989 } else { 990 // write body part suffix 991 if !capi.writeToPartCh(part_ch, BodyPartsSuffix) { 992 return 993 } 994 // all parts have been sent. terminate sendBodyPart rountine 995 close(part_ch) 996 return 997 } 998 partIndex++ 999 } 1000 1001 } 1002} 1003 1004// use timeout to give it a chance to detect nozzle stop event and abort 1005func (capi *CapiNozzle) writeToPartCh(part_ch chan []byte, data []byte) bool { 1006 timeoutticker := time.NewTicker(capi.config.writeTimeout) 1007 defer timeoutticker.Stop() 1008 for { 1009 select { 1010 case part_ch <- data: 1011 return true 1012 case <-timeoutticker.C: 1013 if capi.validateRunningState() != nil { 1014 capi.Logger().Infof("%v is no longer running, aborting writing to part ch", capi.Id()) 1015 return false 1016 } 1017 } 1018 } 1019} 1020 1021func (capi *CapiNozzle) tcpProxy(vbno uint16, part_ch chan []byte, resp_ch chan bool, err_ch chan error, fin_ch chan bool, waitGrp *sync.WaitGroup) { 1022 defer waitGrp.Done() 1023 capi.Logger().Debugf("%v tcpProxy routine for vb %v is starting\n", capi.Id(), vbno) 1024 for { 1025 select { 1026 case <-fin_ch: 1027 capi.Logger().Debugf("%v tcpProxy routine is exiting because of closure of finch\n", capi.Id()) 1028 return 1029 case part, ok := <-part_ch: 1030 1031 if ok { 1032 client := capi.getClient() 1033 client.SetWriteDeadline(time.Now().Add(capi.config.writeTimeout)) 1034 _, err := client.Write(part) 1035 if err != nil { 1036 capi.Logger().Errorf("%v Received error when writing boby part. err=%v\n", capi.Id(), err) 1037 err_ch <- err 1038 return 1039 } 1040 } else { 1041 // the closing of part_ch signals that all body parts have been sent. start receiving responses 1042 1043 // read response 1044 client := capi.getClient() 1045 client.SetReadDeadline(time.Now().Add(capi.config.readTimeout)) 1046 1047 response, err := http.ReadResponse(bufio.NewReader(client), nil) 1048 if err != nil || response == nil { 1049 errMsg := fmt.Sprintf("Error reading response. vb=%v, err=%v\n", vbno, trimErrorMessage(err)) 1050 capi.Logger().Errorf("%v %v", capi.Id(), errMsg) 1051 err_ch <- errors.New(errMsg) 1052 return 1053 } 1054 1055 defer response.Body.Close() 1056 1057 if response.StatusCode != 201 { 1058 errMsg := fmt.Sprintf("Received unexpected status code, %v, from update docs request for vb %v\n", response.StatusCode, vbno) 1059 capi.Logger().Errorf("%v %v", capi.Id(), errMsg) 1060 err_ch <- errors.New(errMsg) 1061 1062 // no need to read leftover bytes, if any, since connection will get reset soon 1063 return 1064 } 1065 1066 _, err = ioutil.ReadAll(response.Body) 1067 if err != nil { 1068 // if we get an error reading the entirety of response body, e.g., because of timeout 1069 // we need to reset connection to give subsequent requests a clean start 1070 // there is no need to return error, though, since the current batch has already 1071 // succeeded (as signaled by the 201 response status) 1072 errMsg := MalformedResponseError + fmt.Sprintf(" vb=%v, err=%v\n", vbno, trimErrorMessage(err)) 1073 capi.Logger().Errorf("%v %v", capi.Id(), errMsg) 1074 capi.resetConn() 1075 } 1076 1077 // notify caller that write succeeded 1078 resp_ch <- true 1079 1080 return 1081 } 1082 } 1083 } 1084 1085} 1086 1087func (capi *CapiNozzle) getResponseBodyLength(response *http.Response) (int, error) { 1088 contents, err := ioutil.ReadAll(response.Body) 1089 if err != nil && (err == io.ErrUnexpectedEOF || strings.Contains(err.Error(), base.UnexpectedEOF)) { 1090 // unexpected EOF is expected when response.Body does not contain all response bytes, which happens often 1091 err = nil 1092 } 1093 return len(contents), err 1094} 1095 1096// malformed http response error may print the entire response buffer, which can be arbitrarily long 1097// trim the error message to at most 400 chars to avoid flooding the log file 1098func trimErrorMessage(err error) string { 1099 errMsg := err.Error() 1100 if len(errMsg) > MaxErrorMessageLength { 1101 errMsg = errMsg[:MaxErrorMessageLength] 1102 } 1103 return errMsg 1104} 1105 1106// produce a serialized document from mc request 1107func getDocMap(req *mc.MCRequest, doc_map map[string]interface{}) { 1108 doc_map[BodyKey] = req.Body 1109 meta_map := doc_map[MetaKey].(map[string]interface{}) 1110 1111 //TODO need to handle Key being non-UTF8? 1112 meta_map[IdKey] = string(req.Key) 1113 meta_map[RevKey] = getSerializedRevision(req) 1114 meta_map[ExpirationKey] = binary.BigEndian.Uint32(req.Extras[4:8]) 1115 meta_map[FlagsKey] = binary.BigEndian.Uint32(req.Extras[0:4]) 1116 if req.Opcode == base.DELETE_WITH_META { 1117 meta_map[DeletedKey] = true 1118 } else { 1119 delete(meta_map, DeletedKey) 1120 } 1121 1122 if !base.IsJSON(req.Body) { 1123 meta_map[AttReasonKey] = InvalidJson 1124 } else { 1125 delete(meta_map, AttReasonKey) 1126 } 1127} 1128 1129// produce serialized revision info in the form of revSeq-Cas+Expiration+Flags 1130func getSerializedRevision(req *mc.MCRequest) string { 1131 var revId [16]byte 1132 // CAS 1133 copy(revId[0:8], req.Extras[16:24]) 1134 // expiration 1135 copy(revId[8:12], req.Extras[4:8]) 1136 // flags 1137 copy(revId[12:16], req.Extras[0:4]) 1138 1139 revSeq := binary.BigEndian.Uint64(req.Extras[8:16]) 1140 revSeqStr := strconv.FormatUint(revSeq, 10) 1141 revIdStr := hex.EncodeToString(revId[0:16]) 1142 return revSeqStr + "-" + revIdStr 1143} 1144 1145func (capi *CapiNozzle) initNewBatch(vbno uint16) { 1146 capi.Logger().Debugf("%v init a new batch for vb %v\n", capi.Id(), vbno) 1147 capi.vb_batch_map[vbno] = &capiBatch{*newBatch(uint32(capi.config.maxCount), uint32(capi.config.maxSize), capi.Logger()), vbno} 1148} 1149 1150func (capi *CapiNozzle) initialize(settings metadata.ReplicationSettingsMap) error { 1151 err := capi.config.initializeConfig(settings, capi.utils) 1152 if err != nil { 1153 return err 1154 } 1155 1156 capi.vb_dataChan_map = make(map[uint16]chan *base.WrappedMCRequest) 1157 for vbno, _ := range capi.config.vbCouchApiBaseMap { 1158 capi.vb_dataChan_map[vbno] = make(chan *base.WrappedMCRequest, capi.config.maxCount*base.CapiDataChanSizeMultiplier) 1159 } 1160 capi.items_in_dataChan = 0 1161 capi.bytes_in_dataChan = 0 1162 capi.batches_ready = make(chan *capiBatch, len(capi.config.vbCouchApiBaseMap)*10) 1163 1164 //enable send 1165 // capi.send_allow_ch <- true 1166 1167 //init new batches 1168 capi.vb_batch_map = make(map[uint16]*capiBatch) 1169 capi.vb_batch_map_lock = make(chan bool, 1) 1170 for vbno, _ := range capi.config.vbCouchApiBaseMap { 1171 capi.initNewBatch(vbno) 1172 } 1173 1174 capi.Logger().Debugf("%v about to start initializing connection", capi.Id()) 1175 // resetConn() initializes the connection when called for the first time 1176 err = capi.resetConn() 1177 if err == nil { 1178 capi.Logger().Infof("%v connection initialization completed.", capi.Id()) 1179 } else { 1180 capi.Logger().Errorf("%v connection initialization failed with err=%v.", capi.Id(), err) 1181 } 1182 1183 return err 1184} 1185 1186func (capi *CapiNozzle) PrintStatusSummary() { 1187 capi.Logger().Infof("%v received %v items, sent %v items, last sent batches = %v", capi.Id(), atomic.LoadUint32(&capi.counter_received), atomic.LoadUint32(&capi.counter_sent), capi.getLastSentBatches()) 1188} 1189 1190func (capi *CapiNozzle) handleGeneralError(err error) { 1191 if capi.handleError() { 1192 capi.Logger().Errorf("%v raise error condition %v\n", capi.Id(), err) 1193 capi.RaiseEvent(common.NewEvent(common.ErrorEncountered, nil, capi, nil, err)) 1194 } else { 1195 capi.Logger().Debugf("%v in shutdown process, err=%v is ignored\n", capi.Id(), err) 1196 } 1197} 1198 1199func (capi *CapiNozzle) optimisticRep(req *mc.MCRequest) bool { 1200 if req != nil { 1201 return uint32(req.Size()) < capi.getOptiRepThreshold() 1202 } 1203 return true 1204} 1205 1206func (capi *CapiNozzle) getOptiRepThreshold() uint32 { 1207 return atomic.LoadUint32(&(capi.config.optiRepThreshold)) 1208} 1209 1210func (capi *CapiNozzle) getPoolName(config capiConfig) string { 1211 return "Couch_Capi_" + config.connectStr 1212} 1213 1214func (capi *CapiNozzle) getCouchApiBaseHostAndPathForVB(vbno uint16) (string, string, error) { 1215 couchApiBase, ok := capi.config.vbCouchApiBaseMap[vbno] 1216 if !ok { 1217 return "", "", errors.New(fmt.Sprintf("Cannot find couchApiBase for vbucket %v", vbno)) 1218 } 1219 1220 index := strings.LastIndex(couchApiBase, base.UrlDelimiter) 1221 if index < 0 { 1222 return "", "", errors.New(fmt.Sprintf("Error parsing couchApiBase for vbucket %v", vbno)) 1223 } 1224 couchApiBaseHost := couchApiBase[:index] 1225 couchApiBasePath := couchApiBase[index:] 1226 1227 return couchApiBaseHost, couchApiBasePath, nil 1228} 1229 1230// resetConn() initializes the connection when called for the first time 1231func (capi *CapiNozzle) resetConn() error { 1232 capi.Logger().Infof("%v resetting capi connection. \n", capi.Id()) 1233 1234 if capi.validateRunningState() != nil { 1235 capi.Logger().Infof("%v is not running, no need to resetConn", capi.Id()) 1236 return nil 1237 } 1238 1239 // the sole purpose of getClientOpFunc is to match the func signature required by ExponentialBackoffExecutorWithFinishSignal 1240 getClientOpFunc := func(param interface{}) (interface{}, error) { 1241 return capi.utils.NewTCPConn(param.(string)) 1242 } 1243 1244 result, err := capi.utils.ExponentialBackoffExecutorWithFinishSignal("capi.resetConn", base.XmemBackoffTimeNewConn, base.XmemMaxRetryNewConn, 1245 base.MetaKvBackoffFactor, getClientOpFunc, capi.config.connectStr, capi.finish_ch) 1246 if err != nil { 1247 capi.Logger().Errorf("%v - Connection reset failed. err=%v\n", capi.Id(), err) 1248 capi.handleGeneralError(err) 1249 return err 1250 } 1251 newClient, ok := result.(*net.TCPConn) 1252 if !ok { 1253 // should never get here 1254 err = fmt.Errorf("%v resetConn returned wrong type of client", capi.Id()) 1255 capi.Logger().Error(err.Error()) 1256 capi.handleGeneralError(err) 1257 return err 1258 } 1259 1260 capi.setClient(newClient) 1261 capi.Logger().Infof("%v Connection reset succeeded", capi.Id()) 1262 1263 return nil 1264} 1265 1266func (capi *CapiNozzle) UpdateSettings(settings metadata.ReplicationSettingsMap) error { 1267 optimisticReplicationThreshold, ok := settings[SETTING_OPTI_REP_THRESHOLD] 1268 if ok { 1269 optimisticReplicationThresholdInt := optimisticReplicationThreshold.(int) 1270 atomic.StoreUint32(&capi.config.optiRepThreshold, uint32(optimisticReplicationThresholdInt)) 1271 capi.Logger().Infof("%v updated optimistic replication threshold to %v\n", capi.Id(), optimisticReplicationThresholdInt) 1272 } 1273 1274 return nil 1275} 1276 1277func (capi *CapiNozzle) recycleDataObj(req *base.WrappedMCRequest) { 1278 if capi.dataObj_recycler != nil { 1279 capi.dataObj_recycler(capi.topic, req) 1280 } 1281} 1282 1283func (capi *CapiNozzle) recordLastSentBatch(vbno uint16, batchSize uint32) { 1284 capi.last_sent_batches_lock.Lock() 1285 defer capi.last_sent_batches_lock.Unlock() 1286 1287 // oldestBatchInfo will be rotated out of the array 1288 // reuse it to hold info about the last sent batch 1289 oldestBatchInfo := capi.last_sent_batches[BatchHistorySize-1] 1290 oldestBatchInfo.vbno = vbno 1291 oldestBatchInfo.size = batchSize 1292 for i := BatchHistorySize - 1; i > 0; i-- { 1293 capi.last_sent_batches[i] = capi.last_sent_batches[i-1] 1294 } 1295 capi.last_sent_batches[0] = oldestBatchInfo 1296} 1297 1298func (capi *CapiNozzle) getLastSentBatches() string { 1299 capi.last_sent_batches_lock.RLock() 1300 defer capi.last_sent_batches_lock.RUnlock() 1301 var buffer bytes.Buffer 1302 buffer.WriteByte('[') 1303 first := true 1304 for _, last_sent_batch := range capi.last_sent_batches { 1305 if !first { 1306 buffer.WriteByte(',') 1307 } else { 1308 first = false 1309 } 1310 buffer.WriteString(fmt.Sprintf("%v", *last_sent_batch)) 1311 } 1312 buffer.WriteByte(']') 1313 return buffer.String() 1314} 1315