robot tcs, test charts, robot container added
[ta/cloudtaf.git] / libraries / common / execute_command.py
diff --git a/libraries/common/execute_command.py b/libraries/common/execute_command.py
new file mode 100644 (file)
index 0000000..8df0240
--- /dev/null
@@ -0,0 +1,255 @@
+import re
+import time
+from datetime import datetime
+from datetime import timedelta
+from robot.api import logger
+from robot.libraries.BuiltIn import BuiltIn
+from robot.libraries.String import String
+from decorators_for_robot_functionalities import *
+from users import *
+from test_constants import *
+
+
+@robot_log
+def check_if_login_was_successful(login_output):
+    login_errors = ['authentication failure', 'name or service not known', 'permission denied']
+    for login_error in login_errors:
+        if re.search(login_error, login_output, re.IGNORECASE):
+            return False
+    return True
+
+
+class execute_command:  # pylint: disable=old-style-class
+    def __init__(self):
+        self._builtin = BuiltIn()
+        self._string = String()
+        self._builtin.import_library('SSHLibrary')
+        self._sshlibrary = self._builtin.get_library_instance('SSHLibrary')
+        self._default_user = {}
+        self._prompt = ':prompt:'
+        self._local_infra_int_ip_ipv4 = ''
+        self._key_exists = False
+
+    def get_ssh_library_instance(self):
+        return self._sshlibrary
+
+    def get_default_user(self):
+        return self._default_user
+
+    @robot_log
+    def open_connection_and_log_in(self, host, user, private_key=None, timeout="90s"):
+        self._sshlibrary.open_connection(host=host, timeout=timeout)
+        login_output = ''
+        wait_until = datetime.now() + timedelta(seconds=60)
+        while datetime.now() < wait_until:
+            time.sleep(1)
+            try:
+                if private_key is None:
+                    login_output = self._sshlibrary.login(user['username'], user['password'])
+                else:
+                    login_output = self._sshlibrary.login_with_public_key(user['username'],
+                                                                          private_key, user['password'])
+            except Exception:
+                logger.warn("Login was unsuccessful, trying again.")
+                continue
+            if check_if_login_was_successful(login_output):
+                self._configure_prompt()
+                logger.info("Login was successful.")
+                break
+        return login_output
+
+    @robot_log
+    def set_basic_connection(self, user, private_key=None):
+        self._default_user = user
+        stack_infos = self._builtin.get_library_instance('stack_infos')
+        self.open_connection_and_log_in(stack_infos.get_floating_ip(), user, private_key)
+        self._local_infra_int_ip_ipv4 = self.get_interface_ipv4_address(stack_infos.get_crf_nodes())
+        self._key_exists = self.check_id_rsa_exists()
+        self.stop_auditd_service()
+
+    @robot_log
+    def _configure_prompt(self):
+        self._sshlibrary.write('export PS1=' + self._prompt)
+        self._sshlibrary.read_until_regexp('(?m)^' + self._prompt + '.*')
+
+    @robot_log
+    def su_as(self, user):
+        def check_if_su_was_succesful(login_output):
+            if 'incorrect password' in login_output or 'Authentication failure' in login_output:
+                return False
+            return True
+
+        self._sshlibrary.write('su ' + user['username'])
+        self._sshlibrary.read_until('Password:')
+        self._sshlibrary.write(user['password'])
+        output = self._sshlibrary.read_until(self._prompt)
+        if not check_if_su_was_succesful(output):
+            raise Exception(output)
+
+    @robot_log
+    def sudo_as_root(self):
+        self._sshlibrary.write('sudo -s')
+        self._sshlibrary.read_until(self._prompt)
+
+    @robot_log
+    def exit_from_user(self):
+        self._sshlibrary.write('exit')
+        self._sshlibrary.read_until(self._prompt)
+
+    @robot_log
+    def execute_unix_command(self,
+                             command,
+                             fail_on_non_zero_rc=True,
+                             delay="90s",
+                             skip_prompt_in_command_output=False,
+                             user={}):
+        """
+        This method executes a linux command via the SSHlibrary connection.
+        The user account which issues the command, is the same as which the connection has opened for (by default)
+        The command can be also executed by switching (su) to another user (e.g. parameter usage: user = "root")
+
+        :param command:
+        :param fail_on_non_zero_rc: the command will fail if return code is nonzero
+        :param delay:
+        :param skip_prompt_in_command_output:
+        :param user: switch to user, by default the command is executed with the current user
+        for which the ssh connection was opened
+
+        :return: stdout: command output is returned
+        """
+        user_changed = False
+        self._sshlibrary.set_client_configuration(timeout=delay)
+        if user == root:
+            self.sudo_as_root()
+            user_changed = True
+        elif bool(user) and user != self._default_user:
+            self.su_as(user)
+            user_changed = True
+
+        self._sshlibrary.write(command)
+        try:
+            if skip_prompt_in_command_output:
+                stdout = self._sshlibrary.read_until_regexp("(^|\n| )" + self._prompt + "$")
+            else:
+                stdout = self._sshlibrary.read_until(prompt)
+        except Exception as err:
+            stdout = unicode(err)
+            ctrl_c = self._builtin.evaluate('chr(int(3))')
+            self._sshlibrary.write_bare(ctrl_c)
+            self._sshlibrary.read_until(prompt)
+        stdout = re.sub(prompt + '$', '', stdout).strip()
+        self._sshlibrary.write('echo error code: $?')
+        error_code = self._sshlibrary.read_until(prompt)
+        logger.trace("Error code variable value (befor processing)=" + error_code)
+        error_code = self._string.get_lines_matching_regexp(error_code,
+                                                            pattern='error code: \\d+').split(':')[1].strip()
+        logger.trace("Error code variable value (after processing)=" + error_code)
+        self._sshlibrary.set_client_configuration(timeout="60s")
+        if user_changed:
+            self.exit_from_user()
+        fail_on_non_zero_rc = self._builtin.convert_to_boolean(fail_on_non_zero_rc)
+        if fail_on_non_zero_rc:
+            if error_code != '0':
+                raise Exception('command: ' + command + '\nreturn code: ' + error_code + '\noutput: ' + stdout)
+            return stdout
+        else:
+            return [stdout, error_code]
+
+    @robot_log
+    def execute_unix_command_as_root(self,
+                                     command,
+                                     fail_on_non_zero_rc=True,
+                                     delay="90s",
+                                     skip_prompt_in_command_output=False):
+        return self.execute_unix_command(command, fail_on_non_zero_rc, delay, skip_prompt_in_command_output, root)
+
+    @robot_log
+    def ssh_to_another_node(self, host, user):
+        self._sshlibrary.write('ssh ' + user['username'] + '@' + host + ' -o "StrictHostKeyChecking no"')
+        if not self._key_exists:
+            logger.info("Login with password")
+            self._sshlibrary.read_until("'s password:")
+            self._sshlibrary.write(user['password'])
+        ssh_regexp = re.compile(r"\[{0}@.*$|authentication failure|name or service not known|permission denied"
+                                .format(user["username"]), re.IGNORECASE)
+        stdout = self._sshlibrary.read_until_regexp(ssh_regexp)
+        if not check_if_login_was_successful(stdout):
+            raise Exception("Login to another node FAILED")
+        self._configure_prompt()
+
+    @robot_log
+    def execute_unix_command_on_remote_as_root(self,
+                                               command,
+                                               host,
+                                               user={},
+                                               fail_on_non_zero_rc=True,
+                                               delay="90s",
+                                               skip_prompt_in_command_output=False):
+        if self._local_infra_int_ip_ipv4 != host:
+            if not user:
+                user = self._default_user
+            self.ssh_to_another_node(host, user)
+            output = self.execute_unix_command_as_root(command, fail_on_non_zero_rc, delay,
+                                                       skip_prompt_in_command_output)
+            self._sshlibrary.write('exit')
+            self._sshlibrary.read_until(self._prompt)
+        else:
+            output = self.execute_unix_command_as_root(command, fail_on_non_zero_rc, delay,
+                                                       skip_prompt_in_command_output)
+        return output
+
+    @robot_log
+    def execute_unix_command_on_remote_as_user(self,
+                                               command,
+                                               host,
+                                               user={},
+                                               fail_on_non_zero_rc=True,
+                                               delay="90s",
+                                               skip_prompt_in_command_output=False):
+        if not user:
+            user = self._default_user
+        if self._local_infra_int_ip_ipv4 != host:
+            self.ssh_to_another_node(host, user)
+            output = self.execute_unix_command(command, fail_on_non_zero_rc, delay, skip_prompt_in_command_output,
+                                               user=user)
+            self._sshlibrary.write('exit')
+            self._sshlibrary.read_until(self._prompt)
+        else:
+            output = self.execute_unix_command(command, fail_on_non_zero_rc, delay, skip_prompt_in_command_output,
+                                               user=user)
+        return output
+
+    @robot_log
+    def get_interface_ipv4_address(self, nodes):
+        for key in nodes:
+            if self.execute_unix_command("ip a | grep " + nodes[key] + " | wc -l") == "1":
+                return nodes[key]
+        return None
+
+    @robot_log
+    def get_interface_ipv6_address(self, interface):
+        return self.execute_unix_command_as_root('ip addr list ' + interface +
+                                                 ' | grep --color=no -o -P "(?<=inet6 ).*(?=/.*)"')
+
+    @robot_log
+    def check_id_rsa_exists(self):
+        _, err_code = self.execute_unix_command("ls /home/{0}/.ssh/id_rsa".format(self._default_user["username"]),
+                                                fail_on_non_zero_rc=False)
+        return err_code == '0'
+
+    @robot_log
+    def stop_auditd_service(self):
+        stack_infos = self._builtin.get_library_instance('stack_infos')
+        if stack_infos.get_virtual_env():
+            all_nodes = stack_infos.get_all_nodes()
+            if not all_nodes:
+                logger.info("Nodes dictionary is empty, nothing to check.")
+                return
+            for node in all_nodes.itervalues():
+                command = "sed -i \"s#RefuseManualStop=yes#RefuseManualStop=no#g\" " \
+                          "/usr/lib/systemd/system/auditd.service"
+                self.execute_unix_command_on_remote_as_root(command, node)
+                command = "systemctl daemon-reload"
+                self.execute_unix_command_on_remote_as_root(command, node)
+                command = "systemctl stop auditd.service"
+                self.execute_unix_command_on_remote_as_root(command, node)