From 3f907eec199eae9c472039ae1903fd5d5b087b15 Mon Sep 17 00:00:00 2001 From: agrawalgaurav Date: Wed, 13 May 2020 20:45:26 +0530 Subject: [PATCH] Error handling & logging for broker Change-Id: I8881f155b47703841d87a91fabb6883606b63ea6 --- mecm/mepm/applcm/broker/cmd/broker/main.go | 29 ++- mecm/mepm/applcm/broker/go.mod | 6 +- mecm/mepm/applcm/broker/go.sum | 15 ++ .../adapter/pluginAdapter/pluginAdapter.go | 91 ++++---- mecm/mepm/applcm/broker/pkg/handlers/handlers.go | 132 ++++------- .../applcm/broker/pkg/handlers/handlersImpl.go | 252 ++++++++++++++------- mecm/mepm/applcm/broker/pkg/plugin/grpcclient.go | 91 ++++---- .../{handlers/common/common.go => util/logger.go} | 32 ++- mecm/mepm/applcm/go.mod | 3 + 9 files changed, 363 insertions(+), 288 deletions(-) rename mecm/mepm/applcm/broker/pkg/{handlers/common/common.go => util/logger.go} (52%) create mode 100644 mecm/mepm/applcm/go.mod diff --git a/mecm/mepm/applcm/broker/cmd/broker/main.go b/mecm/mepm/applcm/broker/cmd/broker/main.go index 6a96500..450cb49 100644 --- a/mecm/mepm/applcm/broker/cmd/broker/main.go +++ b/mecm/mepm/applcm/broker/cmd/broker/main.go @@ -13,25 +13,34 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package main import ( "broker/pkg/handlers" - "log" + "broker/pkg/util" + "github.com/sirupsen/logrus" "os" ) -/* -var ( - GcukCertFile = os.Getenv("GCUK_CERT_FILE") - GcukKeyFile = os.Getenv("GCUK_KEY_FILE") - GcukServiceAddr = os.Getenv("GCUK_SERVICE_ADDR") -)*/ + +const ( + //logFile = "/go/release/logfile" + logFile = "/home/root1/code/akraino/ealt-edge/mecm/mepm/applcm/broker/cmd/broker/logfile" + loggerLevel = logrus.InfoLevel + applcmAddress = "0.0.0.0:8081" +) func main() { - logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile) + // Prepare logger + file, err := os.Create(logFile) + if err != nil { + logrus.Fatal(err) + } + defer file.Close() + + var logger = util.GetLogger(logFile, loggerLevel, file) handler := &handlers.Handlers{} handler.Initialize(logger) - //handler.Run("127.0.0.1:8081") - handler.Run("0.0.0.0:8081") + handler.Run(applcmAddress) } diff --git a/mecm/mepm/applcm/broker/go.mod b/mecm/mepm/applcm/broker/go.mod index dd82bd7..9a6f12c 100644 --- a/mecm/mepm/applcm/broker/go.mod +++ b/mecm/mepm/applcm/broker/go.mod @@ -5,16 +5,14 @@ go 1.14 require ( github.com/buger/jsonparser v0.0.0-20200322175846-f7e751efca13 github.com/ghodss/yaml v1.0.0 - github.com/go-sql-driver/mysql v1.5.0 + github.com/go-sql-driver/mysql v1.5.0 // indirect github.com/golang/protobuf v1.4.0 github.com/google/uuid v1.1.1 github.com/gorilla/mux v1.7.4 github.com/jinzhu/gorm v1.9.12 - golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 // indirect + github.com/sirupsen/logrus v1.6.0 golang.org/x/net v0.0.0-20200506145744-7e3656a0809f - golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 // indirect google.golang.org/grpc v1.29.1 google.golang.org/protobuf v1.22.0 gopkg.in/yaml.v2 v2.2.8 // indirect - honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc // indirect ) diff --git a/mecm/mepm/applcm/broker/go.sum b/mecm/mepm/applcm/broker/go.sum index d938512..c7070df 100644 --- a/mecm/mepm/applcm/broker/go.sum +++ b/mecm/mepm/applcm/broker/go.sum @@ -5,11 +5,14 @@ github.com/buger/jsonparser v0.0.0-20200322175846-f7e751efca13/go.mod h1:tgcrVJ8 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/denisenkom/go-mssqldb v0.0.0-20191124224453-732737034ffd h1:83Wprp6ROGeiHFAP8WJdI2RoxALQYgdllERc3N5N2DM= github.com/denisenkom/go-mssqldb v0.0.0-20191124224453-732737034ffd/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5 h1:Yzb9+7DPaBjB8zlTR87/ElzFsnQfuHnVUVqpZZIcV5Y= github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5/go.mod h1:a2zkGnVExMxdzMo3M0Hi/3sEU+cWnZpSni0O6/Yb/P0= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -43,10 +46,21 @@ github.com/jinzhu/gorm v1.9.12 h1:Drgk1clyWT9t9ERbzHza6Mj/8FY/CqMyVzOiHviMo6Q= github.com/jinzhu/gorm v1.9.12/go.mod h1:vhTjlKSJUTWNtcbQtrMBFCxy7eXTzeCAzfL5fBZT/Qs= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.0.1 h1:HjfetcXq097iXP0uoPCdnM4Efp5/9MsM0/M+XOTeR3M= github.com/jinzhu/now v1.0.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8= +github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/lib/pq v1.1.1 h1:sJZmqHoEaY7f+NPP8pgLB/WxulyR3fewgCM2qaSlBb4= github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/mattn/go-sqlite3 v2.0.1+incompatible h1:xQ15muvnzGBHpIpdrNi1DA5x0+TcBZzsIDwmw9uTHzw= github.com/mattn/go-sqlite3 v2.0.1+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I= +github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191205180655-e7c4368fe9dd h1:GGJVjV8waZKRHrgwvtH66z9ZGVurTD1MT0n1Bb+q4aM= @@ -69,6 +83,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= diff --git a/mecm/mepm/applcm/broker/pkg/handlers/adapter/pluginAdapter/pluginAdapter.go b/mecm/mepm/applcm/broker/pkg/handlers/adapter/pluginAdapter/pluginAdapter.go index d3707f0..822d7cc 100644 --- a/mecm/mepm/applcm/broker/pkg/handlers/adapter/pluginAdapter/pluginAdapter.go +++ b/mecm/mepm/applcm/broker/pkg/handlers/adapter/pluginAdapter/pluginAdapter.go @@ -18,81 +18,94 @@ package pluginAdapter import ( "broker/pkg/plugin" "context" - "log" - "os" + "github.com/sirupsen/logrus" "time" ) -func Instantiate(pluginInfo string, host string, deployArtifact string) (workloadId string, error error) { - logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile) - clientConfig := plugin.ClientGRPCConfig{Address: pluginInfo, ChunkSize: 1024, RootCertificate: ""} +const ( + chunkSize = 1024 + rootCertificate = "" +) + +// Plugin adapter which decides a specific client based on plugin info +// TODO PluginInfo to have other information about plugins to find the client and implementation to handle accordingly. +type PluginAdapter struct { + pluginInfo string + logger *logrus.Logger +} + +// Constructor of PluginAdapter +func NewPluginAdapter(pluginInfo string, logger *logrus.Logger) *PluginAdapter { + return &PluginAdapter{pluginInfo: pluginInfo, logger: logger} +} + +// Instantiate application +func (c *PluginAdapter) Instantiate(pluginInfo string, host string, deployArtifact string) (workloadId string, error error) { + c.logger.Info("Instantation started") + clientConfig := plugin.ClientGRPCConfig{Address: pluginInfo, ChunkSize: chunkSize, RootCertificate: rootCertificate} var client, err = plugin.NewClientGRPC(clientConfig) if err != nil { - logger.Fatalf("failed to create client: %v", err) + c.logger.Errorf("failed to create client: %v", err) return "", err } - log.Printf("pluginInfo: ", pluginInfo) - log.Printf("host: ", host) - log.Printf("deployArtifact: ", deployArtifact) + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Second) defer cancel() //Instantiate workloadId, status, err := client.Instantiate(ctx, deployArtifact, host) if err != nil { - logger.Println("server failed to respond %s", err.Error()) - } else { - logger.Println(workloadId, status) - return workloadId, nil + c.logger.Errorf("server failed to respond %s", err.Error()) + return "", err } - return "", err + c.logger.Info("Instantiation success with workloadId %s, status: %s ", workloadId, status) + return workloadId, nil } -func Query(pluginInfo string, host string, workloadId string) (status string, error error) { - logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile) - clientConfig := plugin.ClientGRPCConfig{Address: pluginInfo, ChunkSize: 1024, RootCertificate: ""} +// Query application +func (c *PluginAdapter) Query(pluginInfo string, host string, workloadId string) (status string, error error) { + c.logger.Info("Query started") + clientConfig := plugin.ClientGRPCConfig{Address: pluginInfo, ChunkSize: chunkSize, RootCertificate: rootCertificate} var client, err = plugin.NewClientGRPC(clientConfig) if err != nil { - logger.Fatalf("failed to create client: %v", err) + c.logger.Errorf("failed to create client: %v", err) return "", err } - log.Printf("pluginInfo: ", pluginInfo) - log.Printf("host: ", host) - log.Printf("workloadId: ", workloadId) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() //Query - stats := client.Query(ctx, host, workloadId) + status, err = client.Query(ctx, host, workloadId) if err != nil { - logger.Fatalf("failed to instantiate: %v", err) - return stats, err + c.logger.Errorf("failed to query: %v", err) + return "", err } - logger.Println("query status: ", stats) - return stats, nil + c.logger.Info("query status: ", status) + return status, nil } -func Terminate(pluginInfo string, host string, workloadId string) (status string, error error) { - logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile) - clientConfig := plugin.ClientGRPCConfig{Address: pluginInfo, ChunkSize: 1024, RootCertificate: ""} +// Terminate application +func (c *PluginAdapter) Terminate(pluginInfo string, host string, workloadId string) (status string, error error) { + c.logger.Info("Terminate started") + clientConfig := plugin.ClientGRPCConfig{Address: pluginInfo, ChunkSize: chunkSize, RootCertificate: rootCertificate} var client, err = plugin.NewClientGRPC(clientConfig) if err != nil { - logger.Fatalf("failed to create client: %v", err) - return + c.logger.Errorf("failed to create client: %v", err) + return "Failure", err } - log.Printf("pluginInfo: ", pluginInfo) - log.Printf("host: ", host) - log.Printf("workloadId: ", workloadId) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() //Terminate - ts := client.Terminate(ctx, host, workloadId) + status, err = client.Terminate(ctx, host, workloadId) + if err != nil { - logger.Fatalf("failed to instantiate: %v", err) - return ts, err + c.logger.Errorf("failed to instantiate: %v", err) + return "Failure", err } - logger.Println("termination status: ", ts) - return ts, nil + c.logger.Info("termination success with status: ", status) + return status, nil } diff --git a/mecm/mepm/applcm/broker/pkg/handlers/handlers.go b/mecm/mepm/applcm/broker/pkg/handlers/handlers.go index d945649..0cb3adb 100644 --- a/mecm/mepm/applcm/broker/pkg/handlers/handlers.go +++ b/mecm/mepm/applcm/broker/pkg/handlers/handlers.go @@ -16,129 +16,91 @@ package handlers import ( - "broker/pkg/handlers/model" - "fmt" "github.com/gorilla/mux" - "github.com/jinzhu/gorm" - _ "github.com/jinzhu/gorm/dialects/mysql" - "log" + "github.com/sirupsen/logrus" "net/http" - "os" - "time" ) -const CreateAppInstance = "/ealtedge/mepm/app_lcm/v1/app_instances" -const InstantiateAppInstance = "/ealtedge/mepm/app_lcm/v1/app_instances/{appInstanceId}/instantiate" -const QueryAppInstanceInfo = "/ealtedge/mepm/app_lcm/v1/app_instances/{appInstanceId}" -const QueryAppLcmOperationStatus = "/ealtedge/mepm/app_lcm/v1/app_lcm_op_occs" -const TerminateAppIns = "/ealtedge/mepm/app_lcm/v1/app_instances/{appInstanceId}/terminate" -const DeleteAppInstanceIdentifier = "/ealtedge/mepm/app_lcm/v1/app_instances/{appInstanceId}" -const OnboardPackage = "/ealtedge/mepm/app_pkgm/v1/app_packages" -const QueryOnboardPackage = "/ealtedge/mepm/app_pkgm/v1/app_packages/{appPkgId}" +// URLS +const ( + CreateAppInstance = "/ealtedge/mepm/app_lcm/v1/app_instances" + InstantiateAppInstance = "/ealtedge/mepm/app_lcm/v1/app_instances/{appInstanceId}/instantiate" + QueryAppInstanceInfo = "/ealtedge/mepm/app_lcm/v1/app_instances/{appInstanceId}" + QueryAppLcmOperationStatus = "/ealtedge/mepm/app_lcm/v1/app_lcm_op_occs" + TerminateAppIns = "/ealtedge/mepm/app_lcm/v1/app_instances/{appInstanceId}/terminate" + DeleteAppInstanceIdentifier = "/ealtedge/mepm/app_lcm/v1/app_instances/{appInstanceId}" + OnboardPackage = "/ealtedge/mepm/app_pkgm/v1/app_packages" + QueryOnboardPackage = "/ealtedge/mepm/app_pkgm/v1/app_packages/{appPkgId}" +) -const PackageFolderPath = "/go/release/application/packages/" -const PackageArtifactPath = "/Artifacts/Deployment/" +// Package paths, to be created in deployment file (docker-compose/k8s yaml/helm) +const ( + PackageFolderPath = "/go/release/application/packages/" + PackageArtifactPath = "/Artifacts/Deployment/" +) +// Handler of REST APIs type Handlers struct { - Router *mux.Router - logger *log.Logger - db *gorm.DB -} - -const DB_NAME = "applcmDB" - -// Run the app on it's router -func (hdlr *Handlers) Run(host string) { - fmt.Println("Binding to port...: %d", host) - log.Fatal(http.ListenAndServe(host, hdlr.Router)) -} - -func createDatabase() *gorm.DB { - fmt.Println("creating Database...") - - usrpswd := os.Getenv("MYSQL_USER") + ":" + os.Getenv("MYSQL_PASSWORD") - host := "@tcp(" + "dbhost" + ":3306)/" - - db, err := gorm.Open("mysql", usrpswd + host) - if err != nil { - fmt.Println("Database connect error", err.Error()) - } -// db = db.Exec("DROP DATABASE IF EXISTS " + DB_NAME) -// db = db.Exec("CREATE DATABASE "+ DB_NAME) - db.Exec("CREATE DATABASE " + DB_NAME) - db.Exec("USE applcmDB") - - //db.Close() - //db, err = gorm.Open("mysql", usrpswd + host + DB_NAME + "?charset=utf8&parseTime=True") - /*if err != nil { - fmt.Println("Database connect error", err.Error()) - } else { - fmt.Println("Database connected successfully") - }*/ - gorm.DefaultCallback.Create().Remove("mysql:set_identity_insert") - - fmt.Println("Migrating models...") - db.AutoMigrate(&model.AppPackageInfo{}) - db.AutoMigrate(&model.AppInstanceInfo{}) - //db.LogMode(true) - return db + router *mux.Router + logger *logrus.Logger + impl HandlerImpl } -// Initialize initializes the app with predefined configuration -func (hdlr *Handlers) Initialize(logger *log.Logger) { - hdlr.Router = mux.NewRouter() - +// Initialize initializes the handler +func (hdlr *Handlers) Initialize(logger *logrus.Logger) { + hdlr.router = mux.NewRouter() hdlr.logger = logger hdlr.setRouters() - hdlr.db = createDatabase() + hdlr.impl = newHandlerImpl(hdlr.logger) } -func (hdlr *Handlers) Logger(next http.HandlerFunc) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - startTime := time.Now() - defer hdlr.logger.Printf("request processed in %s\n", time.Now().Sub(startTime)) - next(w, r) +// Run on it's router +func (hdlr *Handlers) Run(host string) { + hdlr.logger.Info("Server is running on port %s", host) + err := http.ListenAndServe(host, hdlr.router) + if err != nil { + hdlr.logger.Fatalf("Server couldn't run on port %s", host) } } -// setRouters sets the all required routers +// SetRouters sets the all required routers func (hdlr *Handlers) setRouters() { // Routing for handling the requests - hdlr.Post(OnboardPackage, hdlr.handleRequest(UploadFileHldr)) - hdlr.Get(QueryOnboardPackage, hdlr.handleRequest(QueryAppPackageInfo)) - hdlr.Post(CreateAppInstance, hdlr.handleRequest(CreateAppInstanceHldr)) - hdlr.Delete(QueryOnboardPackage, hdlr.handleRequest(DeleteAppPackage)) - hdlr.Post(InstantiateAppInstance, hdlr.handleRequest(InstantiateAppInstanceHldr)) - hdlr.Get(QueryAppInstanceInfo, hdlr.handleRequest(QueryAppInstanceInfoHldr)) - hdlr.Get(QueryAppLcmOperationStatus, hdlr.handleRequest(QueryAppLcmOperationStatusHldr)) - hdlr.Post(TerminateAppIns, hdlr.handleRequest(TerminateAppInsHldr)) - hdlr.Delete(DeleteAppInstanceIdentifier, hdlr.handleRequest(DeleteAppInstanceIdentifierHldr)) + hdlr.Post(OnboardPackage, hdlr.handleRequest(hdlr.impl.UploadPackage)) + hdlr.Get(QueryOnboardPackage, hdlr.handleRequest(hdlr.impl.QueryAppPackageInfo)) + hdlr.Post(CreateAppInstance, hdlr.handleRequest(hdlr.impl.CreateAppInstance)) + hdlr.Delete(QueryOnboardPackage, hdlr.handleRequest(hdlr.impl.DeleteAppPackage)) + hdlr.Post(InstantiateAppInstance, hdlr.handleRequest(hdlr.impl.InstantiateAppInstance)) + hdlr.Get(QueryAppInstanceInfo, hdlr.handleRequest(hdlr.impl.QueryAppInstanceInfo)) + hdlr.Get(QueryAppLcmOperationStatus, hdlr.handleRequest(hdlr.impl.QueryAppLcmOperationStatus)) + hdlr.Post(TerminateAppIns, hdlr.handleRequest(hdlr.impl.TerminateAppInstance)) + hdlr.Delete(DeleteAppInstanceIdentifier, hdlr.handleRequest(hdlr.impl.DeleteAppInstanceIdentifier)) } // Get wraps the router for GET method func (hdlr *Handlers) Get(path string, f func(w http.ResponseWriter, r *http.Request)) { - hdlr.Router.HandleFunc(path, f).Methods("GET") + hdlr.router.HandleFunc(path, f).Methods("GET") } // Post wraps the router for POST method func (hdlr *Handlers) Post(path string, f func(w http.ResponseWriter, r *http.Request)) { - hdlr.Router.HandleFunc(path, f).Methods("POST") + hdlr.router.HandleFunc(path, f).Methods("POST") } // Put wraps the router for PUT method func (hdlr *Handlers) Put(path string, f func(w http.ResponseWriter, r *http.Request)) { - hdlr.Router.HandleFunc(path, f).Methods("PUT") + hdlr.router.HandleFunc(path, f).Methods("PUT") } // Delete wraps the router for DELETE method func (hdlr *Handlers) Delete(path string, f func(w http.ResponseWriter, r *http.Request)) { - hdlr.Router.HandleFunc(path, f).Methods("DELETE") + hdlr.router.HandleFunc(path, f).Methods("DELETE") } -type RequestHandlerFunction func(db *gorm.DB, w http.ResponseWriter, r *http.Request) +type RequestHandlerFunction func(w http.ResponseWriter, r *http.Request) func (hdlr *Handlers) handleRequest(handler RequestHandlerFunction) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - handler(hdlr.db, w, r) + handler(w, r) } } diff --git a/mecm/mepm/applcm/broker/pkg/handlers/handlersImpl.go b/mecm/mepm/applcm/broker/pkg/handlers/handlersImpl.go index ec9e296..cd66846 100644 --- a/mecm/mepm/applcm/broker/pkg/handlers/handlersImpl.go +++ b/mecm/mepm/applcm/broker/pkg/handlers/handlersImpl.go @@ -19,7 +19,6 @@ import ( "archive/zip" "broker/pkg/handlers/adapter/dbAdapter" "broker/pkg/handlers/adapter/pluginAdapter" - "broker/pkg/handlers/common" "broker/pkg/handlers/model" "bytes" "encoding/json" @@ -29,27 +28,68 @@ import ( "github.com/google/uuid" "github.com/gorilla/mux" "github.com/jinzhu/gorm" + "github.com/sirupsen/logrus" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "io" "io/ioutil" - "log" "net/http" "os" "path/filepath" "strings" ) -func UploadFileHldr(db *gorm.DB, w http.ResponseWriter, r *http.Request) { +// DB name +const DbName = "applcmDB" + +// Handler of REST APIs +type HandlerImpl struct { + logger *logrus.Logger + db *gorm.DB +} + +// Creates handler implementation +func newHandlerImpl(logger *logrus.Logger) (impl HandlerImpl) { + impl.logger = logger + impl.db = impl.createDatabase() + return +} + +// Creates database +func (impl *HandlerImpl) createDatabase() *gorm.DB { + impl.logger.Info("creating Database...") + + usrpswd := os.Getenv("MYSQL_USER") + ":" + os.Getenv("MYSQL_PASSWORD") + host := "@tcp(" + "dbhost" + ":3306)/" + + db, err := gorm.Open("mysql", usrpswd + host) + if err != nil { + impl.logger.Fatalf("Database connect error", err.Error()) + } + + db.Exec("CREATE DATABASE " + DbName) + db.Exec("USE applcmDB") + gorm.DefaultCallback.Create().Remove("mysql:set_identity_insert") + + impl.logger.Info("Migrating models...") + db.AutoMigrate(&model.AppPackageInfo{}) + db.AutoMigrate(&model.AppInstanceInfo{}) + return db +} + +// Uploads package +func (impl *HandlerImpl) UploadPackage(w http.ResponseWriter, r *http.Request) { file, header, err := r.FormFile("file") defer file.Close() if err != nil { - common.RespondError(w, http.StatusBadRequest, err.Error()) + respondError(w, http.StatusBadRequest, err.Error()) return } buf := bytes.NewBuffer(nil) if _, err := io.Copy(buf, file); err != nil { - common.RespondError(w, http.StatusBadRequest, err.Error()) + respondError(w, http.StatusBadRequest, err.Error()) return } @@ -58,45 +98,46 @@ func UploadFileHldr(db *gorm.DB, w http.ResponseWriter, r *http.Request) { if len(f) > 0 { packageName = f[0] } - fmt.Println(packageName) + impl.logger.Infof(packageName) pkgPath := PackageFolderPath + header.Filename newFile, err := os.Create(pkgPath) if err != nil { - common.RespondError(w, http.StatusInternalServerError, err.Error()) + respondError(w, http.StatusInternalServerError, err.Error()) return } defer newFile.Close() if _, err := newFile.Write(buf.Bytes()); err != nil { - common.RespondError(w, http.StatusInternalServerError, err.Error()) + respondError(w, http.StatusInternalServerError, err.Error()) return } /* Unzip package to decode appDescriptor */ - openPackage(w, pkgPath) + impl.openPackage(w, pkgPath) var yamlFile = PackageFolderPath + packageName + "/Definitions/" + "MainServiceTemplate.yaml" - appPkgInfo := decodeApplicationDescriptor(w, yamlFile) + appPkgInfo := impl.decodeApplicationDescriptor(w, yamlFile) appPkgInfo.AppPackage = header.Filename appPkgInfo.OnboardingState = "ONBOARDED" - log.Println("Application package info from package") + impl.logger.Infof("Application package info from package") defer r.Body.Close() - dbAdapter.InsertAppPackageInfo(db, appPkgInfo) + dbAdapter.InsertAppPackageInfo(impl.db, appPkgInfo) /*http.StatusOK*/ - common.RespondJSON(w, http.StatusCreated, appPkgInfo) + respondJSON(w, http.StatusCreated, appPkgInfo) } -func openPackage(w http.ResponseWriter, packagePath string) { +// Opens package +func (impl *HandlerImpl) openPackage(w http.ResponseWriter, packagePath string) { zipReader, _ := zip.OpenReader(packagePath) for _, file := range zipReader.Reader.File { zippedFile, err := file.Open() if err != nil { - common.RespondError(w, http.StatusBadRequest, err.Error()) + respondError(w, http.StatusBadRequest, err.Error()) } defer zippedFile.Close() @@ -115,28 +156,29 @@ func openPackage(w http.ResponseWriter, packagePath string) { file.Mode(), ) if err != nil { - common.RespondError(w, http.StatusBadRequest, err.Error()) + respondError(w, http.StatusBadRequest, err.Error()) } defer outputFile.Close() _, err = io.Copy(outputFile, zippedFile) if err != nil { - common.RespondError(w, http.StatusBadRequest, err.Error()) + respondError(w, http.StatusBadRequest, err.Error()) } } } } -func decodeApplicationDescriptor(w http.ResponseWriter, serviceTemplate string) model.AppPackageInfo { +// Decodes application descriptor +func (impl *HandlerImpl) decodeApplicationDescriptor(w http.ResponseWriter, serviceTemplate string) model.AppPackageInfo { yamlFile, err := ioutil.ReadFile(serviceTemplate) if err != nil { - common.RespondError(w, http.StatusBadRequest, err.Error()) + respondError(w, http.StatusBadRequest, err.Error()) } jsondata, err := yaml.YAMLToJSON(yamlFile) if err != nil { - common.RespondError(w, http.StatusBadRequest, err.Error()) + respondError(w, http.StatusBadRequest, err.Error()) } appDId, _, _, _ := jsonparser.Get(jsondata, "topology_template", "node_templates", "face_recognition", "properties", "appDId") @@ -160,26 +202,28 @@ func decodeApplicationDescriptor(w http.ResponseWriter, serviceTemplate string) return appPkgInfo } -func QueryAppPackageInfo(db *gorm.DB, w http.ResponseWriter, r *http.Request) { +// Query application package information +func (impl *HandlerImpl) QueryAppPackageInfo(w http.ResponseWriter, r *http.Request) { params := mux.Vars(r) appPkgId := params["appPkgId"] - appPkgInfo := dbAdapter.GetAppPackageInfo(db, appPkgId) + appPkgInfo := dbAdapter.GetAppPackageInfo(impl.db, appPkgId) if appPkgInfo.ID == "" { - common.RespondJSON(w, http.StatusNotFound, "ID not exist") + respondJSON(w, http.StatusNotFound, "ID not exist") return } - common.RespondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode(appPkgInfo)) + respondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode(appPkgInfo)) } -func DeleteAppPackage(db *gorm.DB, w http.ResponseWriter, r *http.Request) { +// Deletes application package +func (impl *HandlerImpl) DeleteAppPackage(w http.ResponseWriter, r *http.Request) { params := mux.Vars(r) appPkgId := params["appPkgId"] - appPackageInfo := dbAdapter.GetAppPackageInfo(db, appPkgId) + appPackageInfo := dbAdapter.GetAppPackageInfo(impl.db, appPkgId) if appPackageInfo.ID == "" { - common.RespondJSON(w, http.StatusNotFound, "ID not exist") + respondJSON(w, http.StatusNotFound, "ID not exist") return } - dbAdapter.DeleteAppPackageInfo(db, appPkgId) + dbAdapter.DeleteAppPackageInfo(impl.db, appPkgId) deletePackage := PackageFolderPath + appPackageInfo.AppPackage @@ -191,27 +235,28 @@ func DeleteAppPackage(db *gorm.DB, w http.ResponseWriter, r *http.Request) { /*Delete unzipped*/ os.Remove(packageName) } - common.RespondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode("")) + respondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode("")) } -func CreateAppInstanceHldr(db *gorm.DB, w http.ResponseWriter, r *http.Request) { +// Creates application instance +func (impl *HandlerImpl) CreateAppInstance(w http.ResponseWriter, r *http.Request) { var req model.CreateApplicationReq err := json.NewDecoder(r.Body).Decode(&req) if err != nil { - common.RespondError(w, http.StatusInternalServerError, err.Error()) + respondError(w, http.StatusInternalServerError, err.Error()) return } - appPkgInfo := dbAdapter.GetAppPackageInfo(db, req.AppDID) + appPkgInfo := dbAdapter.GetAppPackageInfo(impl.db, req.AppDID) if appPkgInfo.ID == "" { - common.RespondJSON(w, http.StatusNotFound, "ID not exist") + respondJSON(w, http.StatusNotFound, "ID not exist") return } - fmt.Println("Query appPkg Info:", appPkgInfo) + impl.logger.Infof("Query appPkg Info:", appPkgInfo) appInstanceId, err := uuid.NewUUID() if err != nil { - common.RespondError(w, http.StatusInternalServerError, err.Error()) + respondError(w, http.StatusInternalServerError, err.Error()) } appInstanceInfo := model.AppInstanceInfo{ @@ -227,32 +272,33 @@ func CreateAppInstanceHldr(db *gorm.DB, w http.ResponseWriter, r *http.Request) AppPkgID: appPkgInfo.AppDID, InstantiationState: "NOT_INSTANTIATED", } - dbAdapter.InsertAppInstanceInfo(db, appInstanceInfo) - fmt.Println("CreateAppInstanceHldr:", req) + dbAdapter.InsertAppInstanceInfo(impl.db, appInstanceInfo) + impl.logger.Infof("CreateAppInstance:", req) /*http.StatusOK*/ - common.RespondJSON(w, http.StatusCreated, json.NewEncoder(w).Encode(appInstanceInfo)) + respondJSON(w, http.StatusCreated, json.NewEncoder(w).Encode(appInstanceInfo)) } -func InstantiateAppInstanceHldr(db *gorm.DB, w http.ResponseWriter, r *http.Request) { +// Instantiates application instance +func (impl *HandlerImpl) InstantiateAppInstance(w http.ResponseWriter, r *http.Request) { var req model.InstantiateApplicationReq err := json.NewDecoder(r.Body).Decode(&req) if err != nil { - common.RespondError(w, http.StatusInternalServerError, err.Error()) + respondError(w, http.StatusInternalServerError, err.Error()) return } params := mux.Vars(r) appInstanceId := params["appInstanceId"] - appInstanceInfo := dbAdapter.GetAppInstanceInfo(db, appInstanceId) - appPackageInfo := dbAdapter.GetAppPackageInfo(db, appInstanceInfo.AppDID) + appInstanceInfo := dbAdapter.GetAppInstanceInfo(impl.db, appInstanceId) + appPackageInfo := dbAdapter.GetAppPackageInfo(impl.db, appInstanceInfo.AppDID) if appInstanceInfo.ID == "" || appPackageInfo.ID == "" { - common.RespondJSON(w, http.StatusNotFound, "ID not exist") + respondJSON(w, http.StatusNotFound, "ID not exist") return } if appInstanceInfo.InstantiationState == "INSTANTIATED" { - common.RespondError(w, http.StatusInternalServerError, "Application already instantiated") + respondError(w, http.StatusInternalServerError, "Application already instantiated") return } @@ -262,7 +308,7 @@ func InstantiateAppInstanceHldr(db *gorm.DB, w http.ResponseWriter, r *http.Requ if len(f) > 0 { packageName = f[0] } - fmt.Println(packageName) + impl.logger.Infof(packageName) var artifact string var pluginInfo string @@ -270,57 +316,64 @@ func InstantiateAppInstanceHldr(db *gorm.DB, w http.ResponseWriter, r *http.Requ switch appPackageInfo.DeployType { case "helm": pkgPath := PackageFolderPath + packageName + PackageArtifactPath + "Charts" - artifact = getDeploymentArtifact(pkgPath, ".tar") + artifact = impl.getDeploymentArtifact(pkgPath, ".tar") if artifact == "" { - common.RespondError(w, http.StatusInternalServerError, "artifact not available in application package") + respondError(w, http.StatusInternalServerError, "artifact not available in application package") return } pluginInfo = "helm.plugin" + ":" + os.Getenv("HELM_PLUGIN_PORT") case "kubernetes": pkgPath := PackageFolderPath + packageName + PackageArtifactPath + "Kubernetes" - artifact = getDeploymentArtifact(pkgPath, "*.yaml") + artifact = impl.getDeploymentArtifact(pkgPath, "*.yaml") if artifact == "" { - common.RespondError(w, http.StatusInternalServerError, "artifact not available in application package") + respondError(w, http.StatusInternalServerError, "artifact not available in application package") return } pluginInfo = "kubernetes.plugin" + ":" + os.Getenv("KUBERNETES_PLUGIN_PORT") default: - common.RespondError(w, http.StatusInternalServerError, "Deployment type not supported") + respondError(w, http.StatusInternalServerError, "Deployment type not supported") return } - fmt.Println("Artifact to deploy:", artifact) + impl.logger.Infof("Artifact to deploy:", artifact) - workloadId, err := pluginAdapter.Instantiate(pluginInfo, req.SelectedMECHostInfo.HostID, artifact) + adapter := pluginAdapter.NewPluginAdapter(pluginInfo, impl.logger) + workloadId, err := adapter.Instantiate(pluginInfo, req.SelectedMECHostInfo.HostID, artifact) if err != nil { - common.RespondError(w, http.StatusInternalServerError, err.Error()) - return + st, ok := status.FromError(err) + if ok && st.Code() == codes.InvalidArgument { + respondError(w, http.StatusBadRequest, err.Error()) + return + } else { + respondError(w, http.StatusInternalServerError, err.Error()) + } } - dbAdapter.UpdateAppInstanceInfoInstStatusHostAndWorkloadId(db, appInstanceId, "INSTANTIATED", req.SelectedMECHostInfo.HostID, workloadId) + dbAdapter.UpdateAppInstanceInfoInstStatusHostAndWorkloadId(impl.db, appInstanceId, "INSTANTIATED", req.SelectedMECHostInfo.HostID, workloadId) - common.RespondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode("")) + respondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode("")) } -func getDeploymentArtifact(dir string, ext string) string { +// Gets deployment artifact +func (impl *HandlerImpl) getDeploymentArtifact(dir string, ext string) string { d, err := os.Open(dir) if err != nil { - fmt.Println(err) + impl.logger.Infof("Error: ", err) return "" } defer d.Close() files, err := d.Readdir(-1) if err != nil { - fmt.Println(err) + impl.logger.Infof("Error: ", err) return "" } - fmt.Println("Directory to read " + dir) + impl.logger.Infof("Directory to read " + dir) for _, file := range files { if file.Mode().IsRegular() { if filepath.Ext(file.Name()) == ext || filepath.Ext(file.Name()) == ".gz" { - fmt.Println(file.Name()) - fmt.Println(dir + "/" + file.Name()) + impl.logger.Infof(file.Name()) + impl.logger.Infof(dir + "/" + file.Name()) return dir + "/" + file.Name() } } @@ -328,15 +381,16 @@ func getDeploymentArtifact(dir string, ext string) string { return "" } -func QueryAppInstanceInfoHldr(db *gorm.DB, w http.ResponseWriter, r *http.Request) { +// Queries application instance information +func (impl *HandlerImpl) QueryAppInstanceInfo(w http.ResponseWriter, r *http.Request) { params := mux.Vars(r) appInstanceId := params["appInstanceId"] - appInstanceInfo := dbAdapter.GetAppInstanceInfo(db, appInstanceId) - appPackageInfo := dbAdapter.GetAppPackageInfo(db, appInstanceInfo.AppDID) + appInstanceInfo := dbAdapter.GetAppInstanceInfo(impl.db, appInstanceId) + appPackageInfo := dbAdapter.GetAppPackageInfo(impl.db, appInstanceInfo.AppDID) if appInstanceInfo.ID == "" || appPackageInfo.ID == "" { - common.RespondJSON(w, http.StatusNotFound, "ID not exist") + respondJSON(w, http.StatusNotFound, "ID not exist") return } var instantiatedAppState string @@ -350,47 +404,50 @@ func QueryAppInstanceInfoHldr(db *gorm.DB, w http.ResponseWriter, r *http.Reques case "kubernetes": pluginInfo = "kubernetes.plugin" + ":" + os.Getenv("KUBERNETES_PLUGIN_PORT") default: - common.RespondError(w, http.StatusInternalServerError, "Deployment type not supported") + respondError(w, http.StatusInternalServerError, "Deployment type not supported") return } - state, err := pluginAdapter.Query(pluginInfo, appInstanceInfo.Host, appInstanceInfo.WorkloadID) + adapter := pluginAdapter.NewPluginAdapter(pluginInfo, impl.logger) + state, err := adapter.Query(pluginInfo, appInstanceInfo.Host, appInstanceInfo.WorkloadID) if err != nil { - common.RespondError(w, http.StatusInternalServerError, err.Error()) + respondError(w, http.StatusInternalServerError, err.Error()) return } instantiatedAppState = state } appInstanceInfo.InstantiatedAppState = instantiatedAppState - common.RespondJSON(w, http.StatusCreated, json.NewEncoder(w).Encode(appInstanceInfo)) + respondJSON(w, http.StatusCreated, json.NewEncoder(w).Encode(appInstanceInfo)) } -func QueryAppLcmOperationStatusHldr(db *gorm.DB, w http.ResponseWriter, r *http.Request) { +// Queries application lcm operation status +func (impl *HandlerImpl) QueryAppLcmOperationStatus(w http.ResponseWriter, r *http.Request) { var req model.QueryApplicationLCMOperStatusReq err := json.NewDecoder(r.Body).Decode(&req) if err != nil { - common.RespondError(w, http.StatusInternalServerError, err.Error()) + respondError(w, http.StatusInternalServerError, err.Error()) return } fmt.Fprintf(w, "QueryApplicationLCMOperStatus: %+v", req) } -func TerminateAppInsHldr(db *gorm.DB, w http.ResponseWriter, r *http.Request) { - log.Println("TerminateAppInsHldr...") +// Terminates application instance +func (impl *HandlerImpl) TerminateAppInstance(w http.ResponseWriter, r *http.Request) { + impl.logger.Infof("TerminateAppInstance...") params := mux.Vars(r) appInstanceId := params["appInstanceId"] - appInstanceInfo := dbAdapter.GetAppInstanceInfo(db, appInstanceId) - appPackageInfo := dbAdapter.GetAppPackageInfo(db, appInstanceInfo.AppDID) + appInstanceInfo := dbAdapter.GetAppInstanceInfo(impl.db, appInstanceId) + appPackageInfo := dbAdapter.GetAppPackageInfo(impl.db, appInstanceInfo.AppDID) if appInstanceInfo.ID == "" || appPackageInfo.ID == "" { - common.RespondJSON(w, http.StatusNotFound, "ID not exist") + respondJSON(w, http.StatusNotFound, "ID not exist") return } if appInstanceInfo.InstantiationState == "NOT_INSTANTIATED" { - common.RespondError(w, http.StatusNotAcceptable, "instantiationState: NOT_INSTANTIATED") + respondError(w, http.StatusNotAcceptable, "instantiationState: NOT_INSTANTIATED") return } @@ -401,24 +458,45 @@ func TerminateAppInsHldr(db *gorm.DB, w http.ResponseWriter, r *http.Request) { case "kubernetes": pluginInfo = "kubernetes.plugin" + ":" + os.Getenv("KUBERNETES_PLUGIN_PORT") default: - common.RespondError(w, http.StatusInternalServerError, "Deployment type not supported") + respondError(w, http.StatusInternalServerError, "Deployment type not supported") return } - _, err := pluginAdapter.Terminate(pluginInfo, appInstanceInfo.Host, appInstanceInfo.WorkloadID) + adapter := pluginAdapter.NewPluginAdapter(pluginInfo, impl.logger) + _, err := adapter.Terminate(pluginInfo, appInstanceInfo.Host, appInstanceInfo.WorkloadID) if err != nil { - common.RespondError(w, http.StatusInternalServerError, err.Error()) + respondError(w, http.StatusInternalServerError, err.Error()) return } - dbAdapter.UpdateAppInstanceInfoInstStatusAndWorkload(db, appInstanceId, "NOT_INSTANTIATED", "") + dbAdapter.UpdateAppInstanceInfoInstStatusAndWorkload(impl.db, appInstanceId, "NOT_INSTANTIATED", "") - common.RespondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode("")) + respondJSON(w, http.StatusAccepted, json.NewEncoder(w).Encode("")) } -func DeleteAppInstanceIdentifierHldr(db *gorm.DB, w http.ResponseWriter, r *http.Request) { - fmt.Println("DeleteAppInstanceIdentifierHldr:") + +// Deletes application instance identifier +func (impl *HandlerImpl) DeleteAppInstanceIdentifier(w http.ResponseWriter, r *http.Request) { + impl.logger.Infof("DeleteAppInstanceIdentifier:") params := mux.Vars(r) appInstanceId := params["appInstanceId"] - dbAdapter.DeleteAppInstanceInfo(db, appInstanceId) - common.RespondJSON(w, http.StatusOK, json.NewEncoder(w).Encode("")) + dbAdapter.DeleteAppInstanceInfo(impl.db, appInstanceId) + respondJSON(w, http.StatusOK, json.NewEncoder(w).Encode("")) +} + +// It makes the JSON +func respondJSON(w http.ResponseWriter, status int, payload interface{}) { + response, err := json.Marshal(payload) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + w.Write([]byte(response)) +} + +// RespondError makes the error response with payload as json format +func respondError(w http.ResponseWriter, code int, message string) { + respondJSON(w, code, map[string]string{"error": message}) } diff --git a/mecm/mepm/applcm/broker/pkg/plugin/grpcclient.go b/mecm/mepm/applcm/broker/pkg/plugin/grpcclient.go index d55098d..0d83530 100644 --- a/mecm/mepm/applcm/broker/pkg/plugin/grpcclient.go +++ b/mecm/mepm/applcm/broker/pkg/plugin/grpcclient.go @@ -18,88 +18,78 @@ package plugin import ( "broker/internal/lcmservice" + "github.com/sirupsen/logrus" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/credentials" _ "google.golang.org/grpc/encoding/gzip" "io" - "log" "os" ) -// ClientGRPC provides the implementation of a file -// uploader that streams chunks via protobuf-encoded -// messages. +// GRPC client to different GRPC supported plugins type ClientGRPC struct { conn *grpc.ClientConn client lcmservice.AppLCMClient chunkSize int + logger *logrus.Logger } type ClientGRPCConfig struct { Address string ChunkSize int RootCertificate string + Logger *logrus.Logger } +// Create a GRPC client func NewClientGRPC(cfg ClientGRPCConfig) (c ClientGRPC, err error) { - logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile) - var ( grpcOpts = []grpc.DialOption{} grpcCreds credentials.TransportCredentials ) - if cfg.Address == "" { - logger.Fatalf("address must be specified: ", err) - } + c.chunkSize = cfg.ChunkSize + c.logger = cfg.Logger if cfg.RootCertificate != "" { grpcCreds, err = credentials.NewClientTLSFromFile(cfg.RootCertificate, "localhost") if err != nil { - logger.Fatalf("failed to create grpc tls client via root-cert: ", err) + c.logger.Errorf("failed to create grpc tls client via provided root-cert ", err) + return c, err } - grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(grpcCreds)) } else { grpcOpts = append(grpcOpts, grpc.WithInsecure()) } - switch { - case cfg.ChunkSize == 0: - logger.Fatalf("ChunkSize must be specified") - case cfg.ChunkSize > (1 << 22): - logger.Fatalf("ChunkSize must be < than 4MB") - default: - c.chunkSize = cfg.ChunkSize - } - c.conn, err = grpc.Dial(cfg.Address, grpcOpts...) if err != nil { - logger.Fatalf("failed to start grpc connection with address: ", cfg.Address) + c.logger.Errorf("failed to start grpc connection with address: ", cfg.Address) + return c, err } c.client = lcmservice.NewAppLCMClient(c.conn) - return + return c, nil } -func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) (workloadId string, status string, error error) { +// Instantiate application +func (c *ClientGRPC) Instantiate(ctx context.Context, deployArtifact string, hostIP string) (workloadId string, status string, error error) { var ( writing = true buf []byte n int file *os.File ) - log.Printf("hostIP: ", hostIP) - log.Printf("deployArtifact: ", f) - logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile) + c.logger.Info("deployArtifact: ", deployArtifact) // Get a file handle for the file we // want to upload - file, err := os.Open(f) + file, err := os.Open(deployArtifact) if err != nil { - logger.Fatalf("failed to open file: ", err.Error()) + c.logger.Errorf("failed to open package file: %s. Err: %s", deployArtifact, err.Error()) + return "","Failure", err } defer file.Close() @@ -108,21 +98,23 @@ func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) ( stream, err := c.client.Instantiate(ctx) if err != nil { - logger.Fatalf("failed to create upload stream for file: ", err) + c.logger.Errorf("failed to upload stream: %s. Err: %s", deployArtifact, err.Error()) + return "","Failure", err } defer stream.CloseSend() - //send metadata information + //send metadata information req := &lcmservice.InstantiateRequest{ Data: &lcmservice.InstantiateRequest_HostIp{ - HostIp: hostIP, + HostIp: hostIP, }, } err = stream.Send(req) if err != nil { - logger.Fatalf("failed to send metadata information: ", f) + c.logger.Errorf("failed to send metadata information: ", deployArtifact) + return "","Failure", err } // Allocate a buffer with `chunkSize` as the capacity @@ -139,7 +131,8 @@ func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) ( err = nil continue } - logger.Fatalf("errored while copying from file to buf: ", err) + c.logger.Errorf("errored while copying from file to buf: ", err) + return "","Failure", err } req := &lcmservice.InstantiateRequest { @@ -151,42 +144,50 @@ func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) ( err = stream.Send(req) if err != nil { - logger.Fatalf("failed to send chunk via stream: ", err) + c.logger.Errorf("failed to send chunk via stream: ", err) + return "","Failure", err } } res, err := stream.CloseAndRecv() if err != nil { - logger.Fatalf("failed to receive upstream status response: ", err) - return "", "", err + c.logger.Errorf("failed to receive upstream status response: ", err) + return "","Failure", err } - log.Printf("response", res) + c.logger.Info("Instantiation Completed") return res.WorkloadId, res.Status, err } -func (c *ClientGRPC) Query(ctx context.Context, hostIP string, workloadId string) (status string) { +// Query application +func (c *ClientGRPC) Query(ctx context.Context, hostIP string, workloadId string) (status string, error error) { req := &lcmservice.QueryRequest{ HostIp: hostIP, WorkloadId: workloadId, } - resp, _ := c.client.Query(ctx, req) - return resp.Status + resp, err := c.client.Query(ctx, req) + if err != nil { + return "", err + } + return resp.Status, err } -func (c *ClientGRPC) Terminate(ctx context.Context, hostIP string, workloadId string) (status string) { +// Terminate application +func (c *ClientGRPC) Terminate(ctx context.Context, hostIP string, workloadId string) (status string, error error) { req := &lcmservice.TerminateRequest{ HostIp: hostIP, WorkloadId: workloadId, } - resp, _ := c.client.Terminate(ctx, req) - return resp.Status + resp, err := c.client.Terminate(ctx, req) + if err != nil { + return "", err + } + return resp.Status, err } func (c *ClientGRPC) Close() { if c.conn != nil { c.conn.Close() } -} - +} \ No newline at end of file diff --git a/mecm/mepm/applcm/broker/pkg/handlers/common/common.go b/mecm/mepm/applcm/broker/pkg/util/logger.go similarity index 52% rename from mecm/mepm/applcm/broker/pkg/handlers/common/common.go rename to mecm/mepm/applcm/broker/pkg/util/logger.go index 9b384bb..cfe4d56 100644 --- a/mecm/mepm/applcm/broker/pkg/handlers/common/common.go +++ b/mecm/mepm/applcm/broker/pkg/util/logger.go @@ -13,26 +13,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package common +package util import ( - "encoding/json" - "net/http" + "github.com/sirupsen/logrus" + "os" ) -func RespondJSON(w http.ResponseWriter, status int, payload interface{}) { - response, err := json.Marshal(payload) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(err.Error())) - return - } - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(status) - w.Write([]byte(response)) -} +func GetLogger(logFile string, loggerLevel logrus.Level, file *os.File) *logrus.Logger { -// respondError makes the error response with payload as json format -func RespondError(w http.ResponseWriter, code int, message string) { - RespondJSON(w, code, map[string]string{"error": message}) -} + logger := logrus.New() + logger.SetOutput(file) + logger.SetFormatter(&logrus.TextFormatter{ + DisableColors: true, + FullTimestamp: true, + }) + logger.SetLevel(loggerLevel) + logger.Info("logger created") + return logger +} \ No newline at end of file diff --git a/mecm/mepm/applcm/go.mod b/mecm/mepm/applcm/go.mod new file mode 100644 index 0000000..11de177 --- /dev/null +++ b/mecm/mepm/applcm/go.mod @@ -0,0 +1,3 @@ +module applcm + +go 1.14 -- 2.16.6