3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
23 from dss.server import dss_rpc_processor
25 class Connection(object):
26 def __init__(self, sock, address, rpc_processor):
27 self.rpc_processor = rpc_processor
28 self.address = address
30 self.fd = self.sock.makefile('rw', bufsize=0)
32 logging.info("Received connection from %r %r" % (address, self))
35 return self.sock.fileno()
37 def send_all(self, rep):
42 logging.info("Sending %s %r" % (rep, self))
43 total = self.sock.send(rep[totalsent:])
45 raise RuntimeError('connection closed!')
47 if totalsent == msglen:
48 logging.info("Sent successfully %r" % self)
50 except socket.error as e:
51 if e.argus[0] == errno.EWOULDBLOCK:
56 def process_recv_buffer(self, data):
57 self.recvbuffer += data
59 c = self.recvbuffer.count('\n')
62 parts = self.recvbuffer.split('\n')
63 if self.recvbuffer.endswith('\n'):
66 self.recvbuffer = parts[c]
70 logging.debug("Received %s %r" % (msg, self))
72 msg = msg.lstrip('\x00').rstrip('\x00')
73 rep = self.rpc_processor.process(msg)
74 self.send_all(rep+'\n')
82 data = self.sock.recv(1024)
84 logging.info("Client %r %r disconnected" % (self.address, self))
86 self.process_recv_buffer(data)
87 except socket.error as e:
88 if e.args[0] == errno.EWOULDBLOCK:
90 except Exception as exp:
91 logging.warning('Failed when processing %s got exp %s' % (data, exp))
97 def recv_dgram(sock, req, address, rpc_processor):
98 logging.debug("Received %s address is %r" % (req, address))
100 # convert to json structure
101 rep = rpc_processor.process(req)
103 sock.sendto(rep+'\n', address)
104 except Exception as exp:
105 logging.warning('Failed when processing %s got exp %s' % (req, exp))