2 * Copyright 2020 Huawei Technologies Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
21 "github.com/sirupsen/logrus"
22 "google.golang.org/grpc"
23 "google.golang.org/grpc/codes"
24 "google.golang.org/grpc/credentials"
25 _ "google.golang.org/grpc/encoding/gzip"
26 "google.golang.org/grpc/status"
28 "k8shelm/internal/lcmservice"
35 type ServerGRPC struct {
43 // GRPC service configuration used to create GRPC server
44 type ServerGRPCConfig struct {
51 // Constructor to GRPC server
52 func NewServerGRPC(cfg ServerGRPCConfig) (s ServerGRPC) {
55 s.certificate = cfg.Certificate
57 s.logger.Infof("Binding is successful")
61 // Start GRPC server and start listening on the port
62 func (s *ServerGRPC) Listen() (err error) {
65 grpcOpts = []grpc.ServerOption{}
66 grpcCreds credentials.TransportCredentials
69 // Listen announces on the network address
70 listener, err = net.Listen("tcp", ":"+strconv.Itoa(s.port))
72 s.logger.Fatalf("failed to listen on specified port")
74 s.logger.Infof("Server started listening on specified port")
76 // Secure connection if asked
77 if s.certificate != "" && s.key != "" {
78 grpcCreds, err = credentials.NewServerTLSFromFile(
81 s.logger.Fatalf("failed to create tls grpc server using given cert and key")
83 grpcOpts = append(grpcOpts, grpc.Creds(grpcCreds))
86 // Register server with GRPC
87 s.server = grpc.NewServer(grpcOpts...)
88 lcmservice.RegisterAppLCMServer(s.server, s)
90 s.logger.Infof("Server registered with GRPC")
92 // Server start serving
93 err = s.server.Serve(listener)
95 s.logger.Fatalf("failed to listen for grpc connections. Err: %s", err)
102 func (s *ServerGRPC) Query(ctx context.Context, req *lcmservice.QueryRequest) (resp *lcmservice.QueryResponse, err error) {
105 if (req.GetHostIp() == "") || (req.GetWorkloadId() == "") {
106 return nil, s.logError(status.Errorf(codes.InvalidArgument, "HostIP & WorkloadId can't be null", err))
109 // Create HELM Client
110 hc, err := NewHelmClient(req.GetHostIp(), s.logger)
111 if os.IsNotExist(err) {
112 return nil, s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig corresponding to given Edge can't be found. " +
117 r, err := hc.queryChart(req.GetWorkloadId())
119 return nil, s.logError(status.Errorf(codes.NotFound, "Chart not found for workloadId: %s. Err: %s",
120 req.GetWorkloadId(), err))
122 resp = &lcmservice.QueryResponse{
128 // Terminate HELM charts
129 func (s *ServerGRPC) Terminate(ctx context.Context, req *lcmservice.TerminateRequest) (resp *lcmservice.TerminateResponse, err error) {
131 if (req.GetHostIp() == "") || (req.GetWorkloadId() == "") {
132 return nil, s.logError(status.Errorf(codes.InvalidArgument, "HostIP & WorkloadId can't be null", err))
135 // Create HELM client
136 hc, err := NewHelmClient(req.GetHostIp(), s.logger)
137 if os.IsNotExist(err) {
138 return nil, s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig corresponding to given Edge can't be found. " +
143 err = hc.uninstallChart(req.GetWorkloadId())
146 resp = &lcmservice.TerminateResponse{
149 return resp, s.logError(status.Errorf(codes.NotFound, "Chart not found for workloadId: %s. Err: %s",
150 req.GetWorkloadId(), err))
152 resp = &lcmservice.TerminateResponse{
159 // Instantiate HELM Chart
160 func (s *ServerGRPC) Instantiate(stream lcmservice.AppLCM_InstantiateServer) (err error) {
162 // Recieve metadata which is host ip
163 req, err := stream.Recv()
165 s.logger.Errorf("Cannot receive package metadata. Err: %s", err)
169 hostIP := req.GetHostIp()
170 s.logger.Info("Recieved instantiate request")
174 return s.logError(status.Errorf(codes.InvalidArgument, "HostIP & WorkloadId can't be null", err))
178 helmPkg := bytes.Buffer{}
180 err := s.contextError(stream.Context())
185 s.logger.Debug("Waiting to receive more data")
187 req, err := stream.Recv()
189 s.logger.Debug("No more data")
193 return s.logError(status.Errorf(codes.Unknown, "cannot receive chunk data: %v", err))
196 // Receive chunk and write to helm package
197 chunk := req.GetPackage()
199 s.logger.Info("Recieved chunk")
201 _, err = helmPkg.Write(chunk)
203 return s.logError(status.Errorf(codes.Internal, "cannot write chunk data: %v", err))
207 // Create HELM client
208 hc, err := NewHelmClient(req.GetHostIp(), s.logger)
209 if os.IsNotExist(err) {
210 return s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig corresponding to edge can't be found. " +
214 relName, err := hc.installChart(helmPkg)
216 var res lcmservice.InstantiateResponse
217 res.WorkloadId = relName
220 res.Status = "Failure"
222 res.Status = "Success"
225 err = stream.SendAndClose(&res)
227 return s.logError(status.Errorf(codes.Unknown, "cannot send response: %v", err))
229 s.logger.Info("Successful Instantiation")
233 func (s *ServerGRPC) contextError(ctx context.Context) error {
235 case context.Canceled:
236 return s.logError(status.Error(codes.Canceled, "request is canceled"))
237 case context.DeadlineExceeded:
238 return s.logError(status.Error(codes.DeadlineExceeded, "deadline is exceeded"))
244 func (s *ServerGRPC) logError(err error) error {
246 s.logger.Errorf("Error Information: ", err)