Added seed code for access-management.
[ta/access-management.git] / src / access_management / backend / ambackend.py
1 #!/usr/bin/env python
2
3 # Copyright 2019 Nokia
4
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 #     http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 """
18 ambackend module
19 Authorization backend of AM
20 """
21 from keystoneauth1.identity import v3
22 from keystoneauth1 import session
23 from keystoneclient.v3 import client
24 from keystoneclient.v3.tokens import TokenManager
25 from keystoneauth1.exceptions.http import Unauthorized, NotFound
26
27 from access_management.db.amdb import AMDatabase, NotExist
28 import access_management.backend.restlogger as restlog
29 import access_management.config.defaults as defaults
30
31
32 class AMBackend(object):
33     """
34     Authorization backend of AM
35     """
36     def __init__(self, config):
37         """
38         Creates an instance of the authorization module
39         Parses config and creates AMDB instance
40         """
41         self.config = config
42         self.logger = restlog.get_logger(self.config)
43
44         self.db = AMDatabase(db_name=self.config["DB"]["name"], db_addr=self.config["DB"]["addr"],
45                              db_port=int(self.config["DB"]["port"]), db_user=self.config["DB"]["user"],
46                              db_pwd=self.config["DB"]["pwd"], logger=self.logger)
47
48     def is_authorized(self, token, domain="", domain_object="", method="", role_name=""):
49         """
50         Does the authorization check
51         Validates token and extracts user_id, gets allowed endpoint+method from AMDB
52
53         :param token: keystone token
54         :param domain: domian part of the endpoint of the request
55         :param domain_object: domain_object part of the endpoint of the request
56         :param method: method of the request
57         :returns: authorization result
58         :rtype: bool
59         """
60
61         if domain == "am" and domain_object == "users/ownpasswords":
62             return True, ""
63
64         tokenmanager = self.make_auth(token)
65         username = ""
66
67         try:
68             tokeninfo = tokenmanager.validate(token)
69         except Unauthorized as error:
70             self.logger.error("Failed to authenticate with given credentials: {}".format(str(error)))
71             return False, username
72         except NotFound:
73             self.logger.error("Unauthorized token")
74             return False, username
75         except Exception as error:
76             self.logger.error("Failure: {}".format(str(error)))
77             return False, username
78
79         user_uuid = tokeninfo.user_id
80         username = tokeninfo.username
81         endpoint = {}
82         endpoint["name"] = domain+"/"+domain_object
83
84         if endpoint["name"] != "/":
85             self.logger.debug("Endpoint checking")
86             try:
87                 self.db.connect()
88             except Exception as error:
89                 self.logger.error("Failure: {}".format(str(error)))
90                 return False, username
91             try:
92                 permissions = self.db.get_user_resources(user_uuid)
93             except Exception as error:
94                 self.logger.error("Failure: {}".format(str(error)))
95                 return False, username
96             finally:
97                 try:
98                     self.db.close()
99                 except Exception as error:
100                     self.logger.error("Failure: {}".format(str(error)))
101                     return False, username
102
103
104             endpoint["splitted"] = endpoint["name"].split("/")
105             endpoint["length"] = len(endpoint["splitted"])
106             for path in permissions:
107                 per_result = self.check_permission(path, endpoint)
108                 if per_result:
109                     met_result = method in permissions[path]
110                     if met_result:
111                         self.logger.info("Endpoint authorization successful")
112                         return True, username
113                     else:
114                         self.logger.error("Unauthorized request 1")
115                         return False, username
116                 else:
117                     continue
118
119         if role_name != "":
120             self.logger.debug("Role checking")
121             try:
122                 self.db.connect()
123             except Exception as error:
124                 self.logger.error("Failure: {}".format(str(error)))
125                 return False, username
126             try:
127                 permissions = self.db.get_user_roles(user_uuid)
128             except Exception as error:
129                 self.logger.error("Failure: {}".format(str(error)))
130                 return False, username
131             finally:
132                 try:
133                     self.db.close()
134                 except Exception as error:
135                     self.logger.error("Failure: {}".format(str(error)))
136                     return False, username
137
138             if role_name in permissions:
139                 self.logger.info("Role name authorization successful")
140                 return True, username
141
142         self.logger.error("Unauthorized request 2")
143         return False, username
144
145     def check_permission(self, key, endpoint):
146         """
147         Checks the permission
148
149         :param key: permission from the DB
150         :param endpoint: endpoint of the request
151         :returns: checking result
152         :rtype: bool
153         """
154         key_splitted = key.split("/")
155         key_length = len(key_splitted)
156         if key_length == 1 and endpoint["splitted"][0] == key:
157             return True
158         if endpoint["length"] != key_length:
159             return False
160         for i in range(0, endpoint["length"]):
161             if key_splitted[i][0] == "<":
162                 continue
163             if endpoint["splitted"][i] != key_splitted[i]:
164                 return False
165         return True
166
167     def make_auth(self, token):
168         """
169         Makes a connection to Keystone for token validation
170
171         :param token: keystone token
172         :returns: instance of keystone's TokenManager
173         :rtype: TokenManager
174         """
175         auth = v3.Token(auth_url=self.config["Keystone"]["auth_uri"], token=token, project_name=defaults.PROJECT_NAME, project_domain_id="default")
176         sess = session.Session(auth=auth)
177         keystone = client.Client(session=sess)
178         tokenmanager = TokenManager(keystone)
179         return tokenmanager