Error handling & logging for broker
[ealt-edge.git] / mecm / mepm / applcm / broker / pkg / handlers / handlersImpl.go
1 /*
2  * Copyright 2020 Huawei Technologies Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package handlers
17
18 import (
19         "archive/zip"
20         "broker/pkg/handlers/adapter/dbAdapter"
21         "broker/pkg/handlers/adapter/pluginAdapter"
22         "broker/pkg/handlers/model"
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "github.com/buger/jsonparser"
27         "github.com/ghodss/yaml"
28         "github.com/google/uuid"
29         "github.com/gorilla/mux"
30         "github.com/jinzhu/gorm"
31         "github.com/sirupsen/logrus"
32         "google.golang.org/grpc/codes"
33         "google.golang.org/grpc/status"
34         "io"
35         "io/ioutil"
36         "net/http"
37         "os"
38         "path/filepath"
39         "strings"
40 )
41
42 // DB name
43 const DbName = "applcmDB"
44
45 // Handler of REST APIs
46 type HandlerImpl struct {
47         logger *logrus.Logger
48         db     *gorm.DB
49 }
50
51 // Creates handler implementation
52 func newHandlerImpl(logger *logrus.Logger) (impl HandlerImpl) {
53         impl.logger = logger
54         impl.db = impl.createDatabase()
55         return
56 }
57
58 // Creates database
59 func (impl *HandlerImpl) createDatabase() *gorm.DB {
60         impl.logger.Info("creating Database...")
61
62         usrpswd := os.Getenv("MYSQL_USER") + ":" + os.Getenv("MYSQL_PASSWORD")
63         host := "@tcp(" + "dbhost" + ":3306)/"
64
65         db, err := gorm.Open("mysql", usrpswd + host)
66         if err != nil {
67                 impl.logger.Fatalf("Database connect error", err.Error())
68         }
69
70         db.Exec("CREATE DATABASE  " + DbName)
71         db.Exec("USE applcmDB")
72         gorm.DefaultCallback.Create().Remove("mysql:set_identity_insert")
73
74         impl.logger.Info("Migrating models...")
75         db.AutoMigrate(&model.AppPackageInfo{})
76         db.AutoMigrate(&model.AppInstanceInfo{})
77         return db
78 }
79
80 // Uploads package
81 func (impl *HandlerImpl) UploadPackage(w http.ResponseWriter, r *http.Request) {
82
83         file, header, err := r.FormFile("file")
84         defer file.Close()
85         if err != nil {
86                 respondError(w, http.StatusBadRequest, err.Error())
87                 return
88         }
89
90         buf := bytes.NewBuffer(nil)
91         if _, err := io.Copy(buf, file); err != nil {
92                 respondError(w, http.StatusBadRequest, err.Error())
93                 return
94         }
95
96         var packageName = ""
97         f := strings.Split(header.Filename, ".")
98         if len(f) > 0 {
99                 packageName = f[0]
100         }
101         impl.logger.Infof(packageName)
102
103         pkgPath := PackageFolderPath + header.Filename
104         newFile, err := os.Create(pkgPath)
105         if err != nil {
106                 respondError(w, http.StatusInternalServerError, err.Error())
107                 return
108         }
109
110         defer newFile.Close()
111         if _, err := newFile.Write(buf.Bytes()); err != nil {
112                 respondError(w, http.StatusInternalServerError, err.Error())
113                 return
114         }
115
116         /* Unzip package to decode appDescriptor */
117         impl.openPackage(w, pkgPath)
118
119         var yamlFile = PackageFolderPath + packageName + "/Definitions/" + "MainServiceTemplate.yaml"
120         appPkgInfo := impl.decodeApplicationDescriptor(w, yamlFile)
121         appPkgInfo.AppPackage = header.Filename
122         appPkgInfo.OnboardingState = "ONBOARDED"
123
124         impl.logger.Infof("Application package info from package")
125         defer r.Body.Close()
126
127         dbAdapter.InsertAppPackageInfo(impl.db, appPkgInfo)
128
129         /*http.StatusOK*/
130         respondJSON(w, http.StatusCreated, appPkgInfo)
131 }
132
133 // Opens package
134 func (impl *HandlerImpl) openPackage(w http.ResponseWriter, packagePath string) {
135         zipReader, _ := zip.OpenReader(packagePath)
136         for _, file := range zipReader.Reader.File {
137
138                 zippedFile, err := file.Open()
139                 if err != nil {
140                         respondError(w, http.StatusBadRequest, err.Error())
141                 }
142                 defer zippedFile.Close()
143
144                 targetDir := PackageFolderPath + "/"
145                 extractedFilePath := filepath.Join(
146                         targetDir,
147                         file.Name,
148                 )
149
150                 if file.FileInfo().IsDir() {
151                         os.MkdirAll(extractedFilePath, file.Mode())
152                 } else {
153                         outputFile, err := os.OpenFile(
154                                 extractedFilePath,
155                                 os.O_WRONLY|os.O_CREATE|os.O_TRUNC,
156                                 file.Mode(),
157                         )
158                         if err != nil {
159                                 respondError(w, http.StatusBadRequest, err.Error())
160                         }
161                         defer outputFile.Close()
162
163                         _, err = io.Copy(outputFile, zippedFile)
164                         if err != nil {
165                                 respondError(w, http.StatusBadRequest, err.Error())
166                         }
167                 }
168         }
169 }
170
171 // Decodes application descriptor
172 func (impl *HandlerImpl) decodeApplicationDescriptor(w http.ResponseWriter, serviceTemplate string) model.AppPackageInfo {
173
174         yamlFile, err := ioutil.ReadFile(serviceTemplate)
175         if err != nil {
176                 respondError(w, http.StatusBadRequest, err.Error())
177         }
178
179         jsondata, err := yaml.YAMLToJSON(yamlFile)
180         if err != nil {
181                 respondError(w, http.StatusBadRequest, err.Error())
182         }
183
184         appDId, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "appDId")
185         appProvider, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "appProvider")
186         appInfoName, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "appInfoName")
187         appSoftVersion, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "appSoftVersion")
188         appDVersion, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "appDVersion")
189         deployType, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "type")
190
191         appPkgInfo := model.AppPackageInfo{
192                 ID:                 string(appDId),
193                 AppDID:             string(appDId),
194                 AppProvider:        string(appProvider),
195                 AppName:            string(appInfoName),
196                 AppSoftwareVersion: string(appSoftVersion),
197                 AppDVersion:        string(appDVersion),
198                 DeployType:         string(deployType),
199         }
200
201         //return appPackageInfo
202         return appPkgInfo
203 }
204
205 // Query application package information
206 func (impl *HandlerImpl) QueryAppPackageInfo(w http.ResponseWriter, r *http.Request) {
207         params := mux.Vars(r)
208         appPkgId := params["appPkgId"]
209         appPkgInfo := dbAdapter.GetAppPackageInfo(impl.db, appPkgId)
210         if appPkgInfo.ID == "" {
211                 respondJSON(w, http.StatusNotFound, "ID not exist")
212                 return
213         }
214         respondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode(appPkgInfo))
215 }
216
217 // Deletes application package
218 func (impl *HandlerImpl) DeleteAppPackage(w http.ResponseWriter, r *http.Request) {
219         params := mux.Vars(r)
220         appPkgId := params["appPkgId"]
221         appPackageInfo := dbAdapter.GetAppPackageInfo(impl.db, appPkgId)
222         if appPackageInfo.ID == "" {
223                 respondJSON(w, http.StatusNotFound, "ID not exist")
224                 return
225         }
226         dbAdapter.DeleteAppPackageInfo(impl.db, appPkgId)
227
228         deletePackage := PackageFolderPath + appPackageInfo.AppPackage
229
230         /* Delete ZIP*/
231         os.Remove(deletePackage)
232         f := strings.Split(appPackageInfo.AppPackage, ".")
233         if len(f) > 0 {
234                 packageName := f[0]
235                 /*Delete unzipped*/
236                 os.Remove(packageName)
237         }
238         respondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode(""))
239 }
240
241 // Creates application instance
242 func (impl *HandlerImpl) CreateAppInstance(w http.ResponseWriter, r *http.Request) {
243         var req model.CreateApplicationReq
244         err := json.NewDecoder(r.Body).Decode(&req)
245         if err != nil {
246                 respondError(w, http.StatusInternalServerError, err.Error())
247                 return
248         }
249
250         appPkgInfo := dbAdapter.GetAppPackageInfo(impl.db, req.AppDID)
251         if appPkgInfo.ID == "" {
252                 respondJSON(w, http.StatusNotFound, "ID not exist")
253                 return
254         }
255         impl.logger.Infof("Query appPkg Info:", appPkgInfo)
256
257         appInstanceId, err := uuid.NewUUID()
258         if err != nil {
259                 respondError(w, http.StatusInternalServerError, err.Error())
260         }
261
262         appInstanceInfo := model.AppInstanceInfo{
263
264                 ID:                     appInstanceId.String(),
265                 AppInstanceName:        req.AppInstancename,
266                 AppInstanceDescription: req.AppInstanceDescriptor,
267                 AppDID:                 req.AppDID,
268                 AppProvider:            appPkgInfo.AppProvider,
269                 AppName:                appPkgInfo.AppName,
270                 AppSoftVersion:         appPkgInfo.AppSoftwareVersion,
271                 AppDVersion:            appPkgInfo.AppDVersion,
272                 AppPkgID:               appPkgInfo.AppDID,
273                 InstantiationState:     "NOT_INSTANTIATED",
274         }
275         dbAdapter.InsertAppInstanceInfo(impl.db, appInstanceInfo)
276         impl.logger.Infof("CreateAppInstance:", req)
277         /*http.StatusOK*/
278         respondJSON(w, http.StatusCreated, json.NewEncoder(w).Encode(appInstanceInfo))
279 }
280
281 // Instantiates application instance
282 func (impl *HandlerImpl) InstantiateAppInstance(w http.ResponseWriter, r *http.Request) {
283         var req model.InstantiateApplicationReq
284         err := json.NewDecoder(r.Body).Decode(&req)
285         if err != nil {
286                 respondError(w, http.StatusInternalServerError, err.Error())
287                 return
288         }
289
290         params := mux.Vars(r)
291         appInstanceId := params["appInstanceId"]
292
293         appInstanceInfo := dbAdapter.GetAppInstanceInfo(impl.db, appInstanceId)
294         appPackageInfo := dbAdapter.GetAppPackageInfo(impl.db, appInstanceInfo.AppDID)
295         if appInstanceInfo.ID == "" || appPackageInfo.ID == "" {
296                 respondJSON(w, http.StatusNotFound, "ID not exist")
297                 return
298         }
299
300         if appInstanceInfo.InstantiationState == "INSTANTIATED" {
301                 respondError(w, http.StatusInternalServerError, "Application already instantiated")
302                 return
303         }
304
305         //remove extension
306         var packageName = ""
307         f := strings.Split(appPackageInfo.AppPackage, ".")
308         if len(f) > 0 {
309                 packageName = f[0]
310         }
311         impl.logger.Infof(packageName)
312
313         var artifact string
314         var pluginInfo string
315
316         switch appPackageInfo.DeployType {
317         case "helm":
318                 pkgPath := PackageFolderPath + packageName + PackageArtifactPath + "Charts"
319                 artifact = impl.getDeploymentArtifact(pkgPath, ".tar")
320                 if artifact == "" {
321                         respondError(w, http.StatusInternalServerError, "artifact not available in application package")
322                         return
323                 }
324                 pluginInfo = "helm.plugin" + ":" + os.Getenv("HELM_PLUGIN_PORT")
325         case "kubernetes":
326                 pkgPath := PackageFolderPath + packageName + PackageArtifactPath + "Kubernetes"
327                 artifact = impl.getDeploymentArtifact(pkgPath, "*.yaml")
328                 if artifact == "" {
329                         respondError(w, http.StatusInternalServerError, "artifact not available in application package")
330                         return
331                 }
332                 pluginInfo = "kubernetes.plugin" + ":" + os.Getenv("KUBERNETES_PLUGIN_PORT")
333         default:
334                 respondError(w, http.StatusInternalServerError, "Deployment type not supported")
335                 return
336         }
337         impl.logger.Infof("Artifact to deploy:", artifact)
338
339         adapter := pluginAdapter.NewPluginAdapter(pluginInfo, impl.logger)
340         workloadId, err := adapter.Instantiate(pluginInfo, req.SelectedMECHostInfo.HostID, artifact)
341         if err != nil {
342                 st, ok := status.FromError(err)
343                 if ok && st.Code() == codes.InvalidArgument {
344                         respondError(w, http.StatusBadRequest, err.Error())
345                         return
346                 } else {
347                         respondError(w, http.StatusInternalServerError, err.Error())
348                 }
349         }
350         dbAdapter.UpdateAppInstanceInfoInstStatusHostAndWorkloadId(impl.db, appInstanceId, "INSTANTIATED", req.SelectedMECHostInfo.HostID, workloadId)
351
352         respondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode(""))
353 }
354
355 // Gets deployment artifact
356 func (impl *HandlerImpl) getDeploymentArtifact(dir string, ext string) string {
357         d, err := os.Open(dir)
358         if err != nil {
359                 impl.logger.Infof("Error: ", err)
360                 return ""
361         }
362         defer d.Close()
363
364         files, err := d.Readdir(-1)
365         if err != nil {
366                 impl.logger.Infof("Error: ", err)
367                 return ""
368         }
369
370         impl.logger.Infof("Directory to read " + dir)
371
372         for _, file := range files {
373                 if file.Mode().IsRegular() {
374                         if filepath.Ext(file.Name()) == ext || filepath.Ext(file.Name()) == ".gz" {
375                                 impl.logger.Infof(file.Name())
376                                 impl.logger.Infof(dir + "/" + file.Name())
377                                 return dir + "/" + file.Name()
378                         }
379                 }
380         }
381         return ""
382 }
383
384 // Queries application instance information
385 func (impl *HandlerImpl) QueryAppInstanceInfo(w http.ResponseWriter, r *http.Request) {
386
387         params := mux.Vars(r)
388         appInstanceId := params["appInstanceId"]
389
390         appInstanceInfo := dbAdapter.GetAppInstanceInfo(impl.db, appInstanceId)
391         appPackageInfo := dbAdapter.GetAppPackageInfo(impl.db, appInstanceInfo.AppDID)
392         if appInstanceInfo.ID == "" || appPackageInfo.ID == "" {
393                 respondJSON(w, http.StatusNotFound, "ID not exist")
394                 return
395         }
396         var instantiatedAppState string
397         if appInstanceInfo.InstantiationState == "INSTANTIATED" {
398
399                 var pluginInfo string
400
401                 switch appPackageInfo.DeployType {
402                 case "helm":
403                         pluginInfo = "helm.plugin" + ":" + os.Getenv("HELM_PLUGIN_PORT")
404                 case "kubernetes":
405                         pluginInfo = "kubernetes.plugin" + ":" + os.Getenv("KUBERNETES_PLUGIN_PORT")
406                 default:
407                         respondError(w, http.StatusInternalServerError, "Deployment type not supported")
408                         return
409                 }
410
411                 adapter := pluginAdapter.NewPluginAdapter(pluginInfo, impl.logger)
412                 state, err := adapter.Query(pluginInfo, appInstanceInfo.Host, appInstanceInfo.WorkloadID)
413                 if err != nil {
414                         respondError(w, http.StatusInternalServerError, err.Error())
415                         return
416                 }
417                 instantiatedAppState = state
418         }
419         appInstanceInfo.InstantiatedAppState = instantiatedAppState
420
421         respondJSON(w, http.StatusCreated, json.NewEncoder(w).Encode(appInstanceInfo))
422 }
423
424 // Queries application lcm operation status
425 func (impl *HandlerImpl) QueryAppLcmOperationStatus(w http.ResponseWriter, r *http.Request) {
426         var req model.QueryApplicationLCMOperStatusReq
427         err := json.NewDecoder(r.Body).Decode(&req)
428         if err != nil {
429                 respondError(w, http.StatusInternalServerError, err.Error())
430                 return
431         }
432
433         fmt.Fprintf(w, "QueryApplicationLCMOperStatus: %+v", req)
434 }
435
436 // Terminates application instance
437 func (impl *HandlerImpl) TerminateAppInstance(w http.ResponseWriter, r *http.Request) {
438         impl.logger.Infof("TerminateAppInstance...")
439         params := mux.Vars(r)
440         appInstanceId := params["appInstanceId"]
441
442         appInstanceInfo := dbAdapter.GetAppInstanceInfo(impl.db, appInstanceId)
443         appPackageInfo := dbAdapter.GetAppPackageInfo(impl.db, appInstanceInfo.AppDID)
444         if appInstanceInfo.ID == "" || appPackageInfo.ID == "" {
445                 respondJSON(w, http.StatusNotFound, "ID not exist")
446                 return
447         }
448
449         if appInstanceInfo.InstantiationState == "NOT_INSTANTIATED" {
450                 respondError(w, http.StatusNotAcceptable, "instantiationState: NOT_INSTANTIATED")
451                 return
452         }
453
454         var pluginInfo string
455         switch appPackageInfo.DeployType {
456         case "helm":
457                 pluginInfo = "helm.plugin" + ":" + os.Getenv("HELM_PLUGIN_PORT")
458         case "kubernetes":
459                 pluginInfo = "kubernetes.plugin" + ":" + os.Getenv("KUBERNETES_PLUGIN_PORT")
460         default:
461                 respondError(w, http.StatusInternalServerError, "Deployment type not supported")
462                 return
463         }
464
465         adapter := pluginAdapter.NewPluginAdapter(pluginInfo, impl.logger)
466         _, err := adapter.Terminate(pluginInfo, appInstanceInfo.Host, appInstanceInfo.WorkloadID)
467         if err != nil {
468                 respondError(w, http.StatusInternalServerError, err.Error())
469                 return
470         }
471         dbAdapter.UpdateAppInstanceInfoInstStatusAndWorkload(impl.db, appInstanceId, "NOT_INSTANTIATED", "")
472
473         respondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode(""))
474 }
475
476 // Deletes application instance identifier
477 func (impl *HandlerImpl) DeleteAppInstanceIdentifier(w http.ResponseWriter, r *http.Request) {
478         impl.logger.Infof("DeleteAppInstanceIdentifier:")
479         params := mux.Vars(r)
480         appInstanceId := params["appInstanceId"]
481
482         dbAdapter.DeleteAppInstanceInfo(impl.db, appInstanceId)
483         respondJSON(w, http.StatusOK, json.NewEncoder(w).Encode(""))
484 }
485
486 // It makes the JSON
487 func respondJSON(w http.ResponseWriter, status int, payload interface{}) {
488         response, err := json.Marshal(payload)
489         if err != nil {
490                 w.WriteHeader(http.StatusInternalServerError)
491                 w.Write([]byte(err.Error()))
492                 return
493         }
494         w.Header().Set("Content-Type", "application/json")
495         w.WriteHeader(status)
496         w.Write([]byte(response))
497 }
498
499 // RespondError makes the error response with payload as json format
500 func respondError(w http.ResponseWriter, code int, message string) {
501         respondJSON(w, code, map[string]string{"error": message})
502 }