X-Git-Url: https://gerrit.akraino.org/r/gitweb?a=blobdiff_plain;f=cmframework%2Fsrc%2Fcmframework%2Futils%2Fcmactivationrmq.py;fp=cmframework%2Fsrc%2Fcmframework%2Futils%2Fcmactivationrmq.py;h=19cd578e3a9cdda5ce08a9fbe53f197d7ce8d1cf;hb=c389bdee7b3845b55f443dbf04c0ce4083a55886;hp=0000000000000000000000000000000000000000;hpb=5030f0c004701dd422c78c71c014ef60f48139fc;p=ta%2Fconfig-manager.git diff --git a/cmframework/src/cmframework/utils/cmactivationrmq.py b/cmframework/src/cmframework/utils/cmactivationrmq.py new file mode 100644 index 0000000..19cd578 --- /dev/null +++ b/cmframework/src/cmframework/utils/cmactivationrmq.py @@ -0,0 +1,124 @@ +# Copyright 2019 Nokia + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import print_function +import logging +import pika + +from cmframework.apis import cmerror +from cmframework.utils import cmactivationwork + + +class CMActivationRMQ(object): + EXCHANGE = 'cmframework.activator' + + def __init__(self, host, port): + try: + self.host = host + self.port = port + self.connection = pika.BlockingConnection( + pika.ConnectionParameters(host=self.host, port=self.port)) + self.channel = self.connection.channel() + self.channel.exchange_declare(exchange=CMActivationRMQ.EXCHANGE, type='direct') + except Exception as exp: # pylint: disable=broad-except + raise cmerror.CMError(str(exp)) + + +class CMActivationRMQPublisher(CMActivationRMQ): + def __init__(self, host, port): + CMActivationRMQ.__init__(self, host, port) + + def send(self, work): + try: + data = work.serialize() + self.channel.basic_publish(exchange=CMActivationRMQ.EXCHANGE, + routing_key=work.get_target(), + body=data) + logging.debug('Sent %s to activation exchange', str(work)) + except Exception as exp: # pylint: disable=broad-except + self.connection.close() + raise cmerror.CMError(str(exp)) + + +class CMActivationRMQConsumer(CMActivationRMQ): + class WorkConsumer(object): + # pylint: disable=no-self-use, unused-argument + def consume(self, work): + raise cmerror.CMError('Not implemented') + + def __init__(self, host, port, consumer, node): + CMActivationRMQ.__init__(self, host, port) + self.node = node + result = self.channel.queue_declare(exclusive=True) + self.queue_name = result.method.queue + self.channel.queue_bind(exchange=CMActivationRMQ.EXCHANGE, queue=self.queue_name, + routing_key=node) + self.channel.queue_bind(exchange=CMActivationRMQ.EXCHANGE, queue=self.queue_name, + routing_key='all') + self.channel.basic_consume(self, + queue=self.queue_name, + no_ack=True) + self.consumer = consumer + + def __call__(self, ch, method, properties, body): + logging.debug('Received %r', body) + work = cmactivationwork.CMActivationWork() + work.deserialize(body) + self.consumer.consume(work) + + def receive(self): + try: + self.channel.start_consuming() + except Exception as exp: # pylint: disable=broad-except + self.connection.close() + raise cmerror.CMError(str(exp)) + + +def main(): + import sys + import argparse + + class MyConsumer(CMActivationRMQConsumer.WorkConsumer): + def consume(self, work): + print('Got work %s' % work) + + parser = argparse.ArgumentParser(description='Test rabbitmq activator', prog=sys.argv[0]) + parser.add_argument('--role', required=True, action='store') + parser.add_argument('--host', required=True, action='store') + parser.add_argument('--port', required=True, type=int, action='store') + parser.add_argument('--operation', required=False, type=int, action='store') + parser.add_argument('--csn', required=False, type=int, action='store') + parser.add_argument('--node', required=True, type=str, action='store') + parser.add_argument('--properties', required=False, nargs=2, action='append') + args = parser.parse_args(sys.argv[1:]) + if args.role == 'publisher': + if not args.operation or not args.csn or not args.properties: + print('Missing options') + sys.exit(1) + publisher = CMActivationRMQPublisher(args.host, args.port) + props = {} + for prop in args.properties: + props[prop[0]] = prop[1] + work = cmactivationwork.CMActivationWork(args.operation, args.csn, props) + publisher.send(work) + elif args.role == 'consumer': + myconsumer = MyConsumer() + consumer = CMActivationRMQConsumer(args.host, args.port, myconsumer, args.node) + consumer.receive() + else: + print('Invalid role %s' % args.role) + sys.exit(1) + + +if __name__ == '__main__': + main()