#!/usr/bin/env python # Copyright 2019 Nokia # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ ambackend module Authorization backend of AM """ from keystoneauth1.identity import v3 from keystoneauth1 import session from keystoneclient.v3 import client from keystoneclient.v3.tokens import TokenManager from keystoneauth1.exceptions.http import Unauthorized, NotFound from access_management.db.amdb import AMDatabase, NotExist import access_management.backend.restlogger as restlog import access_management.config.defaults as defaults class AMBackend(object): """ Authorization backend of AM """ def __init__(self, config): """ Creates an instance of the authorization module Parses config and creates AMDB instance """ self.config = config self.logger = restlog.get_logger(self.config) self.db = AMDatabase(db_name=self.config["DB"]["name"], db_addr=self.config["DB"]["addr"], db_port=int(self.config["DB"]["port"]), db_user=self.config["DB"]["user"], db_pwd=self.config["DB"]["pwd"], logger=self.logger) def is_authorized(self, token, domain="", domain_object="", method="", role_name=""): """ Does the authorization check Validates token and extracts user_id, gets allowed endpoint+method from AMDB :param token: keystone token :param domain: domian part of the endpoint of the request :param domain_object: domain_object part of the endpoint of the request :param method: method of the request :returns: authorization result :rtype: bool """ if domain == "am" and domain_object == "users/ownpasswords": return True, "" tokenmanager = self.make_auth(token) username = "" try: tokeninfo = tokenmanager.validate(token) except Unauthorized as error: self.logger.error("Failed to authenticate with given credentials: {}".format(str(error))) return False, username except NotFound: self.logger.error("Unauthorized token") return False, username except Exception as error: self.logger.error("Failure: {}".format(str(error))) return False, username user_uuid = tokeninfo.user_id username = tokeninfo.username endpoint = {} endpoint["name"] = domain+"/"+domain_object if endpoint["name"] != "/": self.logger.debug("Endpoint checking") try: self.db.connect() except Exception as error: self.logger.error("Failure: {}".format(str(error))) return False, username try: permissions = self.db.get_user_resources(user_uuid) except Exception as error: self.logger.error("Failure: {}".format(str(error))) return False, username finally: try: self.db.close() except Exception as error: self.logger.error("Failure: {}".format(str(error))) return False, username endpoint["splitted"] = endpoint["name"].split("/") endpoint["length"] = len(endpoint["splitted"]) for path in permissions: per_result = self.check_permission(path, endpoint) if per_result: met_result = method in permissions[path] if met_result: self.logger.info("Endpoint authorization successful") return True, username else: self.logger.error("Unauthorized request 1") return False, username else: continue if role_name != "": self.logger.debug("Role checking") try: self.db.connect() except Exception as error: self.logger.error("Failure: {}".format(str(error))) return False, username try: permissions = self.db.get_user_roles(user_uuid) except Exception as error: self.logger.error("Failure: {}".format(str(error))) return False, username finally: try: self.db.close() except Exception as error: self.logger.error("Failure: {}".format(str(error))) return False, username if role_name in permissions: self.logger.info("Role name authorization successful") return True, username self.logger.error("Unauthorized request 2") return False, username def check_permission(self, key, endpoint): """ Checks the permission :param key: permission from the DB :param endpoint: endpoint of the request :returns: checking result :rtype: bool """ key_splitted = key.split("/") key_length = len(key_splitted) if key_length == 1 and endpoint["splitted"][0] == key: return True if endpoint["length"] != key_length: return False for i in range(0, endpoint["length"]): if key_splitted[i][0] == "<": continue if endpoint["splitted"][i] != key_splitted[i]: return False return True def make_auth(self, token): """ Makes a connection to Keystone for token validation :param token: keystone token :returns: instance of keystone's TokenManager :rtype: TokenManager """ auth = v3.Token(auth_url=self.config["Keystone"]["auth_uri"], token=token, project_name=defaults.PROJECT_NAME, project_domain_id="default") sess = session.Session(auth=auth) keystone = client.Client(session=sess) tokenmanager = TokenManager(keystone) return tokenmanager