xref: /6.0.3/testrunner/pytests/xdcr/lww.py (revision b73aabad)
1import zlib
2
3from couchbase_helper.documentgenerator import BlobGenerator, DocumentGenerator
4from xdcrnewbasetests import XDCRNewBaseTest, FloatingServers
5from xdcrnewbasetests import NodeHelper
6from membase.api.rest_client import RestConnection
7from testconstants import STANDARD_BUCKET_PORT
8from remote.remote_util import RemoteMachineShellConnection
9from couchbase.exceptions import NotFoundError
10from couchbase.bucket import Bucket
11from couchbase_helper.cluster import Cluster
12from membase.helper.cluster_helper import ClusterOperationHelper
13from membase.api.exception import XDCRCheckpointException
14from memcached.helper.data_helper import VBucketAwareMemcached
15from security.rbac_base import RbacBase
16
17
18class Lww(XDCRNewBaseTest):
19
20    def setUp(self):
21        super(Lww, self).setUp()
22        self.cluster = Cluster()
23        self.c1_cluster = self.get_cb_cluster_by_name('C1')
24        self.c2_cluster = self.get_cb_cluster_by_name('C2')
25
26        self.skip_ntp = self._input.param("skip_ntp", False)
27        self.clean_backup = self._input.param("clean_backup", False)
28        self.bucketType = self._input.param("bucket_type", "membase")
29        self.evictionPolicy = self._input.param("eviction_policy", "valueOnly")
30
31        if not self.skip_ntp:
32            self._enable_ntp_and_sync()
33
34    def tearDown(self):
35        super(Lww, self).tearDown()
36        if self.clean_backup:
37            remote_client = RemoteMachineShellConnection(self._input.servers[6])
38            command = "rm -rf /data/lww-backup"
39            output, error = remote_client.execute_command(command)
40            remote_client.log_command_output(output, error)
41        if not self.skip_ntp:
42            self._disable_ntp()
43
44    def _enable_ntp_and_sync(self, nodes=[], ntp_server="0.north-america.pool.ntp.org"):
45        if not nodes:
46            nodes = self._input.servers
47        for node in nodes:
48            conn = RemoteMachineShellConnection(node)
49            output, error = conn.execute_command("chkconfig ntpd on")
50            conn.log_command_output(output, error)
51            output, error = conn.execute_command("/etc/init.d/ntpd start")
52            conn.log_command_output(output, error)
53            output, error = conn.execute_command("systemctl start ntpd")
54            conn.log_command_output(output, error)
55            output, error = conn.execute_command("ntpdate -q " + ntp_server)
56            conn.log_command_output(output, error)
57
58    def _disable_ntp(self):
59        for node in self._input.servers:
60            conn = RemoteMachineShellConnection(node)
61            output, error = conn.execute_command("chkconfig ntpd off")
62            conn.log_command_output(output, error)
63            output, error = conn.execute_command("/etc/init.d/ntpd stop")
64            conn.log_command_output(output, error)
65            output, error = conn.execute_command("systemctl stop ntpd")
66            conn.log_command_output(output, error)
67
68    def _offset_wall_clock(self, cluster=None, offset_secs=0, inc=True, offset_drift=-1):
69        counter = 1
70        for node in cluster.get_nodes():
71            conn = RemoteMachineShellConnection(node)
72            output, error = conn.execute_command("date +%s")
73            conn.log_command_output(output, error)
74            curr_time = int(output[-1])
75            if inc:
76                new_time = curr_time + (offset_secs * counter)
77            else:
78                new_time = curr_time - (offset_secs * counter)
79            output, error = conn.execute_command("date --date @" + str(new_time))
80            conn.log_command_output(output, error)
81            output, error = conn.execute_command("date --set='" + output[-1] + "'")
82            conn.log_command_output(output, error)
83            if offset_drift > 0 and counter < offset_drift:
84                counter = counter + 1
85
86    def _change_time_zone(self, cluster=None, time_zone="America/Los_Angeles"):
87        for node in cluster.get_nodes():
88            conn = RemoteMachineShellConnection(node)
89            output, error = conn.execute_command("timedatectl set-timezone " + time_zone)
90            conn.log_command_output(output, error)
91
92    def _create_buckets(self, bucket='',
93                       ramQuotaMB=1,
94                       authType='none',
95                       saslPassword='',
96                       replicaNumber=1,
97                       proxyPort=11211,
98                       replica_index=1,
99                       threadsNumber=3,
100                       flushEnabled=1,
101                       src_lww=True,
102                       dst_lww=True,
103                       skip_src=False,
104                       skip_dst=False):
105        if not skip_src:
106            src_rest = RestConnection(self.c1_cluster.get_master_node())
107            if src_lww:
108                src_rest.create_bucket(bucket=bucket, ramQuotaMB=ramQuotaMB, authType=authType, saslPassword=saslPassword,
109                                       replicaNumber=replicaNumber, proxyPort=proxyPort, bucketType=self.bucketType,
110                                       replica_index=replica_index, flushEnabled=flushEnabled, evictionPolicy=self.evictionPolicy,
111                                       lww=True)
112            else:
113                src_rest.create_bucket(bucket=bucket, ramQuotaMB=ramQuotaMB, authType=authType, saslPassword=saslPassword,
114                                       replicaNumber=replicaNumber, proxyPort=proxyPort, bucketType=self.bucketType,
115                                       replica_index=replica_index, flushEnabled=flushEnabled, evictionPolicy=self.evictionPolicy)
116            self.c1_cluster.add_bucket(ramQuotaMB=ramQuotaMB, bucket=bucket, authType=authType,
117                                       saslPassword=saslPassword, replicaNumber=replicaNumber,
118                                       proxyPort=proxyPort, bucketType=self.bucketType, evictionPolicy=self.evictionPolicy)
119        if not skip_dst:
120            dst_rest = RestConnection(self.c2_cluster.get_master_node())
121            if dst_lww:
122                dst_rest.create_bucket(bucket=bucket, ramQuotaMB=ramQuotaMB, authType=authType, saslPassword=saslPassword,
123                                       replicaNumber=replicaNumber, proxyPort=proxyPort, bucketType=self.bucketType,
124                                       replica_index=replica_index, flushEnabled=flushEnabled, evictionPolicy=self.evictionPolicy,
125                                       lww=True)
126            else:
127                dst_rest.create_bucket(bucket=bucket, ramQuotaMB=ramQuotaMB, authType=authType, saslPassword=saslPassword,
128                                       replicaNumber=replicaNumber, proxyPort=proxyPort, bucketType=self.bucketType,
129                                       replica_index=replica_index, flushEnabled=flushEnabled, evictionPolicy=self.evictionPolicy)
130            self.c2_cluster.add_bucket(ramQuotaMB=ramQuotaMB, bucket=bucket, authType=authType,
131                                       saslPassword=saslPassword, replicaNumber=replicaNumber,
132                                       proxyPort=proxyPort, bucketType=self.bucketType, evictionPolicy=self.evictionPolicy)
133
134    def _get_python_sdk_client(self, ip, bucket, cluster):
135        try:
136            role_del = [bucket]
137            RbacBase().remove_user_role(role_del, RestConnection(cluster.get_master_node()))
138        except Exception, ex:
139            self.log.info(str(ex))
140            self.assertTrue(str(ex) == '"User was not found."', str(ex))
141
142        testuser = [{'id': bucket, 'name': bucket, 'password': 'password'}]
143        RbacBase().create_user_source(testuser, 'builtin', cluster.get_master_node())
144        self.sleep(10)
145
146        role_list = [{'id': bucket, 'name': bucket, 'roles': 'admin'}]
147        RbacBase().add_user_role(role_list, RestConnection(cluster.get_master_node()), 'builtin')
148        self.sleep(10)
149
150        try:
151            cb = Bucket('couchbase://' + ip + '/' + bucket, password='password')
152            if cb is not None:
153                self.log.info("Established connection to bucket " + bucket + " on " + ip + " using python SDK")
154            else:
155                self.fail("Failed to connect to bucket " + bucket + " on " + ip + " using python SDK")
156            return cb
157        except Exception, ex:
158            self.fail(str(ex))
159
160    def _upsert(self, conn, doc_id, old_key, new_key, new_val):
161        obj = conn.get(key=doc_id)
162        value = obj.value
163        value[new_key] = value.pop(old_key)
164        value[new_key] = new_val
165        conn.upsert(key=doc_id, value=value)
166
167    def _kill_processes(self, crashed_nodes=[]):
168        try:
169            NodeHelper.kill_erlang(node)
170        except:
171            self.log.info('Could not kill erlang process on node, continuing..')
172
173    def _start_cb_server(self, node):
174        shell = RemoteMachineShellConnection(node)
175        shell.start_couchbase()
176        shell.disconnect()
177
178    def _get_max_cas(self, node, bucket, vbucket_id=0):
179        max_cas = 0
180        conn = RemoteMachineShellConnection(node)
181        command = "/opt/couchbase/bin/cbstats -u cbadminbucket -p password " + node.ip + ":11210 vbucket-details " + str(vbucket_id) + " -b " + bucket
182        output, error = conn.execute_command(command)
183        conn.log_command_output(output, error)
184        for line in output:
185            if "max_cas" in line:
186                max_cas = line.split()[1]
187                break
188        return long(max_cas)
189
190    def _get_vbucket_id(self, key, num_vbuckets=1024):
191        vbucket_id = ((zlib.crc32(key) >> 16) & 0x7FFF) % num_vbuckets
192        return vbucket_id
193
194    def test_lww_enable(self):
195        src_conn = RestConnection(self.c1_cluster.get_master_node())
196        dest_conn = RestConnection(self.c2_cluster.get_master_node())
197
198        self._create_buckets(bucket='default', ramQuotaMB=100, src_lww=False, dst_lww=False)
199        self.assertFalse(src_conn.is_lww_enabled(), "LWW enabled on source bucket")
200        self.log.info("LWW not enabled on source bucket as expected")
201        self.assertFalse(dest_conn.is_lww_enabled(), "LWW enabled on dest bucket")
202        self.log.info("LWW not enabled on dest bucket as expected")
203
204        src_conn.delete_bucket()
205        dest_conn.delete_bucket()
206
207        self._create_buckets(bucket='default', ramQuotaMB=100)
208        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
209        self.log.info("LWW enabled on source bucket as expected")
210        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
211        self.log.info("LWW enabled on dest bucket as expected")
212
213    def test_replication_with_lww_default(self):
214        src_conn = RestConnection(self.c1_cluster.get_master_node())
215        dest_conn = RestConnection(self.c2_cluster.get_master_node())
216
217        self._create_buckets(bucket='default', ramQuotaMB=100)
218        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
219        self.log.info("LWW enabled on source bucket as expected")
220        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
221        self.log.info("LWW enabled on dest bucket as expected")
222
223        self.sleep(10)
224
225        self.setup_xdcr()
226        self.merge_all_buckets()
227        self.c1_cluster.pause_all_replications_by_id()
228
229        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
230        self.c2_cluster.load_all_buckets_from_generator(gen1)
231        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
232        self.c1_cluster.load_all_buckets_from_generator(gen2)
233
234        self.c1_cluster.resume_all_replications_by_id()
235
236        self.verify_results()
237
238    def test_replication_with_lww_sasl(self):
239        src_conn = RestConnection(self.c1_cluster.get_master_node())
240        dest_conn = RestConnection(self.c2_cluster.get_master_node())
241
242        self._create_buckets(bucket='sasl_bucket', ramQuotaMB=100, authType='sasl', saslPassword='password')
243        self.assertTrue(src_conn.is_lww_enabled('sasl_bucket'), "LWW not enabled on source bucket")
244        self.log.info("LWW enabled on source bucket as expected")
245        self.assertTrue(dest_conn.is_lww_enabled('sasl_bucket'), "LWW not enabled on dest bucket")
246        self.log.info("LWW enabled on dest bucket as expected")
247
248        self.sleep(10)
249
250        self.setup_xdcr()
251        self.merge_all_buckets()
252        self.c1_cluster.pause_all_replications_by_id()
253
254        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
255        self.c2_cluster.load_all_buckets_from_generator(gen1)
256        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
257        self.c1_cluster.load_all_buckets_from_generator(gen2)
258
259        self.c1_cluster.resume_all_replications_by_id()
260
261        self.verify_results()
262
263    def test_replication_with_lww_standard(self):
264        src_conn = RestConnection(self.c1_cluster.get_master_node())
265        dest_conn = RestConnection(self.c2_cluster.get_master_node())
266
267        self._create_buckets(bucket='standard_bucket', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT)
268        self.assertTrue(src_conn.is_lww_enabled('standard_bucket'), "LWW not enabled on source bucket")
269        self.log.info("LWW enabled on source bucket as expected")
270        self.assertTrue(dest_conn.is_lww_enabled('standard_bucket'), "LWW not enabled on dest bucket")
271        self.log.info("LWW enabled on dest bucket as expected")
272
273        self.sleep(10)
274
275        self.setup_xdcr()
276        self.merge_all_buckets()
277        self.c1_cluster.pause_all_replications_by_id()
278
279        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
280        self.c2_cluster.load_all_buckets_from_generator(gen1)
281        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
282        self.c1_cluster.load_all_buckets_from_generator(gen2)
283
284        self.c1_cluster.resume_all_replications_by_id()
285
286        self.verify_results()
287
288    def test_replication_with_lww_and_no_lww(self):
289        src_conn = RestConnection(self.c1_cluster.get_master_node())
290        dest_conn = RestConnection(self.c2_cluster.get_master_node())
291
292        self._create_buckets(bucket='lww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT)
293        self.assertTrue(src_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on source bucket")
294        self.log.info("LWW enabled on source bucket as expected")
295        self.assertTrue(dest_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on dest bucket")
296        self.log.info("LWW enabled on dest bucket as expected")
297
298
299        self._create_buckets(bucket='nolww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT + 1, src_lww=False,
300                            dst_lww=False)
301        self.assertFalse(src_conn.is_lww_enabled(bucket='nolww'), "LWW enabled on source bucket")
302        self.log.info("LWW not enabled on source bucket as expected")
303        self.assertFalse(dest_conn.is_lww_enabled(bucket='nolww'), "LWW enabled on dest bucket")
304        self.log.info("LWW not enabled on dest bucket as expected")
305
306        self.sleep(10)
307
308        self.setup_xdcr()
309        self.merge_all_buckets()
310        self.c1_cluster.pause_all_replications_by_id()
311
312        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
313        self.c2_cluster.load_all_buckets_from_generator(gen1)
314        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
315        self.c1_cluster.load_all_buckets_from_generator(gen2)
316
317        self.c1_cluster.resume_all_replications_by_id()
318
319        self.verify_results()
320
321    def test_seq_upd_on_uni_with_src_wins(self):
322        src_conn = RestConnection(self.c1_cluster.get_master_node())
323        dest_conn = RestConnection(self.c2_cluster.get_master_node())
324
325        self._create_buckets(bucket='lww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT)
326        self.assertTrue(src_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on source bucket")
327        self.log.info("LWW enabled on source bucket as expected")
328        self.assertTrue(dest_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on dest bucket")
329        self.log.info("LWW enabled on dest bucket as expected")
330
331        self._create_buckets(bucket='nolww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT + 1, src_lww=False,
332                            dst_lww=False)
333        self.assertFalse(src_conn.is_lww_enabled(bucket='nolww'), "LWW enabled on source bucket")
334        self.log.info("LWW not enabled on source bucket as expected")
335        self.assertFalse(dest_conn.is_lww_enabled(bucket='nolww'), "LWW enabled on dest bucket")
336        self.log.info("LWW not enabled on dest bucket as expected")
337
338        self.sleep(30)
339
340        src_lww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'lww', self.c1_cluster)
341        self.sleep(10)
342        src_nolww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'nolww', self.c1_cluster)
343        self.sleep(10)
344        dest_lww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'lww', self.c2_cluster)
345        self.sleep(10)
346        dest_nolww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'nolww', self.c2_cluster)
347        self.sleep(10)
348
349        self.setup_xdcr()
350        self.merge_all_buckets()
351        self.c1_cluster.pause_all_replications_by_id()
352        self.sleep(10)
353
354        gen = DocumentGenerator('lww', '{{"key":"value"}}', xrange(100), start=0, end=1)
355        self.c2_cluster.load_all_buckets_from_generator(gen)
356        self.sleep(10)
357        self._upsert(conn=dest_lww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
358        self.sleep(10)
359        self._upsert(conn=dest_nolww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
360        self.sleep(10)
361        gen = DocumentGenerator('lww', '{{"key2":"value2"}}', xrange(100), start=0, end=1)
362        self.c1_cluster.load_all_buckets_from_generator(gen)
363        self.sleep(10)
364
365        self.c1_cluster.resume_all_replications_by_id()
366        self.sleep(10)
367        self._wait_for_replication_to_catchup()
368
369        obj = src_lww.get(key='lww-0')
370        self.assertDictContainsSubset({'key2':'value2'}, obj.value, "Src doc did not win using LWW")
371        obj = dest_lww.get(key='lww-0')
372        self.assertDictContainsSubset({'key2':'value2'}, obj.value, "Src doc did not win using LWW")
373        self.log.info("Src doc won using LWW as expected")
374
375        obj = src_nolww.get(key='lww-0')
376        self.assertDictContainsSubset({'key2':'value2'}, obj.value, "Src doc did not win using LWW")
377        obj = dest_nolww.get(key='lww-0')
378        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Target doc did not win using Rev Id")
379        self.log.info("Target doc won using Rev Id as expected")
380
381        self.verify_results(skip_verify_data=['nolww'], skip_verify_revid=['nolww'])
382
383    def test_seq_upd_on_uni_with_dest_wins(self):
384        src_conn = RestConnection(self.c1_cluster.get_master_node())
385        dest_conn = RestConnection(self.c2_cluster.get_master_node())
386
387        self._create_buckets(bucket='lww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT)
388        self.assertTrue(src_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on source bucket")
389        self.log.info("LWW enabled on source bucket as expected")
390        self.assertTrue(dest_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on dest bucket")
391        self.log.info("LWW enabled on dest bucket as expected")
392
393        self._create_buckets(bucket='nolww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT + 1, src_lww=False,
394                            dst_lww=False)
395        self.assertFalse(src_conn.is_lww_enabled(bucket='nolww'), "LWW enabled on source bucket")
396        self.log.info("LWW not enabled on source bucket as expected")
397        self.assertFalse(dest_conn.is_lww_enabled(bucket='nolww'), "LWW enabled on dest bucket")
398        self.log.info("LWW not enabled on dest bucket as expected")
399
400        self.sleep(10)
401
402        self.setup_xdcr()
403        self.merge_all_buckets()
404        self.c1_cluster.pause_all_replications_by_id()
405
406        self.sleep(30)
407
408        src_lww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'lww', self.c1_cluster)
409        self.sleep(10)
410        src_nolww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'nolww', self.c1_cluster)
411        self.sleep(10)
412        dest_lww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'lww', self.c2_cluster)
413        self.sleep(10)
414        dest_nolww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'nolww', self.c2_cluster)
415        self.sleep(10)
416
417        gen = DocumentGenerator('lww', '{{"key":"value"}}', xrange(100), start=0, end=1)
418        self.c1_cluster.load_all_buckets_from_generator(gen)
419        self.sleep(10)
420        self._upsert(conn=src_lww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
421        self.sleep(10)
422        self._upsert(conn=src_nolww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
423        self.sleep(10)
424        gen = DocumentGenerator('lww', '{{"key2":"value2"}}', xrange(100), start=0, end=1)
425        self.c2_cluster.load_all_buckets_from_generator(gen)
426        self.sleep(10)
427
428        self.c1_cluster.resume_all_replications_by_id()
429        self._wait_for_replication_to_catchup()
430
431        obj = src_lww.get(key='lww-0')
432        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Target doc did not win using LWW")
433        obj = dest_lww.get(key='lww-0')
434        self.assertDictContainsSubset({'key2':'value2'}, obj.value, "Target doc did not win using LWW")
435        self.log.info("Target doc won using LWW as expected")
436
437        obj = src_nolww.get(key='lww-0')
438        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Src doc did not win using Rev Id")
439        obj = dest_nolww.get(key='lww-0')
440        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Src doc did not win using Rev Id")
441        self.log.info("Src doc won using Rev Id as expected")
442
443        self.verify_results(skip_verify_data=['lww','nolww'], skip_verify_revid=['lww'])
444
445    def test_seq_upd_on_bi_with_src_wins(self):
446        src_conn = RestConnection(self.c1_cluster.get_master_node())
447        dest_conn = RestConnection(self.c2_cluster.get_master_node())
448
449        self._create_buckets(bucket='lww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT)
450        self.assertTrue(src_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on source bucket")
451        self.log.info("LWW enabled on source bucket as expected")
452        self.assertTrue(dest_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on dest bucket")
453        self.log.info("LWW enabled on dest bucket as expected")
454
455        self._create_buckets(bucket='nolww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT + 1, src_lww=False,
456                            dst_lww=False)
457        self.assertFalse(src_conn.is_lww_enabled(bucket='nolww'), "LWW enabled on source bucket")
458        self.log.info("LWW not enabled on source bucket as expected")
459        self.assertFalse(dest_conn.is_lww_enabled(bucket='nolww'), "LWW enabled on dest bucket")
460        self.log.info("LWW not enabled on dest bucket as expected")
461
462        self.sleep(10)
463
464        self.setup_xdcr()
465        self.merge_all_buckets()
466        self.c1_cluster.pause_all_replications_by_id()
467        self.c2_cluster.pause_all_replications_by_id()
468
469        self.sleep(30)
470
471        src_lww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'lww', self.c1_cluster)
472        self.sleep(10)
473        src_nolww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'nolww', self.c1_cluster)
474        self.sleep(10)
475        dest_lww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'lww', self.c2_cluster)
476        self.sleep(10)
477        dest_nolww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'nolww', self.c2_cluster)
478        self.sleep(10)
479
480        gen = DocumentGenerator('lww', '{{"key":"value"}}', xrange(100), start=0, end=1)
481        self.c2_cluster.load_all_buckets_from_generator(gen)
482        self.sleep(10)
483        self._upsert(conn=dest_lww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
484        self.sleep(10)
485        self._upsert(conn=dest_nolww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
486        self.sleep(10)
487        gen = DocumentGenerator('lww', '{{"key2":"value2"}}', xrange(100), start=0, end=1)
488        self.c1_cluster.load_all_buckets_from_generator(gen)
489        self.sleep(10)
490
491        self.c1_cluster.resume_all_replications_by_id()
492        self.c2_cluster.resume_all_replications_by_id()
493        self._wait_for_replication_to_catchup()
494
495        obj = src_lww.get(key='lww-0')
496        self.assertDictContainsSubset({'key2':'value2'}, obj.value, "Src doc did not win using LWW")
497        obj = dest_lww.get(key='lww-0')
498        self.assertDictContainsSubset({'key2':'value2'}, obj.value, "Src doc did not win using LWW")
499        self.log.info("Src doc won using LWW as expected")
500
501        obj = dest_nolww.get(key='lww-0')
502        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Target doc did not win using Rev Id")
503        obj = src_nolww.get(key='lww-0')
504        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Target doc did not win using Rev Id")
505        self.log.info("Target doc won using Rev Id as expected")
506
507        self.verify_results(skip_verify_data=['nolww'])
508
509    def test_seq_upd_on_bi_with_dest_wins(self):
510        src_conn = RestConnection(self.c1_cluster.get_master_node())
511        dest_conn = RestConnection(self.c2_cluster.get_master_node())
512
513        self._create_buckets(bucket='lww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT)
514        self.assertTrue(src_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on source bucket")
515        self.log.info("LWW enabled on source bucket as expected")
516        self.assertTrue(dest_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on dest bucket")
517        self.log.info("LWW enabled on dest bucket as expected")
518
519        self._create_buckets(bucket='nolww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT + 1, src_lww=False,
520                            dst_lww=False)
521        self.assertFalse(src_conn.is_lww_enabled(bucket='nolww'), "LWW enabled on source bucket")
522        self.log.info("LWW not enabled on source bucket as expected")
523        self.assertFalse(dest_conn.is_lww_enabled(bucket='nolww'), "LWW enabled on dest bucket")
524        self.log.info("LWW not enabled on dest bucket as expected")
525
526        self.sleep(10)
527
528        self.setup_xdcr()
529        self.merge_all_buckets()
530        self.c1_cluster.pause_all_replications_by_id()
531        self.c2_cluster.pause_all_replications_by_id()
532
533        self.sleep(30)
534
535        src_lww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'lww', self.c1_cluster)
536        self.sleep(10)
537        src_nolww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'nolww', self.c1_cluster)
538        self.sleep(10)
539        dest_lww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'lww', self.c2_cluster)
540        self.sleep(10)
541        dest_nolww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'nolww', self.c2_cluster)
542        self.sleep(10)
543
544        gen = DocumentGenerator('lww', '{{"key":"value"}}', xrange(100), start=0, end=1)
545        self.c1_cluster.load_all_buckets_from_generator(gen)
546        self.sleep(10)
547        self._upsert(conn=src_lww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
548        self.sleep(10)
549        self._upsert(conn=src_nolww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
550        self.sleep(10)
551        gen = DocumentGenerator('lww', '{{"key2":"value2"}}', xrange(100), start=0, end=1)
552        self.c2_cluster.load_all_buckets_from_generator(gen)
553        self.sleep(10)
554
555        self.c1_cluster.resume_all_replications_by_id()
556        self.c2_cluster.resume_all_replications_by_id()
557        self._wait_for_replication_to_catchup()
558
559        obj = src_lww.get(key='lww-0')
560        self.assertDictContainsSubset({'key2':'value2'}, obj.value, "Target doc did not win using LWW")
561        obj = dest_lww.get(key='lww-0')
562        self.assertDictContainsSubset({'key2':'value2'}, obj.value, "Target doc did not win using LWW")
563        self.log.info("Target doc won using LWW as expected")
564
565        obj = src_nolww.get(key='lww-0')
566        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Src doc did not win using Rev Id")
567        obj = dest_nolww.get(key='lww-0')
568        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Src doc did not win using Rev Id")
569        self.log.info("Src doc won using Rev Id as expected")
570
571        self.verify_results(skip_verify_data=['lww','nolww'])
572
573    def test_seq_add_del_on_bi_with_src_wins(self):
574        src_conn = RestConnection(self.c1_cluster.get_master_node())
575        dest_conn = RestConnection(self.c2_cluster.get_master_node())
576
577        self._create_buckets(bucket='lww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT)
578        self.assertTrue(src_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on source bucket")
579        self.log.info("LWW enabled on source bucket as expected")
580        self.assertTrue(dest_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on dest bucket")
581        self.log.info("LWW enabled on dest bucket as expected")
582
583        self.sleep(10)
584
585        self.setup_xdcr()
586        self.merge_all_buckets()
587
588        gen = DocumentGenerator('lww', '{{"key":"value"}}', xrange(100), start=0, end=1)
589        self.c1_cluster.load_all_buckets_from_generator(gen)
590        self._wait_for_replication_to_catchup()
591
592        self.c1_cluster.pause_all_replications_by_id()
593        self.c2_cluster.pause_all_replications_by_id()
594
595        self.sleep(30)
596
597        src_lww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'lww', self.c1_cluster)
598        self.sleep(10)
599        dest_lww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'lww', self.c2_cluster)
600        self.sleep(10)
601
602        dest_lww.remove(key='lww-0')
603        self.sleep(10)
604        self._upsert(conn=src_lww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
605
606        self.c1_cluster.resume_all_replications_by_id()
607        self.c2_cluster.resume_all_replications_by_id()
608        self._wait_for_replication_to_catchup()
609
610        obj = src_lww.get(key='lww-0')
611        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Source doc did not win using LWW")
612        obj = dest_lww.get(key='lww-0')
613        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Source doc did not win using LWW")
614        self.log.info("Source doc won using LWW as expected")
615
616        self.verify_results(skip_verify_data=['lww'])
617
618    def test_seq_add_del_on_bi_with_dest_wins(self):
619        src_conn = RestConnection(self.c1_cluster.get_master_node())
620        dest_conn = RestConnection(self.c2_cluster.get_master_node())
621
622        self._create_buckets(bucket='lww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT)
623        self.assertTrue(src_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on source bucket")
624        self.log.info("LWW enabled on source bucket as expected")
625        self.assertTrue(dest_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on dest bucket")
626        self.log.info("LWW enabled on dest bucket as expected")
627
628        self.sleep(10)
629
630        self.setup_xdcr()
631        self.merge_all_buckets()
632
633        gen = DocumentGenerator('lww', '{{"key":"value"}}', xrange(100), start=0, end=1)
634        self.c1_cluster.load_all_buckets_from_generator(gen)
635        self._wait_for_replication_to_catchup()
636
637        self.c1_cluster.pause_all_replications_by_id()
638        self.c2_cluster.pause_all_replications_by_id()
639
640        self.sleep(30)
641
642        src_lww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'lww', self.c1_cluster)
643        self.sleep(10)
644        dest_lww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'lww', self.c2_cluster)
645        self.sleep(10)
646
647        self._upsert(conn=src_lww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
648        self.sleep(10)
649        dest_lww.remove(key='lww-0')
650
651        self.c1_cluster.resume_all_replications_by_id()
652        self.c2_cluster.resume_all_replications_by_id()
653        self._wait_for_replication_to_catchup()
654
655        try:
656            obj = src_lww.get(key='lww-0')
657            if obj:
658                self.fail("Doc not deleted in src cluster using LWW")
659        except NotFoundError:
660            self.log.info("Doc deleted in src cluster using LWW as expected")
661
662        try:
663            obj = dest_lww.get(key='lww-0')
664            if obj:
665                self.fail("Doc not deleted in target cluster using LWW")
666        except NotFoundError:
667            self.log.info("Doc deleted in target cluster using LWW as expected")
668
669        # TODO - figure out how to verify results in this case
670        # self.verify_results(skip_verify_data=['lww'])
671
672    def test_seq_upd_on_uni_with_lww_disabled_target_and_src_wins(self):
673        src_conn = RestConnection(self.c1_cluster.get_master_node())
674        dest_conn = RestConnection(self.c2_cluster.get_master_node())
675
676        self._create_buckets(bucket='default', ramQuotaMB=100, src_lww=True, dst_lww=False)
677        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
678        self.log.info("LWW enabled on source bucket as expected")
679        self.assertFalse(dest_conn.is_lww_enabled(), "LWW enabled on dest bucket")
680        self.log.info("LWW not enabled on dest bucket as expected")
681
682        try:
683            self.setup_xdcr()
684        except Exception as e:
685            self.assertTrue("Replication between buckets with different ConflictResolutionType setting is not allowed" in str(e),
686                            "ConflictResolutionType mismatch message not thrown as expected")
687            self.log.info("ConflictResolutionType mismatch message thrown as expected")
688
689    def test_seq_upd_on_uni_with_lww_disabled_source_and_target_wins(self):
690        src_conn = RestConnection(self.c1_cluster.get_master_node())
691        dest_conn = RestConnection(self.c2_cluster.get_master_node())
692
693        self._create_buckets(bucket='default', ramQuotaMB=100, src_lww=False, dst_lww=True)
694        self.assertFalse(src_conn.is_lww_enabled(), "LWW enabled on source bucket")
695        self.log.info("LWW not enabled on source bucket as expected")
696        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
697        self.log.info("LWW enabled on dest bucket as expected")
698        self.sleep(10)
699
700        try:
701            self.setup_xdcr()
702        except Exception as e:
703            self.assertTrue("Replication between buckets with different ConflictResolutionType setting is not allowed" in str(e),
704                            "ConflictResolutionType mismatch message not thrown as expected")
705            self.log.info("ConflictResolutionType mismatch message thrown as expected")
706
707    def test_seq_upd_on_bi_with_lww_disabled_on_both_clusters(self):
708        src_conn = RestConnection(self.c1_cluster.get_master_node())
709        dest_conn = RestConnection(self.c2_cluster.get_master_node())
710
711        self._create_buckets(bucket='default', ramQuotaMB=100, src_lww=False, dst_lww=False)
712        self.assertFalse(src_conn.is_lww_enabled(), "LWW enabled on source bucket")
713        self.log.info("LWW not enabled on source bucket as expected")
714        self.assertFalse(dest_conn.is_lww_enabled(), "LWW enabled on dest bucket")
715        self.log.info("LWW not enabled on dest bucket as expected")
716
717        self.sleep(10)
718
719        self.setup_xdcr()
720        self.merge_all_buckets()
721
722        self.c1_cluster.pause_all_replications_by_id()
723        self.c2_cluster.pause_all_replications_by_id()
724
725        self.sleep(30)
726
727        src_def = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'default', self.c1_cluster)
728        self.sleep(10)
729        dst_def = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'default', self.c2_cluster)
730        self.sleep(10)
731
732        gen = DocumentGenerator('lww', '{{"key":"value"}}', xrange(100), start=0, end=1)
733        self.c1_cluster.load_all_buckets_from_generator(gen)
734        self.sleep(10)
735        self._upsert(conn=src_def, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
736        self.sleep(10)
737        gen = DocumentGenerator('lww', '{{"key":"value"}}', xrange(100), start=0, end=1)
738        self.c2_cluster.load_all_buckets_from_generator(gen)
739        self.sleep(10)
740        self._upsert(conn=dst_def, doc_id='lww-0', old_key='key', new_key='key2', new_val='value2')
741        self.sleep(10)
742
743        self.c1_cluster.resume_all_replications_by_id()
744        self.c2_cluster.resume_all_replications_by_id()
745        self._wait_for_replication_to_catchup()
746
747        obj = src_def.get(key='lww-0')
748        self.assertDictContainsSubset({'key2':'value2'}, obj.value, "Doc with greater rev id did not win")
749        obj = dst_def.get(key='lww-0')
750        self.assertDictContainsSubset({'key2':'value2'}, obj.value, "Doc with greater rev id did not win")
751        self.log.info("Doc with greater rev id won as expected")
752
753        self.verify_results(skip_verify_data=['default'])
754
755    def test_seq_upd_on_uni_with_src_failover(self):
756        src_conn = RestConnection(self.c1_cluster.get_master_node())
757        dest_conn = RestConnection(self.c2_cluster.get_master_node())
758
759        self._create_buckets(bucket='lww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT)
760        self.assertTrue(src_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on source bucket")
761        self.log.info("LWW enabled on source bucket as expected")
762        self.assertTrue(dest_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on dest bucket")
763        self.log.info("LWW enabled on dest bucket as expected")
764
765        self._create_buckets(bucket='nolww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT + 1, src_lww=False,
766                            dst_lww=False)
767        self.assertFalse(src_conn.is_lww_enabled(bucket='nolww'), "LWW enabled on source bucket")
768        self.log.info("LWW not enabled on source bucket as expected")
769        self.assertFalse(dest_conn.is_lww_enabled(bucket='nolww'), "LWW enabled on dest bucket")
770        self.log.info("LWW not enabled on dest bucket as expected")
771
772        self.sleep(10)
773
774        self.setup_xdcr()
775        self.merge_all_buckets()
776
777        gen = DocumentGenerator('lww', '{{"key":"value"}}', xrange(100), start=0, end=1)
778        self.c2_cluster.load_all_buckets_from_generator(gen)
779
780        self.c1_cluster.pause_all_replications_by_id()
781
782        self.sleep(30)
783
784        dest_lww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'lww', self.c2_cluster)
785        self.sleep(10)
786        dest_nolww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'nolww', self.c2_cluster)
787        self.sleep(10)
788
789        self.c1_cluster.failover_and_rebalance_master(graceful=True, rebalance=True)
790
791        self._upsert(conn=dest_lww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
792        self._upsert(conn=dest_nolww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
793
794        gen = DocumentGenerator('lww', '{{"key3":"value3"}}', xrange(100), start=0, end=1)
795        self.c1_cluster.load_all_buckets_from_generator(gen)
796
797        self.c1_cluster.resume_all_replications_by_id()
798        self._wait_for_replication_to_catchup()
799
800        src_lww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'lww', self.c1_cluster)
801        src_nolww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'nolww', self.c1_cluster)
802
803        obj = src_lww.get(key='lww-0')
804        self.assertDictContainsSubset({'key3':'value3'}, obj.value, "Src doc did not win using LWW")
805        obj = dest_lww.get(key='lww-0')
806        self.assertDictContainsSubset({'key3':'value3'}, obj.value, "Src doc did not win using LWW")
807        self.log.info("Src doc won using LWW as expected")
808
809        obj = src_nolww.get(key='lww-0')
810        self.assertDictContainsSubset({'key3':'value3'}, obj.value, "Target doc did not win using Rev Id")
811        obj = dest_nolww.get(key='lww-0')
812        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Target doc did not win using Rev Id")
813        self.log.info("Target doc won using Rev Id as expected")
814
815        self.verify_results(skip_verify_data=['nolww'], skip_verify_revid=['nolww'])
816
817    def test_seq_upd_on_uni_with_src_rebalance(self):
818        src_conn = RestConnection(self.c1_cluster.get_master_node())
819        dest_conn = RestConnection(self.c2_cluster.get_master_node())
820
821        self._create_buckets(bucket='lww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT)
822        self.assertTrue(src_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on source bucket")
823        self.log.info("LWW enabled on source bucket as expected")
824        self.assertTrue(dest_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on dest bucket")
825        self.log.info("LWW enabled on dest bucket as expected")
826
827        self._create_buckets(bucket='nolww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT + 1, src_lww=False,
828                            dst_lww=False)
829        self.assertFalse(src_conn.is_lww_enabled(bucket='nolww'), "LWW enabled on source bucket")
830        self.log.info("LWW not enabled on source bucket as expected")
831        self.assertFalse(dest_conn.is_lww_enabled(bucket='nolww'), "LWW enabled on dest bucket")
832        self.log.info("LWW not enabled on dest bucket as expected")
833
834        self.sleep(10)
835
836        self.setup_xdcr()
837        self.merge_all_buckets()
838
839        gen = DocumentGenerator('lww', '{{"key":"value"}}', xrange(100), start=0, end=1)
840        self.c2_cluster.load_all_buckets_from_generator(gen)
841
842        self.c1_cluster.pause_all_replications_by_id()
843
844        self.sleep(30)
845
846        dest_lww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'lww', self.c2_cluster)
847        self.sleep(10)
848        dest_nolww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'nolww', self.c2_cluster)
849        self.sleep(10)
850
851        self.c1_cluster.rebalance_out_master()
852
853        self._upsert(conn=dest_lww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
854        self._upsert(conn=dest_nolww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
855
856        gen = DocumentGenerator('lww', '{{"key3":"value3"}}', xrange(100), start=0, end=1)
857        self.c1_cluster.load_all_buckets_from_generator(gen)
858
859        self.c1_cluster.resume_all_replications_by_id()
860        self._wait_for_replication_to_catchup()
861
862        self.sleep(30)
863
864        src_lww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'lww', self.c1_cluster)
865        self.sleep(10)
866        src_nolww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'nolww', self.c1_cluster)
867        self.sleep(10)
868
869        obj = src_lww.get(key='lww-0')
870        self.assertDictContainsSubset({'key3':'value3'}, obj.value, "Src doc did not win using LWW")
871        obj = dest_lww.get(key='lww-0')
872        self.assertDictContainsSubset({'key3':'value3'}, obj.value, "Src doc did not win using LWW")
873        self.log.info("Src doc won using LWW as expected")
874
875        obj = src_nolww.get(key='lww-0')
876        self.assertDictContainsSubset({'key3':'value3'}, obj.value, "Target doc did not win using Rev Id")
877        obj = dest_nolww.get(key='lww-0')
878        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Target doc did not win using Rev Id")
879        self.log.info("Target doc won using Rev Id as expected")
880
881        self.verify_results(skip_verify_data=['nolww'], skip_verify_revid=['nolww'])
882
883    def test_seq_add_del_on_bi_with_rebalance(self):
884        src_conn = RestConnection(self.c1_cluster.get_master_node())
885        dest_conn = RestConnection(self.c2_cluster.get_master_node())
886
887        self._create_buckets(bucket='lww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT)
888        self.assertTrue(src_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on source bucket")
889        self.log.info("LWW enabled on source bucket as expected")
890        self.assertTrue(dest_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on dest bucket")
891        self.log.info("LWW enabled on dest bucket as expected")
892
893        self.sleep(10)
894
895        self.setup_xdcr()
896        self.merge_all_buckets()
897
898        gen = DocumentGenerator('lww', '{{"key":"value"}}', xrange(100), start=0, end=1)
899        self.c1_cluster.load_all_buckets_from_generator(gen)
900        self._wait_for_replication_to_catchup()
901
902        self.c1_cluster.pause_all_replications_by_id()
903        self.c2_cluster.pause_all_replications_by_id()
904
905        self.sleep(30)
906
907        src_lww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'lww', self.c1_cluster)
908        self.sleep(10)
909        dest_lww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'lww', self.c2_cluster)
910        self.sleep(10)
911
912        dest_lww.remove(key='lww-0')
913        self.c2_cluster.rebalance_out_master()
914        self._upsert(conn=src_lww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
915        self.c1_cluster.rebalance_out_master()
916
917        self.c1_cluster.resume_all_replications_by_id()
918        self.c2_cluster.resume_all_replications_by_id()
919        self._wait_for_replication_to_catchup()
920
921        self.sleep(30)
922
923        src_lww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'lww', self.c1_cluster)
924        self.sleep(10)
925        dest_lww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'lww', self.c2_cluster)
926        self.sleep(10)
927
928        obj = src_lww.get(key='lww-0')
929        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Source doc did not win using LWW")
930        obj = dest_lww.get(key='lww-0')
931        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Source doc did not win using LWW")
932        self.log.info("Source doc won using LWW as expected")
933
934        self.verify_results(skip_verify_data=['lww'])
935
936    def test_seq_add_del_on_bi_with_failover(self):
937        src_conn = RestConnection(self.c1_cluster.get_master_node())
938        dest_conn = RestConnection(self.c2_cluster.get_master_node())
939
940        self._create_buckets(bucket='lww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT)
941        self.assertTrue(src_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on source bucket")
942        self.log.info("LWW enabled on source bucket as expected")
943        self.assertTrue(dest_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on dest bucket")
944        self.log.info("LWW enabled on dest bucket as expected")
945
946        self.sleep(10)
947
948        self.setup_xdcr()
949        self.merge_all_buckets()
950
951        gen = DocumentGenerator('lww', '{{"key":"value"}}', xrange(100), start=0, end=1)
952        self.c1_cluster.load_all_buckets_from_generator(gen)
953        self._wait_for_replication_to_catchup()
954
955        self.c1_cluster.pause_all_replications_by_id()
956        self.c2_cluster.pause_all_replications_by_id()
957
958        self.sleep(30)
959
960        src_lww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'lww', self.c1_cluster)
961        self.sleep(10)
962        dest_lww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'lww', self.c2_cluster)
963        self.sleep(10)
964
965        dest_lww.remove(key='lww-0')
966        self.c2_cluster.failover_and_rebalance_master(graceful=True, rebalance=True)
967        self._upsert(conn=src_lww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
968        self.c1_cluster.failover_and_rebalance_master(graceful=True, rebalance=True)
969
970        self.c1_cluster.resume_all_replications_by_id()
971        self.c2_cluster.resume_all_replications_by_id()
972        self._wait_for_replication_to_catchup()
973
974        self.sleep(30)
975
976        src_lww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'lww', self.c1_cluster)
977        self.sleep(10)
978        dest_lww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'lww', self.c2_cluster)
979        self.sleep(10)
980
981        obj = src_lww.get(key='lww-0')
982        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Source doc did not win using LWW")
983        obj = dest_lww.get(key='lww-0')
984        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Source doc did not win using LWW")
985        self.log.info("Source doc won using LWW as expected")
986
987        self.verify_results(skip_verify_data=['lww'])
988
989    def test_simult_upd_on_bi(self):
990        src_conn = RestConnection(self.c1_cluster.get_master_node())
991        dest_conn = RestConnection(self.c2_cluster.get_master_node())
992
993        self._create_buckets(bucket='lww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT)
994        self.assertTrue(src_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on source bucket")
995        self.log.info("LWW enabled on source bucket as expected")
996        self.assertTrue(dest_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on dest bucket")
997        self.log.info("LWW enabled on dest bucket as expected")
998
999        self._create_buckets(bucket='nolww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT + 1, src_lww=False,
1000                            dst_lww=False)
1001        self.assertFalse(src_conn.is_lww_enabled(bucket='nolww'), "LWW enabled on source bucket")
1002        self.log.info("LWW not enabled on source bucket as expected")
1003        self.assertFalse(dest_conn.is_lww_enabled(bucket='nolww'), "LWW enabled on dest bucket")
1004        self.log.info("LWW not enabled on dest bucket as expected")
1005
1006        self.sleep(10)
1007
1008        self.setup_xdcr()
1009        self.merge_all_buckets()
1010        self.c1_cluster.pause_all_replications_by_id()
1011        self.c2_cluster.pause_all_replications_by_id()
1012
1013        self.sleep(30)
1014
1015        src_lww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'lww', self.c1_cluster)
1016        self.sleep(10)
1017        src_nolww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'nolww', self.c1_cluster)
1018        self.sleep(10)
1019        dest_lww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'lww', self.c2_cluster)
1020        self.sleep(10)
1021        dest_nolww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'nolww', self.c2_cluster)
1022        self.sleep(10)
1023
1024        tasks = []
1025
1026        gen = DocumentGenerator('lww', '{{"key":"value"}}', xrange(100), start=0, end=1)
1027        tasks += self.c1_cluster.async_load_all_buckets_from_generator(gen)
1028
1029        gen = DocumentGenerator('lww', '{{"key2":"value2"}}', xrange(100), start=0, end=1)
1030        tasks += self.c2_cluster.async_load_all_buckets_from_generator(gen)
1031
1032        for task in tasks:
1033            task.result()
1034
1035        #update doc at C1 thrice
1036        self._upsert(conn=src_lww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
1037        self._upsert(conn=src_lww, doc_id='lww-0', old_key='key1', new_key='key2', new_val='value2')
1038        self._upsert(conn=src_lww, doc_id='lww-0', old_key='key2', new_key='key3', new_val='value3')
1039
1040        self._upsert(conn=src_nolww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
1041        self._upsert(conn=src_nolww, doc_id='lww-0', old_key='key1', new_key='key2', new_val='value2')
1042        self._upsert(conn=src_nolww, doc_id='lww-0', old_key='key2', new_key='key3', new_val='value3')
1043
1044        #update doc at C2 twice
1045        self._upsert(conn=dest_lww, doc_id='lww-0', old_key='key2', new_key='key3', new_val='value3')
1046        self._upsert(conn=dest_lww, doc_id='lww-0', old_key='key3', new_key='key4', new_val='value4')
1047
1048        self._upsert(conn=dest_nolww, doc_id='lww-0', old_key='key2', new_key='key3', new_val='value3')
1049        self._upsert(conn=dest_nolww, doc_id='lww-0', old_key='key3', new_key='key4', new_val='value4')
1050
1051        self.c1_cluster.resume_all_replications_by_id()
1052        self.c2_cluster.resume_all_replications_by_id()
1053        self._wait_for_replication_to_catchup()
1054
1055        obj = src_lww.get(key='lww-0')
1056        self.assertDictContainsSubset({'key4':'value4'}, obj.value, "Target doc did not win using LWW")
1057        obj = dest_lww.get(key='lww-0')
1058        self.assertDictContainsSubset({'key4':'value4'}, obj.value, "Target doc did not win using LWW")
1059        self.log.info("Target doc won using LWW as expected")
1060
1061        obj = dest_nolww.get(key='lww-0')
1062        self.assertDictContainsSubset({'key3':'value3'}, obj.value, "Src doc did not win using Rev Id")
1063        obj = src_nolww.get(key='lww-0')
1064        self.assertDictContainsSubset({'key3':'value3'}, obj.value, "Src doc did not win using Rev Id")
1065        self.log.info("Src doc won using Rev Id as expected")
1066
1067        self.verify_results(skip_verify_data=['lww', 'nolww'])
1068
1069    def test_lww_with_optimistic_threshold_change(self):
1070        src_conn = RestConnection(self.c1_cluster.get_master_node())
1071        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1072
1073        self._create_buckets(bucket='default', ramQuotaMB=100)
1074        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
1075        self.log.info("LWW enabled on source bucket as expected")
1076        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
1077        self.log.info("LWW enabled on dest bucket as expected")
1078
1079        self.sleep(10)
1080
1081        self.setup_xdcr()
1082        self.merge_all_buckets()
1083
1084        src_conn.set_xdcr_param('default', 'default', 'optimisticReplicationThreshold', self._optimistic_threshold)
1085
1086        self.c1_cluster.pause_all_replications_by_id()
1087
1088        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1089        self.c2_cluster.load_all_buckets_from_generator(gen1)
1090        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1091        self.c1_cluster.load_all_buckets_from_generator(gen2)
1092
1093        self.c1_cluster.resume_all_replications_by_id()
1094
1095        self.verify_results()
1096
1097    def test_lww_with_master_warmup(self):
1098        src_conn = RestConnection(self.c1_cluster.get_master_node())
1099        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1100
1101        self._create_buckets(bucket='default', ramQuotaMB=100)
1102        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
1103        self.log.info("LWW enabled on source bucket as expected")
1104        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
1105        self.log.info("LWW enabled on dest bucket as expected")
1106
1107        self.sleep(10)
1108
1109        self.setup_xdcr()
1110        self.merge_all_buckets()
1111
1112        self.sleep(self._wait_timeout)
1113
1114        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1115        self.c2_cluster.async_load_all_buckets_from_generator(gen1)
1116        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1117        self.c1_cluster.async_load_all_buckets_from_generator(gen2)
1118
1119        self.sleep(self._wait_timeout / 2)
1120
1121        NodeHelper.wait_warmup_completed([self.c1_cluster.warmup_node(master=True)])
1122
1123        self.verify_results()
1124
1125    def test_lww_with_cb_restart_at_master(self):
1126        src_conn = RestConnection(self.c1_cluster.get_master_node())
1127        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1128
1129        self._create_buckets(bucket='default', ramQuotaMB=100)
1130        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
1131        self.log.info("LWW enabled on source bucket as expected")
1132        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
1133        self.log.info("LWW enabled on dest bucket as expected")
1134
1135        self.sleep(10)
1136
1137        self.setup_xdcr()
1138        self.merge_all_buckets()
1139
1140        self.sleep(self._wait_timeout)
1141
1142        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1143        self.c2_cluster.async_load_all_buckets_from_generator(gen1)
1144        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1145        self.c1_cluster.async_load_all_buckets_from_generator(gen2)
1146
1147        self.sleep(self._wait_timeout / 2)
1148
1149        conn = RemoteMachineShellConnection(self.c1_cluster.get_master_node())
1150        conn.stop_couchbase()
1151        self.sleep(5)
1152        conn.start_couchbase()
1153        self.wait_service_started(self.c1_cluster.get_master_node())
1154        self.sleep(600, "Slepping so that vBuckets are ready and to avoid \
1155        MemcachedError: Memcached error #1 'Not found':   for vbucket :0")
1156        self.verify_results()
1157
1158    def test_lww_with_erlang_restart_at_master(self):
1159        src_conn = RestConnection(self.c1_cluster.get_master_node())
1160        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1161
1162        self._create_buckets(bucket='default', ramQuotaMB=100)
1163        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
1164        self.log.info("LWW enabled on source bucket as expected")
1165        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
1166        self.log.info("LWW enabled on dest bucket as expected")
1167
1168        self.sleep(10)
1169
1170        self.setup_xdcr()
1171        self.merge_all_buckets()
1172
1173        self.sleep(self._wait_timeout)
1174
1175        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1176        self.c2_cluster.async_load_all_buckets_from_generator(gen1)
1177        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1178        self.c1_cluster.async_load_all_buckets_from_generator(gen2)
1179
1180        self.sleep(self._wait_timeout / 2)
1181
1182        conn = RemoteMachineShellConnection(self.c1_cluster.get_master_node())
1183        conn.kill_erlang()
1184        conn.start_couchbase()
1185        self.wait_service_started(self.c1_cluster.get_master_node())
1186        self.sleep(600, "Slepping so that vBuckets are ready and to avoid \
1187        MemcachedError: Memcached error #1 'Not found':   for vbucket :0")
1188        self.verify_results()
1189
1190    def test_lww_with_memcached_restart_at_master(self):
1191        src_conn = RestConnection(self.c1_cluster.get_master_node())
1192        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1193
1194        self._create_buckets(bucket='default', ramQuotaMB=100)
1195        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
1196        self.log.info("LWW enabled on source bucket as expected")
1197        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
1198        self.log.info("LWW enabled on dest bucket as expected")
1199
1200        self.sleep(10)
1201
1202        self.setup_xdcr()
1203        self.merge_all_buckets()
1204
1205        self.sleep(self._wait_timeout)
1206
1207        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1208        self.c2_cluster.async_load_all_buckets_from_generator(gen1)
1209        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1210        self.c1_cluster.async_load_all_buckets_from_generator(gen2)
1211
1212        self.sleep(self._wait_timeout / 2)
1213
1214        conn = RemoteMachineShellConnection(self.c1_cluster.get_master_node())
1215        conn.pause_memcached()
1216        conn.unpause_memcached()
1217        self.sleep(600,"Wait such that any replication happening should get completed after memcached restart.")
1218        self.verify_results()
1219
1220    def test_seq_upd_on_bi_with_target_clock_faster(self):
1221        src_conn = RestConnection(self.c1_cluster.get_master_node())
1222        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1223
1224        self._create_buckets(bucket='lww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT)
1225        self.assertTrue(src_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on source bucket")
1226        self.log.info("LWW enabled on source bucket as expected")
1227        self.assertTrue(dest_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on dest bucket")
1228        self.log.info("LWW enabled on dest bucket as expected")
1229
1230        self._create_buckets(bucket='nolww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT + 1, src_lww=False,
1231                            dst_lww=False)
1232        self.assertFalse(src_conn.is_lww_enabled(bucket='nolww'), "LWW enabled on source bucket")
1233        self.log.info("LWW not enabled on source bucket as expected")
1234        self.assertFalse(dest_conn.is_lww_enabled(bucket='nolww'), "LWW enabled on dest bucket")
1235        self.log.info("LWW not enabled on dest bucket as expected")
1236
1237        self._offset_wall_clock(self.c2_cluster, offset_secs=3600)
1238
1239        self.sleep(10)
1240
1241        self.setup_xdcr()
1242        self.merge_all_buckets()
1243        self.c1_cluster.pause_all_replications_by_id()
1244        self.c2_cluster.pause_all_replications_by_id()
1245
1246        self.sleep(30)
1247
1248        src_lww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'lww', self.c1_cluster)
1249        self.sleep(10)
1250        src_nolww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'nolww', self.c1_cluster)
1251        self.sleep(10)
1252        dest_lww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'lww', self.c2_cluster)
1253        self.sleep(10)
1254        dest_nolww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'nolww', self.c2_cluster)
1255        self.sleep(10)
1256
1257        gen = DocumentGenerator('lww', '{{"key":"value"}}', xrange(100), start=0, end=1)
1258        self.c2_cluster.load_all_buckets_from_generator(gen)
1259        self._upsert(conn=dest_lww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
1260        self._upsert(conn=dest_nolww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
1261        gen = DocumentGenerator('lww', '{{"key2":"value2"}}', xrange(100), start=0, end=1)
1262        self.c1_cluster.load_all_buckets_from_generator(gen)
1263
1264        self.c1_cluster.resume_all_replications_by_id()
1265        self.c2_cluster.resume_all_replications_by_id()
1266        self._wait_for_replication_to_catchup()
1267
1268        obj = src_lww.get(key='lww-0')
1269        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Target doc did not win using LWW")
1270        obj = dest_lww.get(key='lww-0')
1271        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Target doc did not win using LWW")
1272        self.log.info("Target doc won using LWW as expected")
1273
1274        obj = dest_nolww.get(key='lww-0')
1275        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Target doc did not win using Rev Id")
1276        obj = src_nolww.get(key='lww-0')
1277        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Target doc did not win using Rev Id")
1278        self.log.info("Target doc won using Rev Id as expected")
1279
1280        self.verify_results(skip_verify_data=['lww','nolww'])
1281
1282        conn = RemoteMachineShellConnection(self.c1_cluster.get_master_node())
1283        conn.stop_couchbase()
1284
1285        self._enable_ntp_and_sync()
1286        self._disable_ntp()
1287
1288        conn.start_couchbase()
1289
1290    def test_seq_upd_on_bi_with_src_clock_faster(self):
1291        src_conn = RestConnection(self.c1_cluster.get_master_node())
1292        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1293
1294        self._create_buckets(bucket='lww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT)
1295        self.assertTrue(src_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on source bucket")
1296        self.log.info("LWW enabled on source bucket as expected")
1297        self.assertTrue(dest_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on dest bucket")
1298        self.log.info("LWW enabled on dest bucket as expected")
1299
1300        self._create_buckets(bucket='nolww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT + 1, src_lww=False,
1301                            dst_lww=False)
1302        self.assertFalse(src_conn.is_lww_enabled(bucket='nolww'), "LWW enabled on source bucket")
1303        self.log.info("LWW not enabled on source bucket as expected")
1304        self.assertFalse(dest_conn.is_lww_enabled(bucket='nolww'), "LWW enabled on dest bucket")
1305        self.log.info("LWW not enabled on dest bucket as expected")
1306
1307        self._offset_wall_clock(self.c1_cluster, offset_secs=3600)
1308
1309        self.sleep(10)
1310
1311        self.setup_xdcr()
1312        self.merge_all_buckets()
1313        self.c1_cluster.pause_all_replications_by_id()
1314        self.c2_cluster.pause_all_replications_by_id()
1315
1316        self.sleep(30)
1317
1318        src_lww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'lww', self.c1_cluster)
1319        self.sleep(10)
1320        src_nolww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'nolww', self.c1_cluster)
1321        self.sleep(10)
1322        dest_lww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'lww', self.c2_cluster)
1323        self.sleep(10)
1324        dest_nolww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'nolww', self.c2_cluster)
1325        self.sleep(10)
1326
1327        gen = DocumentGenerator('lww', '{{"key":"value"}}', xrange(100), start=0, end=1)
1328        self.c1_cluster.load_all_buckets_from_generator(gen)
1329        self._upsert(conn=src_lww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
1330        self._upsert(conn=src_nolww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
1331        gen = DocumentGenerator('lww', '{{"key2":"value2"}}', xrange(100), start=0, end=1)
1332        self.c2_cluster.load_all_buckets_from_generator(gen)
1333
1334        self.c1_cluster.resume_all_replications_by_id()
1335        self.c2_cluster.resume_all_replications_by_id()
1336        self._wait_for_replication_to_catchup()
1337
1338        obj = src_lww.get(key='lww-0')
1339        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Src doc did not win using LWW")
1340        obj = dest_lww.get(key='lww-0')
1341        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Src doc did not win using LWW")
1342        self.log.info("Src doc won using LWW as expected")
1343
1344        obj = dest_nolww.get(key='lww-0')
1345        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Src doc did not win using Rev Id")
1346        obj = src_nolww.get(key='lww-0')
1347        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Src doc did not win using Rev Id")
1348        self.log.info("Src doc won using Rev Id as expected")
1349
1350        self.verify_results(skip_verify_data=['lww','nolww'])
1351
1352        conn = RemoteMachineShellConnection(self.c1_cluster.get_master_node())
1353        conn.stop_couchbase()
1354
1355        self._enable_ntp_and_sync()
1356        self._disable_ntp()
1357
1358        conn.start_couchbase()
1359
1360    def test_seq_add_del_on_bi_with_target_clock_faster(self):
1361        src_conn = RestConnection(self.c1_cluster.get_master_node())
1362        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1363
1364        self._create_buckets(bucket='lww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT)
1365        self.assertTrue(src_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on source bucket")
1366        self.log.info("LWW enabled on source bucket as expected")
1367        self.assertTrue(dest_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on dest bucket")
1368        self.log.info("LWW enabled on dest bucket as expected")
1369
1370        self._offset_wall_clock(self.c2_cluster, offset_secs=3600)
1371
1372        self.sleep(10)
1373
1374        self.setup_xdcr()
1375        self.merge_all_buckets()
1376
1377        gen = DocumentGenerator('lww', '{{"key":"value"}}', xrange(100), start=0, end=1)
1378        self.c1_cluster.load_all_buckets_from_generator(gen)
1379        self._wait_for_replication_to_catchup()
1380
1381        self.c1_cluster.pause_all_replications_by_id()
1382        self.c2_cluster.pause_all_replications_by_id()
1383
1384        self.sleep(30)
1385
1386        src_lww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'lww', self.c1_cluster)
1387        self.sleep(10)
1388        dest_lww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'lww', self.c2_cluster)
1389        self.sleep(10)
1390
1391        dest_lww.remove(key='lww-0')
1392        self._upsert(conn=src_lww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
1393
1394        self.c1_cluster.resume_all_replications_by_id()
1395        self.c2_cluster.resume_all_replications_by_id()
1396        self._wait_for_replication_to_catchup()
1397
1398        try:
1399            obj = src_lww.get(key='lww-0')
1400            if obj:
1401                self.fail("Doc not deleted in src cluster using LWW")
1402        except NotFoundError:
1403            self.log.info("Doc deleted in src cluster using LWW as expected")
1404
1405        try:
1406            obj = dest_lww.get(key='lww-0')
1407            if obj:
1408                self.fail("Doc not deleted in target cluster using LWW")
1409        except NotFoundError:
1410            self.log.info("Doc deleted in target cluster using LWW as expected")
1411
1412        # TODO - figure out how to verify results in this case
1413        # self.verify_results(skip_verify_data=['lww'])
1414
1415        conn = RemoteMachineShellConnection(self.c1_cluster.get_master_node())
1416        conn.stop_couchbase()
1417
1418        self._enable_ntp_and_sync()
1419        self._disable_ntp()
1420
1421        conn.start_couchbase()
1422
1423    def test_seq_del_add_on_bi_with_target_clock_faster(self):
1424        src_conn = RestConnection(self.c1_cluster.get_master_node())
1425        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1426
1427        self._create_buckets(bucket='lww', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT)
1428        self.assertTrue(src_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on source bucket")
1429        self.log.info("LWW enabled on source bucket as expected")
1430        self.assertTrue(dest_conn.is_lww_enabled(bucket='lww'), "LWW not enabled on dest bucket")
1431        self.log.info("LWW enabled on dest bucket as expected")
1432
1433        self._offset_wall_clock(self.c2_cluster, offset_secs=3600)
1434
1435        self.sleep(10)
1436
1437        self.setup_xdcr()
1438        self.merge_all_buckets()
1439
1440        gen = DocumentGenerator('lww', '{{"key":"value"}}', xrange(100), start=0, end=1)
1441        self.c1_cluster.load_all_buckets_from_generator(gen)
1442        self._wait_for_replication_to_catchup()
1443
1444        self.c1_cluster.pause_all_replications_by_id()
1445        self.c2_cluster.pause_all_replications_by_id()
1446
1447        self.sleep(30)
1448
1449        src_lww = self._get_python_sdk_client(self.c1_cluster.get_master_node().ip, 'lww', self.c1_cluster)
1450        self.sleep(10)
1451        dest_lww = self._get_python_sdk_client(self.c2_cluster.get_master_node().ip, 'lww', self.c2_cluster)
1452        self.sleep(10)
1453
1454        self._upsert(conn=dest_lww, doc_id='lww-0', old_key='key', new_key='key1', new_val='value1')
1455        src_lww.remove(key='lww-0')
1456
1457        self.c1_cluster.resume_all_replications_by_id()
1458        self.c2_cluster.resume_all_replications_by_id()
1459        self._wait_for_replication_to_catchup()
1460
1461        obj = src_lww.get(key='lww-0')
1462        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Target doc did not win using LWW")
1463        obj = dest_lww.get(key='lww-0')
1464        self.assertDictContainsSubset({'key1':'value1'}, obj.value, "Target doc did not win using LWW")
1465        self.log.info("Target doc won using LWW as expected")
1466
1467        self.verify_results(skip_verify_data=['lww'])
1468
1469        conn = RemoteMachineShellConnection(self.c1_cluster.get_master_node())
1470        conn.stop_couchbase()
1471
1472        self._enable_ntp_and_sync()
1473        self._disable_ntp()
1474
1475        conn.start_couchbase()
1476
1477    def test_lww_with_bucket_recreate(self):
1478        src_conn = RestConnection(self.c1_cluster.get_master_node())
1479        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1480
1481        self._create_buckets(bucket='default', ramQuotaMB=100)
1482        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
1483        self.log.info("LWW enabled on source bucket as expected")
1484        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
1485        self.log.info("LWW enabled on dest bucket as expected")
1486
1487        self.c1_cluster.delete_bucket(bucket_name='default')
1488        self._create_buckets(bucket='default', ramQuotaMB=100, skip_dst=True)
1489
1490        self.sleep(10)
1491
1492        self.setup_xdcr()
1493        self.merge_all_buckets()
1494        self.c1_cluster.pause_all_replications_by_id()
1495
1496        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1497        self.c2_cluster.load_all_buckets_from_generator(gen1)
1498        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1499        self.c1_cluster.load_all_buckets_from_generator(gen2)
1500
1501        self.c1_cluster.resume_all_replications_by_id()
1502
1503        self.verify_results()
1504
1505    def test_lww_while_rebalancing_node_at_src(self):
1506        src_conn = RestConnection(self.c1_cluster.get_master_node())
1507        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1508
1509        self._create_buckets(bucket='default', ramQuotaMB=100)
1510        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
1511        self.log.info("LWW enabled on source bucket as expected")
1512        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
1513        self.log.info("LWW enabled on dest bucket as expected")
1514
1515        self.sleep(10)
1516
1517        self.setup_xdcr()
1518        self.merge_all_buckets()
1519        self.c1_cluster.pause_all_replications_by_id()
1520
1521        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1522        self.c2_cluster.load_all_buckets_from_generator(gen1)
1523        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1524        self.c1_cluster.load_all_buckets_from_generator(gen2)
1525
1526        self.c1_cluster.resume_all_replications_by_id()
1527
1528        self.async_perform_update_delete()
1529
1530        task = self.c1_cluster.async_rebalance_out()
1531        task.result()
1532
1533        FloatingServers._serverlist.append(self._input.servers[1])
1534
1535        task = self.c1_cluster.async_rebalance_in()
1536        task.result()
1537        self.sleep(300)
1538        self.verify_results()
1539
1540    def test_lww_while_failover_node_at_src(self):
1541        src_conn = RestConnection(self.c1_cluster.get_master_node())
1542        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1543
1544        self._create_buckets(bucket='default', ramQuotaMB=100)
1545        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
1546        self.log.info("LWW enabled on source bucket as expected")
1547        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
1548        self.log.info("LWW enabled on dest bucket as expected")
1549
1550        self.sleep(10)
1551
1552        self.setup_xdcr()
1553        self.merge_all_buckets()
1554        self.c1_cluster.pause_all_replications_by_id()
1555
1556        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1557        self.c2_cluster.load_all_buckets_from_generator(gen1)
1558        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1559        self.c1_cluster.load_all_buckets_from_generator(gen2)
1560
1561        self.c1_cluster.resume_all_replications_by_id()
1562
1563        self.async_perform_update_delete()
1564
1565        graceful = self._input.param("graceful", False)
1566        self.recoveryType = self._input.param("recoveryType", None)
1567        task = self.c1_cluster.async_failover(graceful=graceful)
1568        task.result()
1569
1570        self.sleep(30)
1571
1572        if self.recoveryType:
1573            server_nodes = src_conn.node_statuses()
1574            for node in server_nodes:
1575                if node.ip == self._input.servers[1].ip:
1576                    src_conn.set_recovery_type(otpNode=node.id, recoveryType=self.recoveryType)
1577                    self.sleep(30)
1578                    src_conn.add_back_node(otpNode=node.id)
1579            rebalance = self.cluster.async_rebalance(self.c1_cluster.get_nodes(), [], [])
1580            rebalance.result()
1581        self.sleep(300)
1582        self.verify_results()
1583
1584    def test_lww_with_rebalance_in_and_simult_upd_del(self):
1585        src_conn = RestConnection(self.c1_cluster.get_master_node())
1586        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1587
1588        self._create_buckets(bucket='default', ramQuotaMB=100)
1589        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
1590        self.log.info("LWW enabled on source bucket as expected")
1591        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
1592        self.log.info("LWW enabled on dest bucket as expected")
1593
1594        self.sleep(10)
1595
1596        self.setup_xdcr()
1597        self.merge_all_buckets()
1598        self.c1_cluster.pause_all_replications_by_id()
1599
1600        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1601        self.c2_cluster.load_all_buckets_from_generator(gen1)
1602        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1603        self.c1_cluster.load_all_buckets_from_generator(gen2)
1604
1605        self.c1_cluster.resume_all_replications_by_id()
1606
1607        task = self.c1_cluster.async_rebalance_in(num_nodes=1)
1608
1609        self.async_perform_update_delete()
1610
1611        task.result()
1612
1613        self._wait_for_replication_to_catchup(timeout=600)
1614
1615        self.verify_results()
1616
1617    def test_lww_with_rebalance_out_and_simult_upd_del(self):
1618        src_conn = RestConnection(self.c1_cluster.get_master_node())
1619        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1620
1621        self._create_buckets(bucket='default', ramQuotaMB=100)
1622        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
1623        self.log.info("LWW enabled on source bucket as expected")
1624        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
1625        self.log.info("LWW enabled on dest bucket as expected")
1626
1627        self.sleep(10)
1628
1629        self.setup_xdcr()
1630        self.merge_all_buckets()
1631        self.c1_cluster.pause_all_replications_by_id()
1632
1633        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1634        self.c2_cluster.load_all_buckets_from_generator(gen1)
1635        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1636        self.c1_cluster.load_all_buckets_from_generator(gen2)
1637
1638        self.c1_cluster.resume_all_replications_by_id()
1639
1640        task = self.c1_cluster.async_rebalance_out()
1641
1642        self.async_perform_update_delete()
1643
1644        task.result()
1645
1646        FloatingServers._serverlist.append(self._input.servers[1])
1647
1648        task = self.c1_cluster.async_rebalance_in()
1649        task.result()
1650
1651        self._wait_for_replication_to_catchup(timeout=1200)
1652
1653        self.verify_results()
1654
1655    def test_lww_with_failover_and_simult_upd_del(self):
1656        src_conn = RestConnection(self.c1_cluster.get_master_node())
1657        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1658
1659        self._create_buckets(bucket='default', ramQuotaMB=100)
1660        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
1661        self.log.info("LWW enabled on source bucket as expected")
1662        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
1663        self.log.info("LWW enabled on dest bucket as expected")
1664
1665        self.sleep(10)
1666
1667        self.setup_xdcr()
1668        self.merge_all_buckets()
1669        self.c1_cluster.pause_all_replications_by_id()
1670
1671        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1672        self.c2_cluster.load_all_buckets_from_generator(gen1)
1673        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1674        self.c1_cluster.load_all_buckets_from_generator(gen2)
1675
1676        self.c1_cluster.resume_all_replications_by_id()
1677
1678        graceful = self._input.param("graceful", False)
1679        self.recoveryType = self._input.param("recoveryType", None)
1680        task = self.c1_cluster.async_failover(graceful=graceful)
1681
1682        self.async_perform_update_delete()
1683
1684        task.result()
1685
1686        self.sleep(30)
1687
1688        if self.recoveryType:
1689            server_nodes = src_conn.node_statuses()
1690            for node in server_nodes:
1691                if node.ip == self._input.servers[1].ip:
1692                    src_conn.set_recovery_type(otpNode=node.id, recoveryType=self.recoveryType)
1693                    self.sleep(30)
1694                    src_conn.add_back_node(otpNode=node.id)
1695            rebalance = self.cluster.async_rebalance(self.c1_cluster.get_nodes(), [], [])
1696            rebalance.result()
1697
1698        self._wait_for_replication_to_catchup(timeout=1200)
1699
1700        self.verify_results()
1701
1702    def test_mixed_mode(self):
1703        src_conn = RestConnection(self.c1_cluster.get_master_node())
1704        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1705
1706        self._create_buckets(bucket='default', ramQuotaMB=100, src_lww=True, dst_lww=False)
1707        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
1708        self.log.info("LWW enabled on source bucket as expected")
1709        self.assertFalse(dest_conn.is_lww_enabled(), "LWW enabled on dest bucket")
1710        self.log.info("LWW not enabled on dest bucket as expected")
1711        self.sleep(10)
1712
1713        try:
1714            self.setup_xdcr()
1715        except Exception as e:
1716            self.assertTrue("Replication between buckets with different ConflictResolutionType setting is not allowed" in str(e),
1717                            "ConflictResolutionType mismatch message not thrown as expected")
1718            self.log.info("ConflictResolutionType mismatch message thrown as expected")
1719
1720    def test_lww_with_nodes_reshuffle(self):
1721        src_conn = RestConnection(self.c1_cluster.get_master_node())
1722        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1723
1724        self._create_buckets(bucket='default', ramQuotaMB=100)
1725        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
1726        self.log.info("LWW enabled on source bucket as expected")
1727        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
1728        self.log.info("LWW enabled on dest bucket as expected")
1729
1730        self.sleep(10)
1731
1732        self.setup_xdcr()
1733        self.merge_all_buckets()
1734        self.c1_cluster.pause_all_replications_by_id()
1735
1736        zones = src_conn.get_zone_names().keys()
1737        source_zone = zones[0]
1738        target_zone = "test_lww"
1739
1740        try:
1741            self.log.info("Current nodes in group {0} : {1}".format(source_zone,
1742                                                                    str(src_conn.get_nodes_in_zone(source_zone).keys())))
1743            self.log.info("Creating new zone " + target_zone)
1744            src_conn.add_zone(target_zone)
1745            self.log.info("Moving {0} to new zone {1}".format(str(src_conn.get_nodes_in_zone(source_zone).keys()),
1746                                                              target_zone))
1747            src_conn.shuffle_nodes_in_zones(["{0}".format(str(src_conn.get_nodes_in_zone(source_zone).keys()))],
1748                                            source_zone,target_zone)
1749
1750            gen = DocumentGenerator('lww-', '{{"age": {0}}}', xrange(100), start=0, end=self._num_items)
1751            self.c2_cluster.load_all_buckets_from_generator(gen)
1752            gen = DocumentGenerator('lww-', '{{"age": {0}}}', xrange(100), start=0, end=self._num_items)
1753            self.c1_cluster.load_all_buckets_from_generator(gen)
1754
1755            self.c1_cluster.resume_all_replications_by_id()
1756
1757            self._wait_for_replication_to_catchup(timeout=600)
1758        except Exception as e:
1759            self.log.info(e)
1760        finally:
1761            self.log.info("Moving {0} back to old zone {1}".format(str(src_conn.get_nodes_in_zone(source_zone).keys()),
1762                                                                   source_zone))
1763            src_conn.shuffle_nodes_in_zones(["{0}".format(str(src_conn.get_nodes_in_zone(source_zone).keys()))],
1764                                            target_zone,source_zone)
1765            self.log.info("Deleting new zone " + target_zone)
1766            src_conn.delete_zone(target_zone)
1767
1768    def test_lww_with_dst_failover_and_rebalance(self):
1769        src_conn = RestConnection(self.c1_cluster.get_master_node())
1770        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1771
1772        self._create_buckets(bucket='default', ramQuotaMB=100)
1773        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
1774        self.log.info("LWW enabled on source bucket as expected")
1775        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
1776        self.log.info("LWW enabled on dest bucket as expected")
1777
1778        self.sleep(10)
1779
1780        self.setup_xdcr()
1781        self.merge_all_buckets()
1782
1783        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1784        self.c2_cluster.async_load_all_buckets_from_generator(gen1)
1785        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1786        self.c1_cluster.async_load_all_buckets_from_generator(gen2)
1787
1788        graceful = self._input.param("graceful", False)
1789        self.recoveryType = self._input.param("recoveryType", None)
1790        task = self.c2_cluster.async_failover(graceful=graceful)
1791
1792        task.result()
1793
1794        if self.recoveryType:
1795            server_nodes = src_conn.node_statuses()
1796            for node in server_nodes:
1797                if node.ip == self._input.servers[3].ip:
1798                    dest_conn.set_recovery_type(otpNode=node.id, recoveryType=self.recoveryType)
1799                    self.sleep(30)
1800                    dest_conn.add_back_node(otpNode=node.id)
1801            rebalance = self.cluster.async_rebalance(self.c2_cluster.get_nodes(), [], [])
1802            rebalance.result()
1803
1804        self.sleep(60)
1805
1806        self._wait_for_replication_to_catchup(timeout=600)
1807
1808        self.verify_results()
1809
1810    def test_lww_with_rebooting_non_master_node(self):
1811        src_conn = RestConnection(self.c1_cluster.get_master_node())
1812        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1813
1814        self._create_buckets(bucket='default', ramQuotaMB=100)
1815        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
1816        self.log.info("LWW enabled on source bucket as expected")
1817        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
1818        self.log.info("LWW enabled on dest bucket as expected")
1819
1820        self.sleep(10)
1821
1822        self.setup_xdcr()
1823        self.merge_all_buckets()
1824
1825        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1826        self.c2_cluster.async_load_all_buckets_from_generator(gen1)
1827        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1828        self.c1_cluster.async_load_all_buckets_from_generator(gen2)
1829
1830        rebooted_node_src = self.c1_cluster.reboot_one_node(self)
1831        NodeHelper.wait_node_restarted(rebooted_node_src, self, wait_time=self._wait_timeout * 4, wait_if_warmup=True)
1832
1833        rebooted_node_dst = self.c2_cluster.reboot_one_node(self)
1834        NodeHelper.wait_node_restarted(rebooted_node_dst, self, wait_time=self._wait_timeout * 4, wait_if_warmup=True)
1835
1836        self.sleep(120)
1837
1838        ClusterOperationHelper.wait_for_ns_servers_or_assert([rebooted_node_dst], self, wait_if_warmup=True)
1839        ClusterOperationHelper.wait_for_ns_servers_or_assert([rebooted_node_src], self, wait_if_warmup=True)
1840
1841        self.verify_results()
1842
1843    def test_lww_with_firewall(self):
1844        src_conn = RestConnection(self.c1_cluster.get_master_node())
1845        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1846
1847        self._create_buckets(bucket='default', ramQuotaMB=100)
1848        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
1849        self.log.info("LWW enabled on source bucket as expected")
1850        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
1851        self.log.info("LWW enabled on dest bucket as expected")
1852
1853        self.sleep(10)
1854
1855        self.setup_xdcr()
1856        self.merge_all_buckets()
1857
1858        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1859        self.c2_cluster.async_load_all_buckets_from_generator(gen1)
1860        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1861        self.c1_cluster.async_load_all_buckets_from_generator(gen2)
1862
1863        NodeHelper.enable_firewall(self.c2_cluster.get_master_node())
1864        self.sleep(30)
1865        NodeHelper.disable_firewall(self.c2_cluster.get_master_node())
1866
1867        self.sleep(30)
1868
1869        self.verify_results()
1870
1871    def test_lww_with_node_crash_cluster(self):
1872        src_conn = RestConnection(self.c1_cluster.get_master_node())
1873        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1874
1875        self._create_buckets(bucket='default', ramQuotaMB=100)
1876        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
1877        self.log.info("LWW enabled on source bucket as expected")
1878        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
1879        self.log.info("LWW enabled on dest bucket as expected")
1880
1881        self.sleep(10)
1882
1883        self.setup_xdcr()
1884        self.merge_all_buckets()
1885
1886        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1887        self.c2_cluster.async_load_all_buckets_from_generator(gen1)
1888        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1889        self.c1_cluster.async_load_all_buckets_from_generator(gen2)
1890
1891        crashed_nodes = []
1892        crash = self._input.param("crash", "").split('-')
1893        if "C1" in crash:
1894            crashed_nodes += self.c1_cluster.get_nodes()
1895            self._kill_processes(crashed_nodes)
1896            self.sleep(30)
1897        if "C2" in crash:
1898            crashed_nodes += self.c2_cluster.get_nodes()
1899            self._kill_processes(crashed_nodes)
1900
1901        for crashed_node in crashed_nodes:
1902            self._start_cb_server(crashed_node)
1903
1904        if "C1" in crash:
1905            NodeHelper.wait_warmup_completed(self.c1_cluster.get_nodes())
1906            gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1907            self.c1_cluster.load_all_buckets_from_generator(gen1)
1908
1909        self.async_perform_update_delete()
1910
1911        if "C2" in crash:
1912            NodeHelper.wait_warmup_completed(self.c2_cluster.get_nodes())
1913
1914        self.verify_results()
1915
1916    def test_lww_with_auto_failover(self):
1917        src_conn = RestConnection(self.c1_cluster.get_master_node())
1918        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1919
1920        self._create_buckets(bucket='default', ramQuotaMB=100)
1921        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
1922        self.log.info("LWW enabled on source bucket as expected")
1923        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
1924        self.log.info("LWW enabled on dest bucket as expected")
1925
1926        self.log.info("Enabling auto failover on " + str(self.c1_cluster.get_master_node()))
1927        src_conn.update_autofailover_settings(enabled=True, timeout=30)
1928
1929        self.sleep(10)
1930
1931        self.setup_xdcr()
1932        self.merge_all_buckets()
1933        self.c1_cluster.pause_all_replications_by_id()
1934
1935        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1936        self.c2_cluster.load_all_buckets_from_generator(gen1)
1937        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1938        self.c1_cluster.load_all_buckets_from_generator(gen2)
1939
1940        self.c1_cluster.resume_all_replications_by_id()
1941
1942        self.verify_results()
1943
1944    def test_lww_with_mixed_buckets(self):
1945        src_conn = RestConnection(self.c1_cluster.get_master_node())
1946        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1947
1948        self._create_buckets(bucket='default', ramQuotaMB=100)
1949        self._create_buckets(bucket='sasl_bucket_1', ramQuotaMB=100, authType='sasl', saslPassword='password')
1950        self._create_buckets(bucket='sasl_bucket_2', ramQuotaMB=100, authType='sasl', saslPassword='password')
1951        self._create_buckets(bucket='standard_bucket_1', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT)
1952        self._create_buckets(bucket='standard_bucket_2', ramQuotaMB=100, proxyPort=STANDARD_BUCKET_PORT + 1)
1953
1954        for bucket in self.c1_cluster.get_buckets():
1955            self.assertTrue(src_conn.is_lww_enabled(bucket=bucket.name), "LWW not enabled on source bucket " + str(bucket.name))
1956            self.log.info("LWW enabled on source bucket " + str(bucket.name) + " as expected")
1957            self.assertTrue(dest_conn.is_lww_enabled(bucket=bucket.name), "LWW not enabled on source bucket " + str(bucket.name))
1958            self.log.info("LWW enabled on source bucket " + str(bucket.name) + " as expected")
1959
1960        self.sleep(10)
1961
1962        self.setup_xdcr()
1963        self.merge_all_buckets()
1964        self.c1_cluster.pause_all_replications_by_id()
1965
1966        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1967        self.c2_cluster.load_all_buckets_from_generator(gen1)
1968        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
1969        self.c1_cluster.load_all_buckets_from_generator(gen2)
1970
1971        self.c1_cluster.resume_all_replications_by_id()
1972
1973        self.verify_results()
1974
1975    def test_lww_with_diff_time_zones(self):
1976        self.c3_cluster = self.get_cb_cluster_by_name('C3')
1977
1978        src_conn = RestConnection(self.c1_cluster.get_master_node())
1979        dest_conn = RestConnection(self.c2_cluster.get_master_node())
1980        c3_conn = RestConnection(self.c3_cluster.get_master_node())
1981
1982        self._create_buckets(bucket='default', ramQuotaMB=100)
1983        c3_conn.create_bucket(bucket='default', ramQuotaMB=100, authType='none', saslPassword='', replicaNumber=1,
1984                                proxyPort=11211, replica_index=1, threadsNumber=3,
1985                                flushEnabled=1, lww=True)
1986        self.c3_cluster.add_bucket(ramQuotaMB=100, bucket='default', authType='none',
1987                                   saslPassword='', replicaNumber=1, proxyPort=11211,
1988                                   )
1989        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
1990        self.log.info("LWW enabled on source bucket as expected")
1991        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
1992        self.log.info("LWW enabled on dest bucket as expected")
1993        self.assertTrue(c3_conn.is_lww_enabled(), "LWW not enabled on c3 bucket")
1994        self.log.info("LWW enabled on c3 bucket as expected")
1995
1996        self.sleep(10)
1997
1998        self.setup_xdcr()
1999        self.merge_all_buckets()
2000        self.c1_cluster.pause_all_replications_by_id()
2001
2002        self._change_time_zone(self.c2_cluster, time_zone="America/Chicago")
2003        self._change_time_zone(self.c3_cluster, time_zone="America/New_York")
2004
2005        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
2006        self.c3_cluster.load_all_buckets_from_generator(gen1)
2007        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
2008        self.c2_cluster.load_all_buckets_from_generator(gen1)
2009        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
2010        self.c1_cluster.load_all_buckets_from_generator(gen2)
2011
2012        self.c1_cluster.resume_all_replications_by_id()
2013
2014        self.verify_results()
2015
2016    def test_lww_with_dest_shutdown(self):
2017        src_conn = RestConnection(self.c1_cluster.get_master_node())
2018        dest_conn = RestConnection(self.c2_cluster.get_master_node())
2019
2020        self._create_buckets(bucket='default', ramQuotaMB=100)
2021        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
2022        self.log.info("LWW enabled on source bucket as expected")
2023        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
2024        self.log.info("LWW enabled on dest bucket as expected")
2025
2026        self.sleep(10)
2027
2028        self.setup_xdcr()
2029        self.merge_all_buckets()
2030
2031        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
2032        self.c2_cluster.async_load_all_buckets_from_generator(gen1)
2033        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
2034        self.c1_cluster.async_load_all_buckets_from_generator(gen2)
2035
2036        crashed_nodes = self.c2_cluster.get_nodes()
2037
2038        self._kill_processes(crashed_nodes=crashed_nodes)
2039
2040        self.sleep(timeout=180)
2041
2042        for crashed_node in crashed_nodes:
2043            self._start_cb_server(crashed_node)
2044
2045        self.async_perform_update_delete()
2046
2047        NodeHelper.wait_warmup_completed(crashed_nodes)
2048
2049        self.verify_results()
2050
2051    def test_disk_full(self):
2052        src_conn = RestConnection(self.c1_cluster.get_master_node())
2053        dest_conn = RestConnection(self.c2_cluster.get_master_node())
2054
2055        self._create_buckets(bucket='default', ramQuotaMB=100)
2056        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
2057        self.log.info("LWW enabled on source bucket as expected")
2058        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
2059        self.log.info("LWW enabled on dest bucket as expected")
2060
2061        self.sleep(10)
2062
2063        self.setup_xdcr()
2064        self.merge_all_buckets()
2065        self.c1_cluster.pause_all_replications_by_id()
2066
2067        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
2068        self.c2_cluster.load_all_buckets_from_generator(gen1)
2069        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
2070        self.c1_cluster.load_all_buckets_from_generator(gen2)
2071
2072        self.c1_cluster.resume_all_replications_by_id()
2073
2074        self._wait_for_replication_to_catchup()
2075
2076        self.verify_results()
2077
2078        self.sleep(self._wait_timeout)
2079
2080        zip_file = "%s.zip" % (self._input.param("file_name", "collectInfo"))
2081        try:
2082            for node in [self.src_master, self.dest_master]:
2083                self.shell = RemoteMachineShellConnection(node)
2084                self.shell.execute_cbcollect_info(zip_file)
2085                if self.shell.extract_remote_info().type.lower() != "windows":
2086                    command = "unzip %s" % (zip_file)
2087                    output, error = self.shell.execute_command(command)
2088                    self.shell.log_command_output(output, error)
2089                    if len(error) > 0:
2090                        raise Exception("unable to unzip the files. Check unzip command output for help")
2091                    cmd = 'grep -R "Approaching full disk warning." cbcollect_info*/'
2092                    output, _ = self.shell.execute_command(cmd)
2093                else:
2094                    cmd = "curl -0 http://{1}:{2}@{0}:8091/diag 2>/dev/null | grep 'Approaching full disk warning.'".format(
2095                                                        self.src_master.ip,
2096                                                        self.src_master.rest_username,
2097                                                        self.src_master.rest_password)
2098                    output, _ = self.shell.execute_command(cmd)
2099                self.assertNotEquals(len(output), 0, "Full disk warning not generated as expected in %s" % node.ip)
2100                self.log.info("Full disk warning generated as expected in %s" % node.ip)
2101
2102                self.shell.delete_files(zip_file)
2103                self.shell.delete_files("cbcollect_info*")
2104        except Exception as e:
2105            self.log.info(e)
2106
2107    def test_lww_with_checkpoint_validation(self):
2108        src_conn = RestConnection(self.c1_cluster.get_master_node())
2109        dest_conn = RestConnection(self.c2_cluster.get_master_node())
2110
2111        self._create_buckets(bucket='default', ramQuotaMB=100)
2112        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
2113        self.log.info("LWW enabled on source bucket as expected")
2114        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
2115        self.log.info("LWW enabled on dest bucket as expected")
2116
2117        self.sleep(10)
2118
2119        self.setup_xdcr()
2120        self.merge_all_buckets()
2121        self.c1_cluster.pause_all_replications_by_id()
2122
2123        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
2124        self.c2_cluster.load_all_buckets_from_generator(gen1)
2125        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
2126        self.c1_cluster.load_all_buckets_from_generator(gen2)
2127
2128        self.c1_cluster.resume_all_replications_by_id()
2129
2130        self._wait_for_replication_to_catchup()
2131
2132        self.sleep(60)
2133
2134        vb0_node = None
2135        nodes = self.c1_cluster.get_nodes()
2136        ip = VBucketAwareMemcached(src_conn,'default').vBucketMap[0].split(':')[0]
2137        for node in nodes:
2138            if ip == node.ip:
2139                vb0_node = node
2140        if not vb0_node:
2141            raise XDCRCheckpointException("Error determining the node containing active vb0")
2142        rest_con = RestConnection(vb0_node)
2143        repl = rest_con.get_replication_for_buckets('default', 'default')
2144        try:
2145            checkpoint_record = rest_con.get_recent_xdcr_vb_ckpt(repl['id'])
2146            self.log.info("Checkpoint record : {0}".format(checkpoint_record))
2147        except Exception as e:
2148            raise XDCRCheckpointException("Error retrieving last checkpoint document - {0}".format(e))
2149
2150        self.verify_results()
2151
2152    def test_lww_with_backup_and_restore(self):
2153        src_conn = RestConnection(self.c1_cluster.get_master_node())
2154        dest_conn = RestConnection(self.c2_cluster.get_master_node())
2155
2156        self._create_buckets(bucket='default', ramQuotaMB=100)
2157        self.assertTrue(src_conn.is_lww_enabled(), "LWW not enabled on source bucket")
2158        self.log.info("LWW enabled on source bucket as expected")
2159
2160        backup_host_conn = RemoteMachineShellConnection(self._input.servers[6])
2161        output, error = backup_host_conn.execute_command("cbbackupmgr config --archive /data/lww-backup --repo lww")
2162        backup_host_conn.log_command_output(output, error)
2163        output, error = backup_host_conn.execute_command("cbbackupmgr backup --archive /data/lww-backup --repo lww "
2164                                                         "--host couchbase://{0} --username Administrator "
2165                                                         "--password password".format(self._input.servers[0].ip))
2166        backup_host_conn.log_command_output(output, error)
2167        output, error = backup_host_conn.execute_command("cbbackupmgr restore --archive /data/lww-backup --repo lww "
2168                                                         "--host couchbase://{0} --username Administrator "
2169                                                         "--password password".format(self._input.servers[2].ip))
2170        backup_host_conn.log_command_output(output, error)
2171
2172        self.assertTrue(dest_conn.is_lww_enabled(), "LWW not enabled on dest bucket")
2173        self.log.info("LWW enabled on dest bucket as expected")
2174
2175        self.sleep(10)
2176
2177        self.setup_xdcr()
2178        self.merge_all_buckets()
2179        self.c1_cluster.pause_all_replications_by_id()
2180
2181        gen1 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
2182        self.c2_cluster.load_all_buckets_from_generator(gen1)
2183        gen2 = BlobGenerator("lww-", "lww-", self._value_size, end=self._num_items)
2184        self.c1_cluster.load_all_buckets_from_generator(gen2)
2185
2186        self.c1_cluster.resume_all_replications_by_id()
2187
2188        self._wait_for_replication_to_catchup()
2189
2190        self.verify_results()
2191
2192    def test_lww_with_time_diff_in_src_nodes(self):
2193        src_conn = RestConnection(self.