2 * Copyright 2020 Huawei Technologies Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 "broker/pkg/handlers/adapter/dbAdapter"
21 "broker/pkg/handlers/adapter/pluginAdapter"
22 "broker/pkg/handlers/model"
33 "github.com/buger/jsonparser"
34 "github.com/ghodss/yaml"
35 "github.com/google/uuid"
36 "github.com/gorilla/mux"
37 "github.com/sirupsen/logrus"
38 "google.golang.org/grpc/codes"
39 "google.golang.org/grpc/status"
42 // Handler of REST APIs
43 type HandlerImpl struct {
45 dbAdapter *dbAdapter.DbAdapter
48 // Creates handler implementation
49 func newHandlerImpl(logger *logrus.Logger) (impl HandlerImpl) {
51 impl.dbAdapter = dbAdapter.NewDbAdapter(logger)
52 impl.dbAdapter.CreateDatabase()
57 func (impl *HandlerImpl) UploadPackage(w http.ResponseWriter, r *http.Request) {
59 file, header, err := r.FormFile("file")
62 respondError(w, http.StatusBadRequest, err.Error())
66 buf := bytes.NewBuffer(nil)
67 if _, err := io.Copy(buf, file); err != nil {
68 respondError(w, http.StatusBadRequest, err.Error())
73 f := strings.Split(header.Filename, ".")
77 impl.logger.Infof(packageName)
79 pkgPath := PackageFolderPath + header.Filename
80 newFile, err := os.Create(pkgPath)
82 respondError(w, http.StatusInternalServerError, err.Error())
87 if _, err := newFile.Write(buf.Bytes()); err != nil {
88 respondError(w, http.StatusInternalServerError, err.Error())
92 /* Unzip package to decode appDescriptor */
93 impl.openPackage(w, pkgPath)
95 var yamlFile = PackageFolderPath + packageName + "/Definitions/" + "MainServiceTemplate.yaml"
96 appPkgInfo := impl.decodeApplicationDescriptor(w, yamlFile)
97 appPkgInfo.AppPackage = header.Filename
98 appPkgInfo.OnboardingState = "ONBOARDED"
100 impl.logger.Infof("Application package info from package")
103 impl.dbAdapter.InsertAppPackageInfo(appPkgInfo)
106 respondJSON(w, http.StatusCreated, appPkgInfo)
110 func (impl *HandlerImpl) openPackage(w http.ResponseWriter, packagePath string) {
111 zipReader, _ := zip.OpenReader(packagePath)
112 for _, file := range zipReader.Reader.File {
114 zippedFile, err := file.Open()
116 respondError(w, http.StatusBadRequest, err.Error())
118 defer zippedFile.Close()
120 targetDir := PackageFolderPath + "/"
121 extractedFilePath := filepath.Join(
126 if file.FileInfo().IsDir() {
127 os.MkdirAll(extractedFilePath, file.Mode())
129 outputFile, err := os.OpenFile(
131 os.O_WRONLY|os.O_CREATE|os.O_TRUNC,
135 respondError(w, http.StatusBadRequest, err.Error())
137 defer outputFile.Close()
139 _, err = io.Copy(outputFile, zippedFile)
141 respondError(w, http.StatusBadRequest, err.Error())
147 // Decodes application descriptor
148 func (impl *HandlerImpl) decodeApplicationDescriptor(w http.ResponseWriter, serviceTemplate string) model.AppPackageInfo {
150 yamlFile, err := ioutil.ReadFile(serviceTemplate)
152 respondError(w, http.StatusBadRequest, err.Error())
155 jsondata, err := yaml.YAMLToJSON(yamlFile)
157 respondError(w, http.StatusBadRequest, err.Error())
160 appDId, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "appDId")
161 appProvider, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "appProvider")
162 appInfoName, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "appInfoName")
163 appSoftVersion, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "appSoftVersion")
164 appDVersion, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "appDVersion")
165 deployType, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "type")
167 appPkgInfo := model.AppPackageInfo{
169 AppDID: string(appDId),
170 AppProvider: string(appProvider),
171 AppName: string(appInfoName),
172 AppSoftwareVersion: string(appSoftVersion),
173 AppDVersion: string(appDVersion),
174 DeployType: string(deployType),
177 //return appPackageInfo
181 // Query application package information
182 func (impl *HandlerImpl) QueryAppPackageInfo(w http.ResponseWriter, r *http.Request) {
183 params := mux.Vars(r)
184 appPkgId := params["appPkgId"]
185 appPkgInfo := impl.dbAdapter.GetAppPackageInfo(appPkgId)
186 if appPkgInfo.ID == "" {
187 respondJSON(w, http.StatusNotFound, "ID not exist")
190 respondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode(appPkgInfo))
193 // Deletes application package
194 func (impl *HandlerImpl) DeleteAppPackage(w http.ResponseWriter, r *http.Request) {
195 params := mux.Vars(r)
196 appPkgId := params["appPkgId"]
197 appPackageInfo := impl.dbAdapter.GetAppPackageInfo(appPkgId)
198 if appPackageInfo.ID == "" {
199 respondJSON(w, http.StatusNotFound, "ID not exist")
202 impl.dbAdapter.DeleteAppPackageInfo(appPkgId)
204 deletePackage := PackageFolderPath + appPackageInfo.AppPackage
207 os.Remove(deletePackage)
208 f := strings.Split(appPackageInfo.AppPackage, ".")
212 os.Remove(packageName)
214 respondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode(""))
217 // Creates application instance
218 func (impl *HandlerImpl) CreateAppInstance(w http.ResponseWriter, r *http.Request) {
219 var req model.CreateApplicationReq
220 err := json.NewDecoder(r.Body).Decode(&req)
222 respondError(w, http.StatusInternalServerError, err.Error())
226 appPkgInfo := impl.dbAdapter.GetAppPackageInfo(req.AppDID)
227 if appPkgInfo.ID == "" {
228 respondJSON(w, http.StatusNotFound, "ID not exist")
231 impl.logger.Infof("Query appPkg Info:", appPkgInfo)
233 appInstanceId, err := uuid.NewUUID()
235 respondError(w, http.StatusInternalServerError, err.Error())
238 appInstanceInfo := model.AppInstanceInfo{
240 ID: appInstanceId.String(),
241 AppInstanceName: req.AppInstancename,
242 AppInstanceDescription: req.AppInstanceDescriptor,
244 AppProvider: appPkgInfo.AppProvider,
245 AppName: appPkgInfo.AppName,
246 AppSoftVersion: appPkgInfo.AppSoftwareVersion,
247 AppDVersion: appPkgInfo.AppDVersion,
248 AppPkgID: appPkgInfo.AppDID,
249 InstantiationState: "NOT_INSTANTIATED",
251 impl.dbAdapter.InsertAppInstanceInfo(appInstanceInfo)
252 impl.logger.Infof("CreateAppInstance:", req)
254 respondJSON(w, http.StatusCreated, json.NewEncoder(w).Encode(appInstanceInfo))
257 // Instantiates application instance
258 func (impl *HandlerImpl) InstantiateAppInstance(w http.ResponseWriter, r *http.Request) {
259 var req model.InstantiateApplicationReq
260 err := json.NewDecoder(r.Body).Decode(&req)
262 respondError(w, http.StatusInternalServerError, err.Error())
266 params := mux.Vars(r)
267 appInstanceId := params["appInstanceId"]
269 appInstanceInfo := impl.dbAdapter.GetAppInstanceInfo(appInstanceId)
270 appPackageInfo := impl.dbAdapter.GetAppPackageInfo(appInstanceInfo.AppDID)
271 if appInstanceInfo.ID == "" || appPackageInfo.ID == "" {
272 respondJSON(w, http.StatusNotFound, "ID not exist")
276 if appInstanceInfo.InstantiationState == "INSTANTIATED" {
277 respondError(w, http.StatusInternalServerError, "Application already instantiated")
283 f := strings.Split(appPackageInfo.AppPackage, ".")
287 impl.logger.Infof(packageName)
290 var pluginInfo string
292 switch appPackageInfo.DeployType {
294 pkgPath := PackageFolderPath + packageName + PackageArtifactPath + "Charts"
295 artifact = impl.getDeploymentArtifact(pkgPath, ".tar")
297 respondError(w, http.StatusInternalServerError, "artifact not available in application package")
300 pluginInfo = "helmplugin" + ":" + os.Getenv("HELM_PLUGIN_PORT")
301 impl.logger.Infof("Plugin Info ", pluginInfo)
303 pkgPath := PackageFolderPath + packageName + PackageArtifactPath + "Kubernetes"
304 artifact = impl.getDeploymentArtifact(pkgPath, "*.yaml")
306 respondError(w, http.StatusInternalServerError, "artifact not available in application package")
309 pluginInfo = "kubernetes.plugin" + ":" + os.Getenv("KUBERNETES_PLUGIN_PORT")
311 respondError(w, http.StatusInternalServerError, "Deployment type not supported")
314 impl.logger.Infof("Artifact to deploy:", artifact)
316 adapter := pluginAdapter.NewPluginAdapter(pluginInfo, impl.logger)
317 workloadId, err, resStatus := adapter.Instantiate(pluginInfo, req.SelectedMECHostInfo.HostID, artifact)
319 st, ok := status.FromError(err)
320 if ok && st.Code() == codes.InvalidArgument {
321 respondError(w, http.StatusBadRequest, err.Error())
324 respondError(w, http.StatusInternalServerError, err.Error())
328 if resStatus == "Failure" {
329 respondError(w, http.StatusInternalServerError, err.Error())
332 impl.dbAdapter.UpdateAppInstanceInfoInstStatusHostAndWorkloadId(appInstanceId, "INSTANTIATED", req.SelectedMECHostInfo.HostID, workloadId)
333 respondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode(workloadId))
336 // Gets deployment artifact
337 func (impl *HandlerImpl) getDeploymentArtifact(dir string, ext string) string {
338 d, err := os.Open(dir)
340 impl.logger.Infof("Error: ", err)
345 files, err := d.Readdir(-1)
347 impl.logger.Infof("Error: ", err)
351 impl.logger.Infof("Directory to read " + dir)
353 for _, file := range files {
354 if file.Mode().IsRegular() {
355 if filepath.Ext(file.Name()) == ext || filepath.Ext(file.Name()) == ".gz" {
356 impl.logger.Infof(file.Name())
357 impl.logger.Infof(dir + "/" + file.Name())
358 return dir + "/" + file.Name()
365 // Queries application instance information
366 func (impl *HandlerImpl) QueryAppInstanceInfo(w http.ResponseWriter, r *http.Request) {
368 params := mux.Vars(r)
369 appInstanceId := params["appInstanceId"]
371 appInstanceInfo := impl.dbAdapter.GetAppInstanceInfo(appInstanceId)
372 appPackageInfo := impl.dbAdapter.GetAppPackageInfo(appInstanceInfo.AppDID)
373 if appInstanceInfo.ID == "" || appPackageInfo.ID == "" {
374 respondJSON(w, http.StatusNotFound, "ID not exist")
377 var instantiatedAppState string
378 if appInstanceInfo.InstantiationState == "INSTANTIATED" {
380 var pluginInfo string
382 switch appPackageInfo.DeployType {
384 pluginInfo = "helmplugin" + ":" + os.Getenv("HELM_PLUGIN_PORT")
386 pluginInfo = "kubernetes.plugin" + ":" + os.Getenv("KUBERNETES_PLUGIN_PORT")
388 respondError(w, http.StatusInternalServerError, "Deployment type not supported")
392 adapter := pluginAdapter.NewPluginAdapter(pluginInfo, impl.logger)
393 state, err := adapter.Query(pluginInfo, appInstanceInfo.Host, appInstanceInfo.WorkloadID)
395 respondError(w, http.StatusInternalServerError, err.Error())
398 instantiatedAppState = state
400 appInstanceInfo.InstantiatedAppState = instantiatedAppState
402 respondJSON(w, http.StatusCreated, json.NewEncoder(w).Encode(appInstanceInfo))
405 // Queries application lcm operation status
406 func (impl *HandlerImpl) QueryAppLcmOperationStatus(w http.ResponseWriter, r *http.Request) {
407 var req model.QueryApplicationLCMOperStatusReq
408 err := json.NewDecoder(r.Body).Decode(&req)
410 respondError(w, http.StatusInternalServerError, err.Error())
414 fmt.Fprintf(w, "QueryApplicationLCMOperStatus: %+v", req)
417 // Terminates application instance
418 func (impl *HandlerImpl) TerminateAppInstance(w http.ResponseWriter, r *http.Request) {
419 impl.logger.Infof("TerminateAppInstance...")
420 params := mux.Vars(r)
421 appInstanceId := params["appInstanceId"]
423 appInstanceInfo := impl.dbAdapter.GetAppInstanceInfo(appInstanceId)
424 appPackageInfo := impl.dbAdapter.GetAppPackageInfo(appInstanceInfo.AppDID)
425 if appInstanceInfo.ID == "" || appPackageInfo.ID == "" {
426 respondJSON(w, http.StatusNotFound, "ID not exist")
430 if appInstanceInfo.InstantiationState == "NOT_INSTANTIATED" {
431 respondError(w, http.StatusNotAcceptable, "instantiationState: NOT_INSTANTIATED")
435 var pluginInfo string
436 switch appPackageInfo.DeployType {
438 pluginInfo = "helmplugin" + ":" + os.Getenv("HELM_PLUGIN_PORT")
440 pluginInfo = "kubernetes.plugin" + ":" + os.Getenv("KUBERNETES_PLUGIN_PORT")
442 respondError(w, http.StatusInternalServerError, "Deployment type not supported")
446 adapter := pluginAdapter.NewPluginAdapter(pluginInfo, impl.logger)
447 _, err := adapter.Terminate(pluginInfo, appInstanceInfo.Host, appInstanceInfo.WorkloadID)
449 respondError(w, http.StatusInternalServerError, err.Error())
452 impl.dbAdapter.UpdateAppInstanceInfoInstStatusAndWorkload(appInstanceId, "NOT_INSTANTIATED", "")
454 respondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode(""))
457 // Deletes application instance identifier
458 func (impl *HandlerImpl) DeleteAppInstanceIdentifier(w http.ResponseWriter, r *http.Request) {
459 impl.logger.Infof("DeleteAppInstanceIdentifier:")
460 params := mux.Vars(r)
461 appInstanceId := params["appInstanceId"]
463 impl.dbAdapter.DeleteAppInstanceInfo(appInstanceId)
464 respondJSON(w, http.StatusOK, json.NewEncoder(w).Encode(""))
468 func respondJSON(w http.ResponseWriter, status int, payload interface{}) {
469 response, err := json.Marshal(payload)
471 w.WriteHeader(http.StatusInternalServerError)
472 w.Write([]byte(err.Error()))
475 w.Header().Set("Content-Type", "application/json")
476 w.WriteHeader(status)
477 w.Write([]byte(response))
480 // RespondError makes the error response with payload as json format
481 func respondError(w http.ResponseWriter, code int, message string) {
482 respondJSON(w, code, map[string]string{"error": message})