xref: /trunk/testrunner/pysystests/app/config.py (revision b95e9814)
1
2from kombu import Exchange, Queue
3from datetime import timedelta
4from celery.task.schedules import crontab
5import uuid
6import testcfg as cfg
7
8
9class BaseConfig(object):
10    def __init__(self, types):
11
12        self.BROKER_URL = 'librabbitmq://'+cfg.RABBITMQ_IP+'/'+cfg.CB_CLUSTER_TAG
13        self.CELERY_ACKS_LATE = True
14        self.CELERYD_PREFETCH_MULTIPLIER = 1
15        self.CELERY_TASK_SERIALIZER = 'pickle'
16        self.CELERY_DISABLE_RATE_LIMITS = True
17        self.CELERY_TASK_RESULT_EXPIRES = 5
18        self.CELERY_DEFAULT_EXCHANGE = 'default'
19        self.CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
20        self.CELERY_DEFAULT_ROUTING_KEY = 'default'
21        self.CB_CLUSTER_TAG = cfg.CB_CLUSTER_TAG
22        self.CELERY_DEFAULT_QUEUE =  cfg.CB_CLUSTER_TAG
23        self.CELERYBEAT_SCHEDULE = {
24            'systest_manager': {
25                'task': 'app.systest_manager.systestManager',
26                'schedule': timedelta(seconds=3),
27                'args' : ('systest_manager_'+cfg.CB_CLUSTER_TAG,)
28            },
29        }
30
31
32        default_ex = Exchange(self.CELERY_DEFAULT_EXCHANGE,
33                              routing_key = self.CB_CLUSTER_TAG,
34                              auto_delete = True,
35                              durable = True)
36
37        self.CELERY_QUEUES = (
38
39            # queue for default routing
40            Queue(self.CB_CLUSTER_TAG, default_ex, auto_delete = True),
41
42            # queue for system test-case execution
43            self.make_queue('systest_mgr_consumer', 'test.mgr', default_ex),
44
45            # queue for cluster status tasks
46            self.make_queue('cluster_status', 'cluster.status', default_ex),
47
48            self.make_queue('phase_status', 'phase.status', default_ex),
49            self.make_queue('run_phase', 'run.phase', default_ex),
50        )
51
52        self.CELERY_ROUTES = (
53            {'app.systest_manager.systestManager'  : self.route_args('systest_mgr_consumer', 'test.mgr') },
54            {'app.systest_manager.get_phase_status'  : self.route_args('phase_status', 'phase.status') },
55            {'app.systest_manager.runPhase'  : self.route_args('run_phase', 'run.phase') },
56            {'app.workload_manager.updateClusterStatus'  : self.route_args('cluster_status', 'cluster.status') },
57        )
58
59        for type_ in types:
60            if type_ == "kv" or type_ == "all":
61                self.add_kvconfig()
62                self.add_kv_ops_manager()
63                self.add_report_kv_latency()
64            if type_ == "query" or type_ == "all":
65                self.add_queryconfig()
66            if type_ == "admin" or type_ == "all":
67                self.add_adminconfig()
68
69            self.CELERYBEAT_SCHEDULE.update(
70            {
71                'update_cluster_status': {
72                    'task': 'app.workload_manager.updateClusterStatus',
73                    'schedule': timedelta(seconds=10),
74                },
75            })
76
77    def make_queue(self, queue, routing_key = None, exchange = None):
78
79        if exchange is None:
80            exchange = Exchange(self.CELERY_DEFAULT_EXCHANGE,
81                                type="direct",
82                                auto_delete = True,
83                                durable = True)
84
85        queue, routing_key = self._queue_key_bindings(queue, routing_key)
86
87        return Queue(queue,
88                     exchange,
89                     routing_key = routing_key,
90                     auto_delete = True,
91                     durable = True)
92
93
94    def route_args(self, queue, routing_key):
95
96        queue, routing_key = self._queue_key_bindings(queue, routing_key)
97        return  {
98                   'queue':  queue,
99                   'routing_key': routing_key,
100                }
101
102    def _queue_key_bindings(self, queue, key):
103        # every queue has a routing key for message delivery
104        # in the case that key is None, set it equal to queue
105        # as this is default behavior
106
107        queue = "%s_%s" % (queue, self.CB_CLUSTER_TAG)
108        routing_key = None
109        if key is None:
110            routing_key = queue
111        else:
112            routing_key = "%s.%s" % (self.CB_CLUSTER_TAG, key)
113        return queue, routing_key
114
115    def add_kvconfig(self):
116
117        direct_ex = Exchange("kv_direct", type="direct", auto_delete = True, durable = True)
118
119        self.CELERYBEAT_SCHEDULE.update(
120        {
121            'task_scheduler': {
122                'task': 'app.workload_manager.taskScheduler',
123                'schedule': timedelta(seconds=1),
124            },
125            'workload_consumer': {
126                'task': 'app.workload_manager.workloadConsumer',
127                'schedule': timedelta(seconds=2),
128                'args' : ("workload_" + self.CB_CLUSTER_TAG, "workload_template_" + self.CB_CLUSTER_TAG)
129
130            },
131            'postcondition_handler': {
132                'task': 'app.workload_manager.postcondition_handler',
133                'schedule': timedelta(seconds=2),
134            }
135        })
136
137        self.CELERY_QUEUES = self.CELERY_QUEUES +\
138        (
139            self.make_queue('delete',   'kv.delete', direct_ex),
140            self.make_queue('set',      'kv.set', direct_ex),
141            self.make_queue('get',      'kv.get', direct_ex),
142            self.make_queue('kv_consumer',     'kv.consumer', direct_ex),
143            self.make_queue('kv_scheduler',     'kv.scheduler', direct_ex),
144            self.make_queue('kv_postcondition',     'kv.postcondition', direct_ex),
145            self.make_queue('kv_prerun',     'kv.prerun', direct_ex),
146            self.make_queue('kv_postrun',     'kv.postrun', direct_ex),
147            self.make_queue('kv_queueops',     'kv.queueops', direct_ex),
148            self.make_queue('kv_task_gen',     'kv.taskgen', direct_ex),
149            self.make_queue('kv_systestrunner',     'kv.systestrunner', direct_ex),
150        )
151
152
153        self.CELERY_ROUTES = self.CELERY_ROUTES +\
154        (
155            {'app.sdk_client_tasks.mdelete': self.route_args('delete', 'kv.delete') },
156            {'app.sdk_client_tasks.mset'   : self.route_args('set', 'kv.set') },
157            {'app.sdk_client_tasks.mget'  : self.route_args('get', 'kv.get') },
158            {'app.workload_manager.workloadConsumer' : self.route_args('kv_consumer', 'kv.consumer') },
159            {'app.workload_manager.taskScheduler' : self.route_args('kv_scheduler', 'kv.scheduler') },
160            {'app.workload_manager.postcondition_handler' : self.route_args('kv_postcondition', 'kv.postcondition') },
161            {'app.workload_manager.task_prerun_handler' : self.route_args('kv_prerun', 'kv.prerun') },
162            {'app.workload_manager.postrun' : self.route_args('kv_postrun', 'kv.postrun') },
163            {'app.workload_manager.queue_op_cycles' : self.route_args('kv_queueops', 'kv.queueops') },
164            {'app.workload_manager.generate_pending_tasks' : self.route_args('kv_task_gen', 'kv.taskgen') },
165            {'app.workload_manager.sysTestRunner' : self.route_args('kv_systestrunner', 'kv.systestrunner') },
166        )
167
168
169    def add_queryconfig(self):
170
171        direct_ex = Exchange("query_direct", type="direct", auto_delete = True, durable = True)
172
173        self.CELERYBEAT_SCHEDULE.update(
174        {
175            'query_consumer': {
176                'task': 'app.query.queryConsumer',
177                'schedule': timedelta(seconds=2),
178                'args' : ('query_'+self.CB_CLUSTER_TAG,)
179            },
180            'query_runner': {
181                'task': 'app.query.queryRunner',
182                'schedule': timedelta(seconds=1),
183                'args' : (10,) # no. of msgs to trigger throttling
184            },
185            'query_ops_manager': {
186                'task': 'app.query.query_ops_manager',
187                'schedule': timedelta(seconds=10), # every 10s
188                'args' : (10,) # no. of msgs to trigger throttling
189            },
190        })
191
192        self.CELERY_QUEUES = self.CELERY_QUEUES +\
193            (
194                # schedulable queue for the consumer task
195                self.make_queue('query_consumer',  'query.consumer', direct_ex),
196
197                # high performance direct exhcnage for multi_query tasks
198                self.make_queue('query_multi',  'query.multi', direct_ex),
199
200                # dedicated queues
201                self.make_queue('query_runner',  'query.runner', direct_ex),
202                self.make_queue('query_upt_builder',  'query.updateqb', direct_ex),
203                self.make_queue('query_upt_workload',  'query.updateqw', direct_ex),
204                self.make_queue('query_ops_manager',  'query.opmanager', direct_ex),
205            )
206
207        self.CELERY_ROUTES = self.CELERY_ROUTES +\
208        (   # route schedulable tasks both to same interal task queue
209            {'app.query.queryConsumer': self.route_args('query_consumer', 'query.consumer')},
210            {'app.query.queryRunner': self.route_args('query_runner', 'query.runner')},
211            {'app.query.updateQueryBuilders': self.route_args('query_upt_builder', 'query.updateqb')},
212            {'app.query.updateQueryWorkload': self.route_args('query_upt_workload', 'query.updateqw')},
213            {'app.rest_client_tasks.multi_query': self.route_args('query_multi', 'query.multi')},
214            {'app.query.query_ops_manager': self.route_args('query_ops_manager', 'query.opmanager')},
215        )
216
217    def add_adminconfig(self):
218        topic_ex  = Exchange("admin_topic", type="topic", auto_delete = True, durable = True)
219
220        self.CELERYBEAT_SCHEDULE.update(
221        {
222            'admin_consumer': {
223                'task': 'app.admin_manager.adminConsumer',
224                'schedule': timedelta(seconds=2),
225                'args' : ('admin_'+self.CB_CLUSTER_TAG,)
226            },
227             'xdcr_consumer': {
228                 'task': 'app.admin_manager.xdcrConsumer',
229                 'schedule': timedelta(seconds=2),
230                'args' : ('xdcr_'+self.CB_CLUSTER_TAG,)
231             },
232            'do_backup': { # every once per day
233                'task': 'app.admin_manager.backup_task',
234                'schedule': crontab(minute=0, hour=0), #Execute daily at midnight.
235                'args': [cfg.ENABLE_BACKUPS]
236            },
237        })
238
239        self.CELERY_QUEUES = self.CELERY_QUEUES +\
240            (
241                # schedulable queue for multiple tasks
242                self.make_queue('admin_tasks',  'admin_tasks.#', topic_ex),
243            )
244
245        self.CELERY_ROUTES = self.CELERY_ROUTES +\
246        (   # route schedulable tasks both to same interal task queue
247            {'app.admin_manager.adminConsumer':
248                self.route_args('admin_tasks', 'admin_tasks.adminconsumer')},
249            {'app.admin_manager.xdcrConsumer':
250                self.route_args('admin_tasks', 'admin_tasks.xdcrconsumer')},
251            {'app.admin_manager.backup_task':
252                self.route_args('admin_tasks', 'admin_tasks.backuptasks')},
253            {'app.rest_client_tasks.perform_admin_tasks':
254                self.route_args('admin_tasks', 'admin_tasks.performadmin')},
255            {'app.rest_client_tasks.perform_xdcr_tasks':
256                self.route_args('admin_tasks', 'admin_tasks.performxdcr')},
257        )
258
259
260    def add_kv_ops_manager(self):
261        direct_ex = Exchange("kv_ops_direct", type="direct", auto_delete = True, durable = True)
262
263        self.CELERYBEAT_SCHEDULE.update(
264        {
265            'kv_ops_manager': {
266                'task': 'app.workload_manager.kv_ops_manager',
267                'schedule': timedelta(seconds=10), # every 10s
268                'args' : (1000,) # no. of msgs to trigger throttling
269            },
270        })
271
272        self.CELERY_QUEUES = self.CELERY_QUEUES +\
273            (
274                # schedulable queue for multiple tasks
275                self.make_queue('kv_ops_mgr',  'kv.ops_mgr', direct_ex),
276            )
277
278        self.CELERY_ROUTES = self.CELERY_ROUTES +\
279        (
280            # route schedulable tasks both to same interal task queue
281            {'app.workload_manager.kv_ops_manager':
282                self.route_args('kv_ops_mgr', 'kv.ops_mgr')},
283        )
284
285    def add_report_kv_latency(self):
286        direct_ex = Exchange("kv_ops_direct", type="direct", auto_delete = True, durable = True)
287
288        self.CELERYBEAT_SCHEDULE.update(
289        {
290            'report_kv_latency': {
291                'task': 'app.workload_manager.report_kv_latency',
292                'schedule': timedelta(seconds=10), # every 10s
293            },
294        })
295
296        self.CELERY_QUEUES = self.CELERY_QUEUES +\
297            (
298                self.make_queue('kv_mng_latency',  'kv.mnglatency', direct_ex),
299                self.make_queue('kv_sdk_latency',  'kv.sdklatency', direct_ex),
300                self.make_queue('kv_mc_latency',  'kv.mclatency', direct_ex),
301            )
302
303        self.CELERY_ROUTES = self.CELERY_ROUTES +\
304        (
305            {'app.workload_manager.report_kv_latency':
306                self.route_args('kv_mng_latency', 'kv.mnglatency')},
307            {'app.sdk_client_tasks.mc_op_latency':
308                self.route_args('kv_mc_latency', 'kv.mclatency')},
309            {'app.sdk_client_tasks.sdk_op_latency':
310                self.route_args('kv_sdk_latency', 'kv.sdklatency')},
311        )
312
313