2 * Copyright 2020 Huawei Technologies Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 "broker/internal/lcmservice"
21 "golang.org/x/net/context"
22 "google.golang.org/grpc"
23 "google.golang.org/grpc/credentials"
24 _ "google.golang.org/grpc/encoding/gzip"
30 // ClientGRPC provides the implementation of a file
31 // uploader that streams chunks via protobuf-encoded
33 type ClientGRPC struct {
35 client lcmservice.AppLCMClient
39 type ClientGRPCConfig struct {
42 RootCertificate string
45 func NewClientGRPC(cfg ClientGRPCConfig) (c ClientGRPC, err error) {
47 logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile)
50 grpcOpts = []grpc.DialOption{}
51 grpcCreds credentials.TransportCredentials
54 if cfg.Address == "" {
55 logger.Fatalf("address must be specified: ", err)
58 if cfg.RootCertificate != "" {
59 grpcCreds, err = credentials.NewClientTLSFromFile(cfg.RootCertificate, "localhost")
61 logger.Fatalf("failed to create grpc tls client via root-cert: ", err)
64 grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(grpcCreds))
66 grpcOpts = append(grpcOpts, grpc.WithInsecure())
70 case cfg.ChunkSize == 0:
71 logger.Fatalf("ChunkSize must be specified")
72 case cfg.ChunkSize > (1 << 22):
73 logger.Fatalf("ChunkSize must be < than 4MB")
75 c.chunkSize = cfg.ChunkSize
78 c.conn, err = grpc.Dial(cfg.Address, grpcOpts...)
80 logger.Fatalf("failed to start grpc connection with address: ", cfg.Address)
83 c.client = lcmservice.NewAppLCMClient(c.conn)
87 func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) (workloadId string, status string, error error) {
94 log.Printf("hostIP: ", hostIP)
95 log.Printf("deployArtifact: ", f)
96 logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile)
98 // Get a file handle for the file we
100 file, err := os.Open(f)
102 logger.Fatalf("failed to open file: ", err.Error())
106 // Open a stream-based connection with the
108 stream, err := c.client.Instantiate(ctx)
111 logger.Fatalf("failed to create upload stream for file: ", err)
113 defer stream.CloseSend()
115 //send metadata information
116 req := &lcmservice.InstantiateRequest{
118 Data: &lcmservice.InstantiateRequest_HostIp{
123 err = stream.Send(req)
125 logger.Fatalf("failed to send metadata information: ", f)
128 // Allocate a buffer with `chunkSize` as the capacity
129 // and length (making a 0 array of the size of `chunkSize`)
130 buf = make([]byte, c.chunkSize)
132 // put as many bytes as `chunkSize` into the
134 n, err = file.Read(buf)
136 // ... if `eof` --> `writing=false`...
142 logger.Fatalf("errored while copying from file to buf: ", err)
145 req := &lcmservice.InstantiateRequest {
146 Data: &lcmservice.InstantiateRequest_Package {
151 err = stream.Send(req)
154 logger.Fatalf("failed to send chunk via stream: ", err)
158 res, err := stream.CloseAndRecv()
160 logger.Fatalf("failed to receive upstream status response: ", err)
163 log.Printf("response", res)
164 return res.WorkloadId, res.Status, err
167 func (c *ClientGRPC) Query(ctx context.Context, hostIP string, workloadId string) (status string) {
169 req := &lcmservice.QueryRequest{
171 WorkloadId: workloadId,
173 resp, _ := c.client.Query(ctx, req)
177 func (c *ClientGRPC) Terminate(ctx context.Context, hostIP string, workloadId string) (status string) {
179 req := &lcmservice.TerminateRequest{
181 WorkloadId: workloadId,
183 resp, _ := c.client.Terminate(ctx, req)
187 func (c *ClientGRPC) Close() {