2 * Copyright 2020 Huawei Technologies Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
21 "github.com/sirupsen/logrus"
22 "google.golang.org/grpc"
23 "google.golang.org/grpc/codes"
24 "google.golang.org/grpc/credentials"
25 _ "google.golang.org/grpc/encoding/gzip"
26 "google.golang.org/grpc/status"
28 "k8shelm/internal/lcmservice"
35 type ServerGRPC struct {
43 // GRPC service configuration used to create GRPC server
44 type ServerGRPCConfig struct {
51 // Constructor to GRPC server
52 func NewServerGRPC(cfg ServerGRPCConfig) (s ServerGRPC) {
55 s.certificate = cfg.Certificate
57 s.logger.Infof("Binding is successful")
61 // Start GRPC server and start listening on the port
62 func (s *ServerGRPC) Listen() (err error) {
65 grpcOpts = []grpc.ServerOption{}
66 grpcCreds credentials.TransportCredentials
69 // Listen announces on the network address
70 listener, err = net.Listen("tcp", ":"+strconv.Itoa(s.port))
72 s.logger.Fatalf("failed to listen on port: %s. Err: %s", s.port, err)
75 s.logger.Infof("Server started listening on port ", s.port)
77 // Secure connection if asked
78 if s.certificate != "" && s.key != "" {
79 grpcCreds, err = credentials.NewServerTLSFromFile(
82 s.logger.Fatalf("failed to create tls grpc server using cert %s and key: %s. Err: %s", s.certificate, s.key, err)
84 grpcOpts = append(grpcOpts, grpc.Creds(grpcCreds))
87 // Register server with GRPC
88 s.server = grpc.NewServer(grpcOpts...)
89 lcmservice.RegisterAppLCMServer(s.server, s)
91 s.logger.Infof("Server registered with GRPC")
93 // Server start serving
94 err = s.server.Serve(listener)
96 s.logger.Fatalf("failed to listen for grpc connections. Err: %s", err)
103 func (s *ServerGRPC) Query(ctx context.Context, req *lcmservice.QueryRequest) (resp *lcmservice.QueryResponse, err error) {
106 if (req.GetHostIp() == "") || (req.GetWorkloadId() == "") {
107 return nil, s.logError(status.Errorf(codes.InvalidArgument, "Nil input HostIp: %s or workloadId: %s. " +
108 "Err: %s", req.GetHostIp(), req.GetWorkloadId(), err))
111 // Create HELM Client
112 hc, err := NewHelmClient(req.GetHostIp(), s.logger)
113 if os.IsNotExist(err) {
114 return nil, s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig for HostIp can't be found: %s. " +
115 "Err: %s", req.GetHostIp(), err))
119 r, err := hc.queryChart(req.GetWorkloadId())
121 return nil, s.logError(status.Errorf(codes.NotFound, "Chart not found for workloadId: %s. Err: %s",
122 req.GetWorkloadId(), err))
124 resp = &lcmservice.QueryResponse{
130 // Terminate HELM charts
131 func (s *ServerGRPC) Terminate(ctx context.Context, req *lcmservice.TerminateRequest) (resp *lcmservice.TerminateResponse, err error) {
133 if (req.GetHostIp() == "") || (req.GetWorkloadId() == "") {
134 return nil, s.logError(status.Errorf(codes.InvalidArgument, "Nil input HostIp: %s or workloadId: %s. " +
135 "Err: %s", req.GetHostIp(), req.GetWorkloadId(), err))
138 // Create HELM client
139 hc, err := NewHelmClient(req.GetHostIp(), s.logger)
140 if os.IsNotExist(err) {
141 return nil, s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig for HostIp can't be found: %s. " +
142 "Err: %s", req.GetHostIp(), err))
146 err = hc.uninstallChart(req.GetWorkloadId())
149 resp = &lcmservice.TerminateResponse{
152 return resp, s.logError(status.Errorf(codes.NotFound, "Chart not found for workloadId: %s. Err: %s",
153 req.GetWorkloadId(), err))
155 resp = &lcmservice.TerminateResponse{
162 // Instantiate HELM Chart
163 func (s *ServerGRPC) Instantiate(stream lcmservice.AppLCM_InstantiateServer) (err error) {
165 // Recieve metadata which is host ip
166 req, err := stream.Recv()
168 s.logger.Errorf("Cannot receive package metadata. Err: %s", err)
172 hostIP := req.GetHostIp()
173 s.logger.Info("Recieved instantiate request for host ", hostIP)
177 return s.logError(status.Errorf(codes.InvalidArgument, "Nil input for HostIp: %s Err: %s", req.GetHostIp(), err))
181 helmPkg := bytes.Buffer{}
183 err := s.contextError(stream.Context())
188 s.logger.Info("Waiting to receive more data")
190 req, err := stream.Recv()
192 s.logger.Info("No more data")
196 return s.logError(status.Errorf(codes.Unknown, "cannot receive chunk data: %v", err))
199 // Receive chunk and write to helm package
200 chunk := req.GetPackage()
202 s.logger.Info("Recieved chunk")
204 _, err = helmPkg.Write(chunk)
206 return s.logError(status.Errorf(codes.Internal, "cannot write chunk data: %v", err))
210 // Create HELM client
211 hc, err := NewHelmClient(req.GetHostIp(), s.logger)
212 if os.IsNotExist(err) {
213 return s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig for HostIp can't be found: %s. " +
214 "Err: %s", req.GetHostIp(), err))
217 relName, err := hc.installChart(helmPkg)
219 var res lcmservice.InstantiateResponse
220 res.WorkloadId = relName
223 res.Status = "Failure"
225 res.Status = "Success"
228 err = stream.SendAndClose(&res)
230 return s.logError(status.Errorf(codes.Unknown, "cannot send response: %v", err))
232 s.logger.Info("Successful Instantiation")
236 func (s *ServerGRPC) contextError(ctx context.Context) error {
238 case context.Canceled:
239 return s.logError(status.Error(codes.Canceled, "request is canceled"))
240 case context.DeadlineExceeded:
241 return s.logError(status.Error(codes.DeadlineExceeded, "deadline is exceeded"))
247 func (s *ServerGRPC) logError(err error) error {
249 s.logger.Errorf("Error Information: ", err)