Add cloudtaf framework
[ta/cloudtaf.git] / libraries / cluster / testutils / fakehostcli.py
diff --git a/libraries/cluster/testutils/fakehostcli.py b/libraries/cluster/testutils/fakehostcli.py
new file mode 100644 (file)
index 0000000..1de73df
--- /dev/null
@@ -0,0 +1,330 @@
+# Copyright 2019 Nokia
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import abc
+from collections import namedtuple
+import six
+
+BASIC_MEMBER = 'basic_member'
+
+LINUX_USER = 'linux_user'
+
+ROLES = ['role{}'.format(i) for i in range(4)]
+
+
+class UserTuple(namedtuple('UserTuple', ['uuid',
+                                         'username',
+                                         'email',
+                                         'password',
+                                         'roles'])):
+    def __hash__(self):
+        d = self._asdict().copy()
+        d['roles'] = sorted(d['roles'])
+        return hash(repr(sorted(d.items())))
+
+
+class User(object):
+    def __init__(self, username, email='user@email.me', password=None):
+        self.username = username
+        self.email = email
+        self._password = password
+        self.added_roles = set([])
+        self._uuid = None
+        self._password_history = [self.password]
+
+    def set_password(self, password):
+        self._password = password
+        self._password_history.append(password)
+
+    @property
+    def password(self):
+        return self._password
+
+    @property
+    def password_history(self):
+        return self._password_history
+
+    @property
+    def uuid(self):
+        return self._uuid if self._uuid else 'UUID_{}'.format(self.username)
+
+    def set_uuid(self, uuid):
+        self._uuid = uuid
+
+    def set_added_roles(self, roles):
+        self.added_roles = set(roles)
+
+    def add_role(self, role):
+        assert role not in self.roles, 'Role {role} already in {username}'.format(
+            role=role,
+            username=self.username)
+        self.added_roles.add(role)
+
+    @property
+    def roles(self):
+        return self.added_roles.union(set([BASIC_MEMBER]))
+
+    def __repr__(self):
+        return str(self.usertuple)  # pragma: no cover
+
+    def __eq__(self, other):
+        return self.usertuple == other
+
+    def __hash__(self):
+        return hash(self.usertuple)
+
+    @property
+    def usertuple(self):
+        return UserTuple(uuid=self.uuid,
+                         username=self.username,
+                         email=self.email,
+                         password=self.password,
+                         roles=self.roles)
+
+
+class HandlerError(Exception):
+    pass
+
+
+@six.add_metaclass(abc.ABCMeta)
+class HandlerBase(object):
+    def __init__(self, users):
+        self._users = users
+
+    def handle(self, cmd, target):
+        self._handle_target(target)
+
+        if not cmd.startswith(self._expected_startswith):
+            raise HandlerError()
+
+        return self._handle(cmd.split())
+
+    @abc.abstractmethod
+    def _handle_target(self, target):
+        """Handle target
+
+        Raises:
+            HandlerError: if target cannot be handled by handler
+        """
+
+    @abc.abstractproperty
+    def _expected_startswith(self):
+        """Return the expected start of the command.
+        """
+
+    @abc.abstractmethod
+    def _handle(self, args):
+        """Handle and return the command
+
+        Raises:
+           HandlerError: if command cannot be handled by handler
+        """
+
+
+class SetPasswordHandler(HandlerBase):
+    """Handler for HostCli command
+    user set password --opassword OPASSWORD --npassword NPASSWORD
+    """
+    def __init__(self, users, envcreator):
+        super(SetPasswordHandler, self).__init__(users)
+        self._envcreator = envcreator
+        self._envname = None
+
+    @property
+    def _expected_startswith(self):
+        return 'user set password'
+
+    def _handle_target(self, target):
+        if not target.startswith('default.set_password:'):
+            raise HandlerError()
+        self._envname = target.split('.')[1]
+
+    def _handle(self, args):
+        pwds = self._get_passwords(args)
+        openrc_dict = self._envcreator.create(target='default', envname=self._envname)
+        assert pwds.old == openrc_dict['OS_PASSWORD'], (pwds.old,
+                                                        openrc_dict['OS_PASSWORD'])
+        user = self._get_user_for_openrc_dict(openrc_dict)
+        assert pwds.old == user.password
+        user.set_password(pwds.new)
+        return 'Your password has been changed.'
+
+    @staticmethod
+    def _get_passwords(args):
+        assert args[3] == '--opassword'
+        assert args[5] == '--npassword'
+        p = Passwords(old=args[4], new=args[6])
+        assert p.old != p.new, p
+        return p
+
+    def _get_user_for_openrc_dict(self, openrc_dict):
+        username = openrc_dict['OS_USERNAME']
+        for _, user in self._users.items():
+            if user.username == username:
+                return user
+
+        raise AssertionError('User {} not found'.format(username))  # pragma: no cover
+
+
+class Passwords(namedtuple('Passwords', ['old', 'new'])):
+    pass
+
+
+class AdminHandlerBase(HandlerBase):  # pylint: disable=abstract-method
+    def _handle_target(self, target):
+        if target != 'default.um_admin':
+            raise HandlerError()
+
+
+class UserCreateHandler(AdminHandlerBase):
+    @property
+    def _expected_startswith(self):
+        return 'user create'
+
+    def _handle(self, args):
+        assert args[3] == '--email'
+        assert args[5] == '--password'
+        user = User(username=args[2],
+                    email=args[4],
+                    password=args[6])
+        assert user.uuid not in self._users, 'User {} already created'.format(user)
+        self._users[user.uuid] = user
+        return 'User created. The UUID is {uuid}'.format(uuid=user.uuid)
+
+
+class UserListHandler(AdminHandlerBase):
+    @property
+    def _expected_startswith(self):
+        return 'user list'
+
+    def _handle(self, args):
+        return [{'Password-Expires': None,
+                 'User-ID': u.uuid,
+                 'Enabled': True,
+                 'User-Name': u.username} for _, u in self._users.items()]
+
+
+class UserDeleteHandler(AdminHandlerBase):
+    @property
+    def _expected_startswith(self):
+        return 'user delete'
+
+    def _handle(self, args):
+        assert len(args) == 3, 'Wrong number of arguments for delete {}'.format(args)
+        del self._users[args[2]]
+        return 'User deleted.'
+
+
+class CorruptedUserListHandler(UserListHandler):
+    def _handle(self, args):
+        return []
+
+
+class UserAddRoleHandler(AdminHandlerBase):
+    @property
+    def _expected_startswith(self):
+        return 'user add role'
+
+    def _handle(self, args):
+        self._users[args[3]].add_role(args[4])
+        return 'Role has been added to the user.'
+
+
+class RoleListAllHandler(AdminHandlerBase):
+    def __init__(self, users, role_attr):
+        super(RoleListAllHandler, self).__init__(users)
+        self._role_attr = role_attr
+
+    @property
+    def _expected_startswith(self):
+        return 'role list all'
+
+    def _handle(self, args):
+        assert len(args) == 3, 'Expected: {expected}, actual {actual}'.format(
+            expected=self._expected_startswith.split(),
+            actual=args)
+        return [{'Role-Description': 'Role Description {}'.format(role),
+                 'Chroot': False,
+                 'Is-Service-Role': True,
+                 self._role_attr: role} for role in self._roles]
+
+    @property
+    def _roles(self):
+        return ROLES + [BASIC_MEMBER, LINUX_USER]
+
+
+class Handlers(object):  # pylint: disable=too-few-public-methods
+    def __init__(self, handlers):
+        self._handlers = handlers
+
+    def handle(self, cmd, target):
+        for h in self._handlers:
+            try:
+                return h.handle(cmd=cmd, target=target)
+            except HandlerError:
+                pass
+
+        assert 0, 'User Command {!r} not found'.format(cmd)  # pragma: no cover
+
+
+class FakeHostCliUser(object):
+    def __init__(self, mock_hostcli, role_attr):
+        self._mock_hostcli = mock_hostcli
+        self._role_attr = role_attr
+        self._envcreator = None
+        self._users = {}
+        self._run_raw_handlers = None
+        self._run_handlers = None
+
+    def set_envcreator(self, envcreator):
+        self._envcreator = envcreator
+
+    def initialize(self):
+        self._run_raw_handlers = self._create_handlers(UserCreateHandler,
+                                                       UserAddRoleHandler,
+                                                       UserDeleteHandler,
+                                                       self._create_set_password_handler)
+        self._run_handlers = self._create_handlers(UserListHandler,
+                                                   self._create_role_handler)
+        self._set_side_effects()
+
+    @property
+    def users(self):
+        return set([u for _, u in self._users.items()])
+
+    def get_user(self, user_id):
+        return self._users[user_id]
+
+    def set_corrupted_user_list(self):
+        self._run_handlers = self._create_handlers(CorruptedUserListHandler,
+                                                   self._create_role_handler)
+
+    def _create_role_handler(self, users):
+        return RoleListAllHandler(users, role_attr=self._role_attr)
+
+    def _create_set_password_handler(self, users):
+        return SetPasswordHandler(users, envcreator=self._envcreator)
+
+    def _create_handlers(self, *handler_factories):
+        return Handlers([h(self._users) for h in handler_factories])
+
+    def _set_side_effects(self):
+        self._mock_hostcli.return_value.run.side_effect = self._run
+        self._mock_hostcli.return_value.run_raw.side_effect = self._run_raw
+
+    def _run_raw(self, cmd, target):
+        return self._run_raw_handlers.handle(cmd, target=target)
+
+    def _run(self, cmd, target):
+        return self._run_handlers.handle(cmd, target=target)