X-Git-Url: https://gerrit.akraino.org/r/gitweb?a=blobdiff_plain;f=mecm%2Fmepm%2Fapplcm%2Fbroker%2Fpkg%2Fplugin%2Fgrpcclient.go;h=0d83530a9969d20b9b0154fde188fe06df17a1a4;hb=3f907eec199eae9c472039ae1903fd5d5b087b15;hp=d55098d4692d766a6053e1686d27b0065b5145bb;hpb=68e4075a8ba4a80b7727824eb8a91216a18c32dc;p=ealt-edge.git diff --git a/mecm/mepm/applcm/broker/pkg/plugin/grpcclient.go b/mecm/mepm/applcm/broker/pkg/plugin/grpcclient.go index d55098d..0d83530 100644 --- a/mecm/mepm/applcm/broker/pkg/plugin/grpcclient.go +++ b/mecm/mepm/applcm/broker/pkg/plugin/grpcclient.go @@ -18,88 +18,78 @@ package plugin import ( "broker/internal/lcmservice" + "github.com/sirupsen/logrus" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/credentials" _ "google.golang.org/grpc/encoding/gzip" "io" - "log" "os" ) -// ClientGRPC provides the implementation of a file -// uploader that streams chunks via protobuf-encoded -// messages. +// GRPC client to different GRPC supported plugins type ClientGRPC struct { conn *grpc.ClientConn client lcmservice.AppLCMClient chunkSize int + logger *logrus.Logger } type ClientGRPCConfig struct { Address string ChunkSize int RootCertificate string + Logger *logrus.Logger } +// Create a GRPC client func NewClientGRPC(cfg ClientGRPCConfig) (c ClientGRPC, err error) { - logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile) - var ( grpcOpts = []grpc.DialOption{} grpcCreds credentials.TransportCredentials ) - if cfg.Address == "" { - logger.Fatalf("address must be specified: ", err) - } + c.chunkSize = cfg.ChunkSize + c.logger = cfg.Logger if cfg.RootCertificate != "" { grpcCreds, err = credentials.NewClientTLSFromFile(cfg.RootCertificate, "localhost") if err != nil { - logger.Fatalf("failed to create grpc tls client via root-cert: ", err) + c.logger.Errorf("failed to create grpc tls client via provided root-cert ", err) + return c, err } - grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(grpcCreds)) } else { grpcOpts = append(grpcOpts, grpc.WithInsecure()) } - switch { - case cfg.ChunkSize == 0: - logger.Fatalf("ChunkSize must be specified") - case cfg.ChunkSize > (1 << 22): - logger.Fatalf("ChunkSize must be < than 4MB") - default: - c.chunkSize = cfg.ChunkSize - } - c.conn, err = grpc.Dial(cfg.Address, grpcOpts...) if err != nil { - logger.Fatalf("failed to start grpc connection with address: ", cfg.Address) + c.logger.Errorf("failed to start grpc connection with address: ", cfg.Address) + return c, err } c.client = lcmservice.NewAppLCMClient(c.conn) - return + return c, nil } -func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) (workloadId string, status string, error error) { +// Instantiate application +func (c *ClientGRPC) Instantiate(ctx context.Context, deployArtifact string, hostIP string) (workloadId string, status string, error error) { var ( writing = true buf []byte n int file *os.File ) - log.Printf("hostIP: ", hostIP) - log.Printf("deployArtifact: ", f) - logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile) + c.logger.Info("deployArtifact: ", deployArtifact) // Get a file handle for the file we // want to upload - file, err := os.Open(f) + file, err := os.Open(deployArtifact) if err != nil { - logger.Fatalf("failed to open file: ", err.Error()) + c.logger.Errorf("failed to open package file: %s. Err: %s", deployArtifact, err.Error()) + return "","Failure", err } defer file.Close() @@ -108,21 +98,23 @@ func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) ( stream, err := c.client.Instantiate(ctx) if err != nil { - logger.Fatalf("failed to create upload stream for file: ", err) + c.logger.Errorf("failed to upload stream: %s. Err: %s", deployArtifact, err.Error()) + return "","Failure", err } defer stream.CloseSend() - //send metadata information + //send metadata information req := &lcmservice.InstantiateRequest{ Data: &lcmservice.InstantiateRequest_HostIp{ - HostIp: hostIP, + HostIp: hostIP, }, } err = stream.Send(req) if err != nil { - logger.Fatalf("failed to send metadata information: ", f) + c.logger.Errorf("failed to send metadata information: ", deployArtifact) + return "","Failure", err } // Allocate a buffer with `chunkSize` as the capacity @@ -139,7 +131,8 @@ func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) ( err = nil continue } - logger.Fatalf("errored while copying from file to buf: ", err) + c.logger.Errorf("errored while copying from file to buf: ", err) + return "","Failure", err } req := &lcmservice.InstantiateRequest { @@ -151,42 +144,50 @@ func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) ( err = stream.Send(req) if err != nil { - logger.Fatalf("failed to send chunk via stream: ", err) + c.logger.Errorf("failed to send chunk via stream: ", err) + return "","Failure", err } } res, err := stream.CloseAndRecv() if err != nil { - logger.Fatalf("failed to receive upstream status response: ", err) - return "", "", err + c.logger.Errorf("failed to receive upstream status response: ", err) + return "","Failure", err } - log.Printf("response", res) + c.logger.Info("Instantiation Completed") return res.WorkloadId, res.Status, err } -func (c *ClientGRPC) Query(ctx context.Context, hostIP string, workloadId string) (status string) { +// Query application +func (c *ClientGRPC) Query(ctx context.Context, hostIP string, workloadId string) (status string, error error) { req := &lcmservice.QueryRequest{ HostIp: hostIP, WorkloadId: workloadId, } - resp, _ := c.client.Query(ctx, req) - return resp.Status + resp, err := c.client.Query(ctx, req) + if err != nil { + return "", err + } + return resp.Status, err } -func (c *ClientGRPC) Terminate(ctx context.Context, hostIP string, workloadId string) (status string) { +// Terminate application +func (c *ClientGRPC) Terminate(ctx context.Context, hostIP string, workloadId string) (status string, error error) { req := &lcmservice.TerminateRequest{ HostIp: hostIP, WorkloadId: workloadId, } - resp, _ := c.client.Terminate(ctx, req) - return resp.Status + resp, err := c.client.Terminate(ctx, req) + if err != nil { + return "", err + } + return resp.Status, err } func (c *ClientGRPC) Close() { if c.conn != nil { c.conn.Close() } -} - +} \ No newline at end of file