# Copyright 2019 Nokia # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import abc from collections import namedtuple import six BASIC_MEMBER = 'basic_member' LINUX_USER = 'linux_user' ROLES = ['role{}'.format(i) for i in range(4)] class UserTuple(namedtuple('UserTuple', ['uuid', 'username', 'email', 'password', 'roles'])): def __hash__(self): d = self._asdict().copy() d['roles'] = sorted(d['roles']) return hash(repr(sorted(d.items()))) class User(object): def __init__(self, username, email='user@email.me', password=None): self.username = username self.email = email self._password = password self.added_roles = set([]) self._uuid = None self._password_history = [self.password] def set_password(self, password): self._password = password self._password_history.append(password) @property def password(self): return self._password @property def password_history(self): return self._password_history @property def uuid(self): return self._uuid if self._uuid else 'UUID_{}'.format(self.username) def set_uuid(self, uuid): self._uuid = uuid def set_added_roles(self, roles): self.added_roles = set(roles) def add_role(self, role): assert role not in self.roles, 'Role {role} already in {username}'.format( role=role, username=self.username) self.added_roles.add(role) @property def roles(self): return self.added_roles.union(set([BASIC_MEMBER])) def __repr__(self): return str(self.usertuple) # pragma: no cover def __eq__(self, other): return self.usertuple == other def __hash__(self): return hash(self.usertuple) @property def usertuple(self): return UserTuple(uuid=self.uuid, username=self.username, email=self.email, password=self.password, roles=self.roles) class HandlerError(Exception): pass @six.add_metaclass(abc.ABCMeta) class HandlerBase(object): def __init__(self, users): self._users = users def handle(self, cmd, target): self._handle_target(target) if not cmd.startswith(self._expected_startswith): raise HandlerError() return self._handle(cmd.split()) @abc.abstractmethod def _handle_target(self, target): """Handle target Raises: HandlerError: if target cannot be handled by handler """ @abc.abstractproperty def _expected_startswith(self): """Return the expected start of the command. """ @abc.abstractmethod def _handle(self, args): """Handle and return the command Raises: HandlerError: if command cannot be handled by handler """ class SetPasswordHandler(HandlerBase): """Handler for HostCli command user set password --opassword OPASSWORD --npassword NPASSWORD """ def __init__(self, users, envcreator): super(SetPasswordHandler, self).__init__(users) self._envcreator = envcreator self._envname = None @property def _expected_startswith(self): return 'user set password' def _handle_target(self, target): if not target.startswith('default.set_password:'): raise HandlerError() self._envname = target.split('.')[1] def _handle(self, args): pwds = self._get_passwords(args) openrc_dict = self._envcreator.create(target='default', envname=self._envname) assert pwds.old == openrc_dict['OS_PASSWORD'], (pwds.old, openrc_dict['OS_PASSWORD']) user = self._get_user_for_openrc_dict(openrc_dict) assert pwds.old == user.password user.set_password(pwds.new) return 'Your password has been changed.' @staticmethod def _get_passwords(args): assert args[3] == '--opassword' assert args[5] == '--npassword' p = Passwords(old=args[4], new=args[6]) assert p.old != p.new, p return p def _get_user_for_openrc_dict(self, openrc_dict): username = openrc_dict['OS_USERNAME'] for _, user in self._users.items(): if user.username == username: return user raise AssertionError('User {} not found'.format(username)) # pragma: no cover class Passwords(namedtuple('Passwords', ['old', 'new'])): pass class AdminHandlerBase(HandlerBase): # pylint: disable=abstract-method def _handle_target(self, target): if target != 'default.um_admin': raise HandlerError() class UserCreateHandler(AdminHandlerBase): @property def _expected_startswith(self): return 'user create' def _handle(self, args): assert args[3] == '--email' assert args[5] == '--password' user = User(username=args[2], email=args[4], password=args[6]) assert user.uuid not in self._users, 'User {} already created'.format(user) self._users[user.uuid] = user return 'User created. The UUID is {uuid}'.format(uuid=user.uuid) class UserListHandler(AdminHandlerBase): @property def _expected_startswith(self): return 'user list' def _handle(self, args): return [{'Password-Expires': None, 'User-ID': u.uuid, 'Enabled': True, 'User-Name': u.username} for _, u in self._users.items()] class UserDeleteHandler(AdminHandlerBase): @property def _expected_startswith(self): return 'user delete' def _handle(self, args): assert len(args) == 3, 'Wrong number of arguments for delete {}'.format(args) del self._users[args[2]] return 'User deleted.' class CorruptedUserListHandler(UserListHandler): def _handle(self, args): return [] class UserAddRoleHandler(AdminHandlerBase): @property def _expected_startswith(self): return 'user add role' def _handle(self, args): self._users[args[3]].add_role(args[4]) return 'Role has been added to the user.' class RoleListAllHandler(AdminHandlerBase): def __init__(self, users, role_attr): super(RoleListAllHandler, self).__init__(users) self._role_attr = role_attr @property def _expected_startswith(self): return 'role list all' def _handle(self, args): assert len(args) == 3, 'Expected: {expected}, actual {actual}'.format( expected=self._expected_startswith.split(), actual=args) return [{'Role-Description': 'Role Description {}'.format(role), 'Chroot': False, 'Is-Service-Role': True, self._role_attr: role} for role in self._roles] @property def _roles(self): return ROLES + [BASIC_MEMBER, LINUX_USER] class Handlers(object): # pylint: disable=too-few-public-methods def __init__(self, handlers): self._handlers = handlers def handle(self, cmd, target): for h in self._handlers: try: return h.handle(cmd=cmd, target=target) except HandlerError: pass assert 0, 'User Command {!r} not found'.format(cmd) # pragma: no cover class FakeHostCliUser(object): def __init__(self, mock_hostcli, role_attr): self._mock_hostcli = mock_hostcli self._role_attr = role_attr self._envcreator = None self._users = {} self._run_raw_handlers = None self._run_handlers = None def set_envcreator(self, envcreator): self._envcreator = envcreator def initialize(self): self._run_raw_handlers = self._create_handlers(UserCreateHandler, UserAddRoleHandler, UserDeleteHandler, self._create_set_password_handler) self._run_handlers = self._create_handlers(UserListHandler, self._create_role_handler) self._set_side_effects() @property def users(self): return set([u for _, u in self._users.items()]) def get_user(self, user_id): return self._users[user_id] def set_corrupted_user_list(self): self._run_handlers = self._create_handlers(CorruptedUserListHandler, self._create_role_handler) def _create_role_handler(self, users): return RoleListAllHandler(users, role_attr=self._role_attr) def _create_set_password_handler(self, users): return SetPasswordHandler(users, envcreator=self._envcreator) def _create_handlers(self, *handler_factories): return Handlers([h(self._users) for h in handler_factories]) def _set_side_effects(self): self._mock_hostcli.return_value.run.side_effect = self._run self._mock_hostcli.return_value.run_raw.side_effect = self._run_raw def _run_raw(self, cmd, target): return self._run_raw_handlers.handle(cmd, target=target) def _run(self, cmd, target): return self._run_handlers.handle(cmd, target=target)