Bug fixes
[ealt-edge.git] / mecm / mepm / applcm / broker / pkg / plugin / grpcclient.go
1 /*
2  * Copyright 2020 Huawei Technologies Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package plugin
18
19 import (
20         "broker/internal/lcmservice"
21         "github.com/sirupsen/logrus"
22         "golang.org/x/net/context"
23         "google.golang.org/grpc"
24         "google.golang.org/grpc/credentials"
25         _ "google.golang.org/grpc/encoding/gzip"
26         "io"
27         "os"
28 )
29
30 // GRPC client to different GRPC supported plugins
31 type ClientGRPC struct {
32         conn      *grpc.ClientConn
33         client    lcmservice.AppLCMClient
34         chunkSize int
35         logger    *logrus.Logger
36 }
37
38 type ClientGRPCConfig struct {
39         Address         string
40         ChunkSize       int
41         RootCertificate string
42         Logger          *logrus.Logger
43 }
44
45 // Create a GRPC client
46 func NewClientGRPC(cfg ClientGRPCConfig) (c ClientGRPC, err error) {
47
48         var (
49                 grpcOpts  = []grpc.DialOption{}
50                 grpcCreds credentials.TransportCredentials
51         )
52
53         c.chunkSize = cfg.ChunkSize
54         c.logger = cfg.Logger
55
56         if cfg.RootCertificate != "" {
57                 grpcCreds, err = credentials.NewClientTLSFromFile(cfg.RootCertificate, "localhost")
58                 if err != nil {
59                         c.logger.Errorf("failed to create grpc tls client via provided root-cert ", err)
60                         return c, err
61                 }
62                 grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(grpcCreds))
63         } else {
64                 grpcOpts = append(grpcOpts, grpc.WithInsecure())
65         }
66
67         c.conn, err = grpc.Dial(cfg.Address, grpcOpts...)
68         if err != nil {
69                 c.logger.Errorf("failed to start grpc connection with address: ", cfg.Address)
70                 return c, err
71         }
72
73         c.client = lcmservice.NewAppLCMClient(c.conn)
74         return c, nil
75 }
76
77 // Instantiate application
78 func (c *ClientGRPC) Instantiate(ctx context.Context, deployArtifact string, hostIP string) (workloadId string, status string, error error) {
79         var (
80                 writing = true
81                 buf     []byte
82                 n       int
83                 file    *os.File
84         )
85         c.logger.Infof("deployArtifact: ", deployArtifact)
86
87         // Get a file handle for the file we
88         // want to upload
89         file, err := os.Open(deployArtifact)
90         if err != nil {
91                 c.logger.Errorf("failed to open package file: %s. Err: %s", deployArtifact, err.Error())
92                 return "","Failure", err
93         }
94         defer file.Close()
95
96         // Open a stream-based connection with the
97         // gRPC server
98         stream, err := c.client.Instantiate(ctx)
99
100         if err != nil {
101                 c.logger.Errorf("failed to upload stream: %s. Err: %s", deployArtifact, err.Error())
102                 return "","Failure", err
103         }
104         defer stream.CloseSend()
105
106         //send metadata information
107         req := &lcmservice.InstantiateRequest{
108
109                 Data: &lcmservice.InstantiateRequest_HostIp{
110                         HostIp:  hostIP,
111                 },
112         }
113
114         err = stream.Send(req)
115         if err != nil {
116                 c.logger.Errorf("failed to send metadata information: ", deployArtifact)
117                 return "","Failure", err
118         }
119
120         // Allocate a buffer with `chunkSize` as the capacity
121         // and length (making a 0 array of the size of `chunkSize`)
122         buf = make([]byte, c.chunkSize)
123         for writing {
124                 // put as many bytes as `chunkSize` into the
125                 // buf array.
126                 n, err = file.Read(buf)
127                 if err != nil {
128                         // ... if `eof` --> `writing=false`...
129                         if err == io.EOF {
130                                 writing = false
131                                 err = nil
132                                 continue
133                         }
134                         c.logger.Errorf("errored while copying from file to buf: ", err)
135                         return "","Failure", err
136                 }
137
138                 req := &lcmservice.InstantiateRequest {
139                         Data: &lcmservice.InstantiateRequest_Package {
140                                 Package: buf[:n],
141                         },
142                 }
143
144                 err = stream.Send(req)
145
146                 if err != nil {
147                         c.logger.Errorf("failed to send chunk via stream: ", err)
148                         return "","Failure", err
149                 }
150         }
151
152         res, err := stream.CloseAndRecv()
153         if err != nil {
154                 c.logger.Errorf("failed to receive upstream status response: ", err)
155                 return "","Failure", err
156         }
157         c.logger.Infof("Instantiation Completed with workloadId %s and status", res.GetWorkloadId(), res.GetStatus())
158         return res.GetWorkloadId(), res.GetStatus(), err
159 }
160
161 // Query application
162 func (c *ClientGRPC) Query(ctx context.Context, hostIP string, workloadId string) (status string, error error) {
163
164         req := &lcmservice.QueryRequest{
165                 HostIp: hostIP,
166                 WorkloadId: workloadId,
167         }
168         resp, err := c.client.Query(ctx, req)
169         if err != nil {
170                 return "", err
171         }
172         return resp.Status, err
173 }
174
175 // Terminate application
176 func (c *ClientGRPC) Terminate(ctx context.Context, hostIP string, workloadId string) (status string, error error) {
177
178         req := &lcmservice.TerminateRequest{
179                 HostIp: hostIP,
180                 WorkloadId: workloadId,
181         }
182         resp, err := c.client.Terminate(ctx, req)
183         if err != nil {
184                 return "", err
185         }
186         return resp.Status, err
187 }
188
189 func (c *ClientGRPC) Close() {
190         if c.conn != nil {
191                 c.conn.Close()
192         }
193 }