Add cloudtaf framework
[ta/cloudtaf.git] / libraries / cluster / usermanager.py
diff --git a/libraries/cluster/usermanager.py b/libraries/cluster/usermanager.py
new file mode 100644 (file)
index 0000000..903d02b
--- /dev/null
@@ -0,0 +1,178 @@
+# Copyright 2019 Nokia
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from collections import namedtuple
+from .usergen import UserGen
+
+INITIAL_PASSWORD = 'intl_AM10srt'
+PASSWORD = 'um_UM0scrt'
+
+
+class UserRecord(namedtuple('UserRecord', ['uuid',
+                                           'username',
+                                           'password',
+                                           'roles'])):
+
+    def create_with_password(self, password):
+        d = self._asdict().copy()
+        d.update({'password': password})
+        return UserRecord(**d)
+
+
+class UserManagerError(Exception):
+    pass
+
+
+class UserManager(object):
+
+    _target = 'default.um_admin'
+    _all_roles = 'all_roles'
+    _no_roles = 'no_roles'
+    _special_roles = [_all_roles, _no_roles]
+    _basic_member = 'basic_member'
+    _linux_user = 'linux_user'
+
+    def __init__(self, hostcli_factory):
+        self._hostcli_factory = hostcli_factory
+        self._hostcli_inst = None
+        self._roles_users = dict()
+        self._roles_cache = set()
+        self._user_gen_inst = None
+
+    def create_user_with_roles(self, *roles):
+        r = self._get_and_verify_roles(roles)
+
+        key = self._get_roles_users_key(r)
+        if key not in self._roles_users:
+            self._roles_users[key] = self._create_with_roles(r)
+            self._change_password(roles)
+
+        return self._roles_users[key]
+
+    def delete_users(self):
+        """Delete all users created by :meth:`.create_user_with_roles`.
+        """
+        for _, userrecord in self._roles_users.items():
+            self._hostcli.run_raw('user delete {}'.format(userrecord.uuid),
+                                  target=self._target)
+
+    def get_user_and_set_password(self, *roles):
+        """Get user with INITIAL_PASSWORD and set password to PASSWORD.
+
+        .. note::
+
+           This method should be called only by :class:`cluster.envcreator.EnvCreator`.
+        """
+        upd_roles = self._get_roles(roles)
+        key = self._get_roles_users_key(upd_roles)
+        old_userrecord = self._roles_users[key]
+        self._roles_users[key] = old_userrecord.create_with_password(PASSWORD)
+        return old_userrecord
+
+    @staticmethod
+    def _get_roles_users_key(roles):
+        return frozenset(roles)
+
+    @property
+    def _hostcli(self):
+        if self._hostcli_inst is None:
+            self._hostcli_inst = self._hostcli_factory()
+        return self._hostcli_inst
+
+    def _get_uuid(self, username):
+        users = self._hostcli.run('user list', target=self._target)
+        for u in users:
+            if u['User-Name'] == username:
+                return u['User-ID']
+
+        raise UserManagerError('User {} does not exist in target'.format(username))
+
+    def _get_and_verify_roles(self, roles):
+        self._verify_special_roles(roles)
+        r = self._get_roles(roles)
+        self._verify_roles(r)
+        return r
+
+    def _verify_special_roles(self, roles):
+        if len(roles) == 1:
+            return
+
+        for special_role in self._special_roles:
+            if special_role in roles:
+                raise UserManagerError(
+                    'Special role {special_role!r} and other roles in {roles}'.format(
+                        special_role=special_role,
+                        roles=roles))
+
+    def _get_roles(self, roles):
+        if set(roles) == set([self._all_roles]):
+            return self._roles
+        if set(roles) == set([self._no_roles]):
+            return []
+        return roles
+
+    def _verify_roles(self, roles):
+        given_roles = set(roles)
+        if len(roles) > len(given_roles):
+            raise UserManagerError('Duplicate roles in {}'.format(roles))
+        target_roles = set(self._roles)
+        notexisting = given_roles - target_roles
+        if notexisting:
+            raise UserManagerError('Roles {} not found'.format(notexisting))
+
+    def _create_with_roles(self, roles):
+        username = self._user_gen.create_username(roles)
+        uuid = self._create_user_from_user(username, roles)
+        return UserRecord(uuid=uuid,
+                          username=username,
+                          password=INITIAL_PASSWORD,
+                          roles=roles)
+
+    def _create_user_from_user(self, username, roles):
+        self._hostcli.run_raw('user create {username} '
+                              '--email user@email.me '
+                              '--password {password}'.format(
+                                  username=username,
+                                  password=INITIAL_PASSWORD), target=self._target)
+        uuid = self._get_uuid(username)
+        for role in roles:
+            self._hostcli.run_raw('user add role {uuid} {role}'.format(uuid=uuid,
+                                                                       role=role),
+                                  target=self._target)
+        return uuid
+
+    def _change_password(self, roles):
+        self._hostcli.run_raw(
+            'user set password --opassword {old} --npassword {new}'.format(
+                old=INITIAL_PASSWORD,
+                new=PASSWORD),
+            target='default.set_password:{}'.format(','.join(roles)))
+
+    @property
+    def _user_gen(self):
+        if self._user_gen_inst is None:
+            self._user_gen_inst = UserGen(len(self._roles))
+        return self._user_gen_inst
+
+    @property
+    def _roles(self):
+        if not self._roles_cache:
+            self._roles_cache = self._get_roles_via_hostcli()
+        return self._roles_cache
+
+    def _get_roles_via_hostcli(self):
+        roles = self._hostcli.run('role list all', target=self._target)
+        role_attr = 'Role-Name' if 'Role-Name' in roles[0] else 'Role'
+        return [role[role_attr] for role in roles
+                if role[role_attr] not in [self._basic_member, self._linux_user]]