Initial commit
[ta/config-manager.git] / cmframework / src / cmframework / utils / cmactivationrmq.py
diff --git a/cmframework/src/cmframework/utils/cmactivationrmq.py b/cmframework/src/cmframework/utils/cmactivationrmq.py
new file mode 100644 (file)
index 0000000..19cd578
--- /dev/null
@@ -0,0 +1,124 @@
+# Copyright 2019 Nokia
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import print_function
+import logging
+import pika
+
+from cmframework.apis import cmerror
+from cmframework.utils import cmactivationwork
+
+
+class CMActivationRMQ(object):
+    EXCHANGE = 'cmframework.activator'
+
+    def __init__(self, host, port):
+        try:
+            self.host = host
+            self.port = port
+            self.connection = pika.BlockingConnection(
+                pika.ConnectionParameters(host=self.host, port=self.port))
+            self.channel = self.connection.channel()
+            self.channel.exchange_declare(exchange=CMActivationRMQ.EXCHANGE, type='direct')
+        except Exception as exp:  # pylint: disable=broad-except
+            raise cmerror.CMError(str(exp))
+
+
+class CMActivationRMQPublisher(CMActivationRMQ):
+    def __init__(self, host, port):
+        CMActivationRMQ.__init__(self, host, port)
+
+    def send(self, work):
+        try:
+            data = work.serialize()
+            self.channel.basic_publish(exchange=CMActivationRMQ.EXCHANGE,
+                                       routing_key=work.get_target(),
+                                       body=data)
+            logging.debug('Sent %s to activation exchange', str(work))
+        except Exception as exp:  # pylint: disable=broad-except
+            self.connection.close()
+            raise cmerror.CMError(str(exp))
+
+
+class CMActivationRMQConsumer(CMActivationRMQ):
+    class WorkConsumer(object):
+        # pylint: disable=no-self-use, unused-argument
+        def consume(self, work):
+            raise cmerror.CMError('Not implemented')
+
+    def __init__(self, host, port, consumer, node):
+        CMActivationRMQ.__init__(self, host, port)
+        self.node = node
+        result = self.channel.queue_declare(exclusive=True)
+        self.queue_name = result.method.queue
+        self.channel.queue_bind(exchange=CMActivationRMQ.EXCHANGE, queue=self.queue_name,
+                                routing_key=node)
+        self.channel.queue_bind(exchange=CMActivationRMQ.EXCHANGE, queue=self.queue_name,
+                                routing_key='all')
+        self.channel.basic_consume(self,
+                                   queue=self.queue_name,
+                                   no_ack=True)
+        self.consumer = consumer
+
+    def __call__(self, ch, method, properties, body):
+        logging.debug('Received %r', body)
+        work = cmactivationwork.CMActivationWork()
+        work.deserialize(body)
+        self.consumer.consume(work)
+
+    def receive(self):
+        try:
+            self.channel.start_consuming()
+        except Exception as exp:  # pylint: disable=broad-except
+            self.connection.close()
+            raise cmerror.CMError(str(exp))
+
+
+def main():
+    import sys
+    import argparse
+
+    class MyConsumer(CMActivationRMQConsumer.WorkConsumer):
+        def consume(self, work):
+            print('Got work %s' % work)
+
+    parser = argparse.ArgumentParser(description='Test rabbitmq activator', prog=sys.argv[0])
+    parser.add_argument('--role', required=True, action='store')
+    parser.add_argument('--host', required=True, action='store')
+    parser.add_argument('--port', required=True, type=int, action='store')
+    parser.add_argument('--operation', required=False, type=int, action='store')
+    parser.add_argument('--csn', required=False, type=int, action='store')
+    parser.add_argument('--node', required=True, type=str, action='store')
+    parser.add_argument('--properties', required=False, nargs=2, action='append')
+    args = parser.parse_args(sys.argv[1:])
+    if args.role == 'publisher':
+        if not args.operation or not args.csn or not args.properties:
+            print('Missing options')
+            sys.exit(1)
+        publisher = CMActivationRMQPublisher(args.host, args.port)
+        props = {}
+        for prop in args.properties:
+            props[prop[0]] = prop[1]
+        work = cmactivationwork.CMActivationWork(args.operation, args.csn, props)
+        publisher.send(work)
+    elif args.role == 'consumer':
+        myconsumer = MyConsumer()
+        consumer = CMActivationRMQConsumer(args.host, args.port, myconsumer, args.node)
+        consumer.receive()
+    else:
+        print('Invalid role %s' % args.role)
+        sys.exit(1)
+
+
+if __name__ == '__main__':
+    main()