3 from datetime import datetime
4 from datetime import timedelta
5 from robot.api import logger
6 from robot.libraries.BuiltIn import BuiltIn
7 from robot.libraries.String import String
8 from decorators_for_robot_functionalities import *
10 from test_constants import *
14 def check_if_login_was_successful(login_output):
15 login_errors = ['authentication failure', 'name or service not known', 'permission denied']
16 for login_error in login_errors:
17 if re.search(login_error, login_output, re.IGNORECASE):
22 class execute_command: # pylint: disable=old-style-class
24 self._builtin = BuiltIn()
25 self._string = String()
26 self._builtin.import_library('SSHLibrary')
27 self._sshlibrary = self._builtin.get_library_instance('SSHLibrary')
28 self._default_user = {}
29 self._prompt = ':prompt:'
30 self._local_infra_int_ip_ipv4 = ''
31 self._key_exists = False
33 def get_ssh_library_instance(self):
34 return self._sshlibrary
36 def get_default_user(self):
37 return self._default_user
40 def open_connection_and_log_in(self, host, user, private_key=None, timeout="90s"):
41 self._sshlibrary.open_connection(host=host, timeout=timeout)
43 wait_until = datetime.now() + timedelta(seconds=60)
44 while datetime.now() < wait_until:
47 if private_key is None:
48 login_output = self._sshlibrary.login(user['username'], user['password'])
50 login_output = self._sshlibrary.login_with_public_key(user['username'],
51 private_key, user['password'])
53 logger.warn("Login was unsuccessful, trying again.")
55 if check_if_login_was_successful(login_output):
56 self._configure_prompt()
57 logger.info("Login was successful.")
62 def set_basic_connection(self, user, private_key=None):
63 self._default_user = user
64 stack_infos = self._builtin.get_library_instance('stack_infos')
65 self.open_connection_and_log_in(stack_infos.get_floating_ip(), user, private_key)
66 self._local_infra_int_ip_ipv4 = self.get_interface_ipv4_address(stack_infos.get_crf_nodes())
67 self._key_exists = self.check_id_rsa_exists()
68 self.stop_auditd_service()
71 def _configure_prompt(self):
72 self._sshlibrary.write('export PS1=' + self._prompt)
73 self._sshlibrary.read_until_regexp('(?m)^' + self._prompt + '.*')
76 def su_as(self, user):
77 def check_if_su_was_succesful(login_output):
78 if 'incorrect password' in login_output or 'Authentication failure' in login_output:
82 self._sshlibrary.write('su ' + user['username'])
83 self._sshlibrary.read_until('Password:')
84 self._sshlibrary.write(user['password'])
85 output = self._sshlibrary.read_until(self._prompt)
86 if not check_if_su_was_succesful(output):
87 raise Exception(output)
90 def sudo_as_root(self):
91 self._sshlibrary.write('sudo -s')
92 self._sshlibrary.read_until(self._prompt)
95 def exit_from_user(self):
96 self._sshlibrary.write('exit')
97 self._sshlibrary.read_until(self._prompt)
100 def execute_unix_command(self,
102 fail_on_non_zero_rc=True,
104 skip_prompt_in_command_output=False,
107 This method executes a linux command via the SSHlibrary connection.
108 The user account which issues the command, is the same as which the connection has opened for (by default)
109 The command can be also executed by switching (su) to another user (e.g. parameter usage: user = "root")
112 :param fail_on_non_zero_rc: the command will fail if return code is nonzero
114 :param skip_prompt_in_command_output:
115 :param user: switch to user, by default the command is executed with the current user
116 for which the ssh connection was opened
118 :return: stdout: command output is returned
121 self._sshlibrary.set_client_configuration(timeout=delay)
125 elif bool(user) and user != self._default_user:
129 self._sshlibrary.write(command)
131 if skip_prompt_in_command_output:
132 stdout = self._sshlibrary.read_until_regexp("(^|\n| )" + self._prompt + "$")
134 stdout = self._sshlibrary.read_until(prompt)
135 except Exception as err:
136 stdout = unicode(err)
137 ctrl_c = self._builtin.evaluate('chr(int(3))')
138 self._sshlibrary.write_bare(ctrl_c)
139 self._sshlibrary.read_until(prompt)
140 stdout = re.sub(prompt + '$', '', stdout).strip()
141 self._sshlibrary.write('echo error code: $?')
142 error_code = self._sshlibrary.read_until(prompt)
143 logger.trace("Error code variable value (befor processing)=" + error_code)
144 error_code = self._string.get_lines_matching_regexp(error_code,
145 pattern='error code: \\d+').split(':')[1].strip()
146 logger.trace("Error code variable value (after processing)=" + error_code)
147 self._sshlibrary.set_client_configuration(timeout="60s")
149 self.exit_from_user()
150 fail_on_non_zero_rc = self._builtin.convert_to_boolean(fail_on_non_zero_rc)
151 if fail_on_non_zero_rc:
152 if error_code != '0':
153 raise Exception('command: ' + command + '\nreturn code: ' + error_code + '\noutput: ' + stdout)
156 return [stdout, error_code]
159 def execute_unix_command_as_root(self,
161 fail_on_non_zero_rc=True,
163 skip_prompt_in_command_output=False):
164 return self.execute_unix_command(command, fail_on_non_zero_rc, delay, skip_prompt_in_command_output, root)
167 def ssh_to_another_node(self, host, user):
168 self._sshlibrary.write('ssh ' + user['username'] + '@' + host + ' -o "StrictHostKeyChecking no"')
169 if not self._key_exists:
170 logger.info("Login with password")
171 self._sshlibrary.read_until("'s password:")
172 self._sshlibrary.write(user['password'])
173 ssh_regexp = re.compile(r"\[{0}@.*$|authentication failure|name or service not known|permission denied"
174 .format(user["username"]), re.IGNORECASE)
175 stdout = self._sshlibrary.read_until_regexp(ssh_regexp)
176 if not check_if_login_was_successful(stdout):
177 raise Exception("Login to another node FAILED")
178 self._configure_prompt()
181 def execute_unix_command_on_remote_as_root(self,
185 fail_on_non_zero_rc=True,
187 skip_prompt_in_command_output=False):
188 if self._local_infra_int_ip_ipv4 != host:
190 user = self._default_user
191 self.ssh_to_another_node(host, user)
192 output = self.execute_unix_command_as_root(command, fail_on_non_zero_rc, delay,
193 skip_prompt_in_command_output)
194 self._sshlibrary.write('exit')
195 self._sshlibrary.read_until(self._prompt)
197 output = self.execute_unix_command_as_root(command, fail_on_non_zero_rc, delay,
198 skip_prompt_in_command_output)
202 def execute_unix_command_on_remote_as_user(self,
206 fail_on_non_zero_rc=True,
208 skip_prompt_in_command_output=False):
210 user = self._default_user
211 if self._local_infra_int_ip_ipv4 != host:
212 self.ssh_to_another_node(host, user)
213 output = self.execute_unix_command(command, fail_on_non_zero_rc, delay, skip_prompt_in_command_output,
215 self._sshlibrary.write('exit')
216 self._sshlibrary.read_until(self._prompt)
218 output = self.execute_unix_command(command, fail_on_non_zero_rc, delay, skip_prompt_in_command_output,
223 def get_interface_ipv4_address(self, nodes):
225 if self.execute_unix_command("ip a | grep " + nodes[key] + " | wc -l") == "1":
230 def get_interface_ipv6_address(self, interface):
231 return self.execute_unix_command_as_root('ip addr list ' + interface +
232 ' | grep --color=no -o -P "(?<=inet6 ).*(?=/.*)"')
235 def check_id_rsa_exists(self):
236 _, err_code = self.execute_unix_command("ls /home/{0}/.ssh/id_rsa".format(self._default_user["username"]),
237 fail_on_non_zero_rc=False)
238 return err_code == '0'
241 def stop_auditd_service(self):
242 stack_infos = self._builtin.get_library_instance('stack_infos')
243 if stack_infos.get_virtual_env():
244 all_nodes = stack_infos.get_all_nodes()
246 logger.info("Nodes dictionary is empty, nothing to check.")
248 for node in all_nodes.itervalues():
249 command = "sed -i \"s#RefuseManualStop=yes#RefuseManualStop=no#g\" " \
250 "/usr/lib/systemd/system/auditd.service"
251 self.execute_unix_command_on_remote_as_root(command, node)
252 command = "systemctl daemon-reload"
253 self.execute_unix_command_on_remote_as_root(command, node)
254 command = "systemctl stop auditd.service"
255 self.execute_unix_command_on_remote_as_root(command, node)