K8s helm plugin code
[ealt-edge.git] / mecm / mepm / applcm / k8shelm / pkg / plugin / grpcserver.go
1 package plugin
2
3 import (
4         "bytes"
5         "context"
6         "k8shelm/internal/lcmservice"
7         "google.golang.org/grpc"
8         "google.golang.org/grpc/codes"
9         "google.golang.org/grpc/credentials"
10         _ "google.golang.org/grpc/encoding/gzip"
11         "google.golang.org/grpc/status"
12         "io"
13         "log"
14         "net"
15         "os"
16         "strconv"
17 )
18
19 type ServerGRPC struct {
20         server      *grpc.Server
21         port        int
22         certificate string
23         key         string
24 }
25
26 type ServerGRPCConfig struct {
27         Certificate string
28         Key         string
29         Port        int
30 }
31
32 func NewServerGRPC(cfg ServerGRPCConfig) (s ServerGRPC, err error) {
33         logger := log.New(os.Stdout, "helmplugin ", log.LstdFlags|log.Lshortfile)
34         if cfg.Port == 0 {
35                 logger.Fatalf("Port must be specified")
36         }
37         s.port = cfg.Port
38         s.certificate = cfg.Certificate
39         s.key = cfg.Key
40         logger.Println("Binding is successful")
41         return
42 }
43
44 func (s *ServerGRPC) Listen() (err error) {
45         logger := log.New(os.Stdout, "helmplugin ", log.LstdFlags|log.Lshortfile)
46         var (
47                 listener  net.Listener
48                 grpcOpts  = []grpc.ServerOption{}
49                 grpcCreds credentials.TransportCredentials
50         )
51
52         logger.Println("Listening start")
53
54         listener, err = net.Listen("tcp", ":"+strconv.Itoa(s.port))
55
56         logger.Println("Listening end")
57
58         if err != nil {
59                 logger.Fatalf("failed to listen on port: ", s.port)
60         }
61
62         if s.certificate != "" && s.key != "" {
63                 grpcCreds, err = credentials.NewServerTLSFromFile(
64                         s.certificate, s.key)
65                 if err != nil {
66                         logger.Fatalf("failed to create tls grpc server using cert %s and key: ", s.certificate, s.key)
67                 }
68
69                 grpcOpts = append(grpcOpts, grpc.Creds(grpcCreds))
70         }
71
72         logger.Println("New server creation")
73
74         s.server = grpc.NewServer(grpcOpts...)
75
76         logger.Println("New server creation success")
77
78         lcmservice.RegisterAppLCMServer(s.server, s)
79
80         logger.Println("New server registration success")
81
82
83         err = s.server.Serve(listener)
84         if err != nil {
85                 logger.Fatalf("errored listening for grpc connections")
86         }
87
88         logger.Println("Server is serving")
89
90
91         return
92 }
93
94 func (s *ServerGRPC) Query(ctx context.Context, req *lcmservice.QueryRequest) (resp *lcmservice.QueryResponse, err error) {
95         r := queryChart(req.GetWorkloadId(), req.GetHostIp())
96         resp = &lcmservice.QueryResponse{
97                 Status: r,
98         }
99         return resp, nil
100 }
101
102 func (s *ServerGRPC) Terminate(ctx context.Context, req *lcmservice.TerminateRequest) (resp *lcmservice.TerminateResponse, err error) {
103         logger := log.New(os.Stdout, "helmplugin ", log.LstdFlags|log.Lshortfile)
104         uninstallChart(req.GetWorkloadId(), req.GetHostIp())
105         resp = &lcmservice.TerminateResponse{
106                 Status: "Success",
107         }
108         logger.Printf("Termination completed")
109         return resp, nil
110 }
111
112 func (s *ServerGRPC) Instantiate(stream lcmservice.AppLCM_InstantiateServer) (err error) {
113         logger := log.New(os.Stdout, "helmplugin ", log.LstdFlags|log.Lshortfile)
114
115         req, err := stream.Recv()
116         if err != nil {
117                 logger.Fatalf("cannot receive package metadata")
118         }
119
120         hostIP := req.GetHostIp()
121         logger.Printf("receive an upload-image request for package ip %s", hostIP)
122
123         helmPkg := bytes.Buffer{}
124
125         for {
126                 err := contextError(stream.Context())
127                 if err != nil {
128                         return err
129                 }
130
131                 logger.Printf("waiting to receive more data")
132
133                 req, err := stream.Recv()
134                 if err == io.EOF {
135                         logger.Printf("no more data")
136                         break
137                 }
138                 if err != nil {
139                         return logError(status.Errorf(codes.Unknown, "cannot receive chunk data: %v", err))
140                 }
141
142                 chunk := req.GetPackage()
143
144                 logger.Printf("received a chunk ")
145
146                 // write slowly
147                 // time.Sleep(time.Second)
148                 _, err = helmPkg.Write(chunk)
149                 if err != nil {
150                         return logError(status.Errorf(codes.Internal, "cannot write chunk data: %v", err))
151                 }
152         }
153
154         relName := installChart(helmPkg, hostIP)
155
156         res := &lcmservice.InstantiateResponse{
157                 WorkloadId: relName,
158                 Status:    "Success",
159         }
160
161         err = stream.SendAndClose(res)
162         if err != nil {
163                 return logError(status.Errorf(codes.Unknown, "cannot send response: %v", err))
164         }
165
166         logger.Printf("Instantation completed")
167         return
168 }
169
170
171 func (s *ServerGRPC) Close() {
172         if s.server != nil {
173                 s.server.Stop()
174         }
175         return
176 }
177
178 func contextError(ctx context.Context) error {
179         switch ctx.Err() {
180         case context.Canceled:
181                 return logError(status.Error(codes.Canceled, "request is canceled"))
182         case context.DeadlineExceeded:
183                 return logError(status.Error(codes.DeadlineExceeded, "deadline is exceeded"))
184         default:
185                 return nil
186         }
187 }
188
189 func logError(err error) error {
190         if err != nil {
191                 log.Print(err)
192         }
193         return err
194 }