1from xdcrbasetests import XDCRReplicationBaseTest, XDCRConstants
2from remote.remote_util import RemoteMachineShellConnection
3from lib.membase.api.rest_client import RestConnection
4from membase.api.exception import XDCRCheckpointException
5from mc_bin_client import MemcachedClient, MemcachedError
6from memcached.helper.data_helper import MemcachedClientHelper, VBucketAwareMemcached
7
8
9class XDCRCheckpointUnitTest(XDCRReplicationBaseTest):
10    def setUp(self):
11        super(XDCRCheckpointUnitTest, self).setUp()
12        if not self._default_bucket:
13            self.fail("Remove \'default_bucket=false\', these unit tests are designed to run on default bucket")
14        self.init()
15
16    def tearDown(self):
17        self.log.info("Checkpoints recorded in this run -")
18        for record in self.chkpt_records:
19            self.log.info(record)
20        super(XDCRCheckpointUnitTest, self).tearDown()
21
22    def init(self):
23        self.keys_loaded = []
24        self.key_counter = 0
25        # some keys that will always hash to vb0
26        self.vb0_keys = ['pymc1098', 'pymc1108', 'pymc2329', 'pymc4019', 'pymc4189', 'pymc7238','pymc10031', 'pymc10743',
27                         'pymc11935', 'pymc13210', 'pymc13380', 'pymc13562', 'pymc14824', 'pymc15120', 'pymc15652',
28                         'pymc16291', 'pymc16301', 'pymc16473', 'pymc18254', 'pymc18526']
29        self.chkpt_records = []
30        self.num_commit_for_chkpt_calls_so_far = 0
31        self.num_successful_chkpts_so_far = 0
32        self.num_failed_chkpts_so_far = 0
33        self.num_pre_replicate_calls_so_far = 0
34        self.num_successful_prereps_so_far = 0
35        self.num_failed_prereps_so_far = 0
36        self.read_chkpt_history_new_vb0node()
37
38    """ * Call everytime the active vb0 on dest moves *
39        We don't install everytime a test is run so it is important to know the checkpoint history on the node.
40        Hence we call this in the beginning of every run.
41        We determine if checkpointing is successful based on the CAPI posts and return codes at destination from couchdb log
42        When the active vb0 moves, we must remember we are accessing new set of logs. To rely on _was_pre_successful()
43        and was_checkpointing_successful(), we should get a log snapshot to compare against
44    """
45    def read_chkpt_history_new_vb0node(self):
46        # since we do not install before every test, discounting already recorded checkpoints, pre-replicates"""
47        self.num_commit_for_chkpt_beginning = self.num_successful_chkpts_beginning = self.num_failed_chkpts_beginning = 0
48        self.num_pre_replicates_beginning = self.num_successful_prereps_beginning = self.num_failed_prereps_beginning = 0
49        # get these numbers from logs
50        node = self.get_active_vb0_node(self.dest_master)
51        self.num_commit_for_chkpt_beginning, self.num_successful_chkpts_beginning, self.num_failed_chkpts_beginning = \
52            self.get_checkpoint_call_history(node)
53        self.num_pre_replicates_beginning, self.num_successful_prereps_beginning,self.num_failed_prereps_beginning = \
54            self.get_pre_replicate_call_history(node)
55        self.log.info("From previous runs on {} : Num of commit calls : {} ; num of successful commits : {} \
56        num of failed commits : {}".format(node.ip, self.num_commit_for_chkpt_beginning, \
57        self.num_successful_chkpts_beginning,self.num_failed_chkpts_beginning))
58        self.log.info("From previous runs on {} : Num of pre_replicate calls : {} ; num of successful pre_replicates : {} \
59        num of failed pre_replicates : {}".format(node.ip,self.num_pre_replicates_beginning, \
60                                            self.num_successful_prereps_beginning, self.num_failed_prereps_beginning ))
61
62        self.num_commit_for_chkpt_calls_so_far = self.num_commit_for_chkpt_beginning
63        self.num_successful_chkpts_so_far = self.num_successful_chkpts_beginning
64        self.num_failed_chkpts_so_far = self.num_failed_chkpts_beginning
65        self.num_pre_replicate_calls_so_far = self.num_pre_replicates_beginning
66        self.num_successful_prereps_so_far = self.num_successful_prereps_beginning
67        self.num_failed_prereps_so_far = self.num_failed_prereps_beginning
68
69    """ Returns node containing active vb0 """
70    def get_active_vb0_node(self, master):
71        nodes = self.src_nodes
72        ip = VBucketAwareMemcached(RestConnection(master),'default').vBucketMap[0].split(':')[0]
73        if master == self.dest_master:
74            nodes = self.dest_nodes
75        for node in nodes:
76            if ip == node.ip:
77                return node
78        raise XDCRCheckpointException("Error determining the node containing active vb0")
79
80    """ Sample XDCR checkpoint record -
81      {u'total_docs_checked': 1,                        :
82       u'upr_snapshot_end_seqno': 1,                    : UPR snapshot end sequence number
83       u'upr_snapshot_seqno': 1,                        : UPR snapshot starting sequence number
84       u'seqno': 1,                                     : the sequence number we checkpointed at
85       u'start_time': u'Tue, 20 May 2014 22:17:51 GMT', : start time of ep_engine
86       u'total_data_replicated': 151,                   : number of bytes replicated to dest
87       u'commitopaque': [169224017468010, 2],           : remote failover log
88       u'total_docs_written': 1,                        : number of docs replicated to dest
89       u'end_time': u'Tue, 20 May 2014 22:18:56 GMT',   : time at checkpointing
90       u'failover_uuid': 77928303208376}                : local vb_uuid
91
92        Main method that validates a checkpoint record """
93    def get_and_validate_latest_checkpoint(self):
94        rest_con = RestConnection(self.src_master)
95        try:
96            checkpoint_record = rest_con.get_recent_xdcr_vb_ckpt('default')
97            self.log.info("Checkpoint record : {}".format(checkpoint_record))
98            self.chkpt_records.append(checkpoint_record)
99        except Exception as e:
100            raise XDCRCheckpointException("Error retrieving last checkpoint document - {}".format(e))
101
102        commit_opaque = checkpoint_record["commitopaque"]
103        failover_uuid = checkpoint_record["failover_uuid"]
104        seqno = checkpoint_record["seqno"]
105
106        self.log.info ("Verifying commitopaque/remote failover log ...")
107        if seqno != 0:
108            self.validate_remote_failover_log(commit_opaque[0], commit_opaque[1])
109            self.log.info ("Verifying local failover uuid ...")
110            local_vb_uuid, _ = self.get_failover_log(self.src_master)
111            self.assertTrue(int(local_vb_uuid) == int(failover_uuid),
112                        "local failover_uuid is wrong in checkpoint record! Expected: {0} seen: {1}".
113                        format(local_vb_uuid,failover_uuid))
114            self.log.info("Checkpoint record verified")
115        else:
116            self.log.info("Skipping checkpoint record checks for checkpoint-0")
117        return True
118
119    """ Checks if commitopaque in a checkpoint record matches remote failover log """
120    def validate_remote_failover_log(self, vb_uuid, high_seqno):
121        # TAP based validation
122        remote_uuid, remote_highseq = self.get_failover_log(self.dest_master)
123        self.log.info("Remote failover log = [{},{}]".format(remote_uuid, remote_highseq))
124        if int(remote_uuid) != int(vb_uuid):
125            raise XDCRCheckpointException("vb_uuid in commitopaque {} does not match remote vb_uuid {}"
126                                          .format(remote_uuid))
127
128    """ Gets failover log [vb_uuid, high_seqno] from node containing vb0 """
129    def get_failover_log(self, master):
130        vb0_active_node = self.get_active_vb0_node(master)
131        stats = MemcachedClientHelper.direct_client(vb0_active_node, 'default').stats('vbucket-seqno')
132        return stats['vb_0:uuid'], stats['vb_0:high_seqno']
133
134    """ Gets _commit_for_checkpoint call history recorded so far on a node """
135    def get_checkpoint_call_history(self, node):
136        shell = RemoteMachineShellConnection(node)
137        os_type = shell.extract_remote_info().distribution_type
138        if os_type.lower() == 'windows':
139            couchdb_log = "C:/Program Files/Couchbase/Server/var/lib/couchbase/logs/couchdb.log"
140        else:
141            couchdb_log = "/opt/couchbase/var/lib/couchbase/logs/couchdb.log"
142        total_chkpt_calls, error = shell.execute_command("grep \"POST /_commit_for_checkpoint\" \"{}\" | wc -l"
143                                                                     .format(couchdb_log))
144        total_successful_chkpts, error = shell.execute_command("grep \"POST /_commit_for_checkpoint 200\" \"{}\" | wc -l"
145                                                                     .format(couchdb_log))
146        self.log.info(int(total_successful_chkpts[0]))
147        if self.num_successful_chkpts_so_far != 0:
148            checkpoint_number = int(total_successful_chkpts[0]) - self.num_successful_chkpts_beginning
149            self.log.info("Checkpoint on this node (this run): {}".format(checkpoint_number))
150        shell.disconnect()
151        total_commit_failures = int(total_chkpt_calls[0]) - int(total_successful_chkpts[0])
152        return int(total_chkpt_calls[0]), int(total_successful_chkpts[0]), total_commit_failures
153
154    """ Gets total number of pre_replicate responses made from dest, number of
155        successful and failed pre_replicate calls so far on the current dest node """
156    def get_pre_replicate_call_history(self, node):
157        shell = RemoteMachineShellConnection(node)
158        os_type = shell.extract_remote_info().distribution_type
159        if os_type.lower() == 'windows':
160            couchdb_log = "C:/Program Files/Couchbase/Server/var/lib/couchbase/logs/couchdb.log"
161        else:
162            couchdb_log = "/opt/couchbase/var/lib/couchbase/logs/couchdb.log"
163        total_prerep_calls, error = shell.execute_command("grep \"POST /_pre_replicate\" \"{}\" | wc -l"
164                                                                     .format(couchdb_log))
165        total_successful_prereps, error = shell.execute_command("grep \"POST /_pre_replicate 200\" \"{}\" | wc -l"
166                                                                     .format(couchdb_log))
167        shell.disconnect()
168        total_prerep_failures = int(total_prerep_calls[0]) - int(total_successful_prereps[0])
169        return int(total_prerep_calls[0]), int(total_successful_prereps[0]), total_prerep_failures
170
171    """ From destination couchdb log tells if checkpointing was successful """
172    def was_checkpointing_successful(self):
173        node = self.get_active_vb0_node(self.dest_master)
174        total_commit_calls, success, failures = self.get_checkpoint_call_history(node)
175        if success > self.num_successful_chkpts_so_far :
176            self.log.info("_commit_for_checkpoint was successful: last recorded success:{} , now :{}".
177                          format(self.num_successful_chkpts_so_far, success))
178            self.num_successful_chkpts_so_far = success
179            return True
180        elif failures > self.num_failed_chkpts_so_far:
181            self.log.info("_commit_for_checkpoint was NOT successful: last recorded failure :{} , now :{}".
182                          format(self.num_failed_chkpts_so_far, failures))
183            self.num_failed_chkpts_so_far = failures
184        elif total_commit_calls == self.num_commit_for_chkpt_calls_so_far:
185            self.log.info("Checkpointing did not happen: last recorded call :{} , now :{}".
186                          format(self.num_commit_for_chkpt_calls_so_far, total_commit_calls))
187        return False
188
189    """ Tells if pre-replicate was successful based on source->dest _pre_replicate CAPI posts """
190    def was_pre_rep_successful(self):
191        node = self.get_active_vb0_node(self.dest_master)
192        total_commit_calls, success, failures = self.get_pre_replicate_call_history(node)
193        if success > self.num_successful_prereps_so_far :
194            self.log.info("_pre_replicate was successful: last recorded success :{} , now :{}".
195                          format(self.num_successful_prereps_so_far, success))
196            self.num_successful_prereps_so_far = success
197            return True
198        elif failures > self.num_failed_prereps_so_far:
199            self.log.error("_pre_replicate was NOT successful: last recorded failure :{} , now :{}".
200                          format(self.num_failed_prereps_so_far, failures))
201            self.num_failed_prereps_so_far = failures
202        elif total_commit_calls == self.num_pre_replicate_calls_so_far:
203            self.log.error("ERROR: Pre-replication did NOT happen!")
204        return False
205
206    """ Load one mutation into source node containing active vb0 """
207    def load_one_mutation_into_source_vb0(self, vb0_active_src_node):
208        key = self.vb0_keys[self.key_counter]
209        memc_client = MemcachedClient(vb0_active_src_node.ip, 11210)
210        try:
211            memc_client.set(key, exp=0, flags=0, val="dummy val")
212            self.key_counter += 1
213            self.keys_loaded.append(key)
214            self.log.info("Loaded key {} onto vb0 in {}".format(key, vb0_active_src_node.ip))
215            self.log.info ("deleted, flags, exp, rev_id, cas for key {} = {}".format(key, memc_client.getMeta(key)))
216        except MemcachedError as e:
217            self.log.error(e)
218
219    """ Initial load, 3 further updates on same key onto vb0
220        Note: Checkpointing happens during the second mutation,but only if it's time to checkpoint """
221    def mutate_and_checkpoint(self, n=3):
222        count = 1
223        # get vb0 active source node
224        active_src_node = self.get_active_vb0_node(self.src_master)
225        while count <=n:
226            self.sleep(self._checkpoint_interval + 10)
227            remote_vbuuid, remote_highseqno = self.get_failover_log(self.dest_master)
228            local_vbuuid, local_highseqno = self.get_failover_log(self.src_master)
229            self.log.info("Local failover log: [{}, {}]".format(local_vbuuid,local_highseqno))
230            self.log.info("Remote failover log: [{}, {}]".format(remote_vbuuid,remote_highseqno))
231            self.log.info("################ New mutation:{} ##################".format(self.key_counter+1))
232            self.load_one_mutation_into_source_vb0(active_src_node)
233            if self.was_checkpointing_successful():
234                self.log.info("Validating checkpoint record ...")
235                self.get_and_validate_latest_checkpoint()
236            else:
237                self.log.info("Checkpointing failed - may not be an error if vb_uuid changed ")
238                return False
239            count += 1
240        return True
241
242    """ Verify checkpoint 404 error thrown when the dest node containing vb0 is no more a part of cluster """
243    def mutate_and_check_error404(self, n=1):
244        # get vb0 active source node
245        active_src_node = self.get_active_vb0_node(self.src_master)
246        shell = RemoteMachineShellConnection(active_src_node)
247        os_type = shell.extract_remote_info().distribution_type
248        if os_type.lower() == 'windows':
249            trace_log = "C:/Program Files/Couchbase/Server/var/lib/couchbase/logs/xdcr_trace.log"
250        else:
251            trace_log = "/opt/couchbase/var/lib/couchbase/logs/xdcr_trace.*"
252        num_404_errors_before_load, error = shell.execute_command("grep \"error,404\" {} | wc -l"
253                                                                     .format(trace_log))
254        num_get_remote_bkt_failed_before_load, error = shell.execute_command("grep \"get_remote_bucket_failed\" \"{}\" | wc -l"
255                                                                     .format(trace_log))
256        self.log.info("404 errors: {}, get_remote_bucket_failed errors : {}".
257                      format(num_404_errors_before_load, num_get_remote_bkt_failed_before_load))
258        self.sleep(60)
259        self.log.info("################ New mutation:{} ##################".format(self.key_counter+1))
260        self.load_one_mutation_into_source_vb0(active_src_node)
261        self.sleep(5)
262        num_404_errors_after_load, error = shell.execute_command("grep \"error,404\" {} | wc -l"
263                                                                     .format(trace_log))
264        num_get_remote_bkt_failed_after_load, error = shell.execute_command("grep \"get_remote_bucket_failed\" \"{}\" | wc -l"
265                                                                     .format(trace_log))
266        self.log.info("404 errors: {}, get_remote_bucket_failed errors : {}".
267                      format(num_404_errors_after_load, num_get_remote_bkt_failed_after_load))
268        shell.disconnect()
269        if (int(num_404_errors_after_load[0]) > int(num_404_errors_before_load[0])) or \
270           (int(num_get_remote_bkt_failed_after_load[0]) > int(num_get_remote_bkt_failed_before_load[0])):
271            self.log.info("Checkpointing error-404 verified after dest failover/rebalance out")
272            return True
273        else:
274            self.log.info("404 errors on source node before last load : {}, after last node: {}".
275                          format(int(num_404_errors_after_load[0]), int(num_404_errors_before_load[0])))
276            self.log.error("Checkpoint 404 error NOT recorded at source following dest failover or rebalance!")
277
278    """ Rebalance-out active vb0 node from a cluster """
279    def rebalance_out_activevb0_node(self, master):
280        pre_rebalance_uuid, _ =self.get_failover_log(master)
281        self.log.info("Starting rebalance-out ...")
282        # find which node contains vb0
283        node = self.get_active_vb0_node(master)
284        self.log.info("Node {} contains active vb0".format(node))
285        if master == self.src_master:
286            tasks = self._async_rebalance(self.src_nodes, [], [node])
287            for task in tasks:
288                task.result()
289            if master == node:
290                self.src_nodes.remove(self.src_master)
291                self.src_master = self.src_nodes[0]
292            post_rebalance_uuid, _= self.get_failover_log(self.get_active_vb0_node(self.src_master))
293            self.log.info("Remote uuid before rebalance :{}, after rebalance : {}".
294                      format(pre_rebalance_uuid, post_rebalance_uuid))
295            # source rebalance on tap?
296            if RestConnection(self.src_master).get_internal_replication_type() == 'tap':
297                self.assertTrue(int(pre_rebalance_uuid) != int(post_rebalance_uuid),
298                                "vb_uuid of vb0 is same before and after TAP rebalance")
299            else:
300                self.log.info("Current internal replication = UPR,hence vb_uuid did not change," \
301                          "Subsequent _commit_for_checkpoints are expected to pass")
302            self.verify_next_checkpoint_passes()
303        else:
304            tasks = self._async_rebalance(self.dest_nodes, [], [node])
305            for task in tasks:
306                task.result()
307            if master == node:
308                self.dest_nodes.remove(self.dest_master)
309                self.dest_master = self.dest_nodes[0]
310            post_rebalance_uuid, _= self.get_failover_log(self.get_active_vb0_node(self.dest_master))
311            self.log.info("Remote uuid before rebalance :{}, after rebalance : {}".
312                      format(pre_rebalance_uuid, post_rebalance_uuid))
313            # destination rebalance on tap?
314            if RestConnection(self.dest_master).get_internal_replication_type() == 'tap':
315                self.assertTrue(int(pre_rebalance_uuid) != int(post_rebalance_uuid),
316                                "vb_uuid of vb0 is same before and after TAP rebalance")
317                self.read_chkpt_history_new_vb0node()
318                self.verify_next_checkpoint_fails_after_dest_uuid_change()
319            else:
320                self.log.info("Current internal replication = UPR,hence destination vb_uuid did not change," \
321                          "Subsequent _commit_for_checkpoints are expected to pass")
322                self.read_chkpt_history_new_vb0node()
323                self.mutate_and_check_error404()
324                # the replicator might still be awake, ensure adequate time gap
325                self.sleep(self.wait_timeout * 2)
326                self.verify_next_checkpoint_passes()
327
328    """ Failover active vb0 node from a cluster """
329    def failover_activevb0_node(self, master):
330        pre_failover_uuid, _ =self.get_failover_log(master)
331        self.log.info("Starting failover ...")
332        # find which node contains vb0, we will failover that node
333        node = self.get_active_vb0_node(master)
334        self.log.info("Node {} contains active vb0".format(node))
335        if master == self.src_master:
336            tasks = self._async_failover(self.src_nodes, [node])
337            tasks = self._async_rebalance(self.src_nodes, [], [])
338            for task in tasks:
339                task.result()
340            self.src_nodes.remove(node)
341            self.src_master = self.src_nodes[0]
342        else:
343            tasks = self._async_failover(self.dest_nodes, [node])
344            tasks = self._async_rebalance(self.dest_nodes, [], [])
345            for task in tasks:
346                task.result()
347            self.dest_nodes.remove(node)
348            self.dest_master = self.dest_nodes[0]
349
350        if "source" in self._failover:
351            post_failover_uuid, _= self.get_failover_log(self.get_active_vb0_node(self.src_master))
352        else:
353            post_failover_uuid, _= self.get_failover_log(self.get_active_vb0_node(self.dest_master))
354        self.log.info("Remote uuid before failover :{}, after failover : {}".format(pre_failover_uuid, post_failover_uuid))
355        self.assertTrue(int(pre_failover_uuid) != int(post_failover_uuid),"Remote vb_uuid is same before and after failover")
356
357    """ Crash node, check uuid before and after crash """
358    def crash_node(self, master):
359        count = 0
360        pre_crash_uuid, _ = self.get_failover_log(master)
361        node = self.get_active_vb0_node(master)
362        self.log.info("Crashing node {} containing vb0 ...".format(node))
363        shell = RemoteMachineShellConnection(node)
364        shell.terminate_process(process_name='memcached')
365        shell.disconnect()
366        # If we are killing dest node, try to mutate key at source to cause xdcr activity
367        if master == self.dest_master:
368            while count < 5:
369                self.load_one_mutation_into_source_vb0(self.get_active_vb0_node(self.src_master))
370                count += 1
371        self.sleep(10)
372        post_crash_uuid, _=self.get_failover_log(master)
373        self.log.info("vb_uuid before crash :{}, after crash : {}".format(pre_crash_uuid, post_crash_uuid))
374        self.assertTrue(int(pre_crash_uuid) != int(post_crash_uuid),
375                        "vb_uuid is same before and after erlang crash - MB-11085 ")
376
377    """Tests dest node(containing vb0) crash"""
378    def test_dest_node_crash(self):
379        self.mutate_and_checkpoint()
380        self.crash_node(self.dest_master)
381        self.verify_next_checkpoint_fails_after_dest_uuid_change()
382        self.verify_revid()
383
384    """ Tests if pre_replicate and commit_for_checkpoint following source crash is successful"""
385    def test_source_node_crash(self):
386        self.mutate_and_checkpoint(n=2)
387        self.crash_node(self.src_master)
388        if self.was_pre_rep_successful():
389            self.log.info("_pre_replicate following the source crash was successful: {}".
390                          format(self.num_successful_prereps_so_far))
391            # the replicator might still be awake, ensure adequate time gap
392            self.sleep(self.wait_timeout * 2)
393            if self.mutate_and_checkpoint(n=1):
394                self.log.info("Checkpointing resumed normally after source crash")
395            else:
396                self.fail("Checkpointing failed once again after the last uuid change")
397        else:
398            self.fail("ERROR: _pre_replicate following source crash was unsuccessful")
399        self.verify_revid()
400
401    """ Tests if vb_uuid changes after bucket flush, subsequent checkpoint fails indicating that and
402        next checkpoint is successful"""
403    def test_dest_bucket_flush(self):
404        self.mutate_and_checkpoint()
405        self.cluster.async_bucket_flush(self.dest_master, 'default')
406        self.verify_next_checkpoint_fails_after_dest_uuid_change()
407        self.verify_revid()
408
409    """ Tests if vb_uuid at destination changes, next checkpoint fails and then recovers eventually """
410    def test_dest_bucket_delete_recreate(self):
411        self.mutate_and_checkpoint()
412        self.cluster.bucket_delete(self.dest_master, 'default')
413        self._create_buckets(self.dest_nodes)
414        self.verify_next_checkpoint_fails_after_dest_uuid_change()
415        self.verify_revid()
416
417    """ Checks if _pre_replicate and _commit_for_checkpoint are successful after source bucket recreate """
418    def test_source_bucket_delete_recreate(self):
419        self.mutate_and_checkpoint(n=2)
420        self.cluster.bucket_delete(self.src_master, 'default')
421        self._create_buckets(self.src_nodes)
422        dest_cluster_name = self._get_cluster_names()[1]
423        RestConnection(self.src_master).start_replication(XDCRConstants.REPLICATION_TYPE_CONTINUOUS,
424                                                          'default', dest_cluster_name, self.rep_type)
425        self.sleep(5)
426        self.key_counter = 0
427        self.keys_loaded = []
428        if self.was_pre_rep_successful():
429            self.log.info("_pre_replicate following the source bucket recreate was successful: {}".
430                          format(self.num_successful_prereps_so_far))
431            self.verify_next_checkpoint_passes()
432        else:
433            self.fail("ERROR: _pre_replicate following source bucket recreate was unsuccessful")
434        self.verify_revid()
435
436    """ Test rebalance-out of vb0 node at source/destination and checkpointing behavior """
437    def test_rebalance(self):
438        self.mutate_and_checkpoint(n=2)
439        if "destination" in self._rebalance:
440            self.rebalance_out_activevb0_node(self.dest_master)
441        elif "source" in self._rebalance :
442            self.rebalance_out_activevb0_node(self.src_master)
443        self.verify_revid()
444
445    """ Test failover of vb0 node at source/destination and checkpointing behavior """
446    def test_failover(self):
447        self.mutate_and_checkpoint(n=2)
448        if "destination" in self._failover:
449            self.failover_activevb0_node(self.dest_master)
450            self.read_chkpt_history_new_vb0node()
451            self.mutate_and_check_error404()
452            # the replicator might still be awake, ensure adequate time gap
453            self.sleep(self.wait_timeout*2)
454            self.verify_next_checkpoint_passes()
455        elif "source" in self._failover:
456            self.failover_activevb0_node(self.src_master)
457            self.verify_next_checkpoint_passes()
458        self.verify_revid()
459
460    """ Checks if the subsequent _commit_for_checkpoint and _pre_replicate
461        fail after a dest vb_uuid change. Also checks if the next checkpoint
462        call is successful
463    """
464    def verify_next_checkpoint_fails_after_dest_uuid_change(self):
465        if not self.mutate_and_checkpoint(n=1):
466            self.log.info ("Checkpointing failed as expected after remote uuid change, not a bug")
467            if not self.was_pre_rep_successful():
468                self.log.info("_pre_replicate following the failed checkpoint was unsuccessful, but this is expected")
469                self.verify_next_checkpoint_passes()
470            else:
471                self.log.info("_pre_replicate following the failed checkpoint was successful")
472        else:
473            self.log.info("Checkpointing passed, after remote_uuid change following most recent crash/topology change ")
474
475    """ Checks if the subsequent _commit_for_checkpoint and _pre_replicate pass
476        happens if dest vb_uuid did not change or only source uuid changed
477    """
478    def verify_next_checkpoint_passes(self):
479        if self.mutate_and_checkpoint(n=1):
480            self.log.info ("Checkpointing was successful")
481        else:
482            self.fail("Checkpointing failed unexpectedly")
483
484    """ Checks revIDs of loaded keys and logs missing keys """
485    def verify_revid(self):
486        missing_keys = False
487        src_node = self.get_active_vb0_node(self.src_master)
488        dest_node = self.get_active_vb0_node(self.dest_master)
489        src_client = MemcachedClient(src_node.ip, 11210)
490        dest_client = MemcachedClient(dest_node.ip, 11210)
491        for key in self.keys_loaded:
492            src_meta = src_client.getMeta(key)
493            dest_meta = dest_client.getMeta(key)
494            try:
495                self.log.info("deleted, flags, exp, rev_id, cas for key from Source({0}) {1} = {2}"
496                               .format(src_node.ip, key, src_meta))
497                self.log.info("deleted, flags, exp, rev_id, cas for key from Destination({0}) {1} = {2}"
498                               .format(dest_node.ip, key, dest_meta))
499                if src_meta == dest_meta:
500                    self.log.info("RevID verification successful for key {}".format(key))
501                else:
502                    self.fail("RevID verification failed for key {}".format(key))
503            except MemcachedError:
504                self.log.error("Key {} is missing at destination".format(key))
505                missing_keys = True
506        if missing_keys:
507            self.fail("Some keys are missing at destination")
508
509