Code refactoring for bpa operator
[icn.git] / cmd / bpa-operator / vendor / google.golang.org / grpc / balancer_v1_wrapper.go
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package grpc
20
21 import (
22         "context"
23         "strings"
24         "sync"
25
26         "google.golang.org/grpc/balancer"
27         "google.golang.org/grpc/connectivity"
28         "google.golang.org/grpc/grpclog"
29         "google.golang.org/grpc/resolver"
30 )
31
32 type balancerWrapperBuilder struct {
33         b Balancer // The v1 balancer.
34 }
35
36 func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
37         targetAddr := cc.Target()
38         targetSplitted := strings.Split(targetAddr, ":///")
39         if len(targetSplitted) >= 2 {
40                 targetAddr = targetSplitted[1]
41         }
42
43         bwb.b.Start(targetAddr, BalancerConfig{
44                 DialCreds: opts.DialCreds,
45                 Dialer:    opts.Dialer,
46         })
47         _, pickfirst := bwb.b.(*pickFirst)
48         bw := &balancerWrapper{
49                 balancer:   bwb.b,
50                 pickfirst:  pickfirst,
51                 cc:         cc,
52                 targetAddr: targetAddr,
53                 startCh:    make(chan struct{}),
54                 conns:      make(map[resolver.Address]balancer.SubConn),
55                 connSt:     make(map[balancer.SubConn]*scState),
56                 csEvltr:    &balancer.ConnectivityStateEvaluator{},
57                 state:      connectivity.Idle,
58         }
59         cc.UpdateBalancerState(connectivity.Idle, bw)
60         go bw.lbWatcher()
61         return bw
62 }
63
64 func (bwb *balancerWrapperBuilder) Name() string {
65         return "wrapper"
66 }
67
68 type scState struct {
69         addr Address // The v1 address type.
70         s    connectivity.State
71         down func(error)
72 }
73
74 type balancerWrapper struct {
75         balancer  Balancer // The v1 balancer.
76         pickfirst bool
77
78         cc         balancer.ClientConn
79         targetAddr string // Target without the scheme.
80
81         mu     sync.Mutex
82         conns  map[resolver.Address]balancer.SubConn
83         connSt map[balancer.SubConn]*scState
84         // This channel is closed when handling the first resolver result.
85         // lbWatcher blocks until this is closed, to avoid race between
86         // - NewSubConn is created, cc wants to notify balancer of state changes;
87         // - Build hasn't return, cc doesn't have access to balancer.
88         startCh chan struct{}
89
90         // To aggregate the connectivity state.
91         csEvltr *balancer.ConnectivityStateEvaluator
92         state   connectivity.State
93 }
94
95 // lbWatcher watches the Notify channel of the balancer and manages
96 // connections accordingly.
97 func (bw *balancerWrapper) lbWatcher() {
98         <-bw.startCh
99         notifyCh := bw.balancer.Notify()
100         if notifyCh == nil {
101                 // There's no resolver in the balancer. Connect directly.
102                 a := resolver.Address{
103                         Addr: bw.targetAddr,
104                         Type: resolver.Backend,
105                 }
106                 sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
107                 if err != nil {
108                         grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
109                 } else {
110                         bw.mu.Lock()
111                         bw.conns[a] = sc
112                         bw.connSt[sc] = &scState{
113                                 addr: Address{Addr: bw.targetAddr},
114                                 s:    connectivity.Idle,
115                         }
116                         bw.mu.Unlock()
117                         sc.Connect()
118                 }
119                 return
120         }
121
122         for addrs := range notifyCh {
123                 grpclog.Infof("balancerWrapper: got update addr from Notify: %v\n", addrs)
124                 if bw.pickfirst {
125                         var (
126                                 oldA  resolver.Address
127                                 oldSC balancer.SubConn
128                         )
129                         bw.mu.Lock()
130                         for oldA, oldSC = range bw.conns {
131                                 break
132                         }
133                         bw.mu.Unlock()
134                         if len(addrs) <= 0 {
135                                 if oldSC != nil {
136                                         // Teardown old sc.
137                                         bw.mu.Lock()
138                                         delete(bw.conns, oldA)
139                                         delete(bw.connSt, oldSC)
140                                         bw.mu.Unlock()
141                                         bw.cc.RemoveSubConn(oldSC)
142                                 }
143                                 continue
144                         }
145
146                         var newAddrs []resolver.Address
147                         for _, a := range addrs {
148                                 newAddr := resolver.Address{
149                                         Addr:       a.Addr,
150                                         Type:       resolver.Backend, // All addresses from balancer are all backends.
151                                         ServerName: "",
152                                         Metadata:   a.Metadata,
153                                 }
154                                 newAddrs = append(newAddrs, newAddr)
155                         }
156                         if oldSC == nil {
157                                 // Create new sc.
158                                 sc, err := bw.cc.NewSubConn(newAddrs, balancer.NewSubConnOptions{})
159                                 if err != nil {
160                                         grpclog.Warningf("Error creating connection to %v. Err: %v", newAddrs, err)
161                                 } else {
162                                         bw.mu.Lock()
163                                         // For pickfirst, there should be only one SubConn, so the
164                                         // address doesn't matter. All states updating (up and down)
165                                         // and picking should all happen on that only SubConn.
166                                         bw.conns[resolver.Address{}] = sc
167                                         bw.connSt[sc] = &scState{
168                                                 addr: addrs[0], // Use the first address.
169                                                 s:    connectivity.Idle,
170                                         }
171                                         bw.mu.Unlock()
172                                         sc.Connect()
173                                 }
174                         } else {
175                                 bw.mu.Lock()
176                                 bw.connSt[oldSC].addr = addrs[0]
177                                 bw.mu.Unlock()
178                                 oldSC.UpdateAddresses(newAddrs)
179                         }
180                 } else {
181                         var (
182                                 add []resolver.Address // Addresses need to setup connections.
183                                 del []balancer.SubConn // Connections need to tear down.
184                         )
185                         resAddrs := make(map[resolver.Address]Address)
186                         for _, a := range addrs {
187                                 resAddrs[resolver.Address{
188                                         Addr:       a.Addr,
189                                         Type:       resolver.Backend, // All addresses from balancer are all backends.
190                                         ServerName: "",
191                                         Metadata:   a.Metadata,
192                                 }] = a
193                         }
194                         bw.mu.Lock()
195                         for a := range resAddrs {
196                                 if _, ok := bw.conns[a]; !ok {
197                                         add = append(add, a)
198                                 }
199                         }
200                         for a, c := range bw.conns {
201                                 if _, ok := resAddrs[a]; !ok {
202                                         del = append(del, c)
203                                         delete(bw.conns, a)
204                                         // Keep the state of this sc in bw.connSt until its state becomes Shutdown.
205                                 }
206                         }
207                         bw.mu.Unlock()
208                         for _, a := range add {
209                                 sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
210                                 if err != nil {
211                                         grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
212                                 } else {
213                                         bw.mu.Lock()
214                                         bw.conns[a] = sc
215                                         bw.connSt[sc] = &scState{
216                                                 addr: resAddrs[a],
217                                                 s:    connectivity.Idle,
218                                         }
219                                         bw.mu.Unlock()
220                                         sc.Connect()
221                                 }
222                         }
223                         for _, c := range del {
224                                 bw.cc.RemoveSubConn(c)
225                         }
226                 }
227         }
228 }
229
230 func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
231         bw.mu.Lock()
232         defer bw.mu.Unlock()
233         scSt, ok := bw.connSt[sc]
234         if !ok {
235                 return
236         }
237         if s == connectivity.Idle {
238                 sc.Connect()
239         }
240         oldS := scSt.s
241         scSt.s = s
242         if oldS != connectivity.Ready && s == connectivity.Ready {
243                 scSt.down = bw.balancer.Up(scSt.addr)
244         } else if oldS == connectivity.Ready && s != connectivity.Ready {
245                 if scSt.down != nil {
246                         scSt.down(errConnClosing)
247                 }
248         }
249         sa := bw.csEvltr.RecordTransition(oldS, s)
250         if bw.state != sa {
251                 bw.state = sa
252         }
253         bw.cc.UpdateBalancerState(bw.state, bw)
254         if s == connectivity.Shutdown {
255                 // Remove state for this sc.
256                 delete(bw.connSt, sc)
257         }
258 }
259
260 func (bw *balancerWrapper) HandleResolvedAddrs([]resolver.Address, error) {
261         bw.mu.Lock()
262         defer bw.mu.Unlock()
263         select {
264         case <-bw.startCh:
265         default:
266                 close(bw.startCh)
267         }
268         // There should be a resolver inside the balancer.
269         // All updates here, if any, are ignored.
270 }
271
272 func (bw *balancerWrapper) Close() {
273         bw.mu.Lock()
274         defer bw.mu.Unlock()
275         select {
276         case <-bw.startCh:
277         default:
278                 close(bw.startCh)
279         }
280         bw.balancer.Close()
281 }
282
283 // The picker is the balancerWrapper itself.
284 // It either blocks or returns error, consistent with v1 balancer Get().
285 func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) (sc balancer.SubConn, done func(balancer.DoneInfo), err error) {
286         failfast := true // Default failfast is true.
287         if ss, ok := rpcInfoFromContext(ctx); ok {
288                 failfast = ss.failfast
289         }
290         a, p, err := bw.balancer.Get(ctx, BalancerGetOptions{BlockingWait: !failfast})
291         if err != nil {
292                 return nil, nil, err
293         }
294         if p != nil {
295                 done = func(balancer.DoneInfo) { p() }
296                 defer func() {
297                         if err != nil {
298                                 p()
299                         }
300                 }()
301         }
302
303         bw.mu.Lock()
304         defer bw.mu.Unlock()
305         if bw.pickfirst {
306                 // Get the first sc in conns.
307                 for _, sc := range bw.conns {
308                         return sc, done, nil
309                 }
310                 return nil, nil, balancer.ErrNoSubConnAvailable
311         }
312         sc, ok1 := bw.conns[resolver.Address{
313                 Addr:       a.Addr,
314                 Type:       resolver.Backend,
315                 ServerName: "",
316                 Metadata:   a.Metadata,
317         }]
318         s, ok2 := bw.connSt[sc]
319         if !ok1 || !ok2 {
320                 // This can only happen due to a race where Get() returned an address
321                 // that was subsequently removed by Notify.  In this case we should
322                 // retry always.
323                 return nil, nil, balancer.ErrNoSubConnAvailable
324         }
325         switch s.s {
326         case connectivity.Ready, connectivity.Idle:
327                 return sc, done, nil
328         case connectivity.Shutdown, connectivity.TransientFailure:
329                 // If the returned sc has been shut down or is in transient failure,
330                 // return error, and this RPC will fail or wait for another picker (if
331                 // non-failfast).
332                 return nil, nil, balancer.ErrTransientFailure
333         default:
334                 // For other states (connecting or unknown), the v1 balancer would
335                 // traditionally wait until ready and then issue the RPC.  Returning
336                 // ErrNoSubConnAvailable will be a slight improvement in that it will
337                 // allow the balancer to choose another address in case others are
338                 // connected.
339                 return nil, nil, balancer.ErrNoSubConnAvailable
340         }
341 }