Initial commit
[ta/config-manager.git] / cmdatahandlers / src / cmdatahandlers / api / utils.py
1 # Copyright 2019 Nokia
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import socket
16 import netaddr
17 import subprocess
18 import json
19
20 from cmdatahandlers.api import configerror
21
22 def validate_ipv4_address(address):
23     try:
24         socket.inet_pton(socket.AF_INET, address)
25     except AttributeError:
26         try:
27             socket.inet_aton(address)
28         except socket.error:
29             raise configerror.ConfigError('Invalid ip %s' % address)
30         if address.count('.') != 3:
31             raise configerror.ConfigError('Invalid ip %s' % address)
32     except socket.error:
33         raise configerror.ConfigError('Invalid ip %s' % address)
34
35
36 def validate_list_items_unique(l):
37     if len(l) != len(set(l)):
38         raise configerror.ConfigError('List is not unique')
39
40
41 def validate_cidr(cidr):
42     try:
43         tok = cidr.split('/')
44         if len(tok) != 2:
45             raise configerror.ConfigError('Invalid cidr address %s' % cidr)
46         validate_ipv4_address(tok[0])
47     except Exception as exp:
48         raise configerror.ConfigError(str(exp))
49
50 def validate_ip_in_network(ip, cidr):
51     try:
52         if netaddr.IPAddress(ip) not in netaddr.IPNetwork(cidr):
53             raise configerror.ConfigError('%s does not belong to network %s' % (ip, cidr))
54     except Exception as exp:
55         raise configerror.ConfigError(str(exp))
56
57 def validate_keys_in_dictionary(keys, dictionary):
58     for key in keys:
59         if key not in dictionary:
60             raise configerror.ConfigError('%s is not found' % key)
61
62 def validate_vlan(vlan):
63     if vlan < 0 or vlan > 4096:
64         raise configerror.ConfigError('Vlan %d is not valid' % vlan)
65
66 def get_own_hwmgmt_ip():
67     try:
68         #try both ipv4 and ipv6 addresses
69         ips=[]
70         output=subprocess.check_output(['sudo', 'ipmitool', 'lan', 'print'])
71         lines = output.split('\n')
72         for line in lines:
73             if 'IP Address' in line and 'IP Address Source' not in line:
74                 data = line.split(':')
75                 if len(data) != 2:
76                     raise configerror.ConfigError('Invalid hwmgmt ip configured')
77                 ip=data[1]
78                 import re
79                 ip=re.sub('[\s+]', '', ip)
80                 ips.append(ip)
81
82         output_lan6=subprocess.check_output(['sudo', 'ipmitool', 'lan6', 'print'],stderr=subprocess.STDOUT)
83         lines = output_lan6.split('\n')
84         num_static_addr = 0
85         num_dynamic_addr = 0
86         #get max number of ipv6 static address
87         for line in lines:
88             if 'Static address max' in line:
89                 data = line.split(':')
90                 static_address = data[1]
91                 import re
92                 num_static_addr = int(re.sub('[\s+]', '', static_address))
93             if 'Dynamic address max' in line:
94                 data = line.split(':')
95                 dynamic_address = data[1]
96                 import re
97                 num_dynamic_addr = int(re.sub('[\s+]', '', dynamic_address))
98
99         for x in range(num_static_addr):
100             address = 'IPv6 Static Address %s' %x
101             for idx,val in enumerate(lines):
102                 if address in val:
103                     if 'Address' in lines[idx+2]:
104                         data=lines[idx+2].split(':',1)
105                         ip=data[1]
106                         import re
107                         ip=re.sub('[\s+]', '', ip)
108                         ip=ip.split('/',1)[0]
109                         ips.append(ip.strip())
110
111         for x in range(num_dynamic_addr):
112             address = 'IPv6 Dynamic Address %s' %x
113             for idx,val in enumerate(lines):
114                 if address in val:
115                     if 'Address' in lines[idx+2]:
116                         data=lines[idx+2].split(':',1)
117                         ip=data[1]
118                         import re
119                         ip=re.sub('[\s+]', '', ip)
120                         ip=ip.split('/',1)[0]
121                         ips.append(ip.strip())
122         return ips
123
124     except Exception as exp:
125         raise configerror.ConfigError(str(exp))
126
127 def is_virtualized():
128     f=open('/proc/cpuinfo')
129     lines = f.readlines()
130     f.close()
131     for line in lines:
132         if line.startswith('flags') and 'hypervisor' in  line:
133             return True
134     return False
135
136 def flatten_config_data(jsondata):
137     result = {}
138     for key, value in jsondata.iteritems():
139         try:
140             result[key] = json.dumps(value)
141         except Exception as exp:
142             result[key] = value
143     return result
144
145
146 def unflatten_config_data(props):
147     propsjson = {}
148     for name, value in props.iteritems():
149         try:
150             propsjson[name] = json.loads(value)
151         except Exception as exp:
152             propsjson[name] = value
153     return propsjson
154
155 def add_lists(l1, l2):
156     result = l1
157     for v2 in l2:
158         if v2 not in result:
159             result.append(v2)
160     return result