2 * Copyright 2020 Huawei Technologies Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 "broker/internal/lcmservice"
24 "github.com/sirupsen/logrus"
25 "golang.org/x/net/context"
26 "google.golang.org/grpc"
27 "google.golang.org/grpc/credentials"
28 _ "google.golang.org/grpc/encoding/gzip"
31 // GRPC client to different GRPC supported plugins
32 type ClientGRPC struct {
34 client lcmservice.AppLCMClient
39 type ClientGRPCConfig struct {
42 RootCertificate string
46 // Create a GRPC client
47 func NewClientGRPC(cfg ClientGRPCConfig) (c ClientGRPC, err error) {
50 grpcOpts = []grpc.DialOption{}
51 grpcCreds credentials.TransportCredentials
54 c.chunkSize = cfg.ChunkSize
57 if cfg.RootCertificate != "" {
58 grpcCreds, err = credentials.NewClientTLSFromFile(cfg.RootCertificate, "localhost")
60 c.logger.Errorf("failed to create grpc tls client via provided root-cert ", err)
63 grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(grpcCreds))
65 grpcOpts = append(grpcOpts, grpc.WithInsecure())
68 c.conn, err = grpc.Dial(cfg.Address, grpcOpts...)
70 c.logger.Errorf("failed to start grpc connection with address: ", cfg.Address)
74 c.client = lcmservice.NewAppLCMClient(c.conn)
78 // Instantiate application
79 func (c *ClientGRPC) Instantiate(ctx context.Context, deployArtifact string, hostIP string) (workloadId string, status string, error error) {
86 c.logger.Infof("deployArtifact: ", deployArtifact)
88 // Get a file handle for the file we
90 file, err := os.Open(deployArtifact)
92 c.logger.Errorf("failed to open package file: %s. Err: %s", deployArtifact, err.Error())
93 return "", "Failure", err
97 // Open a stream-based connection with the
99 stream, err := c.client.Instantiate(ctx)
102 c.logger.Errorf("failed to upload stream: %s. Err: %s", deployArtifact, err.Error())
103 return "", "Failure", err
105 defer stream.CloseSend()
107 //send metadata information
108 req := &lcmservice.InstantiateRequest{
110 Data: &lcmservice.InstantiateRequest_HostIp{
115 err = stream.Send(req)
117 c.logger.Errorf("failed to send metadata information: ", deployArtifact)
118 return "", "Failure", err
121 // Allocate a buffer with `chunkSize` as the capacity
122 // and length (making a 0 array of the size of `chunkSize`)
123 buf = make([]byte, c.chunkSize)
125 // put as many bytes as `chunkSize` into the
127 n, err = file.Read(buf)
129 // ... if `eof` --> `writing=false`...
135 c.logger.Errorf("errored while copying from file to buf: ", err)
136 return "", "Failure", err
139 req := &lcmservice.InstantiateRequest{
140 Data: &lcmservice.InstantiateRequest_Package{
145 err = stream.Send(req)
148 c.logger.Errorf("failed to send chunk via stream: ", err)
149 return "", "Failure", err
153 res, err := stream.CloseAndRecv()
155 c.logger.Errorf("failed to receive upstream status response: ", err)
156 return "", "Failure", err
158 c.logger.Infof("Instantiation Completed with workloadId %s and status", res.GetWorkloadId(), res.GetStatus())
159 return res.GetWorkloadId(), res.GetStatus(), err
163 func (c *ClientGRPC) Query(ctx context.Context, hostIP string, workloadId string) (status string, error error) {
165 req := &lcmservice.QueryRequest{
167 WorkloadId: workloadId,
169 resp, err := c.client.Query(ctx, req)
173 return resp.Status, err
176 // Terminate application
177 func (c *ClientGRPC) Terminate(ctx context.Context, hostIP string, workloadId string) (status string, error error) {
179 req := &lcmservice.TerminateRequest{
181 WorkloadId: workloadId,
183 resp, err := c.client.Terminate(ctx, req)
187 return resp.Status, err
190 func (c *ClientGRPC) Close() {