Add cloudtaf framework
[ta/cloudtaf.git] / libraries / cluster / usermanager.py
1 # Copyright 2019 Nokia
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 from collections import namedtuple
16 from .usergen import UserGen
17
18 INITIAL_PASSWORD = 'intl_AM10srt'
19 PASSWORD = 'um_UM0scrt'
20
21
22 class UserRecord(namedtuple('UserRecord', ['uuid',
23                                            'username',
24                                            'password',
25                                            'roles'])):
26
27     def create_with_password(self, password):
28         d = self._asdict().copy()
29         d.update({'password': password})
30         return UserRecord(**d)
31
32
33 class UserManagerError(Exception):
34     pass
35
36
37 class UserManager(object):
38
39     _target = 'default.um_admin'
40     _all_roles = 'all_roles'
41     _no_roles = 'no_roles'
42     _special_roles = [_all_roles, _no_roles]
43     _basic_member = 'basic_member'
44     _linux_user = 'linux_user'
45
46     def __init__(self, hostcli_factory):
47         self._hostcli_factory = hostcli_factory
48         self._hostcli_inst = None
49         self._roles_users = dict()
50         self._roles_cache = set()
51         self._user_gen_inst = None
52
53     def create_user_with_roles(self, *roles):
54         r = self._get_and_verify_roles(roles)
55
56         key = self._get_roles_users_key(r)
57         if key not in self._roles_users:
58             self._roles_users[key] = self._create_with_roles(r)
59             self._change_password(roles)
60
61         return self._roles_users[key]
62
63     def delete_users(self):
64         """Delete all users created by :meth:`.create_user_with_roles`.
65         """
66         for _, userrecord in self._roles_users.items():
67             self._hostcli.run_raw('user delete {}'.format(userrecord.uuid),
68                                   target=self._target)
69
70     def get_user_and_set_password(self, *roles):
71         """Get user with INITIAL_PASSWORD and set password to PASSWORD.
72
73         .. note::
74
75            This method should be called only by :class:`cluster.envcreator.EnvCreator`.
76         """
77         upd_roles = self._get_roles(roles)
78         key = self._get_roles_users_key(upd_roles)
79         old_userrecord = self._roles_users[key]
80         self._roles_users[key] = old_userrecord.create_with_password(PASSWORD)
81         return old_userrecord
82
83     @staticmethod
84     def _get_roles_users_key(roles):
85         return frozenset(roles)
86
87     @property
88     def _hostcli(self):
89         if self._hostcli_inst is None:
90             self._hostcli_inst = self._hostcli_factory()
91         return self._hostcli_inst
92
93     def _get_uuid(self, username):
94         users = self._hostcli.run('user list', target=self._target)
95         for u in users:
96             if u['User-Name'] == username:
97                 return u['User-ID']
98
99         raise UserManagerError('User {} does not exist in target'.format(username))
100
101     def _get_and_verify_roles(self, roles):
102         self._verify_special_roles(roles)
103         r = self._get_roles(roles)
104         self._verify_roles(r)
105         return r
106
107     def _verify_special_roles(self, roles):
108         if len(roles) == 1:
109             return
110
111         for special_role in self._special_roles:
112             if special_role in roles:
113                 raise UserManagerError(
114                     'Special role {special_role!r} and other roles in {roles}'.format(
115                         special_role=special_role,
116                         roles=roles))
117
118     def _get_roles(self, roles):
119         if set(roles) == set([self._all_roles]):
120             return self._roles
121         if set(roles) == set([self._no_roles]):
122             return []
123         return roles
124
125     def _verify_roles(self, roles):
126         given_roles = set(roles)
127         if len(roles) > len(given_roles):
128             raise UserManagerError('Duplicate roles in {}'.format(roles))
129         target_roles = set(self._roles)
130         notexisting = given_roles - target_roles
131         if notexisting:
132             raise UserManagerError('Roles {} not found'.format(notexisting))
133
134     def _create_with_roles(self, roles):
135         username = self._user_gen.create_username(roles)
136         uuid = self._create_user_from_user(username, roles)
137         return UserRecord(uuid=uuid,
138                           username=username,
139                           password=INITIAL_PASSWORD,
140                           roles=roles)
141
142     def _create_user_from_user(self, username, roles):
143         self._hostcli.run_raw('user create {username} '
144                               '--email user@email.me '
145                               '--password {password}'.format(
146                                   username=username,
147                                   password=INITIAL_PASSWORD), target=self._target)
148         uuid = self._get_uuid(username)
149         for role in roles:
150             self._hostcli.run_raw('user add role {uuid} {role}'.format(uuid=uuid,
151                                                                        role=role),
152                                   target=self._target)
153         return uuid
154
155     def _change_password(self, roles):
156         self._hostcli.run_raw(
157             'user set password --opassword {old} --npassword {new}'.format(
158                 old=INITIAL_PASSWORD,
159                 new=PASSWORD),
160             target='default.set_password:{}'.format(','.join(roles)))
161
162     @property
163     def _user_gen(self):
164         if self._user_gen_inst is None:
165             self._user_gen_inst = UserGen(len(self._roles))
166         return self._user_gen_inst
167
168     @property
169     def _roles(self):
170         if not self._roles_cache:
171             self._roles_cache = self._get_roles_via_hostcli()
172         return self._roles_cache
173
174     def _get_roles_via_hostcli(self):
175         roles = self._hostcli.run('role list all', target=self._target)
176         role_attr = 'Role-Name' if 'Role-Name' in roles[0] else 'Role'
177         return [role[role_attr] for role in roles
178                 if role[role_attr] not in [self._basic_member, self._linux_user]]