+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package plugin
import (
"broker/internal/lcmservice"
+ "io"
+ "os"
+
+ "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
_ "google.golang.org/grpc/encoding/gzip"
- "io"
- "log"
- "os"
)
-// ClientGRPC provides the implementation of a file
-// uploader that streams chunks via protobuf-encoded
-// messages.
+// GRPC client to different GRPC supported plugins
type ClientGRPC struct {
conn *grpc.ClientConn
client lcmservice.AppLCMClient
chunkSize int
+ logger *logrus.Logger
}
type ClientGRPCConfig struct {
Address string
ChunkSize int
RootCertificate string
+ Logger *logrus.Logger
}
+// Create a GRPC client
func NewClientGRPC(cfg ClientGRPCConfig) (c ClientGRPC, err error) {
- logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile)
-
var (
grpcOpts = []grpc.DialOption{}
grpcCreds credentials.TransportCredentials
)
- if cfg.Address == "" {
- logger.Fatalf("address must be specified: ", err)
- }
+ c.chunkSize = cfg.ChunkSize
+ c.logger = cfg.Logger
if cfg.RootCertificate != "" {
grpcCreds, err = credentials.NewClientTLSFromFile(cfg.RootCertificate, "localhost")
if err != nil {
- logger.Fatalf("failed to create grpc tls client via root-cert: ", err)
+ c.logger.Errorf("failed to create grpc tls client via provided root-cert ", err)
+ return c, err
}
-
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(grpcCreds))
} else {
grpcOpts = append(grpcOpts, grpc.WithInsecure())
}
- switch {
- case cfg.ChunkSize == 0:
- logger.Fatalf("ChunkSize must be specified")
- case cfg.ChunkSize > (1 << 22):
- logger.Fatalf("ChunkSize must be < than 4MB")
- default:
- c.chunkSize = cfg.ChunkSize
- }
-
c.conn, err = grpc.Dial(cfg.Address, grpcOpts...)
if err != nil {
- logger.Fatalf("failed to start grpc connection with address: ", cfg.Address)
+ c.logger.Errorf("failed to start grpc connection with address: ", cfg.Address)
+ return c, err
}
c.client = lcmservice.NewAppLCMClient(c.conn)
- return
+ return c, nil
}
-func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) (workloadId string, status string, error error) {
+// Instantiate application
+func (c *ClientGRPC) Instantiate(ctx context.Context, deployArtifact string, hostIP string) (workloadId string, status string, error error) {
var (
writing = true
buf []byte
n int
file *os.File
)
- log.Printf("hostIP: ", hostIP)
- log.Printf("deployArtifact: ", f)
- logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile)
+ c.logger.Infof("deployArtifact: ", deployArtifact)
// Get a file handle for the file we
// want to upload
- file, err := os.Open(f)
+ file, err := os.Open(deployArtifact)
if err != nil {
- logger.Fatalf("failed to open file: ", err.Error())
+ c.logger.Errorf("failed to open package file: %s. Err: %s", deployArtifact, err.Error())
+ return "", "Failure", err
}
defer file.Close()
stream, err := c.client.Instantiate(ctx)
if err != nil {
- logger.Fatalf("failed to create upload stream for file: ", err)
+ c.logger.Errorf("failed to upload stream: %s. Err: %s", deployArtifact, err.Error())
+ return "", "Failure", err
}
defer stream.CloseSend()
- //send metadata information
+ //send metadata information
req := &lcmservice.InstantiateRequest{
Data: &lcmservice.InstantiateRequest_HostIp{
- HostIp: hostIP,
+ HostIp: hostIP,
},
}
err = stream.Send(req)
if err != nil {
- logger.Fatalf("failed to send metadata information: ", f)
+ c.logger.Errorf("failed to send metadata information: ", deployArtifact)
+ return "", "Failure", err
}
// Allocate a buffer with `chunkSize` as the capacity
err = nil
continue
}
- logger.Fatalf("errored while copying from file to buf: ", err)
+ c.logger.Errorf("errored while copying from file to buf: ", err)
+ return "", "Failure", err
}
- req := &lcmservice.InstantiateRequest {
- Data: &lcmservice.InstantiateRequest_Package {
+ req := &lcmservice.InstantiateRequest{
+ Data: &lcmservice.InstantiateRequest_Package{
Package: buf[:n],
},
}
err = stream.Send(req)
if err != nil {
- logger.Fatalf("failed to send chunk via stream: ", err)
+ c.logger.Errorf("failed to send chunk via stream: ", err)
+ return "", "Failure", err
}
}
res, err := stream.CloseAndRecv()
if err != nil {
- logger.Fatalf("failed to receive upstream status response: ", err)
- return "", "", err
+ c.logger.Errorf("failed to receive upstream status response: ", err)
+ return "", "Failure", err
}
- log.Printf("response", res)
- return res.WorkloadId, res.Status, err
+ c.logger.Infof("Instantiation Completed with workloadId %s and status", res.GetWorkloadId(), res.GetStatus())
+ return res.GetWorkloadId(), res.GetStatus(), err
}
-func (c *ClientGRPC) Query(ctx context.Context, hostIP string, workloadId string) (status string) {
+// Query application
+func (c *ClientGRPC) Query(ctx context.Context, hostIP string, workloadId string) (status string, error error) {
req := &lcmservice.QueryRequest{
- HostIp: hostIP,
+ HostIp: hostIP,
WorkloadId: workloadId,
}
- resp, _ := c.client.Query(ctx, req)
- return resp.Status
+ resp, err := c.client.Query(ctx, req)
+ if err != nil {
+ return "", err
+ }
+ return resp.Status, err
}
-func (c *ClientGRPC) Terminate(ctx context.Context, hostIP string, workloadId string) (status string) {
+// Terminate application
+func (c *ClientGRPC) Terminate(ctx context.Context, hostIP string, workloadId string) (status string, error error) {
req := &lcmservice.TerminateRequest{
- HostIp: hostIP,
+ HostIp: hostIP,
WorkloadId: workloadId,
}
- resp, _ := c.client.Terminate(ctx, req)
- return resp.Status
+ resp, err := c.client.Terminate(ctx, req)
+ if err != nil {
+ return "", err
+ }
+ return resp.Status, err
}
func (c *ClientGRPC) Close() {
c.conn.Close()
}
}
-