GRPC client for broker
[ealt-edge.git] / mecm / mepm / applcm / broker / pkg / plugin / grpcclient.go
1 package plugin
2
3 import (
4         "broker/internal/lcmservice"
5         "golang.org/x/net/context"
6         "google.golang.org/grpc"
7         "google.golang.org/grpc/credentials"
8         _ "google.golang.org/grpc/encoding/gzip"
9         "io"
10         "log"
11         "os"
12 )
13
14 // ClientGRPC provides the implementation of a file
15 // uploader that streams chunks via protobuf-encoded
16 // messages.
17 type ClientGRPC struct {
18         conn      *grpc.ClientConn
19         client    lcmservice.AppLCMClient
20         chunkSize int
21 }
22
23 type ClientGRPCConfig struct {
24         Address         string
25         ChunkSize       int
26         RootCertificate string
27 }
28
29 func NewClientGRPC(cfg ClientGRPCConfig) (c ClientGRPC, err error) {
30
31         logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile)
32
33         var (
34                 grpcOpts  = []grpc.DialOption{}
35                 grpcCreds credentials.TransportCredentials
36         )
37
38         if cfg.Address == "" {
39                 logger.Fatalf("address must be specified: ", err)
40         }
41
42         if cfg.RootCertificate != "" {
43                 grpcCreds, err = credentials.NewClientTLSFromFile(cfg.RootCertificate, "localhost")
44                 if err != nil {
45                         logger.Fatalf("failed to create grpc tls client via root-cert: ", err)
46                 }
47
48                 grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(grpcCreds))
49         } else {
50                 grpcOpts = append(grpcOpts, grpc.WithInsecure())
51         }
52
53         switch {
54         case cfg.ChunkSize == 0:
55                 logger.Fatalf("ChunkSize must be specified")
56         case cfg.ChunkSize > (1 << 22):
57                 logger.Fatalf("ChunkSize must be < than 4MB")
58         default:
59                 c.chunkSize = cfg.ChunkSize
60         }
61
62         c.conn, err = grpc.Dial(cfg.Address, grpcOpts...)
63         if err != nil {
64                 logger.Fatalf("failed to start grpc connection with address: ", cfg.Address)
65         }
66
67         c.client = lcmservice.NewAppLCMClient(c.conn)
68         return
69 }
70
71 func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) (workloadId string, status string) {
72         var (
73                 writing = true
74                 buf     []byte
75                 n       int
76                 file    *os.File
77         )
78         log.Printf("hostIP: ", hostIP)
79         log.Printf("deployArtifact: ", f)
80         logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile)
81
82         // Get a file handle for the file we
83         // want to upload
84         file, err := os.Open(f)
85         if err != nil {
86                 logger.Fatalf("failed to open file: ", err.Error())
87         }
88         defer file.Close()
89
90         // Open a stream-based connection with the
91         // gRPC server
92         stream, err := c.client.Instantiate(ctx)
93
94         if err != nil {
95                 logger.Fatalf("failed to create upload stream for file: ", err)
96         }
97         defer stream.CloseSend()
98
99     //send metadata information
100         req := &lcmservice.InstantiateRequest{
101
102                 Data: &lcmservice.InstantiateRequest_HostIp{
103                                 HostIp:  hostIP,
104                 },
105         }
106
107         err = stream.Send(req)
108         if err != nil {
109                 logger.Fatalf("failed to send metadata information: ", f)
110         }
111
112         // Allocate a buffer with `chunkSize` as the capacity
113         // and length (making a 0 array of the size of `chunkSize`)
114         buf = make([]byte, c.chunkSize)
115         for writing {
116                 // put as many bytes as `chunkSize` into the
117                 // buf array.
118                 n, err = file.Read(buf)
119                 if err != nil {
120                         // ... if `eof` --> `writing=false`...
121                         if err == io.EOF {
122                                 writing = false
123                                 err = nil
124                                 continue
125                         }
126                         logger.Fatalf("errored while copying from file to buf: ", err)
127                 }
128
129                 req := &lcmservice.InstantiateRequest {
130                         Data: &lcmservice.InstantiateRequest_Package {
131                                 Package: buf[:n],
132                         },
133                 }
134
135                 err = stream.Send(req)
136
137                 if err != nil {
138                         logger.Fatalf("failed to send chunk via stream: ", err)
139                 }
140         }
141
142         res, err := stream.CloseAndRecv()
143         if err != nil {
144                 logger.Fatalf("failed to receive upstream status response: ", err)
145         }
146         log.Printf("response", res)
147         return res.WorkloadId, res.Status
148 }
149
150 func (c *ClientGRPC) Query(ctx context.Context, hostIP string, workloadId string) (status string) {
151
152         req := &lcmservice.QueryRequest{
153                 HostIp: hostIP,
154                 WorkloadId: workloadId,
155         }
156         resp, _ := c.client.Query(ctx, req)
157         return resp.Status
158 }
159
160 func (c *ClientGRPC) Terminate(ctx context.Context, hostIP string, workloadId string) (status string) {
161
162         req := &lcmservice.TerminateRequest{
163                 HostIp: hostIP,
164                 WorkloadId: workloadId,
165         }
166         resp, _ := c.client.Terminate(ctx, req)
167         return resp.Status
168 }
169
170 func (c *ClientGRPC) Close() {
171         if c.conn != nil {
172                 c.conn.Close()
173         }
174 }
175