Bug fixes
[ealt-edge.git] / mecm / mepm / applcm / k8shelm / pkg / plugin / grpcserver.go
1 /*
2  * Copyright 2020 Huawei Technologies Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package plugin
17
18 import (
19         "bytes"
20         "context"
21         "github.com/sirupsen/logrus"
22         "google.golang.org/grpc"
23         "google.golang.org/grpc/codes"
24         "google.golang.org/grpc/credentials"
25         _ "google.golang.org/grpc/encoding/gzip"
26         "google.golang.org/grpc/status"
27         "io"
28         "k8shelm/internal/lcmservice"
29         "net"
30         "os"
31         "strconv"
32 )
33
34 // GRPC server
35 type ServerGRPC struct {
36         server      *grpc.Server
37         port        int
38         certificate string
39         key         string
40         logger      *logrus.Logger
41 }
42
43 // GRPC service configuration used to create GRPC server
44 type ServerGRPCConfig struct {
45         Certificate string
46         Key         string
47         Port        int
48         Logger      *logrus.Logger
49 }
50
51 // Constructor to GRPC server
52 func NewServerGRPC(cfg ServerGRPCConfig) (s ServerGRPC) {
53         s.logger = cfg.Logger
54         s.port = cfg.Port
55         s.certificate = cfg.Certificate
56         s.key = cfg.Key
57         s.logger.Infof("Binding is successful")
58         return
59 }
60
61 // Start GRPC server and start listening on the port
62 func (s *ServerGRPC) Listen() (err error) {
63         var (
64                 listener  net.Listener
65                 grpcOpts  = []grpc.ServerOption{}
66                 grpcCreds credentials.TransportCredentials
67         )
68
69         // Listen announces on the network address
70         listener, err = net.Listen("tcp", ":"+strconv.Itoa(s.port))
71         if err != nil {
72                 s.logger.Fatalf("failed to listen on specified port")
73         }
74         s.logger.Infof("Server started listening on specified port")
75
76         // Secure connection if asked
77         if s.certificate != "" && s.key != "" {
78                 grpcCreds, err = credentials.NewServerTLSFromFile(
79                         s.certificate, s.key)
80                 if err != nil {
81                         s.logger.Fatalf("failed to create tls grpc server using given cert and key")
82                 }
83                 grpcOpts = append(grpcOpts, grpc.Creds(grpcCreds))
84         }
85
86         // Register server with GRPC
87         s.server = grpc.NewServer(grpcOpts...)
88         lcmservice.RegisterAppLCMServer(s.server, s)
89
90         s.logger.Infof("Server registered with GRPC")
91
92         // Server start serving
93         err = s.server.Serve(listener)
94         if err != nil {
95                 s.logger.Fatalf("failed to listen for grpc connections. Err: %s", err)
96                 return err
97         }
98         return
99 }
100
101 // Query HELM chart
102 func (s *ServerGRPC) Query(ctx context.Context, req *lcmservice.QueryRequest) (resp *lcmservice.QueryResponse, err error) {
103
104         // Input validation
105         if (req.GetHostIp() == "") || (req.GetWorkloadId() == "") {
106                 return nil, s.logError(status.Errorf(codes.InvalidArgument, "HostIP & WorkloadId can't be null", err))
107         }
108
109         // Create HELM Client
110         hc, err := NewHelmClient(req.GetHostIp(), s.logger)
111         if os.IsNotExist(err) {
112                 return nil, s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig corresponding to given Edge can't be found. " +
113                         "Err: %s", err))
114         }
115
116         // Query Chart
117         r, err := hc.queryChart(req.GetWorkloadId())
118         if (err != nil) {
119                 return nil, s.logError(status.Errorf(codes.NotFound, "Chart not found for workloadId: %s. Err: %s",
120                         req.GetWorkloadId(), err))
121         }
122         resp = &lcmservice.QueryResponse{
123                 Status: r,
124         }
125         return resp, nil
126 }
127
128 // Terminate HELM charts
129 func (s *ServerGRPC) Terminate(ctx context.Context, req *lcmservice.TerminateRequest) (resp *lcmservice.TerminateResponse, err error) {
130         // Input validation
131         if (req.GetHostIp() == "") || (req.GetWorkloadId() == "") {
132                 return nil, s.logError(status.Errorf(codes.InvalidArgument, "HostIP & WorkloadId can't be null", err))
133         }
134
135         // Create HELM client
136         hc, err := NewHelmClient(req.GetHostIp(), s.logger)
137         if os.IsNotExist(err) {
138                 return nil, s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig corresponding to given Edge can't be found. " +
139                         "Err: %s", err))
140         }
141
142         // Uninstall chart
143         err = hc.uninstallChart(req.GetWorkloadId())
144
145         if (err != nil) {
146                 resp = &lcmservice.TerminateResponse{
147                         Status: "Failure",
148                 }
149                 return resp, s.logError(status.Errorf(codes.NotFound, "Chart not found for workloadId: %s. Err: %s",
150                         req.GetWorkloadId(), err))
151         } else {
152                 resp = &lcmservice.TerminateResponse{
153                         Status: "Success",
154                 }
155                 return resp, nil
156         }
157 }
158
159 // Instantiate HELM Chart
160 func (s *ServerGRPC) Instantiate(stream lcmservice.AppLCM_InstantiateServer) (err error) {
161
162         // Recieve metadata which is host ip
163         req, err := stream.Recv()
164         if err != nil {
165                 s.logger.Errorf("Cannot receive package metadata. Err: %s", err)
166                 return
167         }
168
169         hostIP := req.GetHostIp()
170         s.logger.Infof("Recieved instantiate request")
171
172         // Host validation
173         if (hostIP == "") {
174                 return s.logError(status.Errorf(codes.InvalidArgument, "HostIP & WorkloadId can't be null", err))
175         }
176
177         // Receive package
178         helmPkg := bytes.Buffer{}
179         for {
180                 err := s.contextError(stream.Context())
181                 if err != nil {
182                         return err
183                 }
184
185                 s.logger.Debug("Waiting to receive more data")
186
187                 req, err := stream.Recv()
188                 if err == io.EOF {
189                         s.logger.Debug("No more data")
190                         break
191                 }
192                 if err != nil {
193                         return s.logError(status.Errorf(codes.Unknown, "cannot receive chunk data: %v", err))
194                 }
195
196                 // Receive chunk and write to helm package
197                 chunk := req.GetPackage()
198
199                 s.logger.Infof("Recieved chunk")
200
201                 _, err = helmPkg.Write(chunk)
202                 if err != nil {
203                         return s.logError(status.Errorf(codes.Internal, "cannot write chunk data: %v", err))
204                 }
205         }
206
207         // Create HELM client
208         hc, err := NewHelmClient(req.GetHostIp(), s.logger)
209         if os.IsNotExist(err) {
210                 return s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig corresponding to edge can't be found. " +
211                         "Err: %s", err))
212         }
213
214         relName, err := hc.installChart(helmPkg)
215
216         var res lcmservice.InstantiateResponse
217         res.WorkloadId = relName
218
219         if (err != nil) {
220                 res.Status = "Failure"
221                 s.logger.Infof("Instantiation Failed")
222         } else {
223                 res.Status = "Success"
224                 s.logger.Infof("Successful Instantiation")
225         }
226
227         err = stream.SendAndClose(&res)
228         if err != nil {
229                 return s.logError(status.Errorf(codes.Unknown, "cannot send response: %v", err))
230         }
231         return
232 }
233
234 func (s *ServerGRPC) contextError(ctx context.Context) error {
235         switch ctx.Err() {
236         case context.Canceled:
237                 return s.logError(status.Error(codes.Canceled, "request is canceled"))
238         case context.DeadlineExceeded:
239                 return s.logError(status.Error(codes.DeadlineExceeded, "deadline is exceeded"))
240         default:
241                 return nil
242         }
243 }
244
245 func (s *ServerGRPC) logError(err error) error {
246         if err != nil {
247                 s.logger.Errorf("Error Information: ", err)
248         }
249         return err
250 }