6ec356f6277136b0ad8aa52429cd8d3bac0503aa
[ealt-edge.git] / mecm / mepm / applcm / k8shelm / pkg / plugin / grpcserver.go
1 /*
2  * Copyright 2020 Huawei Technologies Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package plugin
17
18 import (
19         "bytes"
20         "context"
21         "github.com/sirupsen/logrus"
22         "google.golang.org/grpc"
23         "google.golang.org/grpc/codes"
24         "google.golang.org/grpc/credentials"
25         _ "google.golang.org/grpc/encoding/gzip"
26         "google.golang.org/grpc/status"
27         "io"
28         "k8shelm/internal/lcmservice"
29         "net"
30         "os"
31         "strconv"
32 )
33
34 // GRPC server
35 type ServerGRPC struct {
36         server      *grpc.Server
37         port        int
38         certificate string
39         key         string
40         logger      *logrus.Logger
41 }
42
43 // GRPC service configuration used to create GRPC server
44 type ServerGRPCConfig struct {
45         Certificate string
46         Key         string
47         Port        int
48         Logger      *logrus.Logger
49 }
50
51 // Constructor to GRPC server
52 func NewServerGRPC(cfg ServerGRPCConfig) (s ServerGRPC) {
53         s.logger = cfg.Logger
54         s.port = cfg.Port
55         s.certificate = cfg.Certificate
56         s.key = cfg.Key
57         s.logger.Infof("Binding is successful")
58         return
59 }
60
61 // Start GRPC server and start listening on the port
62 func (s *ServerGRPC) Listen() (err error) {
63         var (
64                 listener  net.Listener
65                 grpcOpts  = []grpc.ServerOption{}
66                 grpcCreds credentials.TransportCredentials
67         )
68
69         // Listen announces on the network address
70         listener, err = net.Listen("tcp", ":"+strconv.Itoa(s.port))
71         if err != nil {
72                 s.logger.Fatalf("failed to listen on port: %s. Err: %s", s.port, err)
73                 return err
74         }
75         s.logger.Infof("Server started listening on port ", s.port)
76
77         // Secure connection if asked
78         if s.certificate != "" && s.key != "" {
79                 grpcCreds, err = credentials.NewServerTLSFromFile(
80                         s.certificate, s.key)
81                 if err != nil {
82                         s.logger.Fatalf("failed to create tls grpc server using cert %s and key: %s. Err: %s", s.certificate, s.key, err)
83                 }
84                 grpcOpts = append(grpcOpts, grpc.Creds(grpcCreds))
85         }
86
87         // Register server with GRPC
88         s.server = grpc.NewServer(grpcOpts...)
89         lcmservice.RegisterAppLCMServer(s.server, s)
90
91         s.logger.Infof("Server registered with GRPC")
92
93         // Server start serving
94         err = s.server.Serve(listener)
95         if err != nil {
96                 s.logger.Fatalf("failed to listen for grpc connections. Err: %s", err)
97                 return err
98         }
99         return
100 }
101
102 // Query HELM chart
103 func (s *ServerGRPC) Query(ctx context.Context, req *lcmservice.QueryRequest) (resp *lcmservice.QueryResponse, err error) {
104
105         // Input validation
106         if (req.GetHostIp() == "") || (req.GetWorkloadId() == "") {
107                 return nil, s.logError(status.Errorf(codes.InvalidArgument, "Nil input HostIp: %s or workloadId: %s. " +
108                         "Err: %s", req.GetHostIp(), req.GetWorkloadId(), err))
109         }
110
111         // Create HELM Client
112         hc, err := NewHelmClient(req.GetHostIp(), s.logger)
113         if os.IsNotExist(err) {
114                 return nil, s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig for HostIp can't be found: %s. " +
115                         "Err: %s", req.GetHostIp(), err))
116         }
117
118         // Query Chart
119         r, err := hc.queryChart(req.GetWorkloadId())
120         if (err != nil) {
121                 return nil, s.logError(status.Errorf(codes.NotFound, "Chart not found for workloadId: %s. Err: %s",
122                         req.GetWorkloadId(), err))
123         }
124         resp = &lcmservice.QueryResponse{
125                 Status: r,
126         }
127         return resp, nil
128 }
129
130 // Terminate HELM charts
131 func (s *ServerGRPC) Terminate(ctx context.Context, req *lcmservice.TerminateRequest) (resp *lcmservice.TerminateResponse, err error) {
132         // Input validation
133         if (req.GetHostIp() == "") || (req.GetWorkloadId() == "") {
134                 return nil, s.logError(status.Errorf(codes.InvalidArgument, "Nil input HostIp: %s or workloadId: %s. " +
135                         "Err: %s", req.GetHostIp(), req.GetWorkloadId(), err))
136         }
137
138         // Create HELM client
139         hc, err := NewHelmClient(req.GetHostIp(), s.logger)
140         if os.IsNotExist(err) {
141                 return nil, s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig for HostIp can't be found: %s. " +
142                         "Err: %s", req.GetHostIp(), err))
143         }
144
145         // Uninstall chart
146         err = hc.uninstallChart(req.GetWorkloadId())
147
148         if (err != nil) {
149                 resp = &lcmservice.TerminateResponse{
150                         Status: "Failure",
151                 }
152                 return resp, s.logError(status.Errorf(codes.NotFound, "Chart not found for workloadId: %s. Err: %s",
153                         req.GetWorkloadId(), err))
154         } else {
155                 resp = &lcmservice.TerminateResponse{
156                         Status: "Success",
157                 }
158                 return resp, nil
159         }
160 }
161
162 // Instantiate HELM Chart
163 func (s *ServerGRPC) Instantiate(stream lcmservice.AppLCM_InstantiateServer) (err error) {
164
165         // Recieve metadata which is host ip
166         req, err := stream.Recv()
167         if err != nil {
168                 s.logger.Errorf("Cannot receive package metadata. Err: %s", err)
169                 return
170         }
171
172         hostIP := req.GetHostIp()
173         s.logger.Info("Recieved instantiate request for host ", hostIP)
174
175         // Host validation
176         if (hostIP == "") {
177                 return s.logError(status.Errorf(codes.InvalidArgument, "Nil input for HostIp: %s Err: %s", req.GetHostIp(), err))
178         }
179
180         // Receive package
181         helmPkg := bytes.Buffer{}
182         for {
183                 err := s.contextError(stream.Context())
184                 if err != nil {
185                         return err
186                 }
187
188                 s.logger.Info("Waiting to receive more data")
189
190                 req, err := stream.Recv()
191                 if err == io.EOF {
192                         s.logger.Info("No more data")
193                         break
194                 }
195                 if err != nil {
196                         return s.logError(status.Errorf(codes.Unknown, "cannot receive chunk data: %v", err))
197                 }
198
199                 // Receive chunk and write to helm package
200                 chunk := req.GetPackage()
201
202                 s.logger.Info("Recieved chunk")
203
204                 _, err = helmPkg.Write(chunk)
205                 if err != nil {
206                         return s.logError(status.Errorf(codes.Internal, "cannot write chunk data: %v", err))
207                 }
208         }
209
210         // Create HELM client
211         hc, err := NewHelmClient(req.GetHostIp(), s.logger)
212         if os.IsNotExist(err) {
213                 return s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig for HostIp can't be found: %s. " +
214                         "Err: %s", req.GetHostIp(), err))
215         }
216
217         relName, err := hc.installChart(helmPkg)
218
219         var res lcmservice.InstantiateResponse
220         res.WorkloadId = relName
221
222         if (err != nil) {
223                 res.Status = "Failure"
224         } else {
225                 res.Status = "Success"
226         }
227
228         err = stream.SendAndClose(&res)
229         if err != nil {
230                 return s.logError(status.Errorf(codes.Unknown, "cannot send response: %v", err))
231         }
232         s.logger.Info("Successful Instantiation")
233         return
234 }
235
236 func (s *ServerGRPC) contextError(ctx context.Context) error {
237         switch ctx.Err() {
238         case context.Canceled:
239                 return s.logError(status.Error(codes.Canceled, "request is canceled"))
240         case context.DeadlineExceeded:
241                 return s.logError(status.Error(codes.DeadlineExceeded, "deadline is exceeded"))
242         default:
243                 return nil
244         }
245 }
246
247 func (s *ServerGRPC) logError(err error) error {
248         if err != nil {
249                 s.logger.Errorf("Error Information: ", err)
250         }
251         return err
252 }