2 * Copyright 2020 Huawei Technologies Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 "k8shelm/internal/lcmservice"
27 "github.com/sirupsen/logrus"
28 "google.golang.org/grpc"
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/credentials"
31 _ "google.golang.org/grpc/encoding/gzip"
32 "google.golang.org/grpc/status"
36 type ServerGRPC struct {
44 // GRPC service configuration used to create GRPC server
45 type ServerGRPCConfig struct {
52 // Constructor to GRPC server
53 func NewServerGRPC(cfg ServerGRPCConfig) (s ServerGRPC) {
56 s.certificate = cfg.Certificate
58 s.logger.Infof("Binding is successful")
62 // Start GRPC server and start listening on the port
63 func (s *ServerGRPC) Listen() (err error) {
66 grpcOpts = []grpc.ServerOption{}
67 grpcCreds credentials.TransportCredentials
70 // Listen announces on the network address
71 listener, err = net.Listen("tcp", ":"+strconv.Itoa(s.port))
73 s.logger.Fatalf("failed to listen on specified port")
75 s.logger.Infof("Server started listening on specified port")
77 // Secure connection if asked
78 if s.certificate != "" && s.key != "" {
79 grpcCreds, err = credentials.NewServerTLSFromFile(
82 s.logger.Fatalf("failed to create tls grpc server using given cert and key")
84 grpcOpts = append(grpcOpts, grpc.Creds(grpcCreds))
87 // Register server with GRPC
88 s.server = grpc.NewServer(grpcOpts...)
89 lcmservice.RegisterAppLCMServer(s.server, s)
91 s.logger.Infof("Server registered with GRPC")
93 // Server start serving
94 err = s.server.Serve(listener)
96 s.logger.Fatalf("failed to listen for grpc connections. Err: %s", err)
103 func (s *ServerGRPC) Query(ctx context.Context, req *lcmservice.QueryRequest) (resp *lcmservice.QueryResponse, err error) {
106 if (req.GetHostIp() == "") || (req.GetWorkloadId() == "") {
107 return nil, s.logError(status.Errorf(codes.InvalidArgument, "HostIP & WorkloadId can't be null", err))
110 // Create HELM Client
111 hc, err := NewHelmClient(req.GetHostIp(), s.logger)
112 if os.IsNotExist(err) {
113 return nil, s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig corresponding to given Edge can't be found. "+
118 r, err := hc.queryChart(req.GetWorkloadId())
120 return nil, s.logError(status.Errorf(codes.NotFound, "Chart not found for workloadId: %s. Err: %s",
121 req.GetWorkloadId(), err))
123 resp = &lcmservice.QueryResponse{
129 // Terminate HELM charts
130 func (s *ServerGRPC) Terminate(ctx context.Context, req *lcmservice.TerminateRequest) (resp *lcmservice.TerminateResponse, err error) {
132 if (req.GetHostIp() == "") || (req.GetWorkloadId() == "") {
133 return nil, s.logError(status.Errorf(codes.InvalidArgument, "HostIP & WorkloadId can't be null", err))
136 // Create HELM client
137 hc, err := NewHelmClient(req.GetHostIp(), s.logger)
138 if os.IsNotExist(err) {
139 return nil, s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig corresponding to given Edge can't be found. "+
144 err = hc.uninstallChart(req.GetWorkloadId())
147 resp = &lcmservice.TerminateResponse{
150 return resp, s.logError(status.Errorf(codes.NotFound, "Chart not found for workloadId: %s. Err: %s",
151 req.GetWorkloadId(), err))
153 resp = &lcmservice.TerminateResponse{
160 // Instantiate HELM Chart
161 func (s *ServerGRPC) Instantiate(stream lcmservice.AppLCM_InstantiateServer) (err error) {
163 // Recieve metadata which is host ip
164 req, err := stream.Recv()
166 s.logger.Errorf("Cannot receive package metadata. Err: %s", err)
170 hostIP := req.GetHostIp()
171 s.logger.Infof("Recieved instantiate request")
175 return s.logError(status.Errorf(codes.InvalidArgument, "HostIP & WorkloadId can't be null", err))
179 helmPkg := bytes.Buffer{}
181 err := s.contextError(stream.Context())
186 s.logger.Debug("Waiting to receive more data")
188 req, err := stream.Recv()
190 s.logger.Debug("No more data")
194 return s.logError(status.Errorf(codes.Unknown, "cannot receive chunk data: %v", err))
197 // Receive chunk and write to helm package
198 chunk := req.GetPackage()
200 s.logger.Infof("Recieved chunk")
202 _, err = helmPkg.Write(chunk)
204 return s.logError(status.Errorf(codes.Internal, "cannot write chunk data: %v", err))
208 // Create HELM client
209 hc, err := NewHelmClient(req.GetHostIp(), s.logger)
210 if os.IsNotExist(err) {
211 return s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig corresponding to edge can't be found. "+
215 relName, err := hc.installChart(helmPkg)
217 var res lcmservice.InstantiateResponse
218 res.WorkloadId = relName
221 res.Status = "Failure"
222 s.logger.Infof("Instantiation Failed")
224 res.Status = "Success"
225 s.logger.Infof("Successful Instantiation")
228 err = stream.SendAndClose(&res)
230 return s.logError(status.Errorf(codes.Unknown, "cannot send response: %v", err))
235 func (s *ServerGRPC) contextError(ctx context.Context) error {
237 case context.Canceled:
238 return s.logError(status.Error(codes.Canceled, "request is canceled"))
239 case context.DeadlineExceeded:
240 return s.logError(status.Error(codes.DeadlineExceeded, "deadline is exceeded"))
246 func (s *ServerGRPC) logError(err error) error {
248 s.logger.Errorf("Error Information: ", err)