19cd578e3a9cdda5ce08a9fbe53f197d7ce8d1cf
[ta/config-manager.git] / cmactivationrmq.py
1 # Copyright 2019 Nokia
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 from __future__ import print_function
15 import logging
16 import pika
17
18 from cmframework.apis import cmerror
19 from cmframework.utils import cmactivationwork
20
21
22 class CMActivationRMQ(object):
23     EXCHANGE = 'cmframework.activator'
24
25     def __init__(self, host, port):
26         try:
27             self.host = host
28             self.port = port
29             self.connection = pika.BlockingConnection(
30                 pika.ConnectionParameters(host=self.host, port=self.port))
31             self.channel = self.connection.channel()
32             self.channel.exchange_declare(exchange=CMActivationRMQ.EXCHANGE, type='direct')
33         except Exception as exp:  # pylint: disable=broad-except
34             raise cmerror.CMError(str(exp))
35
36
37 class CMActivationRMQPublisher(CMActivationRMQ):
38     def __init__(self, host, port):
39         CMActivationRMQ.__init__(self, host, port)
40
41     def send(self, work):
42         try:
43             data = work.serialize()
44             self.channel.basic_publish(exchange=CMActivationRMQ.EXCHANGE,
45                                        routing_key=work.get_target(),
46                                        body=data)
47             logging.debug('Sent %s to activation exchange', str(work))
48         except Exception as exp:  # pylint: disable=broad-except
49             self.connection.close()
50             raise cmerror.CMError(str(exp))
51
52
53 class CMActivationRMQConsumer(CMActivationRMQ):
54     class WorkConsumer(object):
55         # pylint: disable=no-self-use, unused-argument
56         def consume(self, work):
57             raise cmerror.CMError('Not implemented')
58
59     def __init__(self, host, port, consumer, node):
60         CMActivationRMQ.__init__(self, host, port)
61         self.node = node
62         result = self.channel.queue_declare(exclusive=True)
63         self.queue_name = result.method.queue
64         self.channel.queue_bind(exchange=CMActivationRMQ.EXCHANGE, queue=self.queue_name,
65                                 routing_key=node)
66         self.channel.queue_bind(exchange=CMActivationRMQ.EXCHANGE, queue=self.queue_name,
67                                 routing_key='all')
68         self.channel.basic_consume(self,
69                                    queue=self.queue_name,
70                                    no_ack=True)
71         self.consumer = consumer
72
73     def __call__(self, ch, method, properties, body):
74         logging.debug('Received %r', body)
75         work = cmactivationwork.CMActivationWork()
76         work.deserialize(body)
77         self.consumer.consume(work)
78
79     def receive(self):
80         try:
81             self.channel.start_consuming()
82         except Exception as exp:  # pylint: disable=broad-except
83             self.connection.close()
84             raise cmerror.CMError(str(exp))
85
86
87 def main():
88     import sys
89     import argparse
90
91     class MyConsumer(CMActivationRMQConsumer.WorkConsumer):
92         def consume(self, work):
93             print('Got work %s' % work)
94
95     parser = argparse.ArgumentParser(description='Test rabbitmq activator', prog=sys.argv[0])
96     parser.add_argument('--role', required=True, action='store')
97     parser.add_argument('--host', required=True, action='store')
98     parser.add_argument('--port', required=True, type=int, action='store')
99     parser.add_argument('--operation', required=False, type=int, action='store')
100     parser.add_argument('--csn', required=False, type=int, action='store')
101     parser.add_argument('--node', required=True, type=str, action='store')
102     parser.add_argument('--properties', required=False, nargs=2, action='append')
103     args = parser.parse_args(sys.argv[1:])
104     if args.role == 'publisher':
105         if not args.operation or not args.csn or not args.properties:
106             print('Missing options')
107             sys.exit(1)
108         publisher = CMActivationRMQPublisher(args.host, args.port)
109         props = {}
110         for prop in args.properties:
111             props[prop[0]] = prop[1]
112         work = cmactivationwork.CMActivationWork(args.operation, args.csn, props)
113         publisher.send(work)
114     elif args.role == 'consumer':
115         myconsumer = MyConsumer()
116         consumer = CMActivationRMQConsumer(args.host, args.port, myconsumer, args.node)
117         consumer.receive()
118     else:
119         print('Invalid role %s' % args.role)
120         sys.exit(1)
121
122
123 if __name__ == '__main__':
124     main()