X-Git-Url: https://gerrit.akraino.org/r/gitweb?a=blobdiff_plain;f=mecm%2Fmepm%2Fapplcm%2Fk8shelm%2Fpkg%2Fplugin%2Fgrpcserver.go;h=6ec356f6277136b0ad8aa52429cd8d3bac0503aa;hb=e363ff9fc0e8d060a3da53e44a95f1dbf0d84d3f;hp=cb7b8b246764bd60eb675494b66f7ae9b4bb1805;hpb=d4565738443ee3735d90fe4ae58aa3673bf2a77b;p=ealt-edge.git diff --git a/mecm/mepm/applcm/k8shelm/pkg/plugin/grpcserver.go b/mecm/mepm/applcm/k8shelm/pkg/plugin/grpcserver.go index cb7b8b2..6ec356f 100644 --- a/mecm/mepm/applcm/k8shelm/pkg/plugin/grpcserver.go +++ b/mecm/mepm/applcm/k8shelm/pkg/plugin/grpcserver.go @@ -1,194 +1,252 @@ +/* + * Copyright 2020 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package plugin import ( "bytes" "context" - "k8shelm/internal/lcmservice" + "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" _ "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/status" "io" - "log" + "k8shelm/internal/lcmservice" "net" "os" "strconv" ) +// GRPC server type ServerGRPC struct { server *grpc.Server port int certificate string key string + logger *logrus.Logger } +// GRPC service configuration used to create GRPC server type ServerGRPCConfig struct { Certificate string Key string Port int + Logger *logrus.Logger } -func NewServerGRPC(cfg ServerGRPCConfig) (s ServerGRPC, err error) { - logger := log.New(os.Stdout, "helmplugin ", log.LstdFlags|log.Lshortfile) - if cfg.Port == 0 { - logger.Fatalf("Port must be specified") - } +// Constructor to GRPC server +func NewServerGRPC(cfg ServerGRPCConfig) (s ServerGRPC) { + s.logger = cfg.Logger s.port = cfg.Port s.certificate = cfg.Certificate s.key = cfg.Key - logger.Println("Binding is successful") + s.logger.Infof("Binding is successful") return } +// Start GRPC server and start listening on the port func (s *ServerGRPC) Listen() (err error) { - logger := log.New(os.Stdout, "helmplugin ", log.LstdFlags|log.Lshortfile) var ( listener net.Listener grpcOpts = []grpc.ServerOption{} grpcCreds credentials.TransportCredentials ) - logger.Println("Listening start") - + // Listen announces on the network address listener, err = net.Listen("tcp", ":"+strconv.Itoa(s.port)) - - logger.Println("Listening end") - if err != nil { - logger.Fatalf("failed to listen on port: ", s.port) + s.logger.Fatalf("failed to listen on port: %s. Err: %s", s.port, err) + return err } + s.logger.Infof("Server started listening on port ", s.port) + // Secure connection if asked if s.certificate != "" && s.key != "" { grpcCreds, err = credentials.NewServerTLSFromFile( s.certificate, s.key) if err != nil { - logger.Fatalf("failed to create tls grpc server using cert %s and key: ", s.certificate, s.key) + s.logger.Fatalf("failed to create tls grpc server using cert %s and key: %s. Err: %s", s.certificate, s.key, err) } - grpcOpts = append(grpcOpts, grpc.Creds(grpcCreds)) } - logger.Println("New server creation") - + // Register server with GRPC s.server = grpc.NewServer(grpcOpts...) - - logger.Println("New server creation success") - lcmservice.RegisterAppLCMServer(s.server, s) - logger.Println("New server registration success") - + s.logger.Infof("Server registered with GRPC") + // Server start serving err = s.server.Serve(listener) if err != nil { - logger.Fatalf("errored listening for grpc connections") + s.logger.Fatalf("failed to listen for grpc connections. Err: %s", err) + return err } - - logger.Println("Server is serving") - - return } +// Query HELM chart func (s *ServerGRPC) Query(ctx context.Context, req *lcmservice.QueryRequest) (resp *lcmservice.QueryResponse, err error) { - r := queryChart(req.GetWorkloadId(), req.GetHostIp()) + + // Input validation + if (req.GetHostIp() == "") || (req.GetWorkloadId() == "") { + return nil, s.logError(status.Errorf(codes.InvalidArgument, "Nil input HostIp: %s or workloadId: %s. " + + "Err: %s", req.GetHostIp(), req.GetWorkloadId(), err)) + } + + // Create HELM Client + hc, err := NewHelmClient(req.GetHostIp(), s.logger) + if os.IsNotExist(err) { + return nil, s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig for HostIp can't be found: %s. " + + "Err: %s", req.GetHostIp(), err)) + } + + // Query Chart + r, err := hc.queryChart(req.GetWorkloadId()) + if (err != nil) { + return nil, s.logError(status.Errorf(codes.NotFound, "Chart not found for workloadId: %s. Err: %s", + req.GetWorkloadId(), err)) + } resp = &lcmservice.QueryResponse{ Status: r, } return resp, nil } +// Terminate HELM charts func (s *ServerGRPC) Terminate(ctx context.Context, req *lcmservice.TerminateRequest) (resp *lcmservice.TerminateResponse, err error) { - logger := log.New(os.Stdout, "helmplugin ", log.LstdFlags|log.Lshortfile) - uninstallChart(req.GetWorkloadId(), req.GetHostIp()) - resp = &lcmservice.TerminateResponse{ - Status: "Success", + // Input validation + if (req.GetHostIp() == "") || (req.GetWorkloadId() == "") { + return nil, s.logError(status.Errorf(codes.InvalidArgument, "Nil input HostIp: %s or workloadId: %s. " + + "Err: %s", req.GetHostIp(), req.GetWorkloadId(), err)) + } + + // Create HELM client + hc, err := NewHelmClient(req.GetHostIp(), s.logger) + if os.IsNotExist(err) { + return nil, s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig for HostIp can't be found: %s. " + + "Err: %s", req.GetHostIp(), err)) + } + + // Uninstall chart + err = hc.uninstallChart(req.GetWorkloadId()) + + if (err != nil) { + resp = &lcmservice.TerminateResponse{ + Status: "Failure", + } + return resp, s.logError(status.Errorf(codes.NotFound, "Chart not found for workloadId: %s. Err: %s", + req.GetWorkloadId(), err)) + } else { + resp = &lcmservice.TerminateResponse{ + Status: "Success", + } + return resp, nil } - logger.Printf("Termination completed") - return resp, nil } +// Instantiate HELM Chart func (s *ServerGRPC) Instantiate(stream lcmservice.AppLCM_InstantiateServer) (err error) { - logger := log.New(os.Stdout, "helmplugin ", log.LstdFlags|log.Lshortfile) + // Recieve metadata which is host ip req, err := stream.Recv() if err != nil { - logger.Fatalf("cannot receive package metadata") + s.logger.Errorf("Cannot receive package metadata. Err: %s", err) + return } hostIP := req.GetHostIp() - logger.Printf("receive an upload-image request for package ip %s", hostIP) + s.logger.Info("Recieved instantiate request for host ", hostIP) - helmPkg := bytes.Buffer{} + // Host validation + if (hostIP == "") { + return s.logError(status.Errorf(codes.InvalidArgument, "Nil input for HostIp: %s Err: %s", req.GetHostIp(), err)) + } + // Receive package + helmPkg := bytes.Buffer{} for { - err := contextError(stream.Context()) + err := s.contextError(stream.Context()) if err != nil { return err } - logger.Printf("waiting to receive more data") + s.logger.Info("Waiting to receive more data") req, err := stream.Recv() if err == io.EOF { - logger.Printf("no more data") + s.logger.Info("No more data") break } if err != nil { - return logError(status.Errorf(codes.Unknown, "cannot receive chunk data: %v", err)) + return s.logError(status.Errorf(codes.Unknown, "cannot receive chunk data: %v", err)) } + // Receive chunk and write to helm package chunk := req.GetPackage() - logger.Printf("received a chunk ") + s.logger.Info("Recieved chunk") - // write slowly - // time.Sleep(time.Second) _, err = helmPkg.Write(chunk) if err != nil { - return logError(status.Errorf(codes.Internal, "cannot write chunk data: %v", err)) + return s.logError(status.Errorf(codes.Internal, "cannot write chunk data: %v", err)) } } - relName := installChart(helmPkg, hostIP) - - res := &lcmservice.InstantiateResponse{ - WorkloadId: relName, - Status: "Success", + // Create HELM client + hc, err := NewHelmClient(req.GetHostIp(), s.logger) + if os.IsNotExist(err) { + return s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig for HostIp can't be found: %s. " + + "Err: %s", req.GetHostIp(), err)) } - err = stream.SendAndClose(res) - if err != nil { - return logError(status.Errorf(codes.Unknown, "cannot send response: %v", err)) - } + relName, err := hc.installChart(helmPkg) - logger.Printf("Instantation completed") - return -} + var res lcmservice.InstantiateResponse + res.WorkloadId = relName + if (err != nil) { + res.Status = "Failure" + } else { + res.Status = "Success" + } -func (s *ServerGRPC) Close() { - if s.server != nil { - s.server.Stop() + err = stream.SendAndClose(&res) + if err != nil { + return s.logError(status.Errorf(codes.Unknown, "cannot send response: %v", err)) } + s.logger.Info("Successful Instantiation") return } -func contextError(ctx context.Context) error { +func (s *ServerGRPC) contextError(ctx context.Context) error { switch ctx.Err() { case context.Canceled: - return logError(status.Error(codes.Canceled, "request is canceled")) + return s.logError(status.Error(codes.Canceled, "request is canceled")) case context.DeadlineExceeded: - return logError(status.Error(codes.DeadlineExceeded, "deadline is exceeded")) + return s.logError(status.Error(codes.DeadlineExceeded, "deadline is exceeded")) default: return nil } } -func logError(err error) error { +func (s *ServerGRPC) logError(err error) error { if err != nil { - log.Print(err) + s.logger.Errorf("Error Information: ", err) } return err } \ No newline at end of file