Initial commit
[ta/distributed-state-server.git] / src / dss / server / dss_connection.py
1 # Copyright 2019 Nokia
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 #
15
16 import logging
17 import json
18 import Queue
19 import socket
20 import errno
21 import time
22
23 from dss.server import dss_rpc_processor
24
25 class Connection(object):
26     def __init__(self, sock, address, rpc_processor):
27         self.rpc_processor = rpc_processor
28         self.address = address
29         self.sock = sock
30         self.fd = self.sock.makefile('rw', bufsize=0)
31         self.recvbuffer = ""
32         logging.info("Received connection from %r %r" % (address, self))
33
34     def fileno(self):
35         return self.sock.fileno()
36
37     def send_all(self, rep):
38         totalsent = 0
39         msglen = len(rep)
40         while True:
41             try:
42                 logging.info("Sending %s %r" % (rep, self))
43                 total = self.sock.send(rep[totalsent:])
44                 if total == 0:
45                     raise RuntimeError('connection closed!')
46                 totalsent += total
47                 if totalsent == msglen:
48                     logging.info("Sent successfully %r" % self)
49                     return
50             except socket.error as e:
51                 if e.argus[0] == errno.EWOULDBLOCK:
52                     time.sleep(1)
53                 else:
54                     raise
55
56     def process_recv_buffer(self, data):
57         self.recvbuffer += data
58         #check for '\n'
59         c = self.recvbuffer.count('\n')
60         parts = None
61         if c:
62             parts = self.recvbuffer.split('\n')
63             if self.recvbuffer.endswith('\n'):
64                 self.recvbuffer = ""
65             else:
66                 self.recvbuffer = parts[c]
67
68         for i in range(0, c):
69             msg = parts[i]
70             logging.debug("Received %s %r" % (msg, self))
71             # strip the req
72             msg = msg.lstrip('\x00').rstrip('\x00')
73             rep = self.rpc_processor.process(msg)
74             self.send_all(rep+'\n')
75
76
77
78
79     def recv(self):
80         data = None
81         try:
82             data = self.sock.recv(1024)
83             if not data:
84                 logging.info("Client %r %r disconnected" % (self.address, self))
85                 return False
86             self.process_recv_buffer(data)
87         except socket.error as e:
88             if e.args[0] == errno.EWOULDBLOCK:
89                 return True
90         except Exception as exp:
91             logging.warning('Failed when processing %s got exp %s' % (data, exp))
92             return False
93
94         return True
95
96     @staticmethod
97     def recv_dgram(sock, req, address, rpc_processor):
98         logging.debug("Received %s address is %r" % (req, address))
99         try:
100             # convert to json structure
101             rep = rpc_processor.process(req)
102             # add '/n'
103             sock.sendto(rep+'\n', address)
104         except Exception as exp:
105             logging.warning('Failed when processing %s got exp %s' % (req, exp))