2 * Copyright 2020 Huawei Technologies Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 "broker/pkg/handlers/adapter/dbAdapter"
21 "broker/pkg/handlers/adapter/pluginAdapter"
22 "broker/pkg/handlers/model"
26 "github.com/buger/jsonparser"
27 "github.com/ghodss/yaml"
28 "github.com/google/uuid"
29 "github.com/gorilla/mux"
30 "github.com/jinzhu/gorm"
31 "github.com/sirupsen/logrus"
32 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/status"
43 const DbName = "applcmDB"
45 // Handler of REST APIs
46 type HandlerImpl struct {
51 // Creates handler implementation
52 func newHandlerImpl(logger *logrus.Logger) (impl HandlerImpl) {
54 impl.db = impl.createDatabase()
59 func (impl *HandlerImpl) createDatabase() *gorm.DB {
60 impl.logger.Info("creating Database...")
62 usrpswd := os.Getenv("MYSQL_USER") + ":" + os.Getenv("MYSQL_PASSWORD")
63 host := "@tcp(" + "dbhost" + ":3306)/"
65 db, err := gorm.Open("mysql", usrpswd + host)
67 impl.logger.Fatalf("Database connect error", err.Error())
70 db.Exec("CREATE DATABASE " + DbName)
71 db.Exec("USE applcmDB")
72 gorm.DefaultCallback.Create().Remove("mysql:set_identity_insert")
74 impl.logger.Info("Migrating models...")
75 db.AutoMigrate(&model.AppPackageInfo{})
76 db.AutoMigrate(&model.AppInstanceInfo{})
81 func (impl *HandlerImpl) UploadPackage(w http.ResponseWriter, r *http.Request) {
83 file, header, err := r.FormFile("file")
86 respondError(w, http.StatusBadRequest, err.Error())
90 buf := bytes.NewBuffer(nil)
91 if _, err := io.Copy(buf, file); err != nil {
92 respondError(w, http.StatusBadRequest, err.Error())
97 f := strings.Split(header.Filename, ".")
101 impl.logger.Infof(packageName)
103 pkgPath := PackageFolderPath + header.Filename
104 newFile, err := os.Create(pkgPath)
106 respondError(w, http.StatusInternalServerError, err.Error())
110 defer newFile.Close()
111 if _, err := newFile.Write(buf.Bytes()); err != nil {
112 respondError(w, http.StatusInternalServerError, err.Error())
116 /* Unzip package to decode appDescriptor */
117 impl.openPackage(w, pkgPath)
119 var yamlFile = PackageFolderPath + packageName + "/Definitions/" + "MainServiceTemplate.yaml"
120 appPkgInfo := impl.decodeApplicationDescriptor(w, yamlFile)
121 appPkgInfo.AppPackage = header.Filename
122 appPkgInfo.OnboardingState = "ONBOARDED"
124 impl.logger.Infof("Application package info from package")
127 dbAdapter.InsertAppPackageInfo(impl.db, appPkgInfo)
130 respondJSON(w, http.StatusCreated, appPkgInfo)
134 func (impl *HandlerImpl) openPackage(w http.ResponseWriter, packagePath string) {
135 zipReader, _ := zip.OpenReader(packagePath)
136 for _, file := range zipReader.Reader.File {
138 zippedFile, err := file.Open()
140 respondError(w, http.StatusBadRequest, err.Error())
142 defer zippedFile.Close()
144 targetDir := PackageFolderPath + "/"
145 extractedFilePath := filepath.Join(
150 if file.FileInfo().IsDir() {
151 os.MkdirAll(extractedFilePath, file.Mode())
153 outputFile, err := os.OpenFile(
155 os.O_WRONLY|os.O_CREATE|os.O_TRUNC,
159 respondError(w, http.StatusBadRequest, err.Error())
161 defer outputFile.Close()
163 _, err = io.Copy(outputFile, zippedFile)
165 respondError(w, http.StatusBadRequest, err.Error())
171 // Decodes application descriptor
172 func (impl *HandlerImpl) decodeApplicationDescriptor(w http.ResponseWriter, serviceTemplate string) model.AppPackageInfo {
174 yamlFile, err := ioutil.ReadFile(serviceTemplate)
176 respondError(w, http.StatusBadRequest, err.Error())
179 jsondata, err := yaml.YAMLToJSON(yamlFile)
181 respondError(w, http.StatusBadRequest, err.Error())
184 appDId, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "appDId")
185 appProvider, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "appProvider")
186 appInfoName, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "appInfoName")
187 appSoftVersion, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "appSoftVersion")
188 appDVersion, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "appDVersion")
189 deployType, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "type")
191 appPkgInfo := model.AppPackageInfo{
193 AppDID: string(appDId),
194 AppProvider: string(appProvider),
195 AppName: string(appInfoName),
196 AppSoftwareVersion: string(appSoftVersion),
197 AppDVersion: string(appDVersion),
198 DeployType: string(deployType),
201 //return appPackageInfo
205 // Query application package information
206 func (impl *HandlerImpl) QueryAppPackageInfo(w http.ResponseWriter, r *http.Request) {
207 params := mux.Vars(r)
208 appPkgId := params["appPkgId"]
209 appPkgInfo := dbAdapter.GetAppPackageInfo(impl.db, appPkgId)
210 if appPkgInfo.ID == "" {
211 respondJSON(w, http.StatusNotFound, "ID not exist")
214 respondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode(appPkgInfo))
217 // Deletes application package
218 func (impl *HandlerImpl) DeleteAppPackage(w http.ResponseWriter, r *http.Request) {
219 params := mux.Vars(r)
220 appPkgId := params["appPkgId"]
221 appPackageInfo := dbAdapter.GetAppPackageInfo(impl.db, appPkgId)
222 if appPackageInfo.ID == "" {
223 respondJSON(w, http.StatusNotFound, "ID not exist")
226 dbAdapter.DeleteAppPackageInfo(impl.db, appPkgId)
228 deletePackage := PackageFolderPath + appPackageInfo.AppPackage
231 os.Remove(deletePackage)
232 f := strings.Split(appPackageInfo.AppPackage, ".")
236 os.Remove(packageName)
238 respondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode(""))
241 // Creates application instance
242 func (impl *HandlerImpl) CreateAppInstance(w http.ResponseWriter, r *http.Request) {
243 var req model.CreateApplicationReq
244 err := json.NewDecoder(r.Body).Decode(&req)
246 respondError(w, http.StatusInternalServerError, err.Error())
250 appPkgInfo := dbAdapter.GetAppPackageInfo(impl.db, req.AppDID)
251 if appPkgInfo.ID == "" {
252 respondJSON(w, http.StatusNotFound, "ID not exist")
255 impl.logger.Infof("Query appPkg Info:", appPkgInfo)
257 appInstanceId, err := uuid.NewUUID()
259 respondError(w, http.StatusInternalServerError, err.Error())
262 appInstanceInfo := model.AppInstanceInfo{
264 ID: appInstanceId.String(),
265 AppInstanceName: req.AppInstancename,
266 AppInstanceDescription: req.AppInstanceDescriptor,
268 AppProvider: appPkgInfo.AppProvider,
269 AppName: appPkgInfo.AppName,
270 AppSoftVersion: appPkgInfo.AppSoftwareVersion,
271 AppDVersion: appPkgInfo.AppDVersion,
272 AppPkgID: appPkgInfo.AppDID,
273 InstantiationState: "NOT_INSTANTIATED",
275 dbAdapter.InsertAppInstanceInfo(impl.db, appInstanceInfo)
276 impl.logger.Infof("CreateAppInstance:", req)
278 respondJSON(w, http.StatusCreated, json.NewEncoder(w).Encode(appInstanceInfo))
281 // Instantiates application instance
282 func (impl *HandlerImpl) InstantiateAppInstance(w http.ResponseWriter, r *http.Request) {
283 var req model.InstantiateApplicationReq
284 err := json.NewDecoder(r.Body).Decode(&req)
286 respondError(w, http.StatusInternalServerError, err.Error())
290 params := mux.Vars(r)
291 appInstanceId := params["appInstanceId"]
293 appInstanceInfo := dbAdapter.GetAppInstanceInfo(impl.db, appInstanceId)
294 appPackageInfo := dbAdapter.GetAppPackageInfo(impl.db, appInstanceInfo.AppDID)
295 if appInstanceInfo.ID == "" || appPackageInfo.ID == "" {
296 respondJSON(w, http.StatusNotFound, "ID not exist")
300 if appInstanceInfo.InstantiationState == "INSTANTIATED" {
301 respondError(w, http.StatusInternalServerError, "Application already instantiated")
307 f := strings.Split(appPackageInfo.AppPackage, ".")
311 impl.logger.Infof(packageName)
314 var pluginInfo string
316 switch appPackageInfo.DeployType {
318 pkgPath := PackageFolderPath + packageName + PackageArtifactPath + "Charts"
319 artifact = impl.getDeploymentArtifact(pkgPath, ".tar")
321 respondError(w, http.StatusInternalServerError, "artifact not available in application package")
324 pluginInfo = "helm.plugin" + ":" + os.Getenv("HELM_PLUGIN_PORT")
326 pkgPath := PackageFolderPath + packageName + PackageArtifactPath + "Kubernetes"
327 artifact = impl.getDeploymentArtifact(pkgPath, "*.yaml")
329 respondError(w, http.StatusInternalServerError, "artifact not available in application package")
332 pluginInfo = "kubernetes.plugin" + ":" + os.Getenv("KUBERNETES_PLUGIN_PORT")
334 respondError(w, http.StatusInternalServerError, "Deployment type not supported")
337 impl.logger.Infof("Artifact to deploy:", artifact)
339 adapter := pluginAdapter.NewPluginAdapter(pluginInfo, impl.logger)
340 workloadId, err := adapter.Instantiate(pluginInfo, req.SelectedMECHostInfo.HostID, artifact)
342 st, ok := status.FromError(err)
343 if ok && st.Code() == codes.InvalidArgument {
344 respondError(w, http.StatusBadRequest, err.Error())
347 respondError(w, http.StatusInternalServerError, err.Error())
350 dbAdapter.UpdateAppInstanceInfoInstStatusHostAndWorkloadId(impl.db, appInstanceId, "INSTANTIATED", req.SelectedMECHostInfo.HostID, workloadId)
352 respondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode(""))
355 // Gets deployment artifact
356 func (impl *HandlerImpl) getDeploymentArtifact(dir string, ext string) string {
357 d, err := os.Open(dir)
359 impl.logger.Infof("Error: ", err)
364 files, err := d.Readdir(-1)
366 impl.logger.Infof("Error: ", err)
370 impl.logger.Infof("Directory to read " + dir)
372 for _, file := range files {
373 if file.Mode().IsRegular() {
374 if filepath.Ext(file.Name()) == ext || filepath.Ext(file.Name()) == ".gz" {
375 impl.logger.Infof(file.Name())
376 impl.logger.Infof(dir + "/" + file.Name())
377 return dir + "/" + file.Name()
384 // Queries application instance information
385 func (impl *HandlerImpl) QueryAppInstanceInfo(w http.ResponseWriter, r *http.Request) {
387 params := mux.Vars(r)
388 appInstanceId := params["appInstanceId"]
390 appInstanceInfo := dbAdapter.GetAppInstanceInfo(impl.db, appInstanceId)
391 appPackageInfo := dbAdapter.GetAppPackageInfo(impl.db, appInstanceInfo.AppDID)
392 if appInstanceInfo.ID == "" || appPackageInfo.ID == "" {
393 respondJSON(w, http.StatusNotFound, "ID not exist")
396 var instantiatedAppState string
397 if appInstanceInfo.InstantiationState == "INSTANTIATED" {
399 var pluginInfo string
401 switch appPackageInfo.DeployType {
403 pluginInfo = "helm.plugin" + ":" + os.Getenv("HELM_PLUGIN_PORT")
405 pluginInfo = "kubernetes.plugin" + ":" + os.Getenv("KUBERNETES_PLUGIN_PORT")
407 respondError(w, http.StatusInternalServerError, "Deployment type not supported")
411 adapter := pluginAdapter.NewPluginAdapter(pluginInfo, impl.logger)
412 state, err := adapter.Query(pluginInfo, appInstanceInfo.Host, appInstanceInfo.WorkloadID)
414 respondError(w, http.StatusInternalServerError, err.Error())
417 instantiatedAppState = state
419 appInstanceInfo.InstantiatedAppState = instantiatedAppState
421 respondJSON(w, http.StatusCreated, json.NewEncoder(w).Encode(appInstanceInfo))
424 // Queries application lcm operation status
425 func (impl *HandlerImpl) QueryAppLcmOperationStatus(w http.ResponseWriter, r *http.Request) {
426 var req model.QueryApplicationLCMOperStatusReq
427 err := json.NewDecoder(r.Body).Decode(&req)
429 respondError(w, http.StatusInternalServerError, err.Error())
433 fmt.Fprintf(w, "QueryApplicationLCMOperStatus: %+v", req)
436 // Terminates application instance
437 func (impl *HandlerImpl) TerminateAppInstance(w http.ResponseWriter, r *http.Request) {
438 impl.logger.Infof("TerminateAppInstance...")
439 params := mux.Vars(r)
440 appInstanceId := params["appInstanceId"]
442 appInstanceInfo := dbAdapter.GetAppInstanceInfo(impl.db, appInstanceId)
443 appPackageInfo := dbAdapter.GetAppPackageInfo(impl.db, appInstanceInfo.AppDID)
444 if appInstanceInfo.ID == "" || appPackageInfo.ID == "" {
445 respondJSON(w, http.StatusNotFound, "ID not exist")
449 if appInstanceInfo.InstantiationState == "NOT_INSTANTIATED" {
450 respondError(w, http.StatusNotAcceptable, "instantiationState: NOT_INSTANTIATED")
454 var pluginInfo string
455 switch appPackageInfo.DeployType {
457 pluginInfo = "helm.plugin" + ":" + os.Getenv("HELM_PLUGIN_PORT")
459 pluginInfo = "kubernetes.plugin" + ":" + os.Getenv("KUBERNETES_PLUGIN_PORT")
461 respondError(w, http.StatusInternalServerError, "Deployment type not supported")
465 adapter := pluginAdapter.NewPluginAdapter(pluginInfo, impl.logger)
466 _, err := adapter.Terminate(pluginInfo, appInstanceInfo.Host, appInstanceInfo.WorkloadID)
468 respondError(w, http.StatusInternalServerError, err.Error())
471 dbAdapter.UpdateAppInstanceInfoInstStatusAndWorkload(impl.db, appInstanceId, "NOT_INSTANTIATED", "")
473 respondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode(""))
476 // Deletes application instance identifier
477 func (impl *HandlerImpl) DeleteAppInstanceIdentifier(w http.ResponseWriter, r *http.Request) {
478 impl.logger.Infof("DeleteAppInstanceIdentifier:")
479 params := mux.Vars(r)
480 appInstanceId := params["appInstanceId"]
482 dbAdapter.DeleteAppInstanceInfo(impl.db, appInstanceId)
483 respondJSON(w, http.StatusOK, json.NewEncoder(w).Encode(""))
487 func respondJSON(w http.ResponseWriter, status int, payload interface{}) {
488 response, err := json.Marshal(payload)
490 w.WriteHeader(http.StatusInternalServerError)
491 w.Write([]byte(err.Error()))
494 w.Header().Set("Content-Type", "application/json")
495 w.WriteHeader(status)
496 w.Write([]byte(response))
499 // RespondError makes the error response with payload as json format
500 func respondError(w http.ResponseWriter, code int, message string) {
501 respondJSON(w, code, map[string]string{"error": message})