6 "k8shelm/internal/lcmservice"
7 "google.golang.org/grpc"
8 "google.golang.org/grpc/codes"
9 "google.golang.org/grpc/credentials"
10 _ "google.golang.org/grpc/encoding/gzip"
11 "google.golang.org/grpc/status"
19 type ServerGRPC struct {
26 type ServerGRPCConfig struct {
32 func NewServerGRPC(cfg ServerGRPCConfig) (s ServerGRPC, err error) {
33 logger := log.New(os.Stdout, "helmplugin ", log.LstdFlags|log.Lshortfile)
35 logger.Fatalf("Port must be specified")
38 s.certificate = cfg.Certificate
40 logger.Println("Binding is successful")
44 func (s *ServerGRPC) Listen() (err error) {
45 logger := log.New(os.Stdout, "helmplugin ", log.LstdFlags|log.Lshortfile)
48 grpcOpts = []grpc.ServerOption{}
49 grpcCreds credentials.TransportCredentials
52 logger.Println("Listening start")
54 listener, err = net.Listen("tcp", ":"+strconv.Itoa(s.port))
56 logger.Println("Listening end")
59 logger.Fatalf("failed to listen on port: ", s.port)
62 if s.certificate != "" && s.key != "" {
63 grpcCreds, err = credentials.NewServerTLSFromFile(
66 logger.Fatalf("failed to create tls grpc server using cert %s and key: ", s.certificate, s.key)
69 grpcOpts = append(grpcOpts, grpc.Creds(grpcCreds))
72 logger.Println("New server creation")
74 s.server = grpc.NewServer(grpcOpts...)
76 logger.Println("New server creation success")
78 lcmservice.RegisterAppLCMServer(s.server, s)
80 logger.Println("New server registration success")
83 err = s.server.Serve(listener)
85 logger.Fatalf("errored listening for grpc connections")
88 logger.Println("Server is serving")
94 func (s *ServerGRPC) Query(ctx context.Context, req *lcmservice.QueryRequest) (resp *lcmservice.QueryResponse, err error) {
95 r := queryChart(req.GetWorkloadId(), req.GetHostIp())
96 resp = &lcmservice.QueryResponse{
102 func (s *ServerGRPC) Terminate(ctx context.Context, req *lcmservice.TerminateRequest) (resp *lcmservice.TerminateResponse, err error) {
103 logger := log.New(os.Stdout, "helmplugin ", log.LstdFlags|log.Lshortfile)
104 uninstallChart(req.GetWorkloadId(), req.GetHostIp())
105 resp = &lcmservice.TerminateResponse{
108 logger.Printf("Termination completed")
112 func (s *ServerGRPC) Instantiate(stream lcmservice.AppLCM_InstantiateServer) (err error) {
113 logger := log.New(os.Stdout, "helmplugin ", log.LstdFlags|log.Lshortfile)
115 req, err := stream.Recv()
117 logger.Fatalf("cannot receive package metadata")
120 hostIP := req.GetHostIp()
121 logger.Printf("receive an upload-image request for package ip %s", hostIP)
123 helmPkg := bytes.Buffer{}
126 err := contextError(stream.Context())
131 logger.Printf("waiting to receive more data")
133 req, err := stream.Recv()
135 logger.Printf("no more data")
139 return logError(status.Errorf(codes.Unknown, "cannot receive chunk data: %v", err))
142 chunk := req.GetPackage()
144 logger.Printf("received a chunk ")
147 // time.Sleep(time.Second)
148 _, err = helmPkg.Write(chunk)
150 return logError(status.Errorf(codes.Internal, "cannot write chunk data: %v", err))
154 relName := installChart(helmPkg, hostIP)
156 res := &lcmservice.InstantiateResponse{
161 err = stream.SendAndClose(res)
163 return logError(status.Errorf(codes.Unknown, "cannot send response: %v", err))
166 logger.Printf("Instantation completed")
171 func (s *ServerGRPC) Close() {
178 func contextError(ctx context.Context) error {
180 case context.Canceled:
181 return logError(status.Error(codes.Canceled, "request is canceled"))
182 case context.DeadlineExceeded:
183 return logError(status.Error(codes.DeadlineExceeded, "deadline is exceeded"))
189 func logError(err error) error {