Add cloudtaf framework
[ta/cloudtaf.git] / libraries / cluster / testutils / fakehostcli.py
1 # Copyright 2019 Nokia
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import abc
16 from collections import namedtuple
17 import six
18
19 BASIC_MEMBER = 'basic_member'
20
21 LINUX_USER = 'linux_user'
22
23 ROLES = ['role{}'.format(i) for i in range(4)]
24
25
26 class UserTuple(namedtuple('UserTuple', ['uuid',
27                                          'username',
28                                          'email',
29                                          'password',
30                                          'roles'])):
31     def __hash__(self):
32         d = self._asdict().copy()
33         d['roles'] = sorted(d['roles'])
34         return hash(repr(sorted(d.items())))
35
36
37 class User(object):
38     def __init__(self, username, email='user@email.me', password=None):
39         self.username = username
40         self.email = email
41         self._password = password
42         self.added_roles = set([])
43         self._uuid = None
44         self._password_history = [self.password]
45
46     def set_password(self, password):
47         self._password = password
48         self._password_history.append(password)
49
50     @property
51     def password(self):
52         return self._password
53
54     @property
55     def password_history(self):
56         return self._password_history
57
58     @property
59     def uuid(self):
60         return self._uuid if self._uuid else 'UUID_{}'.format(self.username)
61
62     def set_uuid(self, uuid):
63         self._uuid = uuid
64
65     def set_added_roles(self, roles):
66         self.added_roles = set(roles)
67
68     def add_role(self, role):
69         assert role not in self.roles, 'Role {role} already in {username}'.format(
70             role=role,
71             username=self.username)
72         self.added_roles.add(role)
73
74     @property
75     def roles(self):
76         return self.added_roles.union(set([BASIC_MEMBER]))
77
78     def __repr__(self):
79         return str(self.usertuple)  # pragma: no cover
80
81     def __eq__(self, other):
82         return self.usertuple == other
83
84     def __hash__(self):
85         return hash(self.usertuple)
86
87     @property
88     def usertuple(self):
89         return UserTuple(uuid=self.uuid,
90                          username=self.username,
91                          email=self.email,
92                          password=self.password,
93                          roles=self.roles)
94
95
96 class HandlerError(Exception):
97     pass
98
99
100 @six.add_metaclass(abc.ABCMeta)
101 class HandlerBase(object):
102     def __init__(self, users):
103         self._users = users
104
105     def handle(self, cmd, target):
106         self._handle_target(target)
107
108         if not cmd.startswith(self._expected_startswith):
109             raise HandlerError()
110
111         return self._handle(cmd.split())
112
113     @abc.abstractmethod
114     def _handle_target(self, target):
115         """Handle target
116
117         Raises:
118             HandlerError: if target cannot be handled by handler
119         """
120
121     @abc.abstractproperty
122     def _expected_startswith(self):
123         """Return the expected start of the command.
124         """
125
126     @abc.abstractmethod
127     def _handle(self, args):
128         """Handle and return the command
129
130         Raises:
131            HandlerError: if command cannot be handled by handler
132         """
133
134
135 class SetPasswordHandler(HandlerBase):
136     """Handler for HostCli command
137     user set password --opassword OPASSWORD --npassword NPASSWORD
138     """
139     def __init__(self, users, envcreator):
140         super(SetPasswordHandler, self).__init__(users)
141         self._envcreator = envcreator
142         self._envname = None
143
144     @property
145     def _expected_startswith(self):
146         return 'user set password'
147
148     def _handle_target(self, target):
149         if not target.startswith('default.set_password:'):
150             raise HandlerError()
151         self._envname = target.split('.')[1]
152
153     def _handle(self, args):
154         pwds = self._get_passwords(args)
155         openrc_dict = self._envcreator.create(target='default', envname=self._envname)
156         assert pwds.old == openrc_dict['OS_PASSWORD'], (pwds.old,
157                                                         openrc_dict['OS_PASSWORD'])
158         user = self._get_user_for_openrc_dict(openrc_dict)
159         assert pwds.old == user.password
160         user.set_password(pwds.new)
161         return 'Your password has been changed.'
162
163     @staticmethod
164     def _get_passwords(args):
165         assert args[3] == '--opassword'
166         assert args[5] == '--npassword'
167         p = Passwords(old=args[4], new=args[6])
168         assert p.old != p.new, p
169         return p
170
171     def _get_user_for_openrc_dict(self, openrc_dict):
172         username = openrc_dict['OS_USERNAME']
173         for _, user in self._users.items():
174             if user.username == username:
175                 return user
176
177         raise AssertionError('User {} not found'.format(username))  # pragma: no cover
178
179
180 class Passwords(namedtuple('Passwords', ['old', 'new'])):
181     pass
182
183
184 class AdminHandlerBase(HandlerBase):  # pylint: disable=abstract-method
185     def _handle_target(self, target):
186         if target != 'default.um_admin':
187             raise HandlerError()
188
189
190 class UserCreateHandler(AdminHandlerBase):
191     @property
192     def _expected_startswith(self):
193         return 'user create'
194
195     def _handle(self, args):
196         assert args[3] == '--email'
197         assert args[5] == '--password'
198         user = User(username=args[2],
199                     email=args[4],
200                     password=args[6])
201         assert user.uuid not in self._users, 'User {} already created'.format(user)
202         self._users[user.uuid] = user
203         return 'User created. The UUID is {uuid}'.format(uuid=user.uuid)
204
205
206 class UserListHandler(AdminHandlerBase):
207     @property
208     def _expected_startswith(self):
209         return 'user list'
210
211     def _handle(self, args):
212         return [{'Password-Expires': None,
213                  'User-ID': u.uuid,
214                  'Enabled': True,
215                  'User-Name': u.username} for _, u in self._users.items()]
216
217
218 class UserDeleteHandler(AdminHandlerBase):
219     @property
220     def _expected_startswith(self):
221         return 'user delete'
222
223     def _handle(self, args):
224         assert len(args) == 3, 'Wrong number of arguments for delete {}'.format(args)
225         del self._users[args[2]]
226         return 'User deleted.'
227
228
229 class CorruptedUserListHandler(UserListHandler):
230     def _handle(self, args):
231         return []
232
233
234 class UserAddRoleHandler(AdminHandlerBase):
235     @property
236     def _expected_startswith(self):
237         return 'user add role'
238
239     def _handle(self, args):
240         self._users[args[3]].add_role(args[4])
241         return 'Role has been added to the user.'
242
243
244 class RoleListAllHandler(AdminHandlerBase):
245     def __init__(self, users, role_attr):
246         super(RoleListAllHandler, self).__init__(users)
247         self._role_attr = role_attr
248
249     @property
250     def _expected_startswith(self):
251         return 'role list all'
252
253     def _handle(self, args):
254         assert len(args) == 3, 'Expected: {expected}, actual {actual}'.format(
255             expected=self._expected_startswith.split(),
256             actual=args)
257         return [{'Role-Description': 'Role Description {}'.format(role),
258                  'Chroot': False,
259                  'Is-Service-Role': True,
260                  self._role_attr: role} for role in self._roles]
261
262     @property
263     def _roles(self):
264         return ROLES + [BASIC_MEMBER, LINUX_USER]
265
266
267 class Handlers(object):  # pylint: disable=too-few-public-methods
268     def __init__(self, handlers):
269         self._handlers = handlers
270
271     def handle(self, cmd, target):
272         for h in self._handlers:
273             try:
274                 return h.handle(cmd=cmd, target=target)
275             except HandlerError:
276                 pass
277
278         assert 0, 'User Command {!r} not found'.format(cmd)  # pragma: no cover
279
280
281 class FakeHostCliUser(object):
282     def __init__(self, mock_hostcli, role_attr):
283         self._mock_hostcli = mock_hostcli
284         self._role_attr = role_attr
285         self._envcreator = None
286         self._users = {}
287         self._run_raw_handlers = None
288         self._run_handlers = None
289
290     def set_envcreator(self, envcreator):
291         self._envcreator = envcreator
292
293     def initialize(self):
294         self._run_raw_handlers = self._create_handlers(UserCreateHandler,
295                                                        UserAddRoleHandler,
296                                                        UserDeleteHandler,
297                                                        self._create_set_password_handler)
298         self._run_handlers = self._create_handlers(UserListHandler,
299                                                    self._create_role_handler)
300         self._set_side_effects()
301
302     @property
303     def users(self):
304         return set([u for _, u in self._users.items()])
305
306     def get_user(self, user_id):
307         return self._users[user_id]
308
309     def set_corrupted_user_list(self):
310         self._run_handlers = self._create_handlers(CorruptedUserListHandler,
311                                                    self._create_role_handler)
312
313     def _create_role_handler(self, users):
314         return RoleListAllHandler(users, role_attr=self._role_attr)
315
316     def _create_set_password_handler(self, users):
317         return SetPasswordHandler(users, envcreator=self._envcreator)
318
319     def _create_handlers(self, *handler_factories):
320         return Handlers([h(self._users) for h in handler_factories])
321
322     def _set_side_effects(self):
323         self._mock_hostcli.return_value.run.side_effect = self._run
324         self._mock_hostcli.return_value.run_raw.side_effect = self._run_raw
325
326     def _run_raw(self, cmd, target):
327         return self._run_raw_handlers.handle(cmd, target=target)
328
329     def _run(self, cmd, target):
330         return self._run_handlers.handle(cmd, target=target)