3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 from collections import namedtuple
16 from .usergen import UserGen
18 INITIAL_PASSWORD = 'intl_AM10srt'
19 PASSWORD = 'um_UM0scrt'
22 class UserRecord(namedtuple('UserRecord', ['uuid',
27 def create_with_password(self, password):
28 d = self._asdict().copy()
29 d.update({'password': password})
30 return UserRecord(**d)
33 class UserManagerError(Exception):
37 class UserManager(object):
39 _target = 'default.um_admin'
40 _all_roles = 'all_roles'
41 _no_roles = 'no_roles'
42 _special_roles = [_all_roles, _no_roles]
43 _basic_member = 'basic_member'
44 _linux_user = 'linux_user'
46 def __init__(self, hostcli_factory):
47 self._hostcli_factory = hostcli_factory
48 self._hostcli_inst = None
49 self._roles_users = dict()
50 self._roles_cache = set()
51 self._user_gen_inst = None
53 def create_user_with_roles(self, *roles):
54 r = self._get_and_verify_roles(roles)
56 key = self._get_roles_users_key(r)
57 if key not in self._roles_users:
58 self._roles_users[key] = self._create_with_roles(r)
59 self._change_password(roles)
61 return self._roles_users[key]
63 def delete_users(self):
64 """Delete all users created by :meth:`.create_user_with_roles`.
66 for _, userrecord in self._roles_users.items():
67 self._hostcli.run_raw('user delete {}'.format(userrecord.uuid),
70 def get_user_and_set_password(self, *roles):
71 """Get user with INITIAL_PASSWORD and set password to PASSWORD.
75 This method should be called only by :class:`cluster.envcreator.EnvCreator`.
77 upd_roles = self._get_roles(roles)
78 key = self._get_roles_users_key(upd_roles)
79 old_userrecord = self._roles_users[key]
80 self._roles_users[key] = old_userrecord.create_with_password(PASSWORD)
84 def _get_roles_users_key(roles):
85 return frozenset(roles)
89 if self._hostcli_inst is None:
90 self._hostcli_inst = self._hostcli_factory()
91 return self._hostcli_inst
93 def _get_uuid(self, username):
94 users = self._hostcli.run('user list', target=self._target)
96 if u['User-Name'] == username:
99 raise UserManagerError('User {} does not exist in target'.format(username))
101 def _get_and_verify_roles(self, roles):
102 self._verify_special_roles(roles)
103 r = self._get_roles(roles)
104 self._verify_roles(r)
107 def _verify_special_roles(self, roles):
111 for special_role in self._special_roles:
112 if special_role in roles:
113 raise UserManagerError(
114 'Special role {special_role!r} and other roles in {roles}'.format(
115 special_role=special_role,
118 def _get_roles(self, roles):
119 if set(roles) == set([self._all_roles]):
121 if set(roles) == set([self._no_roles]):
125 def _verify_roles(self, roles):
126 given_roles = set(roles)
127 if len(roles) > len(given_roles):
128 raise UserManagerError('Duplicate roles in {}'.format(roles))
129 target_roles = set(self._roles)
130 notexisting = given_roles - target_roles
132 raise UserManagerError('Roles {} not found'.format(notexisting))
134 def _create_with_roles(self, roles):
135 username = self._user_gen.create_username(roles)
136 uuid = self._create_user_from_user(username, roles)
137 return UserRecord(uuid=uuid,
139 password=INITIAL_PASSWORD,
142 def _create_user_from_user(self, username, roles):
143 self._hostcli.run_raw('user create {username} '
144 '--email user@email.me '
145 '--password {password}'.format(
147 password=INITIAL_PASSWORD), target=self._target)
148 uuid = self._get_uuid(username)
150 self._hostcli.run_raw('user add role {uuid} {role}'.format(uuid=uuid,
155 def _change_password(self, roles):
156 self._hostcli.run_raw(
157 'user set password --opassword {old} --npassword {new}'.format(
158 old=INITIAL_PASSWORD,
160 target='default.set_password:{}'.format(','.join(roles)))
164 if self._user_gen_inst is None:
165 self._user_gen_inst = UserGen(len(self._roles))
166 return self._user_gen_inst
170 if not self._roles_cache:
171 self._roles_cache = self._get_roles_via_hostcli()
172 return self._roles_cache
174 def _get_roles_via_hostcli(self):
175 roles = self._hostcli.run('role list all', target=self._target)
176 role_attr = 'Role-Name' if 'Role-Name' in roles[0] else 'Role'
177 return [role[role_attr] for role in roles
178 if role[role_attr] not in [self._basic_member, self._linux_user]]