3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 from collections import namedtuple
17 from crl.remotesession.envcreatorbase import EnvCreatorBase
20 class EnvCreator(EnvCreatorBase):
23 def __init__(self, remotesession_factory, usermanager):
24 self._remotesession_factory = remotesession_factory
25 self._usermanager = usermanager
26 self._openrc_dict_cache = None
27 self._um_admin_creds_cache = None
29 def create(self, target, envname):
30 openrc_dict = self._get_openrc_dict(target)
31 if envname.startswith('set_password:'):
32 self._set_password_and_update_dict(envname, openrc_dict)
33 elif envname == 'um_admin':
34 self._update_openrc_dict_um_admin(openrc_dict)
35 elif envname != 'admin':
36 self._create_user_and_update_dict(envname, openrc_dict=openrc_dict)
41 def _um_admin_creds(self):
42 if self._um_admin_creds_cache is None:
43 self._setup_um_admin_creds_cache()
44 return self._um_admin_creds_cache
46 def _set_password_and_update_dict(self, envname, openrc_dict):
47 roles = self._get_roles_for_envname(envname)
48 userrecord = self._usermanager.get_user_and_set_password(*roles)
49 self._update_openrc_dict(openrc_dict, userrecord)
52 def _get_roles_for_envname(envname):
53 r = envname.split(':')[1]
54 return [role.strip() for role in r.split(',')] if r else []
57 def _update_openrc_dict(openrc_dict, userrecord):
58 openrc_dict.update({'OS_USERNAME': userrecord.username,
59 'OS_PASSWORD': userrecord.password,
60 'OS_TENANT_NAME': 'infrastructure',
61 'OS_PROJECT_NAME': 'infrastructure'})
63 def _update_openrc_dict_um_admin(self, openrc_dict):
64 openrc_dict.update({'OS_USERNAME': self._um_admin_creds.username,
65 'OS_PASSWORD': self._um_admin_creds.password,
66 'OS_TENANT_NAME': 'infrastructure',
67 'OS_PROJECT_NAME': 'infrastructure'})
69 def _create_user_and_update_dict(self, envname, openrc_dict):
70 roles = [role.strip() for role in envname.split(',')]
71 userrecord = self._usermanager.create_user_with_roles(*roles)
72 self._update_openrc_dict(openrc_dict, userrecord)
74 def _get_openrc_dict(self, target):
75 remotesession = self._remotesession_factory()
76 if self._openrc_dict_cache is None:
77 self._openrc_dict_cache = (
78 remotesession.get_source_update_env_dict(self._openrc,
80 return self._openrc_dict_cache.copy()
82 def _setup_um_admin_creds_cache(self):
83 rem = self._remotesession_factory()
84 res = rem.execute_command_in_target('cat /etc/userconfig/user_config.yaml')
85 u = yaml.load(res.stdout, Loader=yaml.Loader)
87 self._um_admin_creds_cache = Credentials(username=users['initial_user_name'],
88 password=users['initial_user_password'])
91 class Credentials(namedtuple('Credentials', ['username', 'password'])):