# Copyright 2019 Nokia # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from collections import namedtuple from .usergen import UserGen INITIAL_PASSWORD = 'intl_AM10srt' PASSWORD = 'um_UM0scrt' class UserRecord(namedtuple('UserRecord', ['uuid', 'username', 'password', 'roles'])): def create_with_password(self, password): d = self._asdict().copy() d.update({'password': password}) return UserRecord(**d) class UserManagerError(Exception): pass class UserManager(object): _target = 'default.um_admin' _all_roles = 'all_roles' _no_roles = 'no_roles' _special_roles = [_all_roles, _no_roles] _basic_member = 'basic_member' _linux_user = 'linux_user' def __init__(self, hostcli_factory): self._hostcli_factory = hostcli_factory self._hostcli_inst = None self._roles_users = dict() self._roles_cache = set() self._user_gen_inst = None def create_user_with_roles(self, *roles): r = self._get_and_verify_roles(roles) key = self._get_roles_users_key(r) if key not in self._roles_users: self._roles_users[key] = self._create_with_roles(r) self._change_password(roles) return self._roles_users[key] def delete_users(self): """Delete all users created by :meth:`.create_user_with_roles`. """ for _, userrecord in self._roles_users.items(): self._hostcli.run_raw('user delete {}'.format(userrecord.uuid), target=self._target) def get_user_and_set_password(self, *roles): """Get user with INITIAL_PASSWORD and set password to PASSWORD. .. note:: This method should be called only by :class:`cluster.envcreator.EnvCreator`. """ upd_roles = self._get_roles(roles) key = self._get_roles_users_key(upd_roles) old_userrecord = self._roles_users[key] self._roles_users[key] = old_userrecord.create_with_password(PASSWORD) return old_userrecord @staticmethod def _get_roles_users_key(roles): return frozenset(roles) @property def _hostcli(self): if self._hostcli_inst is None: self._hostcli_inst = self._hostcli_factory() return self._hostcli_inst def _get_uuid(self, username): users = self._hostcli.run('user list', target=self._target) for u in users: if u['User-Name'] == username: return u['User-ID'] raise UserManagerError('User {} does not exist in target'.format(username)) def _get_and_verify_roles(self, roles): self._verify_special_roles(roles) r = self._get_roles(roles) self._verify_roles(r) return r def _verify_special_roles(self, roles): if len(roles) == 1: return for special_role in self._special_roles: if special_role in roles: raise UserManagerError( 'Special role {special_role!r} and other roles in {roles}'.format( special_role=special_role, roles=roles)) def _get_roles(self, roles): if set(roles) == set([self._all_roles]): return self._roles if set(roles) == set([self._no_roles]): return [] return roles def _verify_roles(self, roles): given_roles = set(roles) if len(roles) > len(given_roles): raise UserManagerError('Duplicate roles in {}'.format(roles)) target_roles = set(self._roles) notexisting = given_roles - target_roles if notexisting: raise UserManagerError('Roles {} not found'.format(notexisting)) def _create_with_roles(self, roles): username = self._user_gen.create_username(roles) uuid = self._create_user_from_user(username, roles) return UserRecord(uuid=uuid, username=username, password=INITIAL_PASSWORD, roles=roles) def _create_user_from_user(self, username, roles): self._hostcli.run_raw('user create {username} ' '--email user@email.me ' '--password {password}'.format( username=username, password=INITIAL_PASSWORD), target=self._target) uuid = self._get_uuid(username) for role in roles: self._hostcli.run_raw('user add role {uuid} {role}'.format(uuid=uuid, role=role), target=self._target) return uuid def _change_password(self, roles): self._hostcli.run_raw( 'user set password --opassword {old} --npassword {new}'.format( old=INITIAL_PASSWORD, new=PASSWORD), target='default.set_password:{}'.format(','.join(roles))) @property def _user_gen(self): if self._user_gen_inst is None: self._user_gen_inst = UserGen(len(self._roles)) return self._user_gen_inst @property def _roles(self): if not self._roles_cache: self._roles_cache = self._get_roles_via_hostcli() return self._roles_cache def _get_roles_via_hostcli(self): roles = self._hostcli.run('role list all', target=self._target) role_attr = 'Role-Name' if 'Role-Name' in roles[0] else 'Role' return [role[role_attr] for role in roles if role[role_attr] not in [self._basic_member, self._linux_user]]