# Copyright 2019 Nokia # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import re import os import json import traceback import access_management.db.amdb as amdb import yarf.restfullogger as logger from cmframework.apis import cmclient from keystoneauth1 import session from keystoneauth1 import exceptions from keystoneclient.v3 import client from keystoneauth1.identity import v3 from yarf.restresource import RestResource from access_management.config.amconfigparser import AMConfigParser import access_management.config.defaults as defaults class AMApiBase(RestResource): """ The AMApiBase is the base class that all Access Management REST API endpoints should inherit form. It implements some basic helper methods helping the handling of requests. """ def __init__(self): super(AMApiBase, self).__init__() self.logger = logger.get_logger() configparser = AMConfigParser() self.config = configparser.parse() self.db = amdb.AMDatabase(db_name=self.config["DB"]["name"], db_addr=self.config["DB"]["addr"], db_port=int(self.config["DB"]["port"]), db_user=self.config["DB"]["user"], db_pwd=self.config["DB"]["pwd"], logger=self.logger) if self.get_token() != "": self.keystone = self.auth_keystone() self.token = self.get_token() @staticmethod def error_handler(func): def error_handled_function(*args, **kwargs): try: ret = func(*args, **kwargs) return ret except Exception as err: traceback_info = traceback.format_exc() return AMApiBase.construct_error_response( 255, "Server side error:{0}->{1}\nTraceback:\n{2}".format(err.__class__.__name__, err.message, traceback_info)) return error_handled_function @staticmethod def construct_error_response(code, description): """ Constructs an error response with the given code and message :param code: :param message: :type code: int :type description: str :return: """ return AMApiBase.embed_data({}, code, description) @staticmethod def embed_data(data, code=0, desc=""): """ Embeds the data into the NCIR Restfulframework preferred format. :param data: The data to encapsulate, it should be a dictionary :param code: The error code, it should be 0 on success. (Default: 0) :param desc: The description of the error, if no error happened it can be an empty string. :type data: dict :type code: int :type desc: str :return: The encapsulated data as a dictionary. :rtype: dict """ return {"code": code, "description": desc, "data": data} def parse_args(self): """ Helper function to handle special cases like all or an empty string :return: The parsed arguments with all and empty string replaced with None :rtype: dict """ args = self.parser.parse_args() for key, value in args.iteritems(): if value == "all" or value == "": args[key] = None return args def get_user_from_uuid(self, uuid): self.logger.debug("Start get_user_from_uuid") try: s_user = self.keystone.users.get(uuid) except exceptions.http.NotFound as ex: self.logger.error("{0}".format(ex)) return 'None', defaults.PROJECT_NAME except Exception as ex: self.logger.error("{0}".format(ex)) return 'None', defaults.PROJECT_NAME name = s_user.name try: project = s_user.default_project_id except AttributeError: project = None return name, project def id_validator(self, uuid): if re.match("^[0-9a-f]+$", uuid) is not None: return True else: return False def passwd_validator(self, passwd): if (re.search(r"^(?=.*?[A-Z])(?=.*?[0-9])(?=.*?[][.,:;/(){}<>~\!?@#$%^&*_=+-])[][a-zA-Z0-9.,:;/(){}<>~\!?@#$%^&*_=+-]{8,255}$", passwd) is None): return "The password must have a minimum length of 8 characters (maximum is 255 characters). The allowed characters are lower case letters (a-z), upper case letters (A-Z), digits (0-9), and special characters (][.,:;/(){}<>~\\!?@#$%^&*_=+-). The password must contain at least one upper case letter, one digit and one special character." pwd_dict_check = os.system("echo '{0}' | cracklib-check | grep OK &>/dev/null".format(passwd)) if pwd_dict_check != 0: return "The password is incorrect: It cannot contain a dictionary word." return None def get_role_id(self, role_name): self.logger.debug("Start get_role_id") try: role_list = self.keystone.roles.list() except Exception as ex: self.logger.error("{0}".format(ex)) return False, "{0}".format(ex) for role in role_list: if role.name == role_name: return str(role.id) def get_project_id(self, project_name): self.logger.debug("Start get_project_id") project_id = None try: project_list = self.keystone.projects.list() except Exception: return project_id for project in project_list: if project.name == project_name: return str(project.id) def get_uuid_from_token(self): self.logger.debug("Start get_uuid_from_token") try: token_data = self.keystone.tokens.get_token_data(self.get_token()) except exceptions.http.NotFound as ex: self.logger.error("{0}".format(ex)) return None except Exception as ex: self.logger.error("{0}".format(ex)) return None self.logger.debug({"Token owner": token_data["token"]["user"]["id"]}) return token_data["token"]["user"]["id"] def send_role_request_and_check_response(self, role_id, user_id, method, proj_id): try: if method == "put": self.keystone.roles.grant(role_id, user=user_id, project=proj_id) elif method == "delete": self.keystone.roles.revoke(role_id, user=user_id, project=proj_id) else: return False, "Not allowed method for role modification" except Exception as ex: self.logger.error("{0}".format(ex)) return False, "{0}".format(ex) return True, "OK" def modify_role_in_keystone(self, role_name, user_id, method, project, need_admin_role = True): um_proj_id = self.get_project_id(defaults.PROJECT_NAME) if um_proj_id is None: message = "The "+defaults.PROJECT_NAME+" project not found!" self.logger.error(message) return False, message role_id = self.get_role_id(role_name) if role_id is None: message = "{} user role not found!".format(role_name) self.logger.error(message) return False, message state, message = self.send_role_request_and_check_response(role_id, user_id, method, um_proj_id) if project and project != um_proj_id: state, message = self.send_role_request_and_check_response(role_id, user_id, method, project) if need_admin_role and (role_name == defaults.INF_ADMIN_ROLE_NAME or role_name == defaults.OS_ADMIN_ROLE_NAME): admin_role_id = self.get_role_id(defaults.KS_ADMIN_NAME) if admin_role_id is None: message = "The admin user role not found!" self.logger.error(message) return False, message state, message = self.send_role_request_and_check_response(admin_role_id, user_id, method, um_proj_id) if project and project != um_proj_id: state, message = self.send_role_request_and_check_response(admin_role_id, user_id, method, project) return state, message def _close_db(self): try: self.db.close() except Exception as err: return False, err return True, "DB closed" def _open_db(self): try: self.db.connect() except Exception as err: return False, err return True, "DB opened" def check_chroot_linux_state(self, username, list_name, state): cmc = cmclient.CMClient() user_list = cmc.get_property(list_name) user_list = json.loads(user_list) self.logger.debug("Start the user list check") self.logger.debug("Checked {0} user list : {1}".format(list_name, json.dumps(user_list))) for val in user_list: if val["name"] == username and val["state"] == state: self.logger.debug("{0} checked!".format(username)) return True self.logger.debug("{0} failed to check!".format(username)) return False def auth_keystone(self): auth = v3.Token(auth_url=self.config["Keystone"]["auth_uri"], token=self.get_token()) sess = session.Session(auth=auth) keystone = client.Client(session=sess) return keystone def auth_keystone_with_pass(self, passwd, username=None, uuid=None): if not username and not uuid: return False if username: auth = v3.Password(auth_url=self.config["Keystone"]["auth_uri"], username=username, password=passwd, project_name=defaults.PROJECT_NAME, user_domain_id="default", project_domain_id="default") else: auth = v3.Password(auth_url=self.config["Keystone"]["auth_uri"], user_id=uuid, password=passwd, project_name=defaults.PROJECT_NAME, user_domain_id="default", project_domain_id="default") sess = session.Session(auth=auth) keystone = client.Client(session=sess) return keystone def get_uuid_and_name(self, user): try: u_list = self.keystone.users.list() except Exception as ex: self.logger.error("{0}".format(ex)) return False, "{0}".format(ex) for element in u_list: if user == element.id or user == element.name: name = element.name id = element.id project = element.default_project_id self.logger.debug("{0},{1},{2}".format(name, id, project)) return True, {"name": name, "id": id, "project": project} self.logger.error("{0} user does not exist in the keystone!".format(user)) return False, {"{0} user does not exist in the keystone!".format(user)}