4 "broker/internal/lcmservice"
5 "golang.org/x/net/context"
6 "google.golang.org/grpc"
7 "google.golang.org/grpc/credentials"
8 _ "google.golang.org/grpc/encoding/gzip"
14 // ClientGRPC provides the implementation of a file
15 // uploader that streams chunks via protobuf-encoded
17 type ClientGRPC struct {
19 client lcmservice.AppLCMClient
23 type ClientGRPCConfig struct {
26 RootCertificate string
29 func NewClientGRPC(cfg ClientGRPCConfig) (c ClientGRPC, err error) {
31 logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile)
34 grpcOpts = []grpc.DialOption{}
35 grpcCreds credentials.TransportCredentials
38 if cfg.Address == "" {
39 logger.Fatalf("address must be specified: ", err)
42 if cfg.RootCertificate != "" {
43 grpcCreds, err = credentials.NewClientTLSFromFile(cfg.RootCertificate, "localhost")
45 logger.Fatalf("failed to create grpc tls client via root-cert: ", err)
48 grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(grpcCreds))
50 grpcOpts = append(grpcOpts, grpc.WithInsecure())
54 case cfg.ChunkSize == 0:
55 logger.Fatalf("ChunkSize must be specified")
56 case cfg.ChunkSize > (1 << 22):
57 logger.Fatalf("ChunkSize must be < than 4MB")
59 c.chunkSize = cfg.ChunkSize
62 c.conn, err = grpc.Dial(cfg.Address, grpcOpts...)
64 logger.Fatalf("failed to start grpc connection with address: ", cfg.Address)
67 c.client = lcmservice.NewAppLCMClient(c.conn)
71 func (c *ClientGRPC) Instantiate(ctx context.Context, f string, hostIP string) (workloadId string, status string) {
78 log.Printf("hostIP: ", hostIP)
79 log.Printf("deployArtifact: ", f)
80 logger := log.New(os.Stdout, "broker ", log.LstdFlags|log.Lshortfile)
82 // Get a file handle for the file we
84 file, err := os.Open(f)
86 logger.Fatalf("failed to open file: ", err.Error())
90 // Open a stream-based connection with the
92 stream, err := c.client.Instantiate(ctx)
95 logger.Fatalf("failed to create upload stream for file: ", err)
97 defer stream.CloseSend()
99 //send metadata information
100 req := &lcmservice.InstantiateRequest{
102 Data: &lcmservice.InstantiateRequest_HostIp{
107 err = stream.Send(req)
109 logger.Fatalf("failed to send metadata information: ", f)
112 // Allocate a buffer with `chunkSize` as the capacity
113 // and length (making a 0 array of the size of `chunkSize`)
114 buf = make([]byte, c.chunkSize)
116 // put as many bytes as `chunkSize` into the
118 n, err = file.Read(buf)
120 // ... if `eof` --> `writing=false`...
126 logger.Fatalf("errored while copying from file to buf: ", err)
129 req := &lcmservice.InstantiateRequest {
130 Data: &lcmservice.InstantiateRequest_Package {
135 err = stream.Send(req)
138 logger.Fatalf("failed to send chunk via stream: ", err)
142 res, err := stream.CloseAndRecv()
144 logger.Fatalf("failed to receive upstream status response: ", err)
146 log.Printf("response", res)
147 return res.WorkloadId, res.Status
150 func (c *ClientGRPC) Query(ctx context.Context, hostIP string, workloadId string) (status string) {
152 req := &lcmservice.QueryRequest{
154 WorkloadId: workloadId,
156 resp, _ := c.client.Query(ctx, req)
160 func (c *ClientGRPC) Terminate(ctx context.Context, hostIP string, workloadId string) (status string) {
162 req := &lcmservice.TerminateRequest{
164 WorkloadId: workloadId,
166 resp, _ := c.client.Terminate(ctx, req)
170 func (c *ClientGRPC) Close() {