+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package plugin
import (
"bytes"
"context"
+ "io"
"k8shelm/internal/lcmservice"
+ "net"
+ "os"
+ "strconv"
+
+ "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
_ "google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/status"
- "io"
- "log"
- "net"
- "os"
- "strconv"
)
+// GRPC server
type ServerGRPC struct {
server *grpc.Server
port int
certificate string
key string
+ logger *logrus.Logger
}
+// GRPC service configuration used to create GRPC server
type ServerGRPCConfig struct {
Certificate string
Key string
Port int
+ Logger *logrus.Logger
}
-func NewServerGRPC(cfg ServerGRPCConfig) (s ServerGRPC, err error) {
- logger := log.New(os.Stdout, "helmplugin ", log.LstdFlags|log.Lshortfile)
- if cfg.Port == 0 {
- logger.Fatalf("Port must be specified")
- }
+// Constructor to GRPC server
+func NewServerGRPC(cfg ServerGRPCConfig) (s ServerGRPC) {
+ s.logger = cfg.Logger
s.port = cfg.Port
s.certificate = cfg.Certificate
s.key = cfg.Key
- logger.Println("Binding is successful")
+ s.logger.Infof("Binding is successful")
return
}
+// Start GRPC server and start listening on the port
func (s *ServerGRPC) Listen() (err error) {
- logger := log.New(os.Stdout, "helmplugin ", log.LstdFlags|log.Lshortfile)
var (
listener net.Listener
grpcOpts = []grpc.ServerOption{}
grpcCreds credentials.TransportCredentials
)
- logger.Println("Listening start")
-
+ // Listen announces on the network address
listener, err = net.Listen("tcp", ":"+strconv.Itoa(s.port))
-
- logger.Println("Listening end")
-
if err != nil {
- logger.Fatalf("failed to listen on port: ", s.port)
+ s.logger.Fatalf("failed to listen on specified port")
}
+ s.logger.Infof("Server started listening on specified port")
+ // Secure connection if asked
if s.certificate != "" && s.key != "" {
grpcCreds, err = credentials.NewServerTLSFromFile(
s.certificate, s.key)
if err != nil {
- logger.Fatalf("failed to create tls grpc server using cert %s and key: ", s.certificate, s.key)
+ s.logger.Fatalf("failed to create tls grpc server using given cert and key")
}
-
grpcOpts = append(grpcOpts, grpc.Creds(grpcCreds))
}
- logger.Println("New server creation")
-
+ // Register server with GRPC
s.server = grpc.NewServer(grpcOpts...)
-
- logger.Println("New server creation success")
-
lcmservice.RegisterAppLCMServer(s.server, s)
- logger.Println("New server registration success")
-
+ s.logger.Infof("Server registered with GRPC")
+ // Server start serving
err = s.server.Serve(listener)
if err != nil {
- logger.Fatalf("errored listening for grpc connections")
+ s.logger.Fatalf("failed to listen for grpc connections. Err: %s", err)
+ return err
}
-
- logger.Println("Server is serving")
-
-
return
}
+// Query HELM chart
func (s *ServerGRPC) Query(ctx context.Context, req *lcmservice.QueryRequest) (resp *lcmservice.QueryResponse, err error) {
- r := queryChart(req.GetWorkloadId(), req.GetHostIp())
+
+ // Input validation
+ if (req.GetHostIp() == "") || (req.GetWorkloadId() == "") {
+ return nil, s.logError(status.Errorf(codes.InvalidArgument, "HostIP & WorkloadId can't be null", err))
+ }
+
+ // Create HELM Client
+ hc, err := NewHelmClient(req.GetHostIp(), s.logger)
+ if os.IsNotExist(err) {
+ return nil, s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig corresponding to given Edge can't be found. "+
+ "Err: %s", err))
+ }
+
+ // Query Chart
+ r, err := hc.queryChart(req.GetWorkloadId())
+ if err != nil {
+ return nil, s.logError(status.Errorf(codes.NotFound, "Chart not found for workloadId: %s. Err: %s",
+ req.GetWorkloadId(), err))
+ }
resp = &lcmservice.QueryResponse{
Status: r,
}
return resp, nil
}
+// Terminate HELM charts
func (s *ServerGRPC) Terminate(ctx context.Context, req *lcmservice.TerminateRequest) (resp *lcmservice.TerminateResponse, err error) {
- logger := log.New(os.Stdout, "helmplugin ", log.LstdFlags|log.Lshortfile)
- uninstallChart(req.GetWorkloadId(), req.GetHostIp())
- resp = &lcmservice.TerminateResponse{
- Status: "Success",
+ // Input validation
+ if (req.GetHostIp() == "") || (req.GetWorkloadId() == "") {
+ return nil, s.logError(status.Errorf(codes.InvalidArgument, "HostIP & WorkloadId can't be null", err))
+ }
+
+ // Create HELM client
+ hc, err := NewHelmClient(req.GetHostIp(), s.logger)
+ if os.IsNotExist(err) {
+ return nil, s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig corresponding to given Edge can't be found. "+
+ "Err: %s", err))
+ }
+
+ // Uninstall chart
+ err = hc.uninstallChart(req.GetWorkloadId())
+
+ if err != nil {
+ resp = &lcmservice.TerminateResponse{
+ Status: "Failure",
+ }
+ return resp, s.logError(status.Errorf(codes.NotFound, "Chart not found for workloadId: %s. Err: %s",
+ req.GetWorkloadId(), err))
+ } else {
+ resp = &lcmservice.TerminateResponse{
+ Status: "Success",
+ }
+ return resp, nil
}
- logger.Printf("Termination completed")
- return resp, nil
}
+// Instantiate HELM Chart
func (s *ServerGRPC) Instantiate(stream lcmservice.AppLCM_InstantiateServer) (err error) {
- logger := log.New(os.Stdout, "helmplugin ", log.LstdFlags|log.Lshortfile)
+ // Recieve metadata which is host ip
req, err := stream.Recv()
if err != nil {
- logger.Fatalf("cannot receive package metadata")
+ s.logger.Errorf("Cannot receive package metadata. Err: %s", err)
+ return
}
hostIP := req.GetHostIp()
- logger.Printf("receive an upload-image request for package ip %s", hostIP)
+ s.logger.Infof("Recieved instantiate request")
- helmPkg := bytes.Buffer{}
+ // Host validation
+ if hostIP == "" {
+ return s.logError(status.Errorf(codes.InvalidArgument, "HostIP & WorkloadId can't be null", err))
+ }
+ // Receive package
+ helmPkg := bytes.Buffer{}
for {
- err := contextError(stream.Context())
+ err := s.contextError(stream.Context())
if err != nil {
return err
}
- logger.Printf("waiting to receive more data")
+ s.logger.Debug("Waiting to receive more data")
req, err := stream.Recv()
if err == io.EOF {
- logger.Printf("no more data")
+ s.logger.Debug("No more data")
break
}
if err != nil {
- return logError(status.Errorf(codes.Unknown, "cannot receive chunk data: %v", err))
+ return s.logError(status.Errorf(codes.Unknown, "cannot receive chunk data: %v", err))
}
+ // Receive chunk and write to helm package
chunk := req.GetPackage()
- logger.Printf("received a chunk ")
+ s.logger.Infof("Recieved chunk")
- // write slowly
- // time.Sleep(time.Second)
_, err = helmPkg.Write(chunk)
if err != nil {
- return logError(status.Errorf(codes.Internal, "cannot write chunk data: %v", err))
+ return s.logError(status.Errorf(codes.Internal, "cannot write chunk data: %v", err))
}
}
- relName := installChart(helmPkg, hostIP)
-
- res := &lcmservice.InstantiateResponse{
- WorkloadId: relName,
- Status: "Success",
+ // Create HELM client
+ hc, err := NewHelmClient(req.GetHostIp(), s.logger)
+ if os.IsNotExist(err) {
+ return s.logError(status.Errorf(codes.InvalidArgument, "Kubeconfig corresponding to edge can't be found. "+
+ "Err: %s", err))
}
- err = stream.SendAndClose(res)
- if err != nil {
- return logError(status.Errorf(codes.Unknown, "cannot send response: %v", err))
- }
+ relName, err := hc.installChart(helmPkg)
- logger.Printf("Instantation completed")
- return
-}
+ var res lcmservice.InstantiateResponse
+ res.WorkloadId = relName
+ if err != nil {
+ res.Status = "Failure"
+ s.logger.Infof("Instantiation Failed")
+ } else {
+ res.Status = "Success"
+ s.logger.Infof("Successful Instantiation")
+ }
-func (s *ServerGRPC) Close() {
- if s.server != nil {
- s.server.Stop()
+ err = stream.SendAndClose(&res)
+ if err != nil {
+ return s.logError(status.Errorf(codes.Unknown, "cannot send response: %v", err))
}
return
}
-func contextError(ctx context.Context) error {
+func (s *ServerGRPC) contextError(ctx context.Context) error {
switch ctx.Err() {
case context.Canceled:
- return logError(status.Error(codes.Canceled, "request is canceled"))
+ return s.logError(status.Error(codes.Canceled, "request is canceled"))
case context.DeadlineExceeded:
- return logError(status.Error(codes.DeadlineExceeded, "deadline is exceeded"))
+ return s.logError(status.Error(codes.DeadlineExceeded, "deadline is exceeded"))
default:
return nil
}
}
-func logError(err error) error {
+func (s *ServerGRPC) logError(err error) error {
if err != nil {
- log.Print(err)
+ s.logger.Errorf("Error Information: ", err)
}
return err
-}
\ No newline at end of file
+}