d55098d4692d766a6053e1686d27b0065b5145bb
[ealt-edge.git] / mecm / mepm / applcm / broker / pkg / plugin / grpcclient.go
1 /*
2  * Copyright 2020 Huawei Technologies Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package plugin
18
19 import (
20         "broker/internal/lcmservice"
21         "golang.org/x/net/context"
22         "google.golang.org/grpc"
23         "google.golang.org/grpc/credentials"
24         _ "google.golang.org/grpc/encoding/gzip"
25         "io"
26         "log"
27         "os"
28 )
29
30 // ClientGRPC provides the implementation of a file
31 // uploader that streams chunks via protobuf-encoded
32 // messages.
33 type ClientGRPC struct {
34         conn      *grpc.ClientConn
35         client    lcmservice.AppLCMClient
36         chunkSize int
37 }
38
39 type ClientGRPCConfig struct {
40         Address         string
41         ChunkSize       int
42         RootCertificate string
43 }
44
45 func NewClientGRPC(cfg ClientGRPCConfig) (c ClientGRPC, err error) {
46
47         logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile)
48
49         var (
50                 grpcOpts  = []grpc.DialOption{}
51                 grpcCreds credentials.TransportCredentials
52         )
53
54         if cfg.Address == "" {
55                 logger.Fatalf("address must be specified: ", err)
56         }
57
58         if cfg.RootCertificate != "" {
59                 grpcCreds, err = credentials.NewClientTLSFromFile(cfg.RootCertificate, "localhost")
60                 if err != nil {
61                         logger.Fatalf("failed to create grpc tls client via root-cert: ", err)
62                 }
63
64                 grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(grpcCreds))
65         } else {
66                 grpcOpts = append(grpcOpts, grpc.WithInsecure())
67         }
68
69         switch {
70         case cfg.ChunkSize == 0:
71                 logger.Fatalf("ChunkSize must be specified")
72         case cfg.ChunkSize > (1 << 22):
73                 logger.Fatalf("ChunkSize must be < than 4MB")
74         default:
75                 c.chunkSize = cfg.ChunkSize
76         }
77
78         c.conn, err = grpc.Dial(cfg.Address, grpcOpts...)
79         if err != nil {
80                 logger.Fatalf("failed to start grpc connection with address: ", cfg.Address)
81         }
82
83         c.client = lcmservice.NewAppLCMClient(c.conn)
84         return
85 }
86
87 func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) (workloadId string, status string, error error) {
88         var (
89                 writing = true
90                 buf     []byte
91                 n       int
92                 file    *os.File
93         )
94         log.Printf("hostIP: ", hostIP)
95         log.Printf("deployArtifact: ", f)
96         logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile)
97
98         // Get a file handle for the file we
99         // want to upload
100         file, err := os.Open(f)
101         if err != nil {
102                 logger.Fatalf("failed to open file: ", err.Error())
103         }
104         defer file.Close()
105
106         // Open a stream-based connection with the
107         // gRPC server
108         stream, err := c.client.Instantiate(ctx)
109
110         if err != nil {
111                 logger.Fatalf("failed to create upload stream for file: ", err)
112         }
113         defer stream.CloseSend()
114
115     //send metadata information
116         req := &lcmservice.InstantiateRequest{
117
118                 Data: &lcmservice.InstantiateRequest_HostIp{
119                                 HostIp:  hostIP,
120                 },
121         }
122
123         err = stream.Send(req)
124         if err != nil {
125                 logger.Fatalf("failed to send metadata information: ", f)
126         }
127
128         // Allocate a buffer with `chunkSize` as the capacity
129         // and length (making a 0 array of the size of `chunkSize`)
130         buf = make([]byte, c.chunkSize)
131         for writing {
132                 // put as many bytes as `chunkSize` into the
133                 // buf array.
134                 n, err = file.Read(buf)
135                 if err != nil {
136                         // ... if `eof` --> `writing=false`...
137                         if err == io.EOF {
138                                 writing = false
139                                 err = nil
140                                 continue
141                         }
142                         logger.Fatalf("errored while copying from file to buf: ", err)
143                 }
144
145                 req := &lcmservice.InstantiateRequest {
146                         Data: &lcmservice.InstantiateRequest_Package {
147                                 Package: buf[:n],
148                         },
149                 }
150
151                 err = stream.Send(req)
152
153                 if err != nil {
154                         logger.Fatalf("failed to send chunk via stream: ", err)
155                 }
156         }
157
158         res, err := stream.CloseAndRecv()
159         if err != nil {
160                 logger.Fatalf("failed to receive upstream status response: ", err)
161                 return "", "", err
162         }
163         log.Printf("response", res)
164         return res.WorkloadId, res.Status, err
165 }
166
167 func (c *ClientGRPC) Query(ctx context.Context, hostIP string, workloadId string) (status string) {
168
169         req := &lcmservice.QueryRequest{
170                 HostIp: hostIP,
171                 WorkloadId: workloadId,
172         }
173         resp, _ := c.client.Query(ctx, req)
174         return resp.Status
175 }
176
177 func (c *ClientGRPC) Terminate(ctx context.Context, hostIP string, workloadId string) (status string) {
178
179         req := &lcmservice.TerminateRequest{
180                 HostIp: hostIP,
181                 WorkloadId: workloadId,
182         }
183         resp, _ := c.client.Terminate(ctx, req)
184         return resp.Status
185 }
186
187 func (c *ClientGRPC) Close() {
188         if c.conn != nil {
189                 c.conn.Close()
190         }
191 }
192