1from multiprocessing.queues import Queue
2import sys
3from threading import Thread
4import uuid
5import time
6from mc_bin_client import MemcachedClient
7
8import memcacheConstants
9from memcached.helper.data_helper import MemcachedClientHelper
10from resourceparser import ServerInfo
11from membase.api.tap import TapConnection
12
13
14class TapListener(Thread):
15    def __init__(self, queue, server):
16        Thread.__init__(self)
17        self.queue = queue
18        self.server = server
19        self.stats = []
20        self.aborted = False
21
22    def run(self):
23        self.tap()
24
25    def callback(self, identifier, cmd, extra, key, vb, val, cas):
26
27    #        if key == 'farshid':
28    #        if cmd != 65 and cmd != 68:
29    #            print cmd
30        command_names = memcacheConstants.COMMAND_NAMES[cmd]
31        if command_names != "CMD_TAP_MUTATION":
32            print "%s: ``%s'' (vb:%d) -> (%d bytes from %s)" % (
33            memcacheConstants.COMMAND_NAMES[cmd],
34            key, vb, len(val), identifier)
35            print extra, cas
36
37
38    def tap(self):
39        print "starting tap process"
40        t = TapConnection(self.server, 11210, callback=self.callback, clientId=str(uuid.uuid4()),
41#        opts={})
42                          opts={memcacheConstants.TAP_FLAG_BACKFILL: 0xffffffff})
43        while True and not self.aborted:
44            t.receive()
45
46sys.path.append("lib")
47sys.path.append("pytests")
48
49def tap(server, queue):
50    listen = TapListener(queue, server)
51    listen.tap()
52
53
54queue = Queue(maxsize=10000)
55
56server = ServerInfo()
57server.ip = "10.17.12.20"
58
59bucket = {'name': 'default', 'port': 11220, 'password': ''}
60#vam = VBucketAwareMemcached(RestConnection(server), bucket)
61#print vam.memcacheds
62#print vam.vBucketMap
63payload = MemcachedClientHelper.create_value('*', 10240)
64keys = ["key_%d" % (i) for i in range(4000)]
65#keys = ["key_%s_%d" % (str(uuid.uuid4()), i) for i in range(4)]
66total_size = 0
67#mc = MemcachedClientHelper.create_memcached_client("172.16.75.128","default",11210,"default")
68mc = MemcachedClient("10.17.12.20", 11210)
69#for key in keys:
70#    vam.memcached(key).set(key, 1, 0, payload)
71#    total_size += len(key) + len(payload) + 200
72#time.sleep(10)
73#for i in range(0,1023):
74#    mc.set_vbucket_state(i, 'active')
75new_thread = TapListener(queue, server)
76new_thread.start()
77
78
79i = 0
80while i < 4000:
81    for key in keys:
82    #    vam.memcached(key).get(key)
83        mc.set(key, 10, 0, payload, vbucket=0)
84#    for key in keys:
85    #    vam.memcached(key).get(key)
86#        mc.set(key, 1, 0, payload, vbucket=0)
87        try:
88            a,b,c = mc.get(key, vbucket=0)
89#            print c
90        except:
91            pass
92    i += 1
93#    print i
94
95
96
97#for key in keys:
98#    vam.memcached(key).get(key)
99#    mc.set(key, 1, 0, payload, vbucket=0)
100#    mc.get(key, vbucket=0)
101
102#for key in keys:
103#    vam.memcached(key).get(key)
104#    mc.delete(key,vbucket=0)
105
106time.sleep(10)
107
108#    vam.memcached(key).delete(key)
109#vam.done()
110new_thread.aborted = True
111time.sleep(30)
112new_thread.join()
113print "total_size", total_size
114#reader = Process(target=tap, args=(server, queue))
115#reader.start()
116#time.sleep(10)
117#keys = []
118#keys_count = 0
119#was_empty = 0
120#while was_empty < 50:
121#    try:
122#        key = queue.get(False, 5)
123#
124#        keys_count += 1
125#        print key
126#        keys.append(key)
127#    except Empty:
128#        print "exception thrown"
129#        print "how many keys ? {0}".format(keys_count)
130#        was_empty += 1
131#
132#reader.terminate()
133#
134
135
136
137
138
139