import re import time from datetime import datetime from datetime import timedelta from robot.api import logger from robot.libraries.BuiltIn import BuiltIn from robot.libraries.String import String from decorators_for_robot_functionalities import * from users import * from test_constants import * @robot_log def check_if_login_was_successful(login_output): login_errors = ['authentication failure', 'name or service not known', 'permission denied'] for login_error in login_errors: if re.search(login_error, login_output, re.IGNORECASE): return False return True class execute_command: # pylint: disable=old-style-class def __init__(self): self._builtin = BuiltIn() self._string = String() self._builtin.import_library('SSHLibrary') self._sshlibrary = self._builtin.get_library_instance('SSHLibrary') self._default_user = {} self._prompt = ':prompt:' self._local_infra_int_ip_ipv4 = '' self._key_exists = False def get_ssh_library_instance(self): return self._sshlibrary def get_default_user(self): return self._default_user @robot_log def open_connection_and_log_in(self, host, user, private_key=None, timeout="90s"): self._sshlibrary.open_connection(host=host, timeout=timeout) login_output = '' wait_until = datetime.now() + timedelta(seconds=60) while datetime.now() < wait_until: time.sleep(1) try: if private_key is None: login_output = self._sshlibrary.login(user['username'], user['password']) else: login_output = self._sshlibrary.login_with_public_key(user['username'], private_key, user['password']) except Exception: logger.warn("Login was unsuccessful, trying again.") continue if check_if_login_was_successful(login_output): self._configure_prompt() logger.info("Login was successful.") break return login_output @robot_log def set_basic_connection(self, user, private_key=None): self._default_user = user stack_infos = self._builtin.get_library_instance('stack_infos') self.open_connection_and_log_in(stack_infos.get_floating_ip(), user, private_key) self._local_infra_int_ip_ipv4 = self.get_interface_ipv4_address(stack_infos.get_crf_nodes()) self._key_exists = self.check_id_rsa_exists() self.stop_auditd_service() @robot_log def _configure_prompt(self): self._sshlibrary.write('export PS1=' + self._prompt) self._sshlibrary.read_until_regexp('(?m)^' + self._prompt + '.*') @robot_log def su_as(self, user): def check_if_su_was_succesful(login_output): if 'incorrect password' in login_output or 'Authentication failure' in login_output: return False return True self._sshlibrary.write('su ' + user['username']) self._sshlibrary.read_until('Password:') self._sshlibrary.write(user['password']) output = self._sshlibrary.read_until(self._prompt) if not check_if_su_was_succesful(output): raise Exception(output) @robot_log def sudo_as_root(self): self._sshlibrary.write('sudo -s') self._sshlibrary.read_until(self._prompt) @robot_log def exit_from_user(self): self._sshlibrary.write('exit') self._sshlibrary.read_until(self._prompt) @robot_log def execute_unix_command(self, command, fail_on_non_zero_rc=True, delay="90s", skip_prompt_in_command_output=False, user={}): """ This method executes a linux command via the SSHlibrary connection. The user account which issues the command, is the same as which the connection has opened for (by default) The command can be also executed by switching (su) to another user (e.g. parameter usage: user = "root") :param command: :param fail_on_non_zero_rc: the command will fail if return code is nonzero :param delay: :param skip_prompt_in_command_output: :param user: switch to user, by default the command is executed with the current user for which the ssh connection was opened :return: stdout: command output is returned """ user_changed = False self._sshlibrary.set_client_configuration(timeout=delay) if user == root: self.sudo_as_root() user_changed = True elif bool(user) and user != self._default_user: self.su_as(user) user_changed = True self._sshlibrary.write(command) try: if skip_prompt_in_command_output: stdout = self._sshlibrary.read_until_regexp("(^|\n| )" + self._prompt + "$") else: stdout = self._sshlibrary.read_until(prompt) except Exception as err: stdout = unicode(err) ctrl_c = self._builtin.evaluate('chr(int(3))') self._sshlibrary.write_bare(ctrl_c) self._sshlibrary.read_until(prompt) stdout = re.sub(prompt + '$', '', stdout).strip() self._sshlibrary.write('echo error code: $?') error_code = self._sshlibrary.read_until(prompt) logger.trace("Error code variable value (befor processing)=" + error_code) error_code = self._string.get_lines_matching_regexp(error_code, pattern='error code: \\d+').split(':')[1].strip() logger.trace("Error code variable value (after processing)=" + error_code) self._sshlibrary.set_client_configuration(timeout="60s") if user_changed: self.exit_from_user() fail_on_non_zero_rc = self._builtin.convert_to_boolean(fail_on_non_zero_rc) if fail_on_non_zero_rc: if error_code != '0': raise Exception('command: ' + command + '\nreturn code: ' + error_code + '\noutput: ' + stdout) return stdout else: return [stdout, error_code] @robot_log def execute_unix_command_as_root(self, command, fail_on_non_zero_rc=True, delay="90s", skip_prompt_in_command_output=False): return self.execute_unix_command(command, fail_on_non_zero_rc, delay, skip_prompt_in_command_output, root) @robot_log def ssh_to_another_node(self, host, user): self._sshlibrary.write('ssh ' + user['username'] + '@' + host + ' -o "StrictHostKeyChecking no"') if not self._key_exists: logger.info("Login with password") self._sshlibrary.read_until("'s password:") self._sshlibrary.write(user['password']) ssh_regexp = re.compile(r"\[{0}@.*$|authentication failure|name or service not known|permission denied" .format(user["username"]), re.IGNORECASE) stdout = self._sshlibrary.read_until_regexp(ssh_regexp) if not check_if_login_was_successful(stdout): raise Exception("Login to another node FAILED") self._configure_prompt() @robot_log def execute_unix_command_on_remote_as_root(self, command, host, user={}, fail_on_non_zero_rc=True, delay="90s", skip_prompt_in_command_output=False): if self._local_infra_int_ip_ipv4 != host: if not user: user = self._default_user self.ssh_to_another_node(host, user) output = self.execute_unix_command_as_root(command, fail_on_non_zero_rc, delay, skip_prompt_in_command_output) self._sshlibrary.write('exit') self._sshlibrary.read_until(self._prompt) else: output = self.execute_unix_command_as_root(command, fail_on_non_zero_rc, delay, skip_prompt_in_command_output) return output @robot_log def execute_unix_command_on_remote_as_user(self, command, host, user={}, fail_on_non_zero_rc=True, delay="90s", skip_prompt_in_command_output=False): if not user: user = self._default_user if self._local_infra_int_ip_ipv4 != host: self.ssh_to_another_node(host, user) output = self.execute_unix_command(command, fail_on_non_zero_rc, delay, skip_prompt_in_command_output, user=user) self._sshlibrary.write('exit') self._sshlibrary.read_until(self._prompt) else: output = self.execute_unix_command(command, fail_on_non_zero_rc, delay, skip_prompt_in_command_output, user=user) return output @robot_log def get_interface_ipv4_address(self, nodes): for key in nodes: if self.execute_unix_command("ip a | grep " + nodes[key] + " | wc -l") == "1": return nodes[key] return None @robot_log def get_interface_ipv6_address(self, interface): return self.execute_unix_command_as_root('ip addr list ' + interface + ' | grep --color=no -o -P "(?<=inet6 ).*(?=/.*)"') @robot_log def check_id_rsa_exists(self): _, err_code = self.execute_unix_command("ls /home/{0}/.ssh/id_rsa".format(self._default_user["username"]), fail_on_non_zero_rc=False) return err_code == '0' @robot_log def stop_auditd_service(self): stack_infos = self._builtin.get_library_instance('stack_infos') if stack_infos.get_virtual_env(): all_nodes = stack_infos.get_all_nodes() if not all_nodes: logger.info("Nodes dictionary is empty, nothing to check.") return for node in all_nodes.itervalues(): command = "sed -i \"s#RefuseManualStop=yes#RefuseManualStop=no#g\" " \ "/usr/lib/systemd/system/auditd.service" self.execute_unix_command_on_remote_as_root(command, node) command = "systemctl daemon-reload" self.execute_unix_command_on_remote_as_root(command, node) command = "systemctl stop auditd.service" self.execute_unix_command_on_remote_as_root(command, node)