3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 import access_management.db.amdb as amdb
20 import yarf.restfullogger as logger
21 from cmframework.apis import cmclient
22 from keystoneauth1 import session
23 from keystoneauth1 import exceptions
24 from keystoneclient.v3 import client
25 from keystoneauth1.identity import v3
26 from yarf.restresource import RestResource
27 from access_management.config.amconfigparser import AMConfigParser
28 import access_management.config.defaults as defaults
31 class AMApiBase(RestResource):
33 The AMApiBase is the base class that all Access Management REST API endpoints should inherit form. It
34 implements some basic helper methods helping the handling of requests.
38 super(AMApiBase, self).__init__()
39 self.logger = logger.get_logger()
40 configparser = AMConfigParser()
41 self.config = configparser.parse()
42 self.db = amdb.AMDatabase(db_name=self.config["DB"]["name"], db_addr=self.config["DB"]["addr"],
43 db_port=int(self.config["DB"]["port"]), db_user=self.config["DB"]["user"],
44 db_pwd=self.config["DB"]["pwd"], logger=self.logger)
45 if self.get_token() != "":
46 self.keystone = self.auth_keystone()
47 self.token = self.get_token()
50 def error_handler(func):
51 def error_handled_function(*args, **kwargs):
53 ret = func(*args, **kwargs)
55 except Exception as err:
56 traceback_info = traceback.format_exc()
57 return AMApiBase.construct_error_response(
59 "Server side error:{0}->{1}\nTraceback:\n{2}".format(err.__class__.__name__,
62 return error_handled_function
65 def construct_error_response(code, description):
67 Constructs an error response with the given code and message
71 :type description: str
74 return AMApiBase.embed_data({}, code, description)
77 def embed_data(data, code=0, desc=""):
79 Embeds the data into the NCIR Restfulframework preferred format.
80 :param data: The data to encapsulate, it should be a dictionary
81 :param code: The error code, it should be 0 on success. (Default: 0)
82 :param desc: The description of the error, if no error happened it can be an empty string.
86 :return: The encapsulated data as a dictionary.
95 Helper function to handle special cases like all or an empty string
96 :return: The parsed arguments with all and empty string replaced with None
99 args = self.parser.parse_args()
101 for key, value in args.iteritems():
102 if value == "all" or value == "":
106 def get_user_from_uuid(self, uuid):
107 self.logger.debug("Start get_user_from_uuid")
109 s_user = self.keystone.users.get(uuid)
110 except exceptions.http.NotFound as ex:
111 self.logger.error("{0}".format(ex))
112 return 'None', defaults.PROJECT_NAME
113 except Exception as ex:
114 self.logger.error("{0}".format(ex))
115 return 'None', defaults.PROJECT_NAME
119 project = s_user.default_project_id
120 except AttributeError:
125 def id_validator(self, uuid):
126 if re.match("^[0-9a-f]+$", uuid) is not None:
131 def passwd_validator(self, passwd):
132 if (re.search(r"^(?=.*?[A-Z])(?=.*?[0-9])(?=.*?[][.,:;/(){}<>~\!?@#$%^&*_=+-])[][a-zA-Z0-9.,:;/(){}<>~\!?@#$%^&*_=+-]{8,255}$", passwd) is None):
133 return "The password must have a minimum length of 8 characters (maximum is 255 characters). The allowed characters are lower case letters (a-z), upper case letters (A-Z), digits (0-9), and special characters (][.,:;/(){}<>~\\!?@#$%^&*_=+-). The password must contain at least one upper case letter, one digit and one special character."
134 pwd_dict_check = os.system("echo '{0}' | cracklib-check | grep OK &>/dev/null".format(passwd))
135 if pwd_dict_check != 0:
136 return "The password is incorrect: It cannot contain a dictionary word."
139 def get_role_id(self, role_name):
140 self.logger.debug("Start get_role_id")
142 role_list = self.keystone.roles.list()
143 except Exception as ex:
144 self.logger.error("{0}".format(ex))
145 return False, "{0}".format(ex)
147 for role in role_list:
148 if role.name == role_name:
151 def get_project_id(self, project_name):
152 self.logger.debug("Start get_project_id")
155 project_list = self.keystone.projects.list()
159 for project in project_list:
160 if project.name == project_name:
161 return str(project.id)
163 def get_uuid_from_token(self):
164 self.logger.debug("Start get_uuid_from_token")
166 token_data = self.keystone.tokens.get_token_data(self.get_token())
167 except exceptions.http.NotFound as ex:
168 self.logger.error("{0}".format(ex))
170 except Exception as ex:
171 self.logger.error("{0}".format(ex))
173 self.logger.debug({"Token owner": token_data["token"]["user"]["id"]})
174 return token_data["token"]["user"]["id"]
176 def send_role_request_and_check_response(self, role_id, user_id, method, proj_id):
179 self.keystone.roles.grant(role_id, user=user_id, project=proj_id)
180 elif method == "delete":
181 self.keystone.roles.revoke(role_id, user=user_id, project=proj_id)
183 return False, "Not allowed method for role modification"
184 except Exception as ex:
185 self.logger.error("{0}".format(ex))
186 return False, "{0}".format(ex)
190 def modify_role_in_keystone(self, role_name, user_id, method, project, need_admin_role = True):
191 um_proj_id = self.get_project_id(defaults.PROJECT_NAME)
192 if um_proj_id is None:
193 message = "The "+defaults.PROJECT_NAME+" project not found!"
194 self.logger.error(message)
195 return False, message
196 role_id = self.get_role_id(role_name)
198 message = "{} user role not found!".format(role_name)
199 self.logger.error(message)
200 return False, message
202 state, message = self.send_role_request_and_check_response(role_id, user_id, method, um_proj_id)
203 if project and project != um_proj_id:
204 state, message = self.send_role_request_and_check_response(role_id, user_id, method, project)
206 if need_admin_role and (role_name == defaults.INF_ADMIN_ROLE_NAME or role_name == defaults.OS_ADMIN_ROLE_NAME):
207 admin_role_id = self.get_role_id(defaults.KS_ADMIN_NAME)
208 if admin_role_id is None:
209 message = "The admin user role not found!"
210 self.logger.error(message)
211 return False, message
212 state, message = self.send_role_request_and_check_response(admin_role_id, user_id, method, um_proj_id)
213 if project and project != um_proj_id:
214 state, message = self.send_role_request_and_check_response(admin_role_id, user_id, method, project)
216 return state, message
221 except Exception as err:
223 return True, "DB closed"
228 except Exception as err:
230 return True, "DB opened"
232 def check_chroot_linux_state(self, username, list_name, state):
233 cmc = cmclient.CMClient()
234 user_list = cmc.get_property(list_name)
235 user_list = json.loads(user_list)
236 self.logger.debug("Start the user list check")
237 self.logger.debug("Checked {0} user list : {1}".format(list_name, json.dumps(user_list)))
238 for val in user_list:
239 if val["name"] == username and val["state"] == state:
240 self.logger.debug("{0} checked!".format(username))
242 self.logger.debug("{0} failed to check!".format(username))
245 def auth_keystone(self):
246 auth = v3.Token(auth_url=self.config["Keystone"]["auth_uri"],
247 token=self.get_token())
248 sess = session.Session(auth=auth)
249 keystone = client.Client(session=sess)
252 def auth_keystone_with_pass(self, passwd, username=None, uuid=None):
253 if not username and not uuid:
256 auth = v3.Password(auth_url=self.config["Keystone"]["auth_uri"],
259 project_name=defaults.PROJECT_NAME,
260 user_domain_id="default",
261 project_domain_id="default")
263 auth = v3.Password(auth_url=self.config["Keystone"]["auth_uri"],
266 project_name=defaults.PROJECT_NAME,
267 user_domain_id="default",
268 project_domain_id="default")
269 sess = session.Session(auth=auth)
270 keystone = client.Client(session=sess)
273 def get_uuid_and_name(self, user):
275 u_list = self.keystone.users.list()
276 except Exception as ex:
277 self.logger.error("{0}".format(ex))
278 return False, "{0}".format(ex)
279 for element in u_list:
280 if user == element.id or user == element.name:
283 project = element.default_project_id
284 self.logger.debug("{0},{1},{2}".format(name, id, project))
285 return True, {"name": name, "id": id, "project": project}
286 self.logger.error("{0} user does not exist in the keystone!".format(user))
287 return False, {"{0} user does not exist in the keystone!".format(user)}