Uniform formatting and volume as hostpath
[ealt-edge.git] / mecm / mepm / applcm / k8shelm / pkg / plugin / grpcserver.go
1 /*
2  * Copyright 2020 Huawei Technologies Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package plugin
17
18 import (
19         "bytes"
20         "context"
21         "io"
22         "k8shelm/internal/lcmservice"
23         "net"
24         "os"
25         "strconv"
26
27         "github.com/sirupsen/logrus"
28         "google.golang.org/grpc"
29         "google.golang.org/grpc/codes"
30         "google.golang.org/grpc/credentials"
31         _ "google.golang.org/grpc/encoding/gzip"
32         "google.golang.org/grpc/status"
33 )
34
35 // GRPC server
36 type ServerGRPC struct {
37         server      *grpc.Server
38         port        int
39         certificate string
40         key         string
41         logger      *logrus.Logger
42 }
43
44 // GRPC service configuration used to create GRPC server
45 type ServerGRPCConfig struct {
46         Certificate string
47         Key         string
48         Port        int
49         Logger      *logrus.Logger
50 }
51
52 // Constructor to GRPC server
53 func NewServerGRPC(cfg ServerGRPCConfig) (s ServerGRPC) {
54         s.logger = cfg.Logger
55         s.port = cfg.Port
56         s.certificate = cfg.Certificate
57         s.key = cfg.Key
58         s.logger.Infof("Binding is successful")
59         return
60 }
61
62 // Start GRPC server and start listening on the port
63 func (s *ServerGRPC) Listen() (err error) {
64         var (
65                 listener  net.Listener
66                 grpcOpts  = []grpc.ServerOption{}
67                 grpcCreds credentials.TransportCredentials
68         )
69
70         // Listen announces on the network address
71         listener, err = net.Listen("tcp", ":"+strconv.Itoa(s.port))
72         if err != nil {
73                 s.logger.Fatalf("failed to listen on specified port")
74         }
75         s.logger.Infof("Server started listening on specified port")
76
77         // Secure connection if asked
78         if s.certificate != "" && s.key != "" {
79                 grpcCreds, err = credentials.NewServerTLSFromFile(
80                         s.certificate, s.key)
81                 if err != nil {
82                         s.logger.Fatalf("failed to create tls grpc server using given cert and key")
83                 }
84                 grpcOpts = append(grpcOpts, grpc.Creds(grpcCreds))
85         }
86
87         // Register server with GRPC
88         s.server = grpc.NewServer(grpcOpts...)
89         lcmservice.RegisterAppLCMServer(s.server, s)
90
91         s.logger.Infof("Server registered with GRPC")
92
93         // Server start serving
94         err = s.server.Serve(listener)
95         if err != nil {
96                 s.logger.Fatalf("failed to listen for grpc connections. Err: %s", err)
97                 return err
98         }
99         return
100 }
101
102 // Query HELM chart
103 func (s *ServerGRPC) Query(ctx context.Context, req *lcmservice.QueryRequest) (resp *lcmservice.QueryResponse, err error) {
104
105         // Input validation
106         if (req.GetHostIp() == "") || (req.GetWorkloadId() == "") {
107                 return nil, s.logError(status.Errorf(codes.InvalidArgument, "HostIP & WorkloadId can't be null", err))
108         }
109
110         // Create HELM Client
111         hc, err := NewHelmClient(req.GetHostIp(), s.logger)
112         if os.IsNotExist(err) {
113                 return nil, s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig corresponding to given Edge can't be found. "+
114                         "Err: %s", err))
115         }
116
117         // Query Chart
118         r, err := hc.queryChart(req.GetWorkloadId())
119         if err != nil {
120                 return nil, s.logError(status.Errorf(codes.NotFound, "Chart not found for workloadId: %s. Err: %s",
121                         req.GetWorkloadId(), err))
122         }
123         resp = &lcmservice.QueryResponse{
124                 Status: r,
125         }
126         return resp, nil
127 }
128
129 // Terminate HELM charts
130 func (s *ServerGRPC) Terminate(ctx context.Context, req *lcmservice.TerminateRequest) (resp *lcmservice.TerminateResponse, err error) {
131         // Input validation
132         if (req.GetHostIp() == "") || (req.GetWorkloadId() == "") {
133                 return nil, s.logError(status.Errorf(codes.InvalidArgument, "HostIP & WorkloadId can't be null", err))
134         }
135
136         // Create HELM client
137         hc, err := NewHelmClient(req.GetHostIp(), s.logger)
138         if os.IsNotExist(err) {
139                 return nil, s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig corresponding to given Edge can't be found. "+
140                         "Err: %s", err))
141         }
142
143         // Uninstall chart
144         err = hc.uninstallChart(req.GetWorkloadId())
145
146         if err != nil {
147                 resp = &lcmservice.TerminateResponse{
148                         Status: "Failure",
149                 }
150                 return resp, s.logError(status.Errorf(codes.NotFound, "Chart not found for workloadId: %s. Err: %s",
151                         req.GetWorkloadId(), err))
152         } else {
153                 resp = &lcmservice.TerminateResponse{
154                         Status: "Success",
155                 }
156                 return resp, nil
157         }
158 }
159
160 // Instantiate HELM Chart
161 func (s *ServerGRPC) Instantiate(stream lcmservice.AppLCM_InstantiateServer) (err error) {
162
163         // Recieve metadata which is host ip
164         req, err := stream.Recv()
165         if err != nil {
166                 s.logger.Errorf("Cannot receive package metadata. Err: %s", err)
167                 return
168         }
169
170         hostIP := req.GetHostIp()
171         s.logger.Infof("Recieved instantiate request")
172
173         // Host validation
174         if hostIP == "" {
175                 return s.logError(status.Errorf(codes.InvalidArgument, "HostIP & WorkloadId can't be null", err))
176         }
177
178         // Receive package
179         helmPkg := bytes.Buffer{}
180         for {
181                 err := s.contextError(stream.Context())
182                 if err != nil {
183                         return err
184                 }
185
186                 s.logger.Debug("Waiting to receive more data")
187
188                 req, err := stream.Recv()
189                 if err == io.EOF {
190                         s.logger.Debug("No more data")
191                         break
192                 }
193                 if err != nil {
194                         return s.logError(status.Errorf(codes.Unknown, "cannot receive chunk data: %v", err))
195                 }
196
197                 // Receive chunk and write to helm package
198                 chunk := req.GetPackage()
199
200                 s.logger.Infof("Recieved chunk")
201
202                 _, err = helmPkg.Write(chunk)
203                 if err != nil {
204                         return s.logError(status.Errorf(codes.Internal, "cannot write chunk data: %v", err))
205                 }
206         }
207
208         // Create HELM client
209         hc, err := NewHelmClient(req.GetHostIp(), s.logger)
210         if os.IsNotExist(err) {
211                 return s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig corresponding to edge can't be found. "+
212                         "Err: %s", err))
213         }
214
215         relName, err := hc.installChart(helmPkg)
216
217         var res lcmservice.InstantiateResponse
218         res.WorkloadId = relName
219
220         if err != nil {
221                 res.Status = "Failure"
222                 s.logger.Infof("Instantiation Failed")
223         } else {
224                 res.Status = "Success"
225                 s.logger.Infof("Successful Instantiation")
226         }
227
228         err = stream.SendAndClose(&res)
229         if err != nil {
230                 return s.logError(status.Errorf(codes.Unknown, "cannot send response: %v", err))
231         }
232         return
233 }
234
235 func (s *ServerGRPC) contextError(ctx context.Context) error {
236         switch ctx.Err() {
237         case context.Canceled:
238                 return s.logError(status.Error(codes.Canceled, "request is canceled"))
239         case context.DeadlineExceeded:
240                 return s.logError(status.Error(codes.DeadlineExceeded, "deadline is exceeded"))
241         default:
242                 return nil
243         }
244 }
245
246 func (s *ServerGRPC) logError(err error) error {
247         if err != nil {
248                 s.logger.Errorf("Error Information: ", err)
249         }
250         return err
251 }