xref: /3.0.3-GA/ep-engine/src/tapconnection.h (revision 054f2780)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2013 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#ifndef SRC_TAPCONNECTION_H_
19#define SRC_TAPCONNECTION_H_ 1
20
21#include "config.h"
22
23#include <list>
24#include <map>
25#include <queue>
26#include <set>
27#include <string>
28#include <vector>
29
30#include "atomic.h"
31#include "common.h"
32#include "locks.h"
33#include "mutex.h"
34#include "statwriter.h"
35
36// forward decl
37class ConnHandler;
38class EventuallyPersistentEngine;
39class TapConnMap;
40class TapProducer;
41class BGFetchCallback;
42class CompleteBackfillOperation;
43class Dispatcher;
44class Item;
45class VBucketFilter;
46
47struct TapStatBuilder;
48struct TapAggStatBuilder;
49struct PopulateEventsBody;
50
51
52
53#define MAX_TAP_KEEP_ALIVE 3600
54#define MAX_TAKEOVER_TAP_LOG_SIZE 10
55#define MINIMUM_BACKFILL_RESIDENT_THRESHOLD 0.7
56#define DEFAULT_BACKFILL_RESIDENT_THRESHOLD 0.9
57
58/**
59 * A tap event that represents a change to the state of a vbucket.
60 *
61 * The tap stream may include other events than data mutation events,
62 * but the data structures in the TapProducer does only store a key
63 * for the item to store. We don't want to add more data to those elements,
64 * because that could potentially consume a lot of memory (the tap queue
65 * may have a lot of elements).
66 */
67class VBucketEvent {
68public:
69    /**
70     * Create a new instance of the VBucketEvent and initialize
71     * its members.
72     * @param ev Type of event
73     * @param b The bucket this event belongs to
74     * @param s The state change for this event
75     */
76    VBucketEvent(uint16_t ev, uint16_t b, vbucket_state_t s) :
77        event(ev), vbucket(b), state(s) {}
78    uint16_t event;
79    uint16_t vbucket;
80    vbucket_state_t state;
81};
82
83/**
84 * Represents an item that has been sent over tap, but may need to be
85 * rolled back if acks fail.
86 */
87class TapLogElement {
88
89public:
90
91    TapLogElement(uint32_t seqno, const VBucketEvent &e)
92        : seqno_(seqno), event_(e.event), vbucket_(e.vbucket),
93          state_(e.state) { }
94
95    TapLogElement(uint32_t seqno, const queued_item &qi)
96    {
97        seqno_ = seqno;
98        event_ = TAP_MUTATION;
99        vbucket_ = qi->getVBucketId();
100        state_ = vbucket_state_active;
101        item_ = qi;
102
103        switch(item_->getOperation()) {
104        case queue_op_set:
105            event_ = TAP_MUTATION;
106            break;
107        case queue_op_del:
108            event_ = TAP_DELETION;
109            break;
110        case queue_op_flush:
111            event_ = TAP_FLUSH;
112            break;
113        case queue_op_checkpoint_start:
114            event_ = TAP_CHECKPOINT_START;
115            break;
116        case queue_op_checkpoint_end:
117            event_ = TAP_CHECKPOINT_END;
118            break;
119        default:
120            break;
121        }
122    }
123
124    uint32_t seqno_;
125    uint16_t event_;
126    uint16_t vbucket_;
127
128    vbucket_state_t state_;
129    queued_item item_;
130};
131
132typedef enum {
133    TAP_CONN, //!< TAP connnection
134    UPR_CONN  //!< UPR connection
135} conn_type_t;
136
137class ConnHandler : public RCValue {
138public:
139    ConnHandler(EventuallyPersistentEngine& engine, const void* c,
140                const std::string& name);
141
142    virtual ~ConnHandler() {}
143
144    virtual ENGINE_ERROR_CODE addStream(uint32_t opaque, uint16_t vbucket,
145                                        uint32_t flags);
146
147    virtual ENGINE_ERROR_CODE closeStream(uint32_t opaque, uint16_t vbucket);
148
149    virtual ENGINE_ERROR_CODE streamEnd(uint32_t opaque, uint16_t vbucket,
150                                        uint32_t flags);
151
152    virtual ENGINE_ERROR_CODE mutation(uint32_t opaque, const void* key,
153                                       uint16_t nkey, const void* value,
154                                       uint32_t nvalue, uint64_t cas,
155                                       uint16_t vbucket, uint32_t flags,
156                                       uint8_t datatype, uint32_t locktime,
157                                       uint64_t bySeqno, uint64_t revSeqno,
158                                       uint32_t exptime, uint8_t nru,
159                                       const void* meta, uint16_t nmeta);
160
161    virtual ENGINE_ERROR_CODE deletion(uint32_t opaque, const void* key,
162                                       uint16_t nkey, uint64_t cas,
163                                       uint16_t vbucket, uint64_t bySeqno,
164                                       uint64_t revSeqno, const void* meta,
165                                       uint16_t nmeta);
166
167    virtual ENGINE_ERROR_CODE expiration(uint32_t opaque, const void* key,
168                                         uint16_t nkey, uint64_t cas,
169                                         uint16_t vbucket, uint64_t bySeqno,
170                                         uint64_t revSeqno, const void* meta,
171                                         uint16_t nmeta);
172
173    virtual ENGINE_ERROR_CODE snapshotMarker(uint32_t opaque,
174                                             uint16_t vbucket,
175                                             uint64_t start_seqno,
176                                             uint64_t end_seqno,
177                                             uint32_t flags);
178
179    virtual ENGINE_ERROR_CODE flushall(uint32_t opaque, uint16_t vbucket);
180
181    virtual ENGINE_ERROR_CODE setVBucketState(uint32_t opaque, uint16_t vbucket,
182                                              vbucket_state_t state);
183
184    virtual ENGINE_ERROR_CODE getFailoverLog(uint32_t opaque, uint16_t vbucket,
185                                             upr_add_failover_log callback);
186
187    virtual ENGINE_ERROR_CODE streamRequest(uint32_t flags,
188                                            uint32_t opaque,
189                                            uint16_t vbucket,
190                                            uint64_t start_seqno,
191                                            uint64_t end_seqno,
192                                            uint64_t vbucket_uuid,
193                                            uint64_t snapStartSeqno,
194                                            uint64_t snapEndSeqno,
195                                            uint64_t *rollback_seqno,
196                                            upr_add_failover_log callback);
197
198    virtual ENGINE_ERROR_CODE noop(uint32_t opaque);
199
200    virtual ENGINE_ERROR_CODE bufferAcknowledgement(uint32_t opaque,
201                                                    uint16_t vbucket,
202                                                    uint32_t buffer_bytes);
203
204    virtual ENGINE_ERROR_CODE control(uint32_t opaque, const void* key,
205                                      uint16_t nkey, const void* value,
206                                      uint32_t nvalue);
207
208    virtual ENGINE_ERROR_CODE step(struct upr_message_producers* producers);
209
210    virtual ENGINE_ERROR_CODE handleResponse(
211                                        protocol_binary_response_header *resp);
212
213    EventuallyPersistentEngine& engine() {
214        return engine_;
215    }
216
217    const char* logHeader() {
218        return logString.c_str();
219    }
220
221    void setLogHeader(const std::string &header) {
222        logString = header;
223    }
224
225    void releaseReference(bool force = false);
226
227    void setSupportAck(bool ack) {
228        supportAck = ack;
229    }
230
231    bool supportsAck() const {
232        return supportAck;
233    }
234
235    void setSupportCheckpointSync(bool checkpointSync) {
236        supportCheckpointSync_ = checkpointSync;
237    }
238
239    bool supportsCheckpointSync() const {
240        return supportCheckpointSync_;
241    }
242
243    virtual const char *getType() const = 0;
244
245    template <typename T>
246    void addStat(const char *nm, const T &val, ADD_STAT add_stat, const void *c) {
247        std::stringstream tap;
248        tap << name << ":" << nm;
249        std::stringstream value;
250        value << val;
251        std::string n = tap.str();
252        add_casted_stat(n.data(), value.str().data(), add_stat, c);
253    }
254
255    void addStat(const char *nm, bool val, ADD_STAT add_stat, const void *c) {
256        addStat(nm, val ? "true" : "false", add_stat, c);
257    }
258
259    virtual void addStats(ADD_STAT add_stat, const void *c) {
260        addStat("type", getType(), add_stat, c);
261        addStat("created", created, add_stat, c);
262        addStat("connected", connected, add_stat, c);
263        addStat("pending_disconnect", disconnect, add_stat, c);
264        addStat("supports_ack", supportAck, add_stat, c);
265        addStat("reserved", reserved.load(), add_stat, c);
266
267        if (numDisconnects > 0) {
268            addStat("disconnects", numDisconnects.load(), add_stat, c);
269        }
270    }
271
272    virtual void processedEvent(uint16_t event, ENGINE_ERROR_CODE ret) {
273        (void) event;
274        (void) ret;
275    }
276
277    const std::string &getName() const {
278        return name;
279    }
280
281    void setName(const std::string &n) {
282        name.assign(n);
283    }
284
285    bool setReserved(bool r) {
286        bool inverse = !r;
287        return reserved.compare_exchange_strong(inverse, r);
288    }
289
290    bool isReserved() const {
291        return reserved;
292    }
293
294    const void *getCookie() const {
295        return cookie;
296    }
297
298    void setCookie(const void *c) {
299        cookie = c;
300    }
301
302    void setExpiryTime(rel_time_t t) {
303        expiryTime = t;
304    }
305
306    rel_time_t getExpiryTime() {
307        return expiryTime;
308    }
309
310    void setLastWalkTime() {
311        lastWalkTime = ep_current_time();
312    }
313
314    rel_time_t getLastWalkTime() {
315        return lastWalkTime;
316    }
317
318    void setConnected(bool s) {
319        if (!s) {
320            ++numDisconnects;
321        }
322        connected = s;
323    }
324
325    bool isConnected() {
326        return connected;
327    }
328
329    bool doDisconnect() {
330        return disconnect;
331    }
332
333    virtual void setDisconnect(bool val) {
334        disconnect = val;
335    }
336
337    static std::string getAnonName() {
338        uint64_t nextConnId = counter_++;
339        std::stringstream s;
340        s << "eq_tapq:anon_";
341        s << nextConnId;
342        return s.str();
343    }
344
345    hrtime_t getConnectionToken() const {
346        return connToken;
347    }
348
349protected:
350    EventuallyPersistentEngine &engine_;
351    EPStats &stats;
352    bool supportCheckpointSync_;
353
354private:
355
356     //! The name for this connection
357    std::string name;
358
359    //! The string used to prefix all log messages for this connection
360    std::string logString;
361
362    //! The cookie representing this connection (provided by the memcached code)
363    const void* cookie;
364
365    //! Whether or not the connection is reserved in the memcached layer
366    AtomicValue<bool> reserved;
367
368    //! Connection token created at connection instantiation time
369    hrtime_t connToken;
370
371    //! Connection creation time
372    rel_time_t created;
373
374    //! The last time this connection's step function was called
375    rel_time_t lastWalkTime;
376
377    //! Should we disconnect as soon as possible?
378    bool disconnect;
379
380    //! Is this tap conenction connected?
381    bool connected;
382
383    //! Number of times this connection was disconnected
384    AtomicValue<size_t> numDisconnects;
385
386    //! when this tap conneciton expires.
387    rel_time_t expiryTime;
388
389    //! Whether or not this connection supports acking
390    bool supportAck;
391
392    //! A counter used to generate unique names
393    static AtomicValue<uint64_t> counter_;
394};
395
396
397/**
398 * Aggregator object to count all tap stats.
399 */
400struct ConnCounter {
401    ConnCounter()
402        : conn_queue(0), totalConns(0), totalProducers(0),
403          conn_queueFill(0), conn_queueDrain(0), conn_totalBytes(0), conn_queueRemaining(0),
404          conn_queueBackoff(0), conn_queueBackfillRemaining(0), conn_queueItemOnDisk(0),
405          conn_totalBacklogSize(0)
406    {}
407
408    size_t      conn_queue;
409    size_t      totalConns;
410    size_t      totalProducers;
411
412    size_t      conn_queueFill;
413    size_t      conn_queueDrain;
414    size_t      conn_totalBytes;
415    size_t      conn_queueRemaining;
416    size_t      conn_queueBackoff;
417    size_t      conn_queueBackfillRemaining;
418    size_t      conn_queueItemOnDisk;
419    size_t      conn_totalBacklogSize;
420};
421
422typedef enum {
423    backfill,
424    checkpoint_start,
425    checkpoint_end,
426    checkpoint_end_synced
427} proto_checkpoint_state;
428
429
430/**
431 * Checkpoint state of each vbucket in TAP or UPR stream.
432 */
433class CheckpointState {
434public:
435    CheckpointState() :
436        currentCheckpointId(0), lastSeqNum(0), bgResultSize(0),
437        bgJobIssued(0), bgJobCompleted(0), lastItem(false), state(backfill) {}
438
439    CheckpointState(uint16_t vb, uint64_t checkpointId, proto_checkpoint_state s) :
440        vbucket(vb), currentCheckpointId(checkpointId), lastSeqNum(0),
441        bgResultSize(0), bgJobIssued(0), bgJobCompleted(0),
442        lastItem(false), state(s) {}
443
444    bool isBgFetchCompleted(void) const {
445        return bgResultSize == 0 && (bgJobIssued - bgJobCompleted) == 0;
446    }
447
448    uint16_t vbucket;
449    // Id of the checkpoint that is currently referenced by the given TAP client's cursor.
450    uint64_t currentCheckpointId;
451    // Last sequence number sent to the slave.
452    uint32_t lastSeqNum;
453
454    // Number of bg-fetched items for a given vbucket, which are ready for streaming.
455    size_t bgResultSize;
456    // Number of bg-fetched jobs issued for a given vbucket.
457    size_t bgJobIssued;
458    // Number of bg-fetched jobs completed for a given vbucket
459    size_t bgJobCompleted;
460
461    // True if the TAP cursor reaches to the last item at its current checkpoint.
462    bool lastItem;
463    proto_checkpoint_state state;
464};
465
466
467/**
468 * A class containing the config parameters for TAP module.
469 */
470class TapConfig {
471public:
472    TapConfig(EventuallyPersistentEngine &e);
473    uint32_t getAckWindowSize() const {
474        return ackWindowSize;
475    }
476
477    uint32_t getAckInterval() const {
478        return ackInterval;
479    }
480
481    rel_time_t getAckGracePeriod() const {
482        return ackGracePeriod;
483    }
484
485    uint32_t getAckInitialSequenceNumber() const {
486        return ackInitialSequenceNumber;
487    }
488
489    size_t getBgMaxPending() const {
490        return bgMaxPending;
491    }
492
493    double getBackoffSleepTime() const {
494        return backoffSleepTime;
495    }
496
497    double getRequeueSleepTime() const {
498        return requeueSleepTime;
499    }
500
501    size_t getBackfillBacklogLimit() const {
502        return backfillBacklogLimit;
503    }
504
505    double getBackfillResidentThreshold() const {
506        return backfillResidentThreshold;
507    }
508
509protected:
510    friend class TapConfigChangeListener;
511    friend class EventuallyPersistentEngine;
512
513    void setAckWindowSize(size_t value) {
514        ackWindowSize = static_cast<uint32_t>(value);
515    }
516
517    void setAckInterval(size_t value) {
518        ackInterval = static_cast<uint32_t>(value);
519    }
520
521    void setAckGracePeriod(size_t value) {
522        ackGracePeriod = static_cast<rel_time_t>(value);
523    }
524
525    void setAckInitialSequenceNumber(size_t value) {
526        ackInitialSequenceNumber = static_cast<uint32_t>(value);
527    }
528
529    void setBgMaxPending(size_t value) {
530        bgMaxPending = value;
531    }
532
533    void setBackoffSleepTime(double value) {
534        backoffSleepTime = value;
535    }
536
537    void setRequeueSleepTime(double value) {
538        requeueSleepTime = value;
539    }
540
541    void setBackfillBacklogLimit(size_t value) {
542        backfillBacklogLimit = value;
543    }
544
545    void setBackfillResidentThreshold(double value) {
546        if (value < MINIMUM_BACKFILL_RESIDENT_THRESHOLD) {
547            value = DEFAULT_BACKFILL_RESIDENT_THRESHOLD;
548        }
549        backfillResidentThreshold = value;
550    }
551
552    static void addConfigChangeListener(EventuallyPersistentEngine &engine);
553
554private:
555    // Constants used to enforce the tap ack protocol
556    uint32_t ackWindowSize;
557    uint32_t ackInterval;
558    rel_time_t ackGracePeriod;
559
560    /**
561     * To ease testing of corner cases we need to be able to seed the
562     * initial tap sequence numbers (if not we would have to wrap an uin32_t)
563     */
564    uint32_t ackInitialSequenceNumber;
565
566    // Parameters to control the backoff behavior of TAP producer
567    size_t bgMaxPending;
568    double backoffSleepTime;
569    double requeueSleepTime;
570
571    // Parameters to control the backfill
572    size_t backfillBacklogLimit;
573    double backfillResidentThreshold;
574
575    EventuallyPersistentEngine &engine;
576};
577
578/**
579 * TAP stream ep-engine specific data payload
580 */
581class TapEngineSpecific {
582public:
583
584    // size of item revision seq number
585    static const short int sizeRevSeqno;
586    // size of item specific extra data
587    static const short int sizeExtra;
588    // size of complete specific data
589    static const short int sizeTotal;
590
591    /**
592     * Read engine specific data for a given tap event type
593     *
594     * @param ev tap event
595     * @param engine_specific input tap engine specific data
596     * @param nengine size of input data (bytes)
597     * @param output sequence number
598     * @param extra additional item specific data
599     */
600    static void readSpecificData(uint16_t ev, void *engine_specific, uint16_t nengine,
601                                 uint64_t *seqnum, uint8_t *extra = NULL);
602
603    /**
604     * Pack engine specific data for a given tap event type
605     *
606     * @param ev tap event
607     * @param tp tap producer connection
608     * @param seqnum item sequence number
609     * @param nru value of the item replicated
610     * @return size of tap engine specific data (bytes)
611     */
612    static uint16_t packSpecificData(uint16_t ev, TapProducer *tp, uint64_t seqnum,
613                                     uint8_t nru = 0xff);
614};
615
616
617/**
618 */
619class Consumer : public ConnHandler {
620private:
621    AtomicValue<size_t> numDelete;
622    AtomicValue<size_t> numDeleteFailed;
623    AtomicValue<size_t> numFlush;
624    AtomicValue<size_t> numFlushFailed;
625    AtomicValue<size_t> numMutation;
626    AtomicValue<size_t> numMutationFailed;
627    AtomicValue<size_t> numOpaque;
628    AtomicValue<size_t> numOpaqueFailed;
629    AtomicValue<size_t> numVbucketSet;
630    AtomicValue<size_t> numVbucketSetFailed;
631    AtomicValue<size_t> numCheckpointStart;
632    AtomicValue<size_t> numCheckpointStartFailed;
633    AtomicValue<size_t> numCheckpointEnd;
634    AtomicValue<size_t> numCheckpointEndFailed;
635    AtomicValue<size_t> numUnknown;
636
637public:
638    Consumer(EventuallyPersistentEngine &theEngine, const void* cookie,
639             const std::string& name);
640    virtual void processedEvent(uint16_t event, ENGINE_ERROR_CODE ret);
641    virtual void addStats(ADD_STAT add_stat, const void *c);
642    virtual const char *getType() const { return "consumer"; };
643    virtual void checkVBOpenCheckpoint(uint16_t);
644    void setBackfillPhase(bool isBackfill, uint16_t vbucket);
645    bool isBackfillPhase(uint16_t vbucket);
646    ENGINE_ERROR_CODE setVBucketState(uint32_t opaque, uint16_t vbucket,
647                                      vbucket_state_t state);
648};
649
650
651/*
652 * auxIODispatcher/GIO task that performs a background fetch on behalf
653 * of TAP/UPR.
654 */
655class BGFetchCallback : public GlobalTask {
656public:
657    BGFetchCallback(EventuallyPersistentEngine *e, const std::string &n,
658                    const std::string &k, uint16_t vbid,
659                    uint64_t r, hrtime_t token, const Priority &p,
660                    double sleeptime = 0) :
661        GlobalTask(e, p, sleeptime, false), name(n), key(k), epe(e),
662        init(gethrtime()), connToken(token), rowid(r), vbucket(vbid)
663    {
664        cb_assert(epe);
665    }
666
667    bool run();
668
669    std::string getDescription() {
670        std::stringstream ss;
671        ss << "Fetching item from disk for tap: " << key;
672        return ss.str();
673    }
674
675private:
676    const std::string name;
677    const std::string key;
678    EventuallyPersistentEngine *epe;
679    hrtime_t init;
680    hrtime_t connToken;
681    uint64_t rowid;
682    uint16_t vbucket;
683};
684
685
686class TapConsumer : public Consumer {
687public:
688    TapConsumer(EventuallyPersistentEngine &e, const void *c,
689                const std::string &n);
690
691    ~TapConsumer() {}
692
693    ENGINE_ERROR_CODE mutation(uint32_t opaque, const void* key, uint16_t nkey,
694                               const void* value, uint32_t nvalue, uint64_t cas,
695                               uint16_t vbucket, uint32_t flags,
696                               uint8_t datatype, uint32_t locktime,
697                               uint64_t bySeqno, uint64_t revSeqno,
698                               uint32_t exptime, uint8_t nru, const void* meta,
699                               uint16_t nmeta);
700
701    ENGINE_ERROR_CODE deletion(uint32_t opaque, const void* key, uint16_t nkey,
702                               uint64_t cas, uint16_t vbucket, uint64_t bySeqno,
703                               uint64_t revSeqno, const void* meta,
704                               uint16_t nmeta);
705
706    bool processCheckpointCommand(uint8_t event, uint16_t vbucket,
707                                  uint64_t checkpointId);
708};
709
710class Notifiable {
711public:
712    Notifiable()
713      : suspended(false), paused(false),
714        notificationScheduled(false), notifySent(false) {}
715
716    bool isPaused() {
717        return paused;
718    }
719
720    void setPaused(bool p) {
721        paused.store(p);
722    }
723
724    bool isNotificationScheduled() {
725        return notificationScheduled;
726    }
727
728    bool setNotificationScheduled(bool val) {
729        bool inverse = !val;
730        return notificationScheduled.compare_exchange_strong(inverse, val);
731    }
732
733    bool setNotifySent(bool val) {
734        bool inverse = !val;
735        return notifySent.compare_exchange_strong(inverse, val);
736    }
737
738    bool sentNotify() {
739        return notifySent;
740    }
741
742    virtual void setSuspended(bool value) {
743        suspended = value;
744    }
745
746    bool isSuspended() {
747        return suspended;
748    }
749
750private:
751    //! Is this tap connection in a suspended state
752    bool suspended;
753    //! Connection is temporarily paused?
754    AtomicValue<bool> paused;
755    //! Flag indicating if the notification event is scheduled
756    AtomicValue<bool> notificationScheduled;
757        //! Flag indicating if the pending memcached connection is notified
758    AtomicValue<bool> notifySent;
759};
760
761class Producer : public ConnHandler, public Notifiable {
762public:
763    Producer(EventuallyPersistentEngine &engine, const void* cookie,
764             const std::string& name) :
765        ConnHandler(engine, cookie, name),
766        Notifiable(),
767        vbucketFilter(),
768        totalBackfillBacklogs(0),
769        reconnects(0) {}
770
771    void addStats(ADD_STAT add_stat, const void *c);
772
773    virtual void aggregateQueueStats(ConnCounter* stats_aggregator) = 0;
774
775    bool isReconnected() const {
776        return reconnects > 0;
777    }
778
779    void reconnected() {
780        ++reconnects;
781    }
782
783    virtual bool isTimeForNoop() = 0;
784
785    virtual void setTimeForNoop() = 0;
786
787    const char *getType() const { return "producer"; }
788
789    virtual void clearQueues() = 0;
790
791    virtual void appendQueue(std::list<queued_item> *q) = 0;
792
793    virtual size_t getBackfillQueueSize() = 0;
794
795    void incrBackfillRemaining(size_t incr) {
796        LockHolder lh(queueLock);
797        totalBackfillBacklogs += incr;
798    }
799
800    virtual void flush() = 0;
801
802    virtual bool windowIsFull() = 0;
803
804    const VBucketFilter &getVBucketFilter() {
805        LockHolder lh(queueLock);
806        return vbucketFilter;
807    }
808
809    virtual ~Producer() {}
810
811protected:
812    friend class ConnMap;
813
814    //! Lock held during queue operations.
815    Mutex queueLock;
816    //! Filter for the vbuckets we want.
817    VBucketFilter vbucketFilter;
818    //! Total backfill backlogs
819    size_t totalBackfillBacklogs;
820
821private:
822    //! Number of times this client reconnected
823    uint32_t reconnects;
824};
825
826/**
827 * Class used by the EventuallyPersistentEngine to keep track of all
828 * information needed per Tap or Upr connection.
829 */
830class TapProducer : public Producer {
831public:
832    TapProducer(EventuallyPersistentEngine &engine,
833             const void *cookie,
834             const std::string &n,
835             uint32_t f);
836
837    virtual ~TapProducer() {
838        delete queue;
839        delete []specificData;
840        delete []transmitted;
841        cb_assert(!isReserved());
842    }
843
844    virtual void addStats(ADD_STAT add_stat, const void *c);
845    virtual void processedEvent(uint16_t event, ENGINE_ERROR_CODE ret);
846
847    void aggregateQueueStats(ConnCounter* stats_aggregator);
848
849    void suspendedConnection_UNLOCKED(bool value);
850    void suspendedConnection(bool value);
851
852    bool isTimeForNoop();
853    void setTimeForNoop();
854
855    void completeBackfill() {
856        LockHolder lh(queueLock);
857        if (pendingBackfillCounter > 0) {
858            --pendingBackfillCounter;
859        }
860        completeBackfillCommon_UNLOCKED();
861    }
862
863    void scheduleDiskBackfill() {
864        LockHolder lh(queueLock);
865        ++diskBackfillCounter;
866    }
867
868    void completeDiskBackfill() {
869        LockHolder lh(queueLock);
870        if (diskBackfillCounter > 0) {
871            --diskBackfillCounter;
872        }
873        completeBackfillCommon_UNLOCKED();
874    }
875
876    /**
877     * Invoked each time a background item fetch completes.
878     */
879    void completeBGFetchJob(Item *item, uint16_t vbid, bool implicitEnqueue);
880
881    /**
882     * Get the next item (e.g., checkpoint_start, checkpoint_end, tap_mutation, or
883     * tap_deletion) to be transmitted.
884     */
885    Item *getNextItem(const void *c, uint16_t *vbucket, uint16_t &ret,
886                      uint8_t &nru);
887
888    /**
889     * Find out how many items are still remaining from backfill.
890     */
891    size_t getBackfillRemaining() {
892        LockHolder lh(queueLock);
893        return getBackfillRemaining_UNLOCKED();
894    }
895
896    /**
897     * Return the current backfill queue size.
898     * This differs from getBackfillRemaining() that returns the approximated size
899     * of total backfill backlogs.
900     */
901    size_t getBackfillQueueSize() {
902        LockHolder lh(queueLock);
903        return getBackfillQueueSize_UNLOCKED();
904    }
905
906    /**
907     * Return the live replication queue size.
908     */
909    size_t getQueueSize() {
910        LockHolder lh(queueLock);
911        return getQueueSize_UNLOCKED();
912    }
913
914    void setFlagByteorderSupport(bool enable) {
915        flagByteorderSupport = enable;
916    }
917    bool haveFlagByteorderSupport(void) const {
918        return flagByteorderSupport;
919    }
920
921    void clearQueues() {
922        LockHolder lh(queueLock);
923        clearQueues_UNLOCKED();
924    }
925
926    static const char* opaqueCmdToString(uint32_t opaque_code);
927
928protected:
929    friend class EventuallyPersistentEngine;
930    friend class ConnMap;
931    friend class TapConnMap;
932    friend class BGFetchCallback;
933    friend struct TapStatBuilder;
934    friend struct TapAggStatBuilder;
935    friend struct PopulateEventsBody;
936    friend class TapEngineSpecific;
937
938    /**
939     * Check if TAP_DUMP or TAP_TAKEOVER is completed and close the connection if
940     * all messages including vbucket_state change commands are sent.
941     */
942    VBucketEvent checkDumpOrTakeOverCompletion();
943
944    void completeBackfillCommon_UNLOCKED() {
945        if (mayCompleteDumpOrTakeover_UNLOCKED() && idle_UNLOCKED()) {
946            // There is no data for this connection..
947            // Just go ahead and disconnect it.
948            setDisconnect(true);
949        }
950    }
951
952    /**
953     * Add a new item to the tap queue. You need to hold the queue lock
954     * before calling this function
955     * The item may be ignored if the TapProducer got a vbucket filter
956     * associated and the item's vbucket isn't part of the filter.
957     *
958     * @return true if the the queue was empty
959     */
960    bool addEvent_UNLOCKED(const queued_item &it);
961
962    /**
963     * Add a new item to the tap queue.
964     * The item may be ignored if the TapProducer got a vbucket filter
965     * associated and the item's vbucket isn't part of the filter.
966     *
967     * @return true if the the queue was empty
968     */
969    bool addEvent(const queued_item &it) {
970        LockHolder lh(queueLock);
971        return addEvent_UNLOCKED(it);
972    }
973
974    void addLogElement_UNLOCKED(const queued_item &qi) {
975        if (supportsAck()) {
976            TapLogElement log(seqno, qi);
977            ackLog_.push_back(log);
978            stats.memOverhead.fetch_add(sizeof(TapLogElement));
979            cb_assert(stats.memOverhead.load() < GIGANTOR);
980        }
981    }
982
983    void addLogElement(const queued_item &qi) {
984        LockHolder lh(queueLock);
985        addLogElement_UNLOCKED(qi);
986    }
987
988    void addLogElement_UNLOCKED(const VBucketEvent &e) {
989        if (supportsAck()) {
990            // add to the log!
991            TapLogElement log(seqno, e);
992            ackLog_.push_back(log);
993            stats.memOverhead.fetch_add(sizeof(TapLogElement));
994            cb_assert(stats.memOverhead.load() < GIGANTOR);
995        }
996    }
997
998    /**
999     * Get the next item from the queue that has items fetched from memory.
1000     */
1001    queued_item nextFgFetched_UNLOCKED(bool &shouldPause);
1002
1003    void addVBucketHighPriority_UNLOCKED(VBucketEvent &ev) {
1004        vBucketHighPriority.push(ev);
1005    }
1006
1007
1008    /**
1009     * Add a new high priority VBucketEvent to this TapProducer. A high
1010     * priority VBucketEvent will bypass the the normal queue of events to
1011     * be sent to the client, and be sent the next time it is possible to
1012     * send data over the tap connection.
1013     */
1014    void addVBucketHighPriority(VBucketEvent &ev) {
1015        LockHolder lh(queueLock);
1016        addVBucketHighPriority_UNLOCKED(ev);
1017    }
1018
1019    /**
1020     * Get the next high priority VBucketEvent for this TapProducer
1021     */
1022    VBucketEvent nextVBucketHighPriority_UNLOCKED();
1023
1024    VBucketEvent nextVBucketHighPriority() {
1025        LockHolder lh(queueLock);
1026        return nextVBucketHighPriority_UNLOCKED();
1027    }
1028
1029    void addVBucketLowPriority_UNLOCKED(VBucketEvent &ev) {
1030        vBucketLowPriority.push(ev);
1031    }
1032
1033    /**
1034     * Add a new low priority VBucketEvent to this TapProducer. A low
1035     * priority VBucketEvent will only be sent when the tap connection
1036     * doesn't have any other events to send.
1037     */
1038    void addVBucketLowPriority(VBucketEvent &ev) {
1039        LockHolder lh(queueLock);
1040        addVBucketLowPriority_UNLOCKED(ev);
1041    }
1042
1043    /**
1044     * Get the next low priority VBucketEvent for this TapProducer.
1045     */
1046    VBucketEvent nextVBucketLowPriority_UNLOCKED();
1047
1048    VBucketEvent nextVBucketLowPriority() {
1049        LockHolder lh(queueLock);
1050        return nextVBucketLowPriority_UNLOCKED();
1051    }
1052
1053    void addCheckpointMessage_UNLOCKED(const queued_item &qi) {
1054        checkpointMsgs.push(qi);
1055    }
1056
1057    /**
1058     * Add a checkpoint start / end message to the checkpoint message queue. These messages
1059     * are used for synchronizing checkpoints between tap producer and consumer.
1060     */
1061    void addCheckpointMessage(const queued_item &qi) {
1062        LockHolder lh(queueLock);
1063        addCheckpointMessage_UNLOCKED(qi);
1064    }
1065
1066    queued_item nextCheckpointMessage_UNLOCKED();
1067
1068    queued_item nextCheckpointMessage() {
1069        LockHolder lh(queueLock);
1070        return nextCheckpointMessage_UNLOCKED();
1071    }
1072
1073    bool hasItemFromVBHashtable_UNLOCKED() {
1074        return !queue->empty() || hasNextFromCheckpoints_UNLOCKED();
1075    }
1076
1077    bool hasItemFromDisk_UNLOCKED() {
1078        return !backfilledItems.empty();
1079    }
1080
1081    bool emptyQueue_UNLOCKED() {
1082        return !hasItemFromDisk_UNLOCKED() && (bgJobIssued - bgJobCompleted) == 0 &&
1083            !hasItemFromVBHashtable_UNLOCKED();
1084    }
1085
1086    bool idle_UNLOCKED() {
1087        return emptyQueue_UNLOCKED() && vBucketLowPriority.empty() &&
1088            vBucketHighPriority.empty() && checkpointMsgs.empty() && ackLog_.empty();
1089    }
1090
1091    bool idle() {
1092        LockHolder lh(queueLock);
1093        return idle_UNLOCKED();
1094    }
1095
1096    bool hasItemFromDisk() {
1097        LockHolder lh(queueLock);
1098        return hasItemFromDisk_UNLOCKED();
1099    }
1100
1101    bool hasItemFromVBHashtable() {
1102        LockHolder lh(queueLock);
1103        return hasItemFromVBHashtable_UNLOCKED();
1104    }
1105
1106    bool emptyQueue() {
1107        LockHolder lh(queueLock);
1108        return emptyQueue_UNLOCKED();
1109    }
1110
1111    size_t getBackfillRemaining_UNLOCKED();
1112
1113    size_t getBackfillQueueSize_UNLOCKED();
1114
1115    size_t getQueueSize_UNLOCKED();
1116
1117    size_t getQueueMemory() {
1118        return queueMemSize;
1119    }
1120
1121    size_t getRemaingOnDisk() {
1122        LockHolder lh(queueLock);
1123        return bgJobIssued - bgJobCompleted;
1124    }
1125
1126    size_t getQueueFillTotal() {
1127        return queueFill;
1128    }
1129
1130    size_t getQueueDrainTotal() {
1131        return queueDrain;
1132    }
1133
1134    size_t getQueueBackoff() {
1135        return numTapNack;
1136    }
1137
1138    /**
1139     * Get the total number of remaining items from all checkpoints.
1140     */
1141    size_t getRemainingOnCheckpoints_UNLOCKED();
1142    size_t getRemainingOnCheckpoints() {
1143        LockHolder lh(queueLock);
1144        return getRemainingOnCheckpoints_UNLOCKED();
1145    }
1146
1147    bool hasNextFromCheckpoints_UNLOCKED();
1148    bool hasNextFromCheckpoints() {
1149        LockHolder lh(queueLock);
1150        return hasNextFromCheckpoints_UNLOCKED();
1151    }
1152
1153    /**
1154     * Get the next item from the queue that has items fetched from disk.
1155     */
1156    Item* nextBgFetchedItem_UNLOCKED();
1157
1158    void flush();
1159
1160    bool shouldFlush() {
1161        bool ret = pendingFlush;
1162        pendingFlush = false;
1163        return ret;
1164    }
1165
1166    // This method is called while holding the tapNotifySync lock.
1167    void appendQueue(std::list<queued_item> *q);
1168
1169    bool isPendingDiskBackfill() {
1170        LockHolder lh(queueLock);
1171        return diskBackfillCounter > 0;
1172    }
1173
1174    /**
1175     * A backfill is pending if the backfill thread is still running
1176     */
1177    bool isPendingBackfill_UNLOCKED() {
1178        return doRunBackfill || pendingBackfillCounter > 0 || diskBackfillCounter > 0;
1179    }
1180
1181    bool isPendingBackfill() {
1182        LockHolder lh(queueLock);
1183        return isPendingBackfill_UNLOCKED();
1184    }
1185
1186    /**
1187     * Items from backfill are all successfully transmitted to the destination?
1188     */
1189    bool isBackfillCompleted_UNLOCKED() {
1190        return backfillCompleted;
1191    }
1192
1193    bool isBackfillCompleted() {
1194        LockHolder lh(queueLock);
1195        return isBackfillCompleted_UNLOCKED();
1196    }
1197
1198    void scheduleBackfill_UNLOCKED(const std::vector<uint16_t> &vblist);
1199
1200    void scheduleBackfill(const std::vector<uint16_t> &vblist) {
1201        LockHolder lh(queueLock);
1202        scheduleBackfill_UNLOCKED(vblist);
1203    }
1204
1205    bool runBackfill(VBucketFilter &vbFilter);
1206
1207    /**
1208     * True if the TAP producer doesn't have any queued items and is ready for
1209     * for completing TAP_DUMP or TAP_VBUCKET_TAKEOVER.
1210     */
1211    bool mayCompleteDumpOrTakeover_UNLOCKED(void) {
1212        return (dumpQueue || doTakeOver) && isBackfillCompleted_UNLOCKED() &&
1213            emptyQueue_UNLOCKED();
1214    }
1215
1216    bool mayCompleteDumpOrTakeover(void) {
1217        LockHolder lh(queueLock);
1218        return mayCompleteDumpOrTakeover_UNLOCKED();
1219    }
1220
1221    /**
1222     * Queue an item to be background fetched.
1223     *
1224     * @param key the item's key
1225     * @param id the disk id of the item to fetch
1226     * @param vb the vbucket ID
1227     */
1228    void queueBGFetch_UNLOCKED(const std::string &key, uint64_t id,
1229                               uint16_t vb);
1230
1231    ENGINE_ERROR_CODE processAck(uint32_t seqno, uint16_t status, const std::string &msg);
1232
1233    /**
1234     * Is the tap ack window full?
1235     * @return true if the window is full and no more items should be sent
1236     */
1237    bool windowIsFull();
1238
1239    /**
1240     * Should we request an ack for this message?
1241     * @param event the event type for this message
1242     * @param vbucket the vbucket Id for this message
1243     * @return true if we should request an ack (and start a new sequence)
1244     */
1245    virtual bool requestAck(uint16_t event, uint16_t vbucket);
1246
1247    /**
1248     * Get the current tap sequence number.
1249     */
1250    uint32_t getSeqno() {
1251        return seqno;
1252    }
1253
1254    /**
1255     * Rollback the tap stream to the last ack
1256     */
1257    void rollback();
1258
1259
1260    void encodeVBucketStateTransition(const VBucketEvent &ev, void **es,
1261                                      uint16_t *nes, uint16_t *vbucket) const;
1262
1263    void evaluateFlags();
1264
1265    bool waitForCheckpointMsgAck();
1266
1267    bool waitForOpaqueMsgAck();
1268
1269    void setTakeOverCompletionPhase(bool completionPhase) {
1270        takeOverCompletionPhase = completionPhase;
1271    }
1272
1273    bool checkBackfillCompletion_UNLOCKED();
1274    bool checkBackfillCompletion() {
1275        LockHolder lh(queueLock);
1276        return checkBackfillCompletion_UNLOCKED();
1277    }
1278
1279    void setBackfillAge(uint64_t age, bool reconnect);
1280
1281    void setVBucketFilter(const std::vector<uint16_t> &vbuckets,
1282                          bool notifyCompletion = false);
1283
1284    bool checkVBucketFilter(uint16_t vbucket) {
1285        LockHolder lh(queueLock);
1286        return vbucketFilter(vbucket);
1287    }
1288
1289    /**
1290     * Register the unified queue cursor for this producer.
1291     */
1292    void registerCursor(const std::map<uint16_t, uint64_t> &lastCheckpointIds);
1293
1294    size_t getTapAckLogSize(void) {
1295        LockHolder lh(queueLock);
1296        return ackLog_.size();
1297    }
1298
1299    void reschedule_UNLOCKED(const std::list<TapLogElement>::iterator &iter);
1300
1301    void clearQueues_UNLOCKED();
1302
1303    //! Queue of live stream items that needs to be sent
1304    std::list<queued_item> *queue;
1305    //! Live stream queue size
1306    size_t queueSize;
1307    //! Queue of items backfilled from disk
1308    std::queue<Item*> backfilledItems;
1309    //! List of items that are waiting for acks from the client
1310    std::list<TapLogElement> ackLog_;
1311
1312    //! Keeps track of items transmitted per VBucket
1313    AtomicValue<size_t> *transmitted;
1314
1315    //! VBucket status messages immediately (before userdata)
1316    std::queue<VBucketEvent> vBucketHighPriority;
1317    //! VBucket status messages sent when there is nothing else to send
1318    std::queue<VBucketEvent> vBucketLowPriority;
1319
1320    //! Checkpoint start and end messages
1321    std::queue<queued_item> checkpointMsgs;
1322    //! Checkpoint state per vbucket
1323    std::map<uint16_t, CheckpointState> checkpointState_;
1324
1325    //! Flags passed by the client
1326    uint32_t flags;
1327    //! Dump and disconnect?
1328    bool dumpQueue;
1329    //! Number of records fetched from this stream since the
1330    size_t recordsFetched;
1331    //! Number of records skipped due to changing the filter on the connection
1332    AtomicValue<size_t> recordsSkipped;
1333    //! Do we have a pending flush command?
1334    bool pendingFlush;
1335    //! Backfill age for the connection
1336    uint64_t backfillAge;
1337
1338    //! Take over and disconnect?
1339    bool doTakeOver;
1340    //! Take over completion phase?
1341    bool takeOverCompletionPhase;
1342
1343    //! Should a new backfill task be scheduled now?
1344    bool doRunBackfill;
1345    //! True if items from backfill are all successfully transmitted to the destination.
1346    bool backfillCompleted;
1347    //! Number of pending backfill tasks
1348    size_t pendingBackfillCounter;
1349    //! Number of vbuckets that are currently scheduled for disk backfill.
1350    size_t diskBackfillCounter;
1351
1352    //! Filter for the vbuckets that require backfill by the next backfill task
1353    VBucketFilter backFillVBucketFilter;
1354    //! vbuckets that are being backfilled by the current backfill session
1355    std::set<uint16_t> backfillVBuckets;
1356
1357    AtomicValue<size_t> bgResultSize;
1358    AtomicValue<size_t> bgJobIssued;
1359    AtomicValue<size_t> bgJobCompleted;
1360    AtomicValue<size_t> numTapNack;
1361    AtomicValue<size_t> queueMemSize;
1362    AtomicValue<size_t> queueFill;
1363    AtomicValue<size_t> queueDrain;
1364    AtomicValue<size_t> checkpointMsgCounter;
1365    AtomicValue<size_t> opaqueMsgCounter;
1366
1367    //! Current tap sequence number (for ack's)
1368    uint32_t seqno;
1369    //! The last tap sequence number received
1370    uint32_t seqnoReceived;
1371    //! The last tap sequence number for which an ack is requested
1372    uint32_t seqnoAckRequested;
1373
1374    //! tap opaque command code.
1375    uint32_t opaqueCommandCode;
1376
1377    //! Textual representation of the vbucket filter.
1378    std::string filterText;
1379    //! Textual representation of the flags..
1380    std::string flagsText;
1381
1382    AtomicValue<rel_time_t> lastMsgTime;
1383
1384    bool isLastAckSucceed;
1385    bool isSeqNumRotated;
1386
1387    //! Should we send a NOOP message now?
1388    AtomicValue<bool> noop;
1389    size_t numNoops;
1390
1391    //! Does the Tap Consumer know about the byteorder bug for the flags
1392    bool flagByteorderSupport;
1393
1394    //! EP-engine specific item info
1395    uint8_t *specificData;
1396    //! Timestamp of backfill start
1397    time_t backfillTimestamp;
1398
1399    DISALLOW_COPY_AND_ASSIGN(TapProducer);
1400};
1401
1402#endif  // SRC_TAPCONNECTION_H_
1403