Logging & error handling
[ealt-edge.git] / mecm / mepm / applcm / k8shelm / pkg / plugin / grpcserver.go
index cb7b8b2..6ec356f 100644 (file)
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package plugin
 
 import (
        "bytes"
        "context"
-       "k8shelm/internal/lcmservice"
+       "github.com/sirupsen/logrus"
        "google.golang.org/grpc"
        "google.golang.org/grpc/codes"
        "google.golang.org/grpc/credentials"
        _ "google.golang.org/grpc/encoding/gzip"
        "google.golang.org/grpc/status"
        "io"
-       "log"
+       "k8shelm/internal/lcmservice"
        "net"
        "os"
        "strconv"
 )
 
+// GRPC server
 type ServerGRPC struct {
        server      *grpc.Server
        port        int
        certificate string
        key         string
+       logger      *logrus.Logger
 }
 
+// GRPC service configuration used to create GRPC server
 type ServerGRPCConfig struct {
        Certificate string
        Key         string
        Port        int
+       Logger      *logrus.Logger
 }
 
-func NewServerGRPC(cfg ServerGRPCConfig) (s ServerGRPC, err error) {
-       logger := log.New(os.Stdout, "helmplugin ", log.LstdFlags|log.Lshortfile)
-       if cfg.Port == 0 {
-               logger.Fatalf("Port must be specified")
-       }
+// Constructor to GRPC server
+func NewServerGRPC(cfg ServerGRPCConfig) (s ServerGRPC) {
+       s.logger = cfg.Logger
        s.port = cfg.Port
        s.certificate = cfg.Certificate
        s.key = cfg.Key
-       logger.Println("Binding is successful")
+       s.logger.Infof("Binding is successful")
        return
 }
 
+// Start GRPC server and start listening on the port
 func (s *ServerGRPC) Listen() (err error) {
-       logger := log.New(os.Stdout, "helmplugin ", log.LstdFlags|log.Lshortfile)
        var (
                listener  net.Listener
                grpcOpts  = []grpc.ServerOption{}
                grpcCreds credentials.TransportCredentials
        )
 
-       logger.Println("Listening start")
-
+       // Listen announces on the network address
        listener, err = net.Listen("tcp", ":"+strconv.Itoa(s.port))
-
-       logger.Println("Listening end")
-
        if err != nil {
-               logger.Fatalf("failed to listen on port: ", s.port)
+               s.logger.Fatalf("failed to listen on port: %s. Err: %s", s.port, err)
+               return err
        }
+       s.logger.Infof("Server started listening on port ", s.port)
 
+       // Secure connection if asked
        if s.certificate != "" && s.key != "" {
                grpcCreds, err = credentials.NewServerTLSFromFile(
                        s.certificate, s.key)
                if err != nil {
-                       logger.Fatalf("failed to create tls grpc server using cert %s and key: ", s.certificate, s.key)
+                       s.logger.Fatalf("failed to create tls grpc server using cert %s and key: %s. Err: %s", s.certificate, s.key, err)
                }
-
                grpcOpts = append(grpcOpts, grpc.Creds(grpcCreds))
        }
 
-       logger.Println("New server creation")
-
+       // Register server with GRPC
        s.server = grpc.NewServer(grpcOpts...)
-
-       logger.Println("New server creation success")
-
        lcmservice.RegisterAppLCMServer(s.server, s)
 
-       logger.Println("New server registration success")
-
+       s.logger.Infof("Server registered with GRPC")
 
+       // Server start serving
        err = s.server.Serve(listener)
        if err != nil {
-               logger.Fatalf("errored listening for grpc connections")
+               s.logger.Fatalf("failed to listen for grpc connections. Err: %s", err)
+               return err
        }
-
-       logger.Println("Server is serving")
-
-
        return
 }
 
+// Query HELM chart
 func (s *ServerGRPC) Query(ctx context.Context, req *lcmservice.QueryRequest) (resp *lcmservice.QueryResponse, err error) {
-       r := queryChart(req.GetWorkloadId(), req.GetHostIp())
+
+       // Input validation
+       if (req.GetHostIp() == "") || (req.GetWorkloadId() == "") {
+               return nil, s.logError(status.Errorf(codes.InvalidArgument, "Nil input HostIp: %s or workloadId: %s. " +
+                       "Err: %s", req.GetHostIp(), req.GetWorkloadId(), err))
+       }
+
+       // Create HELM Client
+       hc, err := NewHelmClient(req.GetHostIp(), s.logger)
+       if os.IsNotExist(err) {
+               return nil, s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig for HostIp can't be found: %s. " +
+                       "Err: %s", req.GetHostIp(), err))
+       }
+
+       // Query Chart
+       r, err := hc.queryChart(req.GetWorkloadId())
+       if (err != nil) {
+               return nil, s.logError(status.Errorf(codes.NotFound, "Chart not found for workloadId: %s. Err: %s",
+                       req.GetWorkloadId(), err))
+       }
        resp = &lcmservice.QueryResponse{
                Status: r,
        }
        return resp, nil
 }
 
+// Terminate HELM charts
 func (s *ServerGRPC) Terminate(ctx context.Context, req *lcmservice.TerminateRequest) (resp *lcmservice.TerminateResponse, err error) {
-       logger := log.New(os.Stdout, "helmplugin ", log.LstdFlags|log.Lshortfile)
-       uninstallChart(req.GetWorkloadId(), req.GetHostIp())
-       resp = &lcmservice.TerminateResponse{
-               Status: "Success",
+       // Input validation
+       if (req.GetHostIp() == "") || (req.GetWorkloadId() == "") {
+               return nil, s.logError(status.Errorf(codes.InvalidArgument, "Nil input HostIp: %s or workloadId: %s. " +
+                       "Err: %s", req.GetHostIp(), req.GetWorkloadId(), err))
+       }
+
+       // Create HELM client
+       hc, err := NewHelmClient(req.GetHostIp(), s.logger)
+       if os.IsNotExist(err) {
+               return nil, s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig for HostIp can't be found: %s. " +
+                       "Err: %s", req.GetHostIp(), err))
+       }
+
+       // Uninstall chart
+       err = hc.uninstallChart(req.GetWorkloadId())
+
+       if (err != nil) {
+               resp = &lcmservice.TerminateResponse{
+                       Status: "Failure",
+               }
+               return resp, s.logError(status.Errorf(codes.NotFound, "Chart not found for workloadId: %s. Err: %s",
+                       req.GetWorkloadId(), err))
+       } else {
+               resp = &lcmservice.TerminateResponse{
+                       Status: "Success",
+               }
+               return resp, nil
        }
-       logger.Printf("Termination completed")
-       return resp, nil
 }
 
+// Instantiate HELM Chart
 func (s *ServerGRPC) Instantiate(stream lcmservice.AppLCM_InstantiateServer) (err error) {
-       logger := log.New(os.Stdout, "helmplugin ", log.LstdFlags|log.Lshortfile)
 
+       // Recieve metadata which is host ip
        req, err := stream.Recv()
        if err != nil {
-               logger.Fatalf("cannot receive package metadata")
+               s.logger.Errorf("Cannot receive package metadata. Err: %s", err)
+               return
        }
 
        hostIP := req.GetHostIp()
-       logger.Printf("receive an upload-image request for package ip %s", hostIP)
+       s.logger.Info("Recieved instantiate request for host ", hostIP)
 
-       helmPkg := bytes.Buffer{}
+       // Host validation
+       if (hostIP == "") {
+               return s.logError(status.Errorf(codes.InvalidArgument, "Nil input for HostIp: %s Err: %s", req.GetHostIp(), err))
+       }
 
+       // Receive package
+       helmPkg := bytes.Buffer{}
        for {
-               err := contextError(stream.Context())
+               err := s.contextError(stream.Context())
                if err != nil {
                        return err
                }
 
-               logger.Printf("waiting to receive more data")
+               s.logger.Info("Waiting to receive more data")
 
                req, err := stream.Recv()
                if err == io.EOF {
-                       logger.Printf("no more data")
+                       s.logger.Info("No more data")
                        break
                }
                if err != nil {
-                       return logError(status.Errorf(codes.Unknown, "cannot receive chunk data: %v", err))
+                       return s.logError(status.Errorf(codes.Unknown, "cannot receive chunk data: %v", err))
                }
 
+               // Receive chunk and write to helm package
                chunk := req.GetPackage()
 
-               logger.Printf("received a chunk ")
+               s.logger.Info("Recieved chunk")
 
-               // write slowly
-               // time.Sleep(time.Second)
                _, err = helmPkg.Write(chunk)
                if err != nil {
-                       return logError(status.Errorf(codes.Internal, "cannot write chunk data: %v", err))
+                       return s.logError(status.Errorf(codes.Internal, "cannot write chunk data: %v", err))
                }
        }
 
-       relName := installChart(helmPkg, hostIP)
-
-       res := &lcmservice.InstantiateResponse{
-               WorkloadId: relName,
-               Status:    "Success",
+       // Create HELM client
+       hc, err := NewHelmClient(req.GetHostIp(), s.logger)
+       if os.IsNotExist(err) {
+               return s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig for HostIp can't be found: %s. " +
+                       "Err: %s", req.GetHostIp(), err))
        }
 
-       err = stream.SendAndClose(res)
-       if err != nil {
-               return logError(status.Errorf(codes.Unknown, "cannot send response: %v", err))
-       }
+       relName, err := hc.installChart(helmPkg)
 
-       logger.Printf("Instantation completed")
-       return
-}
+       var res lcmservice.InstantiateResponse
+       res.WorkloadId = relName
 
+       if (err != nil) {
+               res.Status = "Failure"
+       } else {
+               res.Status = "Success"
+       }
 
-func (s *ServerGRPC) Close() {
-       if s.server != nil {
-               s.server.Stop()
+       err = stream.SendAndClose(&res)
+       if err != nil {
+               return s.logError(status.Errorf(codes.Unknown, "cannot send response: %v", err))
        }
+       s.logger.Info("Successful Instantiation")
        return
 }
 
-func contextError(ctx context.Context) error {
+func (s *ServerGRPC) contextError(ctx context.Context) error {
        switch ctx.Err() {
        case context.Canceled:
-               return logError(status.Error(codes.Canceled, "request is canceled"))
+               return s.logError(status.Error(codes.Canceled, "request is canceled"))
        case context.DeadlineExceeded:
-               return logError(status.Error(codes.DeadlineExceeded, "deadline is exceeded"))
+               return s.logError(status.Error(codes.DeadlineExceeded, "deadline is exceeded"))
        default:
                return nil
        }
 }
 
-func logError(err error) error {
+func (s *ServerGRPC) logError(err error) error {
        if err != nil {
-               log.Print(err)
+               s.logger.Errorf("Error Information: ", err)
        }
        return err
 }
\ No newline at end of file