Error handling & logging for broker
[ealt-edge.git] / mecm / mepm / applcm / broker / pkg / plugin / grpcclient.go
index d55098d..0d83530 100644 (file)
@@ -18,88 +18,78 @@ package plugin
 
 import (
        "broker/internal/lcmservice"
+       "github.com/sirupsen/logrus"
        "golang.org/x/net/context"
        "google.golang.org/grpc"
        "google.golang.org/grpc/credentials"
        _ "google.golang.org/grpc/encoding/gzip"
        "io"
-       "log"
        "os"
 )
 
-// ClientGRPC provides the implementation of a file
-// uploader that streams chunks via protobuf-encoded
-// messages.
+// GRPC client to different GRPC supported plugins
 type ClientGRPC struct {
        conn      *grpc.ClientConn
        client    lcmservice.AppLCMClient
        chunkSize int
+       logger    *logrus.Logger
 }
 
 type ClientGRPCConfig struct {
        Address         string
        ChunkSize       int
        RootCertificate string
+       Logger          *logrus.Logger
 }
 
+// Create a GRPC client
 func NewClientGRPC(cfg ClientGRPCConfig) (c ClientGRPC, err error) {
 
-       logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile)
-
        var (
                grpcOpts  = []grpc.DialOption{}
                grpcCreds credentials.TransportCredentials
        )
 
-       if cfg.Address == "" {
-               logger.Fatalf("address must be specified: ", err)
-       }
+       c.chunkSize = cfg.ChunkSize
+       c.logger = cfg.Logger
 
        if cfg.RootCertificate != "" {
                grpcCreds, err = credentials.NewClientTLSFromFile(cfg.RootCertificate, "localhost")
                if err != nil {
-                       logger.Fatalf("failed to create grpc tls client via root-cert: ", err)
+                       c.logger.Errorf("failed to create grpc tls client via provided root-cert ", err)
+                       return c, err
                }
-
                grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(grpcCreds))
        } else {
                grpcOpts = append(grpcOpts, grpc.WithInsecure())
        }
 
-       switch {
-       case cfg.ChunkSize == 0:
-               logger.Fatalf("ChunkSize must be specified")
-       case cfg.ChunkSize > (1 << 22):
-               logger.Fatalf("ChunkSize must be < than 4MB")
-       default:
-               c.chunkSize = cfg.ChunkSize
-       }
-
        c.conn, err = grpc.Dial(cfg.Address, grpcOpts...)
        if err != nil {
-               logger.Fatalf("failed to start grpc connection with address: ", cfg.Address)
+               c.logger.Errorf("failed to start grpc connection with address: ", cfg.Address)
+               return c, err
        }
 
        c.client = lcmservice.NewAppLCMClient(c.conn)
-       return
+       return c, nil
 }
 
-func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) (workloadId string, status string, error error) {
+// Instantiate application
+func (c *ClientGRPC) Instantiate(ctx context.Context, deployArtifact string, hostIP string) (workloadId string, status string, error error) {
        var (
                writing = true
                buf     []byte
                n       int
                file    *os.File
        )
-       log.Printf("hostIP: ", hostIP)
-       log.Printf("deployArtifact: ", f)
-       logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile)
+       c.logger.Info("deployArtifact: ", deployArtifact)
 
        // Get a file handle for the file we
        // want to upload
-       file, err := os.Open(f)
+       file, err := os.Open(deployArtifact)
        if err != nil {
-               logger.Fatalf("failed to open file: ", err.Error())
+               c.logger.Errorf("failed to open package file: %s. Err: %s", deployArtifact, err.Error())
+               return "","Failure", err
        }
        defer file.Close()
 
@@ -108,21 +98,23 @@ func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) (
        stream, err := c.client.Instantiate(ctx)
 
        if err != nil {
-               logger.Fatalf("failed to create upload stream for file: ", err)
+               c.logger.Errorf("failed to upload stream: %s. Err: %s", deployArtifact, err.Error())
+               return "","Failure", err
        }
        defer stream.CloseSend()
 
-    //send metadata information
+       //send metadata information
        req := &lcmservice.InstantiateRequest{
 
                Data: &lcmservice.InstantiateRequest_HostIp{
-                               HostIp:  hostIP,
+                       HostIp:  hostIP,
                },
        }
 
        err = stream.Send(req)
        if err != nil {
-               logger.Fatalf("failed to send metadata information: ", f)
+               c.logger.Errorf("failed to send metadata information: ", deployArtifact)
+               return "","Failure", err
        }
 
        // Allocate a buffer with `chunkSize` as the capacity
@@ -139,7 +131,8 @@ func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) (
                                err = nil
                                continue
                        }
-                       logger.Fatalf("errored while copying from file to buf: ", err)
+                       c.logger.Errorf("errored while copying from file to buf: ", err)
+                       return "","Failure", err
                }
 
                req := &lcmservice.InstantiateRequest {
@@ -151,42 +144,50 @@ func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) (
                err = stream.Send(req)
 
                if err != nil {
-                       logger.Fatalf("failed to send chunk via stream: ", err)
+                       c.logger.Errorf("failed to send chunk via stream: ", err)
+                       return "","Failure", err
                }
        }
 
        res, err := stream.CloseAndRecv()
        if err != nil {
-               logger.Fatalf("failed to receive upstream status response: ", err)
-               return "", "", err
+               c.logger.Errorf("failed to receive upstream status response: ", err)
+               return "","Failure", err
        }
-       log.Printf("response", res)
+       c.logger.Info("Instantiation Completed")
        return res.WorkloadId, res.Status, err
 }
 
-func (c *ClientGRPC) Query(ctx context.Context, hostIP string, workloadId string) (status string) {
+// Query application
+func (c *ClientGRPC) Query(ctx context.Context, hostIP string, workloadId string) (status string, error error) {
 
        req := &lcmservice.QueryRequest{
                HostIp: hostIP,
                WorkloadId: workloadId,
        }
-       resp, _ := c.client.Query(ctx, req)
-       return resp.Status
+       resp, err := c.client.Query(ctx, req)
+       if err != nil {
+               return "", err
+       }
+       return resp.Status, err
 }
 
-func (c *ClientGRPC) Terminate(ctx context.Context, hostIP string, workloadId string) (status string) {
+// Terminate application
+func (c *ClientGRPC) Terminate(ctx context.Context, hostIP string, workloadId string) (status string, error error) {
 
        req := &lcmservice.TerminateRequest{
                HostIp: hostIP,
                WorkloadId: workloadId,
        }
-       resp, _ := c.client.Terminate(ctx, req)
-       return resp.Status
+       resp, err := c.client.Terminate(ctx, req)
+       if err != nil {
+               return "", err
+       }
+       return resp.Status, err
 }
 
 func (c *ClientGRPC) Close() {
        if c.conn != nil {
                c.conn.Close()
        }
-}
-
+}
\ No newline at end of file