X-Git-Url: https://gerrit.akraino.org/r/gitweb?p=ta%2Fcloudtaf.git;a=blobdiff_plain;f=libraries%2Fcluster%2Fusermanager.py;fp=libraries%2Fcluster%2Fusermanager.py;h=903d02b9da9720b30c7e3fedb9e948c3120c3557;hp=0000000000000000000000000000000000000000;hb=d448b9388fd9cb3732e35996b98f493a5a5921d4;hpb=07c5f13d2429236a603c867e09c4cc3b42e75826 diff --git a/libraries/cluster/usermanager.py b/libraries/cluster/usermanager.py new file mode 100644 index 0000000..903d02b --- /dev/null +++ b/libraries/cluster/usermanager.py @@ -0,0 +1,178 @@ +# Copyright 2019 Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import namedtuple +from .usergen import UserGen + +INITIAL_PASSWORD = 'intl_AM10srt' +PASSWORD = 'um_UM0scrt' + + +class UserRecord(namedtuple('UserRecord', ['uuid', + 'username', + 'password', + 'roles'])): + + def create_with_password(self, password): + d = self._asdict().copy() + d.update({'password': password}) + return UserRecord(**d) + + +class UserManagerError(Exception): + pass + + +class UserManager(object): + + _target = 'default.um_admin' + _all_roles = 'all_roles' + _no_roles = 'no_roles' + _special_roles = [_all_roles, _no_roles] + _basic_member = 'basic_member' + _linux_user = 'linux_user' + + def __init__(self, hostcli_factory): + self._hostcli_factory = hostcli_factory + self._hostcli_inst = None + self._roles_users = dict() + self._roles_cache = set() + self._user_gen_inst = None + + def create_user_with_roles(self, *roles): + r = self._get_and_verify_roles(roles) + + key = self._get_roles_users_key(r) + if key not in self._roles_users: + self._roles_users[key] = self._create_with_roles(r) + self._change_password(roles) + + return self._roles_users[key] + + def delete_users(self): + """Delete all users created by :meth:`.create_user_with_roles`. + """ + for _, userrecord in self._roles_users.items(): + self._hostcli.run_raw('user delete {}'.format(userrecord.uuid), + target=self._target) + + def get_user_and_set_password(self, *roles): + """Get user with INITIAL_PASSWORD and set password to PASSWORD. + + .. note:: + + This method should be called only by :class:`cluster.envcreator.EnvCreator`. + """ + upd_roles = self._get_roles(roles) + key = self._get_roles_users_key(upd_roles) + old_userrecord = self._roles_users[key] + self._roles_users[key] = old_userrecord.create_with_password(PASSWORD) + return old_userrecord + + @staticmethod + def _get_roles_users_key(roles): + return frozenset(roles) + + @property + def _hostcli(self): + if self._hostcli_inst is None: + self._hostcli_inst = self._hostcli_factory() + return self._hostcli_inst + + def _get_uuid(self, username): + users = self._hostcli.run('user list', target=self._target) + for u in users: + if u['User-Name'] == username: + return u['User-ID'] + + raise UserManagerError('User {} does not exist in target'.format(username)) + + def _get_and_verify_roles(self, roles): + self._verify_special_roles(roles) + r = self._get_roles(roles) + self._verify_roles(r) + return r + + def _verify_special_roles(self, roles): + if len(roles) == 1: + return + + for special_role in self._special_roles: + if special_role in roles: + raise UserManagerError( + 'Special role {special_role!r} and other roles in {roles}'.format( + special_role=special_role, + roles=roles)) + + def _get_roles(self, roles): + if set(roles) == set([self._all_roles]): + return self._roles + if set(roles) == set([self._no_roles]): + return [] + return roles + + def _verify_roles(self, roles): + given_roles = set(roles) + if len(roles) > len(given_roles): + raise UserManagerError('Duplicate roles in {}'.format(roles)) + target_roles = set(self._roles) + notexisting = given_roles - target_roles + if notexisting: + raise UserManagerError('Roles {} not found'.format(notexisting)) + + def _create_with_roles(self, roles): + username = self._user_gen.create_username(roles) + uuid = self._create_user_from_user(username, roles) + return UserRecord(uuid=uuid, + username=username, + password=INITIAL_PASSWORD, + roles=roles) + + def _create_user_from_user(self, username, roles): + self._hostcli.run_raw('user create {username} ' + '--email user@email.me ' + '--password {password}'.format( + username=username, + password=INITIAL_PASSWORD), target=self._target) + uuid = self._get_uuid(username) + for role in roles: + self._hostcli.run_raw('user add role {uuid} {role}'.format(uuid=uuid, + role=role), + target=self._target) + return uuid + + def _change_password(self, roles): + self._hostcli.run_raw( + 'user set password --opassword {old} --npassword {new}'.format( + old=INITIAL_PASSWORD, + new=PASSWORD), + target='default.set_password:{}'.format(','.join(roles))) + + @property + def _user_gen(self): + if self._user_gen_inst is None: + self._user_gen_inst = UserGen(len(self._roles)) + return self._user_gen_inst + + @property + def _roles(self): + if not self._roles_cache: + self._roles_cache = self._get_roles_via_hostcli() + return self._roles_cache + + def _get_roles_via_hostcli(self): + roles = self._hostcli.run('role list all', target=self._target) + role_attr = 'Role-Name' if 'Role-Name' in roles[0] else 'Role' + return [role[role_attr] for role in roles + if role[role_attr] not in [self._basic_member, self._linux_user]]