e7783679b24fd51e066d08c6a3a84b958170c7b3
[ealt-edge.git] / mecm / mepm / applcm / broker / pkg / plugin / grpcclient.go
1 /*
2  * Copyright 2020 Huawei Technologies Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package plugin
18
19 import (
20         "broker/internal/lcmservice"
21         "io"
22         "os"
23
24         "github.com/sirupsen/logrus"
25         "golang.org/x/net/context"
26         "google.golang.org/grpc"
27         "google.golang.org/grpc/credentials"
28         _ "google.golang.org/grpc/encoding/gzip"
29 )
30
31 // GRPC client to different GRPC supported plugins
32 type ClientGRPC struct {
33         conn      *grpc.ClientConn
34         client    lcmservice.AppLCMClient
35         chunkSize int
36         logger    *logrus.Logger
37 }
38
39 type ClientGRPCConfig struct {
40         Address         string
41         ChunkSize       int
42         RootCertificate string
43         Logger          *logrus.Logger
44 }
45
46 // Create a GRPC client
47 func NewClientGRPC(cfg ClientGRPCConfig) (c ClientGRPC, err error) {
48
49         var (
50                 grpcOpts  = []grpc.DialOption{}
51                 grpcCreds credentials.TransportCredentials
52         )
53
54         c.chunkSize = cfg.ChunkSize
55         c.logger = cfg.Logger
56
57         if cfg.RootCertificate != "" {
58                 grpcCreds, err = credentials.NewClientTLSFromFile(cfg.RootCertificate, "localhost")
59                 if err != nil {
60                         c.logger.Errorf("failed to create grpc tls client via provided root-cert ", err)
61                         return c, err
62                 }
63                 grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(grpcCreds))
64         } else {
65                 grpcOpts = append(grpcOpts, grpc.WithInsecure())
66         }
67
68         c.conn, err = grpc.Dial(cfg.Address, grpcOpts...)
69         if err != nil {
70                 c.logger.Errorf("failed to start grpc connection with address: ", cfg.Address)
71                 return c, err
72         }
73
74         c.client = lcmservice.NewAppLCMClient(c.conn)
75         return c, nil
76 }
77
78 // Instantiate application
79 func (c *ClientGRPC) Instantiate(ctx context.Context, deployArtifact string, hostIP string) (workloadId string, status string, error error) {
80         var (
81                 writing = true
82                 buf     []byte
83                 n       int
84                 file    *os.File
85         )
86         c.logger.Infof("deployArtifact: ", deployArtifact)
87
88         // Get a file handle for the file we
89         // want to upload
90         file, err := os.Open(deployArtifact)
91         if err != nil {
92                 c.logger.Errorf("failed to open package file: %s. Err: %s", deployArtifact, err.Error())
93                 return "", "Failure", err
94         }
95         defer file.Close()
96
97         // Open a stream-based connection with the
98         // gRPC server
99         stream, err := c.client.Instantiate(ctx)
100
101         if err != nil {
102                 c.logger.Errorf("failed to upload stream: %s. Err: %s", deployArtifact, err.Error())
103                 return "", "Failure", err
104         }
105         defer stream.CloseSend()
106
107         //send metadata information
108         req := &lcmservice.InstantiateRequest{
109
110                 Data: &lcmservice.InstantiateRequest_HostIp{
111                         HostIp: hostIP,
112                 },
113         }
114
115         err = stream.Send(req)
116         if err != nil {
117                 c.logger.Errorf("failed to send metadata information: ", deployArtifact)
118                 return "", "Failure", err
119         }
120
121         // Allocate a buffer with `chunkSize` as the capacity
122         // and length (making a 0 array of the size of `chunkSize`)
123         buf = make([]byte, c.chunkSize)
124         for writing {
125                 // put as many bytes as `chunkSize` into the
126                 // buf array.
127                 n, err = file.Read(buf)
128                 if err != nil {
129                         // ... if `eof` --> `writing=false`...
130                         if err == io.EOF {
131                                 writing = false
132                                 err = nil
133                                 continue
134                         }
135                         c.logger.Errorf("errored while copying from file to buf: ", err)
136                         return "", "Failure", err
137                 }
138
139                 req := &lcmservice.InstantiateRequest{
140                         Data: &lcmservice.InstantiateRequest_Package{
141                                 Package: buf[:n],
142                         },
143                 }
144
145                 err = stream.Send(req)
146
147                 if err != nil {
148                         c.logger.Errorf("failed to send chunk via stream: ", err)
149                         return "", "Failure", err
150                 }
151         }
152
153         res, err := stream.CloseAndRecv()
154         if err != nil {
155                 c.logger.Errorf("failed to receive upstream status response: ", err)
156                 return "", "Failure", err
157         }
158         c.logger.Infof("Instantiation Completed with workloadId %s and status", res.GetWorkloadId(), res.GetStatus())
159         return res.GetWorkloadId(), res.GetStatus(), err
160 }
161
162 // Query application
163 func (c *ClientGRPC) Query(ctx context.Context, hostIP string, workloadId string) (status string, error error) {
164
165         req := &lcmservice.QueryRequest{
166                 HostIp:     hostIP,
167                 WorkloadId: workloadId,
168         }
169         resp, err := c.client.Query(ctx, req)
170         if err != nil {
171                 return "", err
172         }
173         return resp.Status, err
174 }
175
176 // Terminate application
177 func (c *ClientGRPC) Terminate(ctx context.Context, hostIP string, workloadId string) (status string, error error) {
178
179         req := &lcmservice.TerminateRequest{
180                 HostIp:     hostIP,
181                 WorkloadId: workloadId,
182         }
183         resp, err := c.client.Terminate(ctx, req)
184         if err != nil {
185                 return "", err
186         }
187         return resp.Status, err
188 }
189
190 func (c *ClientGRPC) Close() {
191         if c.conn != nil {
192                 c.conn.Close()
193         }
194 }