2 * Copyright 2020 Huawei Technologies Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 "broker/internal/lcmservice"
21 "github.com/sirupsen/logrus"
22 "golang.org/x/net/context"
23 "google.golang.org/grpc"
24 "google.golang.org/grpc/credentials"
25 _ "google.golang.org/grpc/encoding/gzip"
30 // GRPC client to different GRPC supported plugins
31 type ClientGRPC struct {
33 client lcmservice.AppLCMClient
38 type ClientGRPCConfig struct {
41 RootCertificate string
45 // Create a GRPC client
46 func NewClientGRPC(cfg ClientGRPCConfig) (c ClientGRPC, err error) {
49 grpcOpts = []grpc.DialOption{}
50 grpcCreds credentials.TransportCredentials
53 c.chunkSize = cfg.ChunkSize
56 if cfg.RootCertificate != "" {
57 grpcCreds, err = credentials.NewClientTLSFromFile(cfg.RootCertificate, "localhost")
59 c.logger.Errorf("failed to create grpc tls client via provided root-cert ", err)
62 grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(grpcCreds))
64 grpcOpts = append(grpcOpts, grpc.WithInsecure())
67 c.conn, err = grpc.Dial(cfg.Address, grpcOpts...)
69 c.logger.Errorf("failed to start grpc connection with address: ", cfg.Address)
73 c.client = lcmservice.NewAppLCMClient(c.conn)
77 // Instantiate application
78 func (c *ClientGRPC) Instantiate(ctx context.Context, deployArtifact string, hostIP string) (workloadId string, status string, error error) {
85 c.logger.Info("deployArtifact: ", deployArtifact)
87 // Get a file handle for the file we
89 file, err := os.Open(deployArtifact)
91 c.logger.Errorf("failed to open package file: %s. Err: %s", deployArtifact, err.Error())
92 return "","Failure", err
96 // Open a stream-based connection with the
98 stream, err := c.client.Instantiate(ctx)
101 c.logger.Errorf("failed to upload stream: %s. Err: %s", deployArtifact, err.Error())
102 return "","Failure", err
104 defer stream.CloseSend()
106 //send metadata information
107 req := &lcmservice.InstantiateRequest{
109 Data: &lcmservice.InstantiateRequest_HostIp{
114 err = stream.Send(req)
116 c.logger.Errorf("failed to send metadata information: ", deployArtifact)
117 return "","Failure", err
120 // Allocate a buffer with `chunkSize` as the capacity
121 // and length (making a 0 array of the size of `chunkSize`)
122 buf = make([]byte, c.chunkSize)
124 // put as many bytes as `chunkSize` into the
126 n, err = file.Read(buf)
128 // ... if `eof` --> `writing=false`...
134 c.logger.Errorf("errored while copying from file to buf: ", err)
135 return "","Failure", err
138 req := &lcmservice.InstantiateRequest {
139 Data: &lcmservice.InstantiateRequest_Package {
144 err = stream.Send(req)
147 c.logger.Errorf("failed to send chunk via stream: ", err)
148 return "","Failure", err
152 res, err := stream.CloseAndRecv()
154 c.logger.Errorf("failed to receive upstream status response: ", err)
155 return "","Failure", err
157 c.logger.Info("Instantiation Completed")
158 return res.WorkloadId, res.Status, err
162 func (c *ClientGRPC) Query(ctx context.Context, hostIP string, workloadId string) (status string, error error) {
164 req := &lcmservice.QueryRequest{
166 WorkloadId: workloadId,
168 resp, err := c.client.Query(ctx, req)
172 return resp.Status, err
175 // Terminate application
176 func (c *ClientGRPC) Terminate(ctx context.Context, hostIP string, workloadId string) (status string, error error) {
178 req := &lcmservice.TerminateRequest{
180 WorkloadId: workloadId,
182 resp, err := c.client.Terminate(ctx, req)
186 return resp.Status, err
189 func (c *ClientGRPC) Close() {