3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from collections import namedtuple
19 BASIC_MEMBER = 'basic_member'
21 LINUX_USER = 'linux_user'
23 ROLES = ['role{}'.format(i) for i in range(4)]
26 class UserTuple(namedtuple('UserTuple', ['uuid',
32 d = self._asdict().copy()
33 d['roles'] = sorted(d['roles'])
34 return hash(repr(sorted(d.items())))
38 def __init__(self, username, email='user@email.me', password=None):
39 self.username = username
41 self._password = password
42 self.added_roles = set([])
44 self._password_history = [self.password]
46 def set_password(self, password):
47 self._password = password
48 self._password_history.append(password)
55 def password_history(self):
56 return self._password_history
60 return self._uuid if self._uuid else 'UUID_{}'.format(self.username)
62 def set_uuid(self, uuid):
65 def set_added_roles(self, roles):
66 self.added_roles = set(roles)
68 def add_role(self, role):
69 assert role not in self.roles, 'Role {role} already in {username}'.format(
71 username=self.username)
72 self.added_roles.add(role)
76 return self.added_roles.union(set([BASIC_MEMBER]))
79 return str(self.usertuple) # pragma: no cover
81 def __eq__(self, other):
82 return self.usertuple == other
85 return hash(self.usertuple)
89 return UserTuple(uuid=self.uuid,
90 username=self.username,
92 password=self.password,
96 class HandlerError(Exception):
100 @six.add_metaclass(abc.ABCMeta)
101 class HandlerBase(object):
102 def __init__(self, users):
105 def handle(self, cmd, target):
106 self._handle_target(target)
108 if not cmd.startswith(self._expected_startswith):
111 return self._handle(cmd.split())
114 def _handle_target(self, target):
118 HandlerError: if target cannot be handled by handler
121 @abc.abstractproperty
122 def _expected_startswith(self):
123 """Return the expected start of the command.
127 def _handle(self, args):
128 """Handle and return the command
131 HandlerError: if command cannot be handled by handler
135 class SetPasswordHandler(HandlerBase):
136 """Handler for HostCli command
137 user set password --opassword OPASSWORD --npassword NPASSWORD
139 def __init__(self, users, envcreator):
140 super(SetPasswordHandler, self).__init__(users)
141 self._envcreator = envcreator
145 def _expected_startswith(self):
146 return 'user set password'
148 def _handle_target(self, target):
149 if not target.startswith('default.set_password:'):
151 self._envname = target.split('.')[1]
153 def _handle(self, args):
154 pwds = self._get_passwords(args)
155 openrc_dict = self._envcreator.create(target='default', envname=self._envname)
156 assert pwds.old == openrc_dict['OS_PASSWORD'], (pwds.old,
157 openrc_dict['OS_PASSWORD'])
158 user = self._get_user_for_openrc_dict(openrc_dict)
159 assert pwds.old == user.password
160 user.set_password(pwds.new)
161 return 'Your password has been changed.'
164 def _get_passwords(args):
165 assert args[3] == '--opassword'
166 assert args[5] == '--npassword'
167 p = Passwords(old=args[4], new=args[6])
168 assert p.old != p.new, p
171 def _get_user_for_openrc_dict(self, openrc_dict):
172 username = openrc_dict['OS_USERNAME']
173 for _, user in self._users.items():
174 if user.username == username:
177 raise AssertionError('User {} not found'.format(username)) # pragma: no cover
180 class Passwords(namedtuple('Passwords', ['old', 'new'])):
184 class AdminHandlerBase(HandlerBase): # pylint: disable=abstract-method
185 def _handle_target(self, target):
186 if target != 'default.um_admin':
190 class UserCreateHandler(AdminHandlerBase):
192 def _expected_startswith(self):
195 def _handle(self, args):
196 assert args[3] == '--email'
197 assert args[5] == '--password'
198 user = User(username=args[2],
201 assert user.uuid not in self._users, 'User {} already created'.format(user)
202 self._users[user.uuid] = user
203 return 'User created. The UUID is {uuid}'.format(uuid=user.uuid)
206 class UserListHandler(AdminHandlerBase):
208 def _expected_startswith(self):
211 def _handle(self, args):
212 return [{'Password-Expires': None,
215 'User-Name': u.username} for _, u in self._users.items()]
218 class UserDeleteHandler(AdminHandlerBase):
220 def _expected_startswith(self):
223 def _handle(self, args):
224 assert len(args) == 3, 'Wrong number of arguments for delete {}'.format(args)
225 del self._users[args[2]]
226 return 'User deleted.'
229 class CorruptedUserListHandler(UserListHandler):
230 def _handle(self, args):
234 class UserAddRoleHandler(AdminHandlerBase):
236 def _expected_startswith(self):
237 return 'user add role'
239 def _handle(self, args):
240 self._users[args[3]].add_role(args[4])
241 return 'Role has been added to the user.'
244 class RoleListAllHandler(AdminHandlerBase):
245 def __init__(self, users, role_attr):
246 super(RoleListAllHandler, self).__init__(users)
247 self._role_attr = role_attr
250 def _expected_startswith(self):
251 return 'role list all'
253 def _handle(self, args):
254 assert len(args) == 3, 'Expected: {expected}, actual {actual}'.format(
255 expected=self._expected_startswith.split(),
257 return [{'Role-Description': 'Role Description {}'.format(role),
259 'Is-Service-Role': True,
260 self._role_attr: role} for role in self._roles]
264 return ROLES + [BASIC_MEMBER, LINUX_USER]
267 class Handlers(object): # pylint: disable=too-few-public-methods
268 def __init__(self, handlers):
269 self._handlers = handlers
271 def handle(self, cmd, target):
272 for h in self._handlers:
274 return h.handle(cmd=cmd, target=target)
278 assert 0, 'User Command {!r} not found'.format(cmd) # pragma: no cover
281 class FakeHostCliUser(object):
282 def __init__(self, mock_hostcli, role_attr):
283 self._mock_hostcli = mock_hostcli
284 self._role_attr = role_attr
285 self._envcreator = None
287 self._run_raw_handlers = None
288 self._run_handlers = None
290 def set_envcreator(self, envcreator):
291 self._envcreator = envcreator
293 def initialize(self):
294 self._run_raw_handlers = self._create_handlers(UserCreateHandler,
297 self._create_set_password_handler)
298 self._run_handlers = self._create_handlers(UserListHandler,
299 self._create_role_handler)
300 self._set_side_effects()
304 return set([u for _, u in self._users.items()])
306 def get_user(self, user_id):
307 return self._users[user_id]
309 def set_corrupted_user_list(self):
310 self._run_handlers = self._create_handlers(CorruptedUserListHandler,
311 self._create_role_handler)
313 def _create_role_handler(self, users):
314 return RoleListAllHandler(users, role_attr=self._role_attr)
316 def _create_set_password_handler(self, users):
317 return SetPasswordHandler(users, envcreator=self._envcreator)
319 def _create_handlers(self, *handler_factories):
320 return Handlers([h(self._users) for h in handler_factories])
322 def _set_side_effects(self):
323 self._mock_hostcli.return_value.run.side_effect = self._run
324 self._mock_hostcli.return_value.run_raw.side_effect = self._run_raw
326 def _run_raw(self, cmd, target):
327 return self._run_raw_handlers.handle(cmd, target=target)
329 def _run(self, cmd, target):
330 return self._run_handlers.handle(cmd, target=target)