Code Review
/
ealt-edge.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Error handling & logging for broker
[ealt-edge.git]
/
mecm
/
mepm
/
applcm
/
broker
/
pkg
/
plugin
/
grpcclient.go
diff --git
a/mecm/mepm/applcm/broker/pkg/plugin/grpcclient.go
b/mecm/mepm/applcm/broker/pkg/plugin/grpcclient.go
index
d55098d
..
0d83530
100644
(file)
--- a/
mecm/mepm/applcm/broker/pkg/plugin/grpcclient.go
+++ b/
mecm/mepm/applcm/broker/pkg/plugin/grpcclient.go
@@
-18,88
+18,78
@@
package plugin
import (
"broker/internal/lcmservice"
import (
"broker/internal/lcmservice"
+ "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
_ "google.golang.org/grpc/encoding/gzip"
"io"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
_ "google.golang.org/grpc/encoding/gzip"
"io"
- "log"
"os"
)
"os"
)
-// ClientGRPC provides the implementation of a file
-// uploader that streams chunks via protobuf-encoded
-// messages.
+// GRPC client to different GRPC supported plugins
type ClientGRPC struct {
conn *grpc.ClientConn
client lcmservice.AppLCMClient
chunkSize int
type ClientGRPC struct {
conn *grpc.ClientConn
client lcmservice.AppLCMClient
chunkSize int
+ logger *logrus.Logger
}
type ClientGRPCConfig struct {
Address string
ChunkSize int
RootCertificate string
}
type ClientGRPCConfig struct {
Address string
ChunkSize int
RootCertificate string
+ Logger *logrus.Logger
}
}
+// Create a GRPC client
func NewClientGRPC(cfg ClientGRPCConfig) (c ClientGRPC, err error) {
func NewClientGRPC(cfg ClientGRPCConfig) (c ClientGRPC, err error) {
- logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile)
-
var (
grpcOpts = []grpc.DialOption{}
grpcCreds credentials.TransportCredentials
)
var (
grpcOpts = []grpc.DialOption{}
grpcCreds credentials.TransportCredentials
)
- if cfg.Address == "" {
- logger.Fatalf("address must be specified: ", err)
- }
+ c.chunkSize = cfg.ChunkSize
+ c.logger = cfg.Logger
if cfg.RootCertificate != "" {
grpcCreds, err = credentials.NewClientTLSFromFile(cfg.RootCertificate, "localhost")
if err != nil {
if cfg.RootCertificate != "" {
grpcCreds, err = credentials.NewClientTLSFromFile(cfg.RootCertificate, "localhost")
if err != nil {
- logger.Fatalf("failed to create grpc tls client via root-cert: ", err)
+ c.logger.Errorf("failed to create grpc tls client via provided root-cert ", err)
+ return c, err
}
}
-
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(grpcCreds))
} else {
grpcOpts = append(grpcOpts, grpc.WithInsecure())
}
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(grpcCreds))
} else {
grpcOpts = append(grpcOpts, grpc.WithInsecure())
}
- switch {
- case cfg.ChunkSize == 0:
- logger.Fatalf("ChunkSize must be specified")
- case cfg.ChunkSize > (1 << 22):
- logger.Fatalf("ChunkSize must be < than 4MB")
- default:
- c.chunkSize = cfg.ChunkSize
- }
-
c.conn, err = grpc.Dial(cfg.Address, grpcOpts...)
if err != nil {
c.conn, err = grpc.Dial(cfg.Address, grpcOpts...)
if err != nil {
- logger.Fatalf("failed to start grpc connection with address: ", cfg.Address)
+ c.logger.Errorf("failed to start grpc connection with address: ", cfg.Address)
+ return c, err
}
c.client = lcmservice.NewAppLCMClient(c.conn)
}
c.client = lcmservice.NewAppLCMClient(c.conn)
- return
+ return
c, nil
}
}
-func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) (workloadId string, status string, error error) {
+// Instantiate application
+func (c *ClientGRPC) Instantiate(ctx context.Context, deployArtifact string, hostIP string) (workloadId string, status string, error error) {
var (
writing = true
buf []byte
n int
file *os.File
)
var (
writing = true
buf []byte
n int
file *os.File
)
- log.Printf("hostIP: ", hostIP)
- log.Printf("deployArtifact: ", f)
- logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile)
+ c.logger.Info("deployArtifact: ", deployArtifact)
// Get a file handle for the file we
// want to upload
// Get a file handle for the file we
// want to upload
- file, err := os.Open(
f
)
+ file, err := os.Open(
deployArtifact
)
if err != nil {
if err != nil {
- logger.Fatalf("failed to open file: ", err.Error())
+ c.logger.Errorf("failed to open package file: %s. Err: %s", deployArtifact, err.Error())
+ return "","Failure", err
}
defer file.Close()
}
defer file.Close()
@@
-108,21
+98,23
@@
func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) (
stream, err := c.client.Instantiate(ctx)
if err != nil {
stream, err := c.client.Instantiate(ctx)
if err != nil {
- logger.Fatalf("failed to create upload stream for file: ", err)
+ c.logger.Errorf("failed to upload stream: %s. Err: %s", deployArtifact, err.Error())
+ return "","Failure", err
}
defer stream.CloseSend()
}
defer stream.CloseSend()
- //send metadata information
+
//send metadata information
req := &lcmservice.InstantiateRequest{
Data: &lcmservice.InstantiateRequest_HostIp{
req := &lcmservice.InstantiateRequest{
Data: &lcmservice.InstantiateRequest_HostIp{
-
HostIp: hostIP,
+ HostIp: hostIP,
},
}
err = stream.Send(req)
if err != nil {
},
}
err = stream.Send(req)
if err != nil {
- logger.Fatalf("failed to send metadata information: ", f)
+ c.logger.Errorf("failed to send metadata information: ", deployArtifact)
+ return "","Failure", err
}
// Allocate a buffer with `chunkSize` as the capacity
}
// Allocate a buffer with `chunkSize` as the capacity
@@
-139,7
+131,8
@@
func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) (
err = nil
continue
}
err = nil
continue
}
- logger.Fatalf("errored while copying from file to buf: ", err)
+ c.logger.Errorf("errored while copying from file to buf: ", err)
+ return "","Failure", err
}
req := &lcmservice.InstantiateRequest {
}
req := &lcmservice.InstantiateRequest {
@@
-151,42
+144,50
@@
func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) (
err = stream.Send(req)
if err != nil {
err = stream.Send(req)
if err != nil {
- logger.Fatalf("failed to send chunk via stream: ", err)
+ c.logger.Errorf("failed to send chunk via stream: ", err)
+ return "","Failure", err
}
}
res, err := stream.CloseAndRecv()
if err != nil {
}
}
res, err := stream.CloseAndRecv()
if err != nil {
-
logger.Fatal
f("failed to receive upstream status response: ", err)
- return "",
"
", err
+
c.logger.Error
f("failed to receive upstream status response: ", err)
+ return "",
"Failure
", err
}
}
-
log.Printf("response", res
)
+
c.logger.Info("Instantiation Completed"
)
return res.WorkloadId, res.Status, err
}
return res.WorkloadId, res.Status, err
}
-func (c *ClientGRPC) Query(ctx context.Context, hostIP string, workloadId string) (status string) {
+// Query application
+func (c *ClientGRPC) Query(ctx context.Context, hostIP string, workloadId string) (status string, error error) {
req := &lcmservice.QueryRequest{
HostIp: hostIP,
WorkloadId: workloadId,
}
req := &lcmservice.QueryRequest{
HostIp: hostIP,
WorkloadId: workloadId,
}
- resp, _ := c.client.Query(ctx, req)
- return resp.Status
+ resp, err := c.client.Query(ctx, req)
+ if err != nil {
+ return "", err
+ }
+ return resp.Status, err
}
}
-func (c *ClientGRPC) Terminate(ctx context.Context, hostIP string, workloadId string) (status string) {
+// Terminate application
+func (c *ClientGRPC) Terminate(ctx context.Context, hostIP string, workloadId string) (status string, error error) {
req := &lcmservice.TerminateRequest{
HostIp: hostIP,
WorkloadId: workloadId,
}
req := &lcmservice.TerminateRequest{
HostIp: hostIP,
WorkloadId: workloadId,
}
- resp, _ := c.client.Terminate(ctx, req)
- return resp.Status
+ resp, err := c.client.Terminate(ctx, req)
+ if err != nil {
+ return "", err
+ }
+ return resp.Status, err
}
func (c *ClientGRPC) Close() {
if c.conn != nil {
c.conn.Close()
}
}
func (c *ClientGRPC) Close() {
if c.conn != nil {
c.conn.Close()
}
-}
-
+}
\ No newline at end of file