2 * Copyright 2020 Huawei Technologies Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 "broker/pkg/handlers/adapter/dbAdapter"
21 "broker/pkg/handlers/adapter/pluginAdapter"
22 "broker/pkg/handlers/model"
26 "github.com/buger/jsonparser"
27 "github.com/ghodss/yaml"
28 "github.com/google/uuid"
29 "github.com/gorilla/mux"
30 "github.com/sirupsen/logrus"
31 "google.golang.org/grpc/codes"
32 "google.golang.org/grpc/status"
41 // Handler of REST APIs
42 type HandlerImpl struct {
44 dbAdapter *dbAdapter.DbAdapter
47 // Creates handler implementation
48 func newHandlerImpl(logger *logrus.Logger) (impl HandlerImpl) {
50 impl.dbAdapter = dbAdapter.NewDbAdapter(logger)
51 impl.dbAdapter.CreateDatabase()
56 func (impl *HandlerImpl) UploadPackage(w http.ResponseWriter, r *http.Request) {
58 file, header, err := r.FormFile("file")
61 respondError(w, http.StatusBadRequest, err.Error())
65 buf := bytes.NewBuffer(nil)
66 if _, err := io.Copy(buf, file); err != nil {
67 respondError(w, http.StatusBadRequest, err.Error())
72 f := strings.Split(header.Filename, ".")
76 impl.logger.Infof(packageName)
78 pkgPath := PackageFolderPath + header.Filename
79 newFile, err := os.Create(pkgPath)
81 respondError(w, http.StatusInternalServerError, err.Error())
86 if _, err := newFile.Write(buf.Bytes()); err != nil {
87 respondError(w, http.StatusInternalServerError, err.Error())
91 /* Unzip package to decode appDescriptor */
92 impl.openPackage(w, pkgPath)
94 var yamlFile = PackageFolderPath + packageName + "/Definitions/" + "MainServiceTemplate.yaml"
95 appPkgInfo := impl.decodeApplicationDescriptor(w, yamlFile)
96 appPkgInfo.AppPackage = header.Filename
97 appPkgInfo.OnboardingState = "ONBOARDED"
99 impl.logger.Infof("Application package info from package")
102 impl.dbAdapter.InsertAppPackageInfo(appPkgInfo)
105 respondJSON(w, http.StatusCreated, appPkgInfo)
109 func (impl *HandlerImpl) openPackage(w http.ResponseWriter, packagePath string) {
110 zipReader, _ := zip.OpenReader(packagePath)
111 for _, file := range zipReader.Reader.File {
113 zippedFile, err := file.Open()
115 respondError(w, http.StatusBadRequest, err.Error())
117 defer zippedFile.Close()
119 targetDir := PackageFolderPath + "/"
120 extractedFilePath := filepath.Join(
125 if file.FileInfo().IsDir() {
126 os.MkdirAll(extractedFilePath, file.Mode())
128 outputFile, err := os.OpenFile(
130 os.O_WRONLY|os.O_CREATE|os.O_TRUNC,
134 respondError(w, http.StatusBadRequest, err.Error())
136 defer outputFile.Close()
138 _, err = io.Copy(outputFile, zippedFile)
140 respondError(w, http.StatusBadRequest, err.Error())
146 // Decodes application descriptor
147 func (impl *HandlerImpl) decodeApplicationDescriptor(w http.ResponseWriter, serviceTemplate string) model.AppPackageInfo {
149 yamlFile, err := ioutil.ReadFile(serviceTemplate)
151 respondError(w, http.StatusBadRequest, err.Error())
154 jsondata, err := yaml.YAMLToJSON(yamlFile)
156 respondError(w, http.StatusBadRequest, err.Error())
159 appDId, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "appDId")
160 appProvider, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "appProvider")
161 appInfoName, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "appInfoName")
162 appSoftVersion, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "appSoftVersion")
163 appDVersion, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "appDVersion")
164 deployType, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "type")
166 appPkgInfo := model.AppPackageInfo{
168 AppDID: string(appDId),
169 AppProvider: string(appProvider),
170 AppName: string(appInfoName),
171 AppSoftwareVersion: string(appSoftVersion),
172 AppDVersion: string(appDVersion),
173 DeployType: string(deployType),
176 //return appPackageInfo
180 // Query application package information
181 func (impl *HandlerImpl) QueryAppPackageInfo(w http.ResponseWriter, r *http.Request) {
182 params := mux.Vars(r)
183 appPkgId := params["appPkgId"]
184 appPkgInfo := impl.dbAdapter.GetAppPackageInfo(appPkgId)
185 if appPkgInfo.ID == "" {
186 respondJSON(w, http.StatusNotFound, "ID not exist")
189 respondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode(appPkgInfo))
192 // Deletes application package
193 func (impl *HandlerImpl) DeleteAppPackage(w http.ResponseWriter, r *http.Request) {
194 params := mux.Vars(r)
195 appPkgId := params["appPkgId"]
196 appPackageInfo := impl.dbAdapter.GetAppPackageInfo(appPkgId)
197 if appPackageInfo.ID == "" {
198 respondJSON(w, http.StatusNotFound, "ID not exist")
201 impl.dbAdapter.DeleteAppPackageInfo(appPkgId)
203 deletePackage := PackageFolderPath + appPackageInfo.AppPackage
206 os.Remove(deletePackage)
207 f := strings.Split(appPackageInfo.AppPackage, ".")
211 os.Remove(packageName)
213 respondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode(""))
216 // Creates application instance
217 func (impl *HandlerImpl) CreateAppInstance(w http.ResponseWriter, r *http.Request) {
218 var req model.CreateApplicationReq
219 err := json.NewDecoder(r.Body).Decode(&req)
221 respondError(w, http.StatusInternalServerError, err.Error())
225 appPkgInfo := impl.dbAdapter.GetAppPackageInfo(req.AppDID)
226 if appPkgInfo.ID == "" {
227 respondJSON(w, http.StatusNotFound, "ID not exist")
230 impl.logger.Infof("Query appPkg Info:", appPkgInfo)
232 appInstanceId, err := uuid.NewUUID()
234 respondError(w, http.StatusInternalServerError, err.Error())
237 appInstanceInfo := model.AppInstanceInfo{
239 ID: appInstanceId.String(),
240 AppInstanceName: req.AppInstancename,
241 AppInstanceDescription: req.AppInstanceDescriptor,
243 AppProvider: appPkgInfo.AppProvider,
244 AppName: appPkgInfo.AppName,
245 AppSoftVersion: appPkgInfo.AppSoftwareVersion,
246 AppDVersion: appPkgInfo.AppDVersion,
247 AppPkgID: appPkgInfo.AppDID,
248 InstantiationState: "NOT_INSTANTIATED",
250 impl.dbAdapter.InsertAppInstanceInfo(appInstanceInfo)
251 impl.logger.Infof("CreateAppInstance:", req)
253 respondJSON(w, http.StatusCreated, json.NewEncoder(w).Encode(appInstanceInfo))
256 // Instantiates application instance
257 func (impl *HandlerImpl) InstantiateAppInstance(w http.ResponseWriter, r *http.Request) {
258 var req model.InstantiateApplicationReq
259 err := json.NewDecoder(r.Body).Decode(&req)
261 respondError(w, http.StatusInternalServerError, err.Error())
265 params := mux.Vars(r)
266 appInstanceId := params["appInstanceId"]
268 appInstanceInfo := impl.dbAdapter.GetAppInstanceInfo(appInstanceId)
269 appPackageInfo := impl.dbAdapter.GetAppPackageInfo(appInstanceInfo.AppDID)
270 if appInstanceInfo.ID == "" || appPackageInfo.ID == "" {
271 respondJSON(w, http.StatusNotFound, "ID not exist")
275 if appInstanceInfo.InstantiationState == "INSTANTIATED" {
276 respondError(w, http.StatusInternalServerError, "Application already instantiated")
282 f := strings.Split(appPackageInfo.AppPackage, ".")
286 impl.logger.Infof(packageName)
289 var pluginInfo string
291 switch appPackageInfo.DeployType {
293 pkgPath := PackageFolderPath + packageName + PackageArtifactPath + "Charts"
294 artifact = impl.getDeploymentArtifact(pkgPath, ".tar")
296 respondError(w, http.StatusInternalServerError, "artifact not available in application package")
299 pluginInfo = "helm.plugin" + ":" + os.Getenv("HELM_PLUGIN_PORT")
300 impl.logger.Infof("Plugin Info ", pluginInfo)
302 pkgPath := PackageFolderPath + packageName + PackageArtifactPath + "Kubernetes"
303 artifact = impl.getDeploymentArtifact(pkgPath, "*.yaml")
305 respondError(w, http.StatusInternalServerError, "artifact not available in application package")
308 pluginInfo = "kubernetes.plugin" + ":" + os.Getenv("KUBERNETES_PLUGIN_PORT")
310 respondError(w, http.StatusInternalServerError, "Deployment type not supported")
313 impl.logger.Infof("Artifact to deploy:", artifact)
315 adapter := pluginAdapter.NewPluginAdapter(pluginInfo, impl.logger)
316 workloadId, err, resStatus := adapter.Instantiate(pluginInfo, req.SelectedMECHostInfo.HostID, artifact)
318 st, ok := status.FromError(err)
319 if ok && st.Code() == codes.InvalidArgument {
320 respondError(w, http.StatusBadRequest, err.Error())
323 respondError(w, http.StatusInternalServerError, err.Error())
327 if resStatus == "Failure" {
328 respondError(w, http.StatusInternalServerError, err.Error())
331 impl.dbAdapter.UpdateAppInstanceInfoInstStatusHostAndWorkloadId(appInstanceId, "INSTANTIATED", req.SelectedMECHostInfo.HostID, workloadId)
332 respondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode(workloadId))
335 // Gets deployment artifact
336 func (impl *HandlerImpl) getDeploymentArtifact(dir string, ext string) string {
337 d, err := os.Open(dir)
339 impl.logger.Infof("Error: ", err)
344 files, err := d.Readdir(-1)
346 impl.logger.Infof("Error: ", err)
350 impl.logger.Infof("Directory to read " + dir)
352 for _, file := range files {
353 if file.Mode().IsRegular() {
354 if filepath.Ext(file.Name()) == ext || filepath.Ext(file.Name()) == ".gz" {
355 impl.logger.Infof(file.Name())
356 impl.logger.Infof(dir + "/" + file.Name())
357 return dir + "/" + file.Name()
364 // Queries application instance information
365 func (impl *HandlerImpl) QueryAppInstanceInfo(w http.ResponseWriter, r *http.Request) {
367 params := mux.Vars(r)
368 appInstanceId := params["appInstanceId"]
370 appInstanceInfo := impl.dbAdapter.GetAppInstanceInfo(appInstanceId)
371 appPackageInfo := impl.dbAdapter.GetAppPackageInfo(appInstanceInfo.AppDID)
372 if appInstanceInfo.ID == "" || appPackageInfo.ID == "" {
373 respondJSON(w, http.StatusNotFound, "ID not exist")
376 var instantiatedAppState string
377 if appInstanceInfo.InstantiationState == "INSTANTIATED" {
379 var pluginInfo string
381 switch appPackageInfo.DeployType {
383 pluginInfo = "helm.plugin" + ":" + os.Getenv("HELM_PLUGIN_PORT")
385 pluginInfo = "kubernetes.plugin" + ":" + os.Getenv("KUBERNETES_PLUGIN_PORT")
387 respondError(w, http.StatusInternalServerError, "Deployment type not supported")
391 adapter := pluginAdapter.NewPluginAdapter(pluginInfo, impl.logger)
392 state, err := adapter.Query(pluginInfo, appInstanceInfo.Host, appInstanceInfo.WorkloadID)
394 respondError(w, http.StatusInternalServerError, err.Error())
397 instantiatedAppState = state
399 appInstanceInfo.InstantiatedAppState = instantiatedAppState
401 respondJSON(w, http.StatusCreated, json.NewEncoder(w).Encode(appInstanceInfo))
404 // Queries application lcm operation status
405 func (impl *HandlerImpl) QueryAppLcmOperationStatus(w http.ResponseWriter, r *http.Request) {
406 var req model.QueryApplicationLCMOperStatusReq
407 err := json.NewDecoder(r.Body).Decode(&req)
409 respondError(w, http.StatusInternalServerError, err.Error())
413 fmt.Fprintf(w, "QueryApplicationLCMOperStatus: %+v", req)
416 // Terminates application instance
417 func (impl *HandlerImpl) TerminateAppInstance(w http.ResponseWriter, r *http.Request) {
418 impl.logger.Infof("TerminateAppInstance...")
419 params := mux.Vars(r)
420 appInstanceId := params["appInstanceId"]
422 appInstanceInfo := impl.dbAdapter.GetAppInstanceInfo(appInstanceId)
423 appPackageInfo := impl.dbAdapter.GetAppPackageInfo(appInstanceInfo.AppDID)
424 if appInstanceInfo.ID == "" || appPackageInfo.ID == "" {
425 respondJSON(w, http.StatusNotFound, "ID not exist")
429 if appInstanceInfo.InstantiationState == "NOT_INSTANTIATED" {
430 respondError(w, http.StatusNotAcceptable, "instantiationState: NOT_INSTANTIATED")
434 var pluginInfo string
435 switch appPackageInfo.DeployType {
437 pluginInfo = "helm.plugin" + ":" + os.Getenv("HELM_PLUGIN_PORT")
439 pluginInfo = "kubernetes.plugin" + ":" + os.Getenv("KUBERNETES_PLUGIN_PORT")
441 respondError(w, http.StatusInternalServerError, "Deployment type not supported")
445 adapter := pluginAdapter.NewPluginAdapter(pluginInfo, impl.logger)
446 _, err := adapter.Terminate(pluginInfo, appInstanceInfo.Host, appInstanceInfo.WorkloadID)
448 respondError(w, http.StatusInternalServerError, err.Error())
451 impl.dbAdapter.UpdateAppInstanceInfoInstStatusAndWorkload(appInstanceId, "NOT_INSTANTIATED", "")
453 respondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode(""))
456 // Deletes application instance identifier
457 func (impl *HandlerImpl) DeleteAppInstanceIdentifier(w http.ResponseWriter, r *http.Request) {
458 impl.logger.Infof("DeleteAppInstanceIdentifier:")
459 params := mux.Vars(r)
460 appInstanceId := params["appInstanceId"]
462 impl.dbAdapter.DeleteAppInstanceInfo(appInstanceId)
463 respondJSON(w, http.StatusOK, json.NewEncoder(w).Encode(""))
467 func respondJSON(w http.ResponseWriter, status int, payload interface{}) {
468 response, err := json.Marshal(payload)
470 w.WriteHeader(http.StatusInternalServerError)
471 w.Write([]byte(err.Error()))
474 w.Header().Set("Content-Type", "application/json")
475 w.WriteHeader(status)
476 w.Write([]byte(response))
479 // RespondError makes the error response with payload as json format
480 func respondError(w http.ResponseWriter, code int, message string) {
481 respondJSON(w, code, map[string]string{"error": message})