X-Git-Url: https://gerrit.akraino.org/r/gitweb?a=blobdiff_plain;f=mecm%2Fmepm%2Fapplcm%2Fbroker%2Fpkg%2Fplugin%2Fgrpcclient.go;h=2c3bb28dc978f86fbc45136b069078bf12a55bd3;hb=246be669d7a8109bfc7ce3d7101268a248854be4;hp=613af74ecd7bf0aee28242666f83713398b8030e;hpb=973e68c113a84f5041406219dd87c0f7778ba0fe;p=ealt-edge.git diff --git a/mecm/mepm/applcm/broker/pkg/plugin/grpcclient.go b/mecm/mepm/applcm/broker/pkg/plugin/grpcclient.go index 613af74..2c3bb28 100644 --- a/mecm/mepm/applcm/broker/pkg/plugin/grpcclient.go +++ b/mecm/mepm/applcm/broker/pkg/plugin/grpcclient.go @@ -1,89 +1,95 @@ +/* + * Copyright 2020 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package plugin import ( "broker/internal/lcmservice" + "github.com/sirupsen/logrus" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/credentials" _ "google.golang.org/grpc/encoding/gzip" "io" - "log" "os" ) -// ClientGRPC provides the implementation of a file -// uploader that streams chunks via protobuf-encoded -// messages. +// GRPC client to different GRPC supported plugins type ClientGRPC struct { conn *grpc.ClientConn client lcmservice.AppLCMClient chunkSize int + logger *logrus.Logger } type ClientGRPCConfig struct { Address string ChunkSize int RootCertificate string + Logger *logrus.Logger } +// Create a GRPC client func NewClientGRPC(cfg ClientGRPCConfig) (c ClientGRPC, err error) { - logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile) - var ( grpcOpts = []grpc.DialOption{} grpcCreds credentials.TransportCredentials ) - if cfg.Address == "" { - logger.Fatalf("address must be specified: ", err) - } + c.chunkSize = cfg.ChunkSize + c.logger = cfg.Logger if cfg.RootCertificate != "" { grpcCreds, err = credentials.NewClientTLSFromFile(cfg.RootCertificate, "localhost") if err != nil { - logger.Fatalf("failed to create grpc tls client via root-cert: ", err) + c.logger.Errorf("failed to create grpc tls client via provided root-cert ", err) + return c, err } - grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(grpcCreds)) } else { grpcOpts = append(grpcOpts, grpc.WithInsecure()) } - switch { - case cfg.ChunkSize == 0: - logger.Fatalf("ChunkSize must be specified") - case cfg.ChunkSize > (1 << 22): - logger.Fatalf("ChunkSize must be < than 4MB") - default: - c.chunkSize = cfg.ChunkSize - } - c.conn, err = grpc.Dial(cfg.Address, grpcOpts...) if err != nil { - logger.Fatalf("failed to start grpc connection with address: ", cfg.Address) + c.logger.Errorf("failed to start grpc connection with address: ", cfg.Address) + return c, err } c.client = lcmservice.NewAppLCMClient(c.conn) - return + return c, nil } -func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) (workloadId string, status string, error error) { +// Instantiate application +func (c *ClientGRPC) Instantiate(ctx context.Context, deployArtifact string, hostIP string) (workloadId string, status string, error error) { var ( writing = true buf []byte n int file *os.File ) - log.Printf("hostIP: ", hostIP) - log.Printf("deployArtifact: ", f) - logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile) + c.logger.Infof("deployArtifact: ", deployArtifact) // Get a file handle for the file we // want to upload - file, err := os.Open(f) + file, err := os.Open(deployArtifact) if err != nil { - logger.Fatalf("failed to open file: ", err.Error()) + c.logger.Errorf("failed to open package file: %s. Err: %s", deployArtifact, err.Error()) + return "","Failure", err } defer file.Close() @@ -92,21 +98,23 @@ func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) ( stream, err := c.client.Instantiate(ctx) if err != nil { - logger.Fatalf("failed to create upload stream for file: ", err) + c.logger.Errorf("failed to upload stream: %s. Err: %s", deployArtifact, err.Error()) + return "","Failure", err } defer stream.CloseSend() - //send metadata information + //send metadata information req := &lcmservice.InstantiateRequest{ Data: &lcmservice.InstantiateRequest_HostIp{ - HostIp: hostIP, + HostIp: hostIP, }, } err = stream.Send(req) if err != nil { - logger.Fatalf("failed to send metadata information: ", f) + c.logger.Errorf("failed to send metadata information: ", deployArtifact) + return "","Failure", err } // Allocate a buffer with `chunkSize` as the capacity @@ -123,7 +131,8 @@ func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) ( err = nil continue } - logger.Fatalf("errored while copying from file to buf: ", err) + c.logger.Errorf("errored while copying from file to buf: ", err) + return "","Failure", err } req := &lcmservice.InstantiateRequest { @@ -135,42 +144,50 @@ func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) ( err = stream.Send(req) if err != nil { - logger.Fatalf("failed to send chunk via stream: ", err) + c.logger.Errorf("failed to send chunk via stream: ", err) + return "","Failure", err } } res, err := stream.CloseAndRecv() if err != nil { - logger.Fatalf("failed to receive upstream status response: ", err) - return "", "", err + c.logger.Errorf("failed to receive upstream status response: ", err) + return "","Failure", err } - log.Printf("response", res) - return res.WorkloadId, res.Status, err + c.logger.Infof("Instantiation Completed with workloadId %s and status", res.GetWorkloadId(), res.GetStatus()) + return res.GetWorkloadId(), res.GetStatus(), err } -func (c *ClientGRPC) Query(ctx context.Context, hostIP string, workloadId string) (status string) { +// Query application +func (c *ClientGRPC) Query(ctx context.Context, hostIP string, workloadId string) (status string, error error) { req := &lcmservice.QueryRequest{ HostIp: hostIP, WorkloadId: workloadId, } - resp, _ := c.client.Query(ctx, req) - return resp.Status + resp, err := c.client.Query(ctx, req) + if err != nil { + return "", err + } + return resp.Status, err } -func (c *ClientGRPC) Terminate(ctx context.Context, hostIP string, workloadId string) (status string) { +// Terminate application +func (c *ClientGRPC) Terminate(ctx context.Context, hostIP string, workloadId string) (status string, error error) { req := &lcmservice.TerminateRequest{ HostIp: hostIP, WorkloadId: workloadId, } - resp, _ := c.client.Terminate(ctx, req) - return resp.Status + resp, err := c.client.Terminate(ctx, req) + if err != nil { + return "", err + } + return resp.Status, err } func (c *ClientGRPC) Close() { if c.conn != nil { c.conn.Close() } -} - +} \ No newline at end of file