Add cloudtaf framework
[ta/cloudtaf.git] / libraries / cluster / userverifier.py
1 # Copyright 2019 Nokia
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import itertools
16 import mock
17 import pytest
18 from crl.remotesession.remotesession import RemoteSession
19 from hostcli import HostCli
20 from .usermanager import (
21     UserManager,
22     UserManagerError,
23     PASSWORD,
24     INITIAL_PASSWORD)
25 from .envcreator import EnvCreator
26 from .testutils.fakehostcli import (
27     FakeHostCliUser,
28     User,
29     ROLES)
30 from .usergen import UserGen
31
32
33 class UserVerifier(object):
34     def __init__(self, role_attr):
35         self._mock_hostcli_factory = mock.create_autospec(HostCli)
36         self._mock_remotesession_factory = mock.create_autospec(RemoteSession)
37         self._hostcliuser = FakeHostCliUser(self._mock_hostcli_factory,
38                                             role_attr=role_attr)
39         self._usermanager = UserManager(self._mock_hostcli_factory)
40         self._envcreator = EnvCreator(
41             remotesession_factory=self._mock_remotesession_factory,
42             usermanager=self._usermanager)
43         self._usergen = None
44         self._initialize()
45
46     def _initialize(self):
47         self._setup_remotesession()
48         self._setup_hostcliuser()
49         self._setup_usergen()
50
51     def _setup_remotesession(self):
52         g = self._mock_remotesession_factory.return_value.get_source_update_env_dict
53         g.return_value = {}
54
55     def _setup_hostcliuser(self):
56         self._hostcliuser.set_envcreator(self._envcreator)
57         self._hostcliuser.initialize()
58
59     def _setup_usergen(self):
60         self._usergen = UserGen(len(ROLES))
61
62     def verify_create_user_with_roles(self):
63         for roles in self._roles_gen():
64             actual_user = self._get_actual_user(
65                 self._usermanager.create_user_with_roles(*roles))
66             expected_user = self._get_expected_user(roles)
67             for actual in [actual_user, self._hostcliuser.get_user(expected_user.uuid)]:
68                 assert actual_user == expected_user, (
69                     'expected {expected}, actual {actual}'.format(
70                         expected=expected_user,
71                         actual=actual))
72                 self._assert_password_history(actual_user.uuid)
73
74     def verify_delete_users(self):
75         self.verify_create_user_with_roles()
76         self._usermanager.delete_users()
77         assert not self._hostcliuser.users
78
79     def verify_corrupted_user_list(self):
80         self._hostcliuser.set_corrupted_user_list()
81         with pytest.raises(UserManagerError) as excinfo:
82             self._usermanager.create_user_with_roles(*ROLES)
83
84         assert str(excinfo.value) == 'User all_roles does not exist in target'
85
86     def verify_user_with_roles_notexist(self):
87         notexists = ['notexists']
88         with pytest.raises(UserManagerError) as excinfo:
89             self._usermanager.create_user_with_roles(ROLES[0], *notexists)
90
91         msg = str(excinfo.value)
92         assert msg == 'Roles {} not found'.format(set(notexists)), msg
93
94     def verify_user_roles_duplicates(self):
95         duplicates = (ROLES[0], ROLES[1], ROLES[0])
96         with pytest.raises(UserManagerError) as excinfo:
97             self._usermanager.create_user_with_roles(*duplicates)
98
99         msg = str(excinfo.value)
100         assert msg == 'Duplicate roles in {}'.format(duplicates), msg
101
102     def verify_one_user_per_roles(self):
103         users_list = []
104         for _ in range(2):
105             self._setup_usergen()
106             self.verify_create_user_with_roles()
107             users_list.append(self._hostcliuser.users)
108
109         assert users_list[0] == users_list[1], users_list
110
111     def verify_all_roles(self):
112         userrecord = self._usermanager.create_user_with_roles('all_roles')
113         user = self._hostcliuser.get_user(userrecord.uuid)
114         assert user.username == 'all_roles', user.username
115         assert user.added_roles == set(ROLES)
116
117     def verify_no_roles(self):
118         userrecord = self._usermanager.create_user_with_roles('no_roles')
119         user = self._hostcliuser.get_user(userrecord.uuid)
120         assert user.username == 'no_roles', user.username
121         assert not user.added_roles
122
123     def verify_special_roles_raises(self):
124         for special_role in ['no_roles', 'all_roles']:
125             with pytest.raises(UserManagerError) as excinfo:
126                 roles = (special_role, ROLES[0])
127                 self._usermanager.create_user_with_roles(*roles)
128
129             msg = str(excinfo.value)
130             assert msg == 'Special role {special_role!r} and other roles in {roles}'.format(
131                 special_role=special_role,
132                 roles=roles), msg
133
134     @staticmethod
135     def _get_actual_user(user):
136         u = User(username=user.username, password=user.password)
137         u.set_added_roles(user.roles)
138         u.set_uuid(user.uuid)
139         return u
140
141     def _get_expected_user(self, roles):
142         user = User(username=self._usergen.create_username(roles),
143                     password=PASSWORD)
144         user.set_added_roles(roles)
145         return user
146
147     @staticmethod
148     def _roles_gen():
149         for r in range(len(ROLES) + 1):
150             for roles in itertools.combinations(ROLES, r):
151                 yield roles
152
153     def _assert_password_history(self, user_id):
154         user = self._hostcliuser.get_user(user_id)
155         actual_history = user.password_history
156         expected_history = [INITIAL_PASSWORD, PASSWORD]
157         assert actual_history == expected_history, (
158             'Expected {expected}, actual {actual}'.format(
159                 expected=expected_history,
160                 actual=actual_history))