1from librabbitmq import Connection, Message
2from pyrabbit.api import Client
3import json
4import pickle
5from celery import Task
6import testcfg as cfg
7from celery.utils.log import get_task_logger
8logger = get_task_logger(__name__)
9
10class PersistedMQ(Task):
11    _conn = None
12
13    @property
14    def rabbitHelper(self):
15        if self._conn is None:
16            self._conn = RabbitHelper()
17        return self._conn
18
19    def close(self):
20        del self._conn
21        self._conn = None
22
23class RabbitHelper(object):
24    def __init__(self, mq_server = None, virtual_host = cfg.CB_CLUSTER_TAG):
25
26        if mq_server == None:
27            mq_server = cfg.RABBITMQ_IP
28
29        self.connection = Connection(host= mq_server, userid="guest", password="guest", virtual_host = virtual_host)
30        self.manager = Client(mq_server+":55672", "guest", "guest")
31
32
33    def declare(self, queue = None, durable = True):
34        res = None
35        channel = self.connection.channel()
36        if queue:
37            if not isinstance(queue,str): queue = str(queue)
38            res = channel.queue_declare(queue = queue, durable = durable, auto_delete = True)
39        else:
40            # tmp queue
41            res = channel.queue_declare(exclusive = True)
42
43        channel.close()
44        return res
45
46
47    def exchange_declare(self, exchange, type_='direct'):
48        channel = self.connection.channel()
49        channel.exchange_declare(exchange = exchange,
50                                type=type_)
51        channel.close()
52
53    def bind(self, exchange, queue):
54        channel = self.connection.channel()
55        channel.queue_bind(exchange = exchange, queue = queue)
56        channel.close()
57
58    def delete(self, queue):
59        channel = self.connection.channel()
60        if not isinstance(queue,str): queue = str(queue)
61        channel.queue_delete(queue=queue)
62        channel.close()
63
64    def purge(self, queue):
65        channel = self.connection.channel()
66        if not isinstance(queue,str): queue = str(queue)
67        channel.queue_purge(queue=queue)
68        channel.close()
69
70    def channel(self):
71        return  self.connection.channel(), self.connection
72
73
74    def qsize(self, queue):
75        size = 0
76        if queue != None:
77
78            if not isinstance(queue,str): queue = str(queue)
79
80            response = self.declare(queue = queue)
81            size = response[1]
82
83        return size
84
85    def broadcastMsg(self, routing_key, body):
86        channel = self.connection.channel()
87        rc = channel.basic_publish(exchange = '', routing_key = routing_key,  body = body)
88        channel.close()
89
90    def getExchange(self, vhost, exchange):
91        return self.manager.get_exchange(vhost, exchange)
92
93    def numExchangeQueues(self, vhost, exchange):
94
95        try:
96          ex = self.getExchange(vhost, exchange)
97          return len(ex['outgoing'])
98        except Exception:
99          return 1 # todo: sometimes the broker doesn't return expected response
100
101
102    def putMsg(self, routing_key, body, exchange = ''):
103
104        channel = self.connection.channel()
105        if not isinstance(routing_key, str): routing_key= str(routing_key)
106
107        rc = channel.basic_publish(exchange = exchange,
108                                   routing_key = routing_key,
109                                   body = body)
110        channel.close()
111
112
113    def getMsg(self, queue, no_ack = False, requeue = False):
114
115        channel = self.connection.channel()
116        message = channel.basic_get(queue = queue)
117        body = None
118
119        if message is not None:
120            body = message.body
121            # Handle data receipt acknowldegement
122            if no_ack == False:
123               message.ack()
124
125            if requeue:
126                self.putMsg(queue, body)
127
128        channel.close()
129        return body
130
131    def getJsonMsg(self, queue, no_ack = False, requeue = False):
132
133        msg = self.getMsg(queue, no_ack, requeue)
134        body = {}
135        if msg is not None:
136            try:
137                body = json.loads(msg)
138            except ValueError:
139                pass
140
141        return body
142
143    def close(self):
144        self.connection.close()
145
146    def __del__(self):
147        self.close()
148
149
150"""
151" rawTaskPublisher
152"
153" This method assembles a celery tasks and sends it to the requested broker.
154" It can be useful for special cases (i.e cli or xdcr) where sender wants to
155" trigger a task that is listening for messages from outside to perform work
156"
157" task: full task string method. this must be a string with the name of a
158"       defined method with @celery.task annotations. i.e (app.systest_manager.getWorkloadStatus)
159" args: tuple of task args  ('abc',)
160" server: broker to run task against
161" vhost: vhost in broker where task should be routed to
162" userid: userid to access broker
163" password: password to access broker
164" exchange: name of exchange in broker to send message
165" routing_key: routing key that will put message in appropriate queue.  see app.config for a map
166"              of queue's to routing keys
167"
168" example:
169"    task = "app.systest_manager.getWorkloadStatus"
170"    args = ('abc',)
171"    rabbit_helper.rawTaskPublisher(task, args, 'kv_workload_status_default',
172"                                   exchange="kv_direct",
173"                                   routing_key="default.kv.workloadstatus")
174"
175"""
176def rawTaskPublisher(task, args, queue,
177                    broker = cfg.RABBITMQ_IP,
178                    vhost = cfg.CB_CLUSTER_TAG,
179                    exchange = "",
180                    userid="guest",
181                    password="guest",
182                    routing_key = None):
183
184    # setup broker connection
185    connection = Connection(host = broker, userid=userid, password=password, virtual_host = vhost)
186    channel = connection.channel()
187
188    # construct task body
189    body={'retries': 0, 'task': task, 'errbacks': None, 'callbacks': None, 'kwargs': {},
190         'eta': None, 'args': args, 'id': 'e7cb7ff5-acd3-4060-8f7e-2ef85f810fe5',
191         'expires': None, 'utc': True}
192    header = {'exclusive': False, 'name': queue, 'headers': {}, 'durable': True, 'delivery_mode': 2,
193              'no_ack': False, 'priority': None, 'alias': None, 'queue_arguments': None,
194              'content_encoding': 'binary', 'content_type': 'application/x-python-serialize',
195              'binding_arguments': None, 'auto_delete': True}
196
197    # prepare message
198    body = pickle.dumps(body)
199    message = (body, header)
200
201    if routing_key is None:
202        routing_key = queue
203
204    # publish!
205    rc = channel.basic_publish(message, exchange = exchange,
206                               routing_key = routing_key)
207