# Copyright 2019 Nokia # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import print_function import logging import pika from cmframework.apis import cmerror from cmframework.utils import cmactivationwork class CMActivationRMQ(object): EXCHANGE = 'cmframework.activator' def __init__(self, host, port): try: self.host = host self.port = port self.connection = pika.BlockingConnection( pika.ConnectionParameters(host=self.host, port=self.port)) self.channel = self.connection.channel() self.channel.exchange_declare(exchange=CMActivationRMQ.EXCHANGE, type='direct') except Exception as exp: # pylint: disable=broad-except raise cmerror.CMError(str(exp)) class CMActivationRMQPublisher(CMActivationRMQ): def __init__(self, host, port): CMActivationRMQ.__init__(self, host, port) def send(self, work): try: data = work.serialize() self.channel.basic_publish(exchange=CMActivationRMQ.EXCHANGE, routing_key=work.get_target(), body=data) logging.debug('Sent %s to activation exchange', str(work)) except Exception as exp: # pylint: disable=broad-except self.connection.close() raise cmerror.CMError(str(exp)) class CMActivationRMQConsumer(CMActivationRMQ): class WorkConsumer(object): # pylint: disable=no-self-use, unused-argument def consume(self, work): raise cmerror.CMError('Not implemented') def __init__(self, host, port, consumer, node): CMActivationRMQ.__init__(self, host, port) self.node = node result = self.channel.queue_declare(exclusive=True) self.queue_name = result.method.queue self.channel.queue_bind(exchange=CMActivationRMQ.EXCHANGE, queue=self.queue_name, routing_key=node) self.channel.queue_bind(exchange=CMActivationRMQ.EXCHANGE, queue=self.queue_name, routing_key='all') self.channel.basic_consume(self, queue=self.queue_name, no_ack=True) self.consumer = consumer def __call__(self, ch, method, properties, body): logging.debug('Received %r', body) work = cmactivationwork.CMActivationWork() work.deserialize(body) self.consumer.consume(work) def receive(self): try: self.channel.start_consuming() except Exception as exp: # pylint: disable=broad-except self.connection.close() raise cmerror.CMError(str(exp)) def main(): import sys import argparse class MyConsumer(CMActivationRMQConsumer.WorkConsumer): def consume(self, work): print('Got work %s' % work) parser = argparse.ArgumentParser(description='Test rabbitmq activator', prog=sys.argv[0]) parser.add_argument('--role', required=True, action='store') parser.add_argument('--host', required=True, action='store') parser.add_argument('--port', required=True, type=int, action='store') parser.add_argument('--operation', required=False, type=int, action='store') parser.add_argument('--csn', required=False, type=int, action='store') parser.add_argument('--node', required=True, type=str, action='store') parser.add_argument('--properties', required=False, nargs=2, action='append') args = parser.parse_args(sys.argv[1:]) if args.role == 'publisher': if not args.operation or not args.csn or not args.properties: print('Missing options') sys.exit(1) publisher = CMActivationRMQPublisher(args.host, args.port) props = {} for prop in args.properties: props[prop[0]] = prop[1] work = cmactivationwork.CMActivationWork(args.operation, args.csn, props) publisher.send(work) elif args.role == 'consumer': myconsumer = MyConsumer() consumer = CMActivationRMQConsumer(args.host, args.port, myconsumer, args.node) consumer.receive() else: print('Invalid role %s' % args.role) sys.exit(1) if __name__ == '__main__': main()