3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 from __future__ import print_function
18 from cmframework.apis import cmerror
19 from cmframework.utils import cmactivationwork
22 class CMActivationRMQ(object):
23 EXCHANGE = 'cmframework.activator'
25 def __init__(self, host, port):
29 self.connection = pika.BlockingConnection(
30 pika.ConnectionParameters(host=self.host, port=self.port))
31 self.channel = self.connection.channel()
32 self.channel.exchange_declare(exchange=CMActivationRMQ.EXCHANGE, type='direct')
33 except Exception as exp: # pylint: disable=broad-except
34 raise cmerror.CMError(str(exp))
37 class CMActivationRMQPublisher(CMActivationRMQ):
38 def __init__(self, host, port):
39 CMActivationRMQ.__init__(self, host, port)
43 data = work.serialize()
44 self.channel.basic_publish(exchange=CMActivationRMQ.EXCHANGE,
45 routing_key=work.get_target(),
47 logging.debug('Sent %s to activation exchange', str(work))
48 except Exception as exp: # pylint: disable=broad-except
49 self.connection.close()
50 raise cmerror.CMError(str(exp))
53 class CMActivationRMQConsumer(CMActivationRMQ):
54 class WorkConsumer(object):
55 # pylint: disable=no-self-use, unused-argument
56 def consume(self, work):
57 raise cmerror.CMError('Not implemented')
59 def __init__(self, host, port, consumer, node):
60 CMActivationRMQ.__init__(self, host, port)
62 result = self.channel.queue_declare(exclusive=True)
63 self.queue_name = result.method.queue
64 self.channel.queue_bind(exchange=CMActivationRMQ.EXCHANGE, queue=self.queue_name,
66 self.channel.queue_bind(exchange=CMActivationRMQ.EXCHANGE, queue=self.queue_name,
68 self.channel.basic_consume(self,
69 queue=self.queue_name,
71 self.consumer = consumer
73 def __call__(self, ch, method, properties, body):
74 logging.debug('Received %r', body)
75 work = cmactivationwork.CMActivationWork()
76 work.deserialize(body)
77 self.consumer.consume(work)
81 self.channel.start_consuming()
82 except Exception as exp: # pylint: disable=broad-except
83 self.connection.close()
84 raise cmerror.CMError(str(exp))
91 class MyConsumer(CMActivationRMQConsumer.WorkConsumer):
92 def consume(self, work):
93 print('Got work %s' % work)
95 parser = argparse.ArgumentParser(description='Test rabbitmq activator', prog=sys.argv[0])
96 parser.add_argument('--role', required=True, action='store')
97 parser.add_argument('--host', required=True, action='store')
98 parser.add_argument('--port', required=True, type=int, action='store')
99 parser.add_argument('--operation', required=False, type=int, action='store')
100 parser.add_argument('--csn', required=False, type=int, action='store')
101 parser.add_argument('--node', required=True, type=str, action='store')
102 parser.add_argument('--properties', required=False, nargs=2, action='append')
103 args = parser.parse_args(sys.argv[1:])
104 if args.role == 'publisher':
105 if not args.operation or not args.csn or not args.properties:
106 print('Missing options')
108 publisher = CMActivationRMQPublisher(args.host, args.port)
110 for prop in args.properties:
111 props[prop[0]] = prop[1]
112 work = cmactivationwork.CMActivationWork(args.operation, args.csn, props)
114 elif args.role == 'consumer':
115 myconsumer = MyConsumer()
116 consumer = CMActivationRMQConsumer(args.host, args.port, myconsumer, args.node)
119 print('Invalid role %s' % args.role)
123 if __name__ == '__main__':