Remove BPA from Makefile
[icn.git] / cmd / bpa-operator / vendor / google.golang.org / grpc / balancer_conn_wrappers.go
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package grpc
20
21 import (
22         "fmt"
23         "sync"
24
25         "google.golang.org/grpc/balancer"
26         "google.golang.org/grpc/connectivity"
27         "google.golang.org/grpc/grpclog"
28         "google.golang.org/grpc/resolver"
29 )
30
31 // scStateUpdate contains the subConn and the new state it changed to.
32 type scStateUpdate struct {
33         sc    balancer.SubConn
34         state connectivity.State
35 }
36
37 // scStateUpdateBuffer is an unbounded channel for scStateChangeTuple.
38 // TODO make a general purpose buffer that uses interface{}.
39 type scStateUpdateBuffer struct {
40         c       chan *scStateUpdate
41         mu      sync.Mutex
42         backlog []*scStateUpdate
43 }
44
45 func newSCStateUpdateBuffer() *scStateUpdateBuffer {
46         return &scStateUpdateBuffer{
47                 c: make(chan *scStateUpdate, 1),
48         }
49 }
50
51 func (b *scStateUpdateBuffer) put(t *scStateUpdate) {
52         b.mu.Lock()
53         defer b.mu.Unlock()
54         if len(b.backlog) == 0 {
55                 select {
56                 case b.c <- t:
57                         return
58                 default:
59                 }
60         }
61         b.backlog = append(b.backlog, t)
62 }
63
64 func (b *scStateUpdateBuffer) load() {
65         b.mu.Lock()
66         defer b.mu.Unlock()
67         if len(b.backlog) > 0 {
68                 select {
69                 case b.c <- b.backlog[0]:
70                         b.backlog[0] = nil
71                         b.backlog = b.backlog[1:]
72                 default:
73                 }
74         }
75 }
76
77 // get returns the channel that the scStateUpdate will be sent to.
78 //
79 // Upon receiving, the caller should call load to send another
80 // scStateChangeTuple onto the channel if there is any.
81 func (b *scStateUpdateBuffer) get() <-chan *scStateUpdate {
82         return b.c
83 }
84
85 // resolverUpdate contains the new resolved addresses or error if there's
86 // any.
87 type resolverUpdate struct {
88         addrs []resolver.Address
89         err   error
90 }
91
92 // ccBalancerWrapper is a wrapper on top of cc for balancers.
93 // It implements balancer.ClientConn interface.
94 type ccBalancerWrapper struct {
95         cc               *ClientConn
96         balancer         balancer.Balancer
97         stateChangeQueue *scStateUpdateBuffer
98         resolverUpdateCh chan *resolverUpdate
99         done             chan struct{}
100
101         mu       sync.Mutex
102         subConns map[*acBalancerWrapper]struct{}
103 }
104
105 func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
106         ccb := &ccBalancerWrapper{
107                 cc:               cc,
108                 stateChangeQueue: newSCStateUpdateBuffer(),
109                 resolverUpdateCh: make(chan *resolverUpdate, 1),
110                 done:             make(chan struct{}),
111                 subConns:         make(map[*acBalancerWrapper]struct{}),
112         }
113         go ccb.watcher()
114         ccb.balancer = b.Build(ccb, bopts)
115         return ccb
116 }
117
118 // watcher balancer functions sequentially, so the balancer can be implemented
119 // lock-free.
120 func (ccb *ccBalancerWrapper) watcher() {
121         for {
122                 select {
123                 case t := <-ccb.stateChangeQueue.get():
124                         ccb.stateChangeQueue.load()
125                         select {
126                         case <-ccb.done:
127                                 ccb.balancer.Close()
128                                 return
129                         default:
130                         }
131                         ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
132                 case t := <-ccb.resolverUpdateCh:
133                         select {
134                         case <-ccb.done:
135                                 ccb.balancer.Close()
136                                 return
137                         default:
138                         }
139                         ccb.balancer.HandleResolvedAddrs(t.addrs, t.err)
140                 case <-ccb.done:
141                 }
142
143                 select {
144                 case <-ccb.done:
145                         ccb.balancer.Close()
146                         ccb.mu.Lock()
147                         scs := ccb.subConns
148                         ccb.subConns = nil
149                         ccb.mu.Unlock()
150                         for acbw := range scs {
151                                 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
152                         }
153                         return
154                 default:
155                 }
156         }
157 }
158
159 func (ccb *ccBalancerWrapper) close() {
160         close(ccb.done)
161 }
162
163 func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
164         // When updating addresses for a SubConn, if the address in use is not in
165         // the new addresses, the old ac will be tearDown() and a new ac will be
166         // created. tearDown() generates a state change with Shutdown state, we
167         // don't want the balancer to receive this state change. So before
168         // tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and
169         // this function will be called with (nil, Shutdown). We don't need to call
170         // balancer method in this case.
171         if sc == nil {
172                 return
173         }
174         ccb.stateChangeQueue.put(&scStateUpdate{
175                 sc:    sc,
176                 state: s,
177         })
178 }
179
180 func (ccb *ccBalancerWrapper) handleResolvedAddrs(addrs []resolver.Address, err error) {
181         if ccb.cc.curBalancerName != grpclbName {
182                 var containsGRPCLB bool
183                 for _, a := range addrs {
184                         if a.Type == resolver.GRPCLB {
185                                 containsGRPCLB = true
186                                 break
187                         }
188                 }
189                 if containsGRPCLB {
190                         // The current balancer is not grpclb, but addresses contain grpclb
191                         // address. This means we failed to switch to grpclb, most likely
192                         // because grpclb is not registered. Filter out all grpclb addresses
193                         // from addrs before sending to balancer.
194                         tempAddrs := make([]resolver.Address, 0, len(addrs))
195                         for _, a := range addrs {
196                                 if a.Type != resolver.GRPCLB {
197                                         tempAddrs = append(tempAddrs, a)
198                                 }
199                         }
200                         addrs = tempAddrs
201                 }
202         }
203         select {
204         case <-ccb.resolverUpdateCh:
205         default:
206         }
207         ccb.resolverUpdateCh <- &resolverUpdate{
208                 addrs: addrs,
209                 err:   err,
210         }
211 }
212
213 func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
214         if len(addrs) <= 0 {
215                 return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
216         }
217         ccb.mu.Lock()
218         defer ccb.mu.Unlock()
219         if ccb.subConns == nil {
220                 return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed")
221         }
222         ac, err := ccb.cc.newAddrConn(addrs, opts)
223         if err != nil {
224                 return nil, err
225         }
226         acbw := &acBalancerWrapper{ac: ac}
227         acbw.ac.mu.Lock()
228         ac.acbw = acbw
229         acbw.ac.mu.Unlock()
230         ccb.subConns[acbw] = struct{}{}
231         return acbw, nil
232 }
233
234 func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
235         acbw, ok := sc.(*acBalancerWrapper)
236         if !ok {
237                 return
238         }
239         ccb.mu.Lock()
240         defer ccb.mu.Unlock()
241         if ccb.subConns == nil {
242                 return
243         }
244         delete(ccb.subConns, acbw)
245         ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
246 }
247
248 func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
249         ccb.mu.Lock()
250         defer ccb.mu.Unlock()
251         if ccb.subConns == nil {
252                 return
253         }
254         // Update picker before updating state.  Even though the ordering here does
255         // not matter, it can lead to multiple calls of Pick in the common start-up
256         // case where we wait for ready and then perform an RPC.  If the picker is
257         // updated later, we could call the "connecting" picker when the state is
258         // updated, and then call the "ready" picker after the picker gets updated.
259         ccb.cc.blockingpicker.updatePicker(p)
260         ccb.cc.csMgr.updateState(s)
261 }
262
263 func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOption) {
264         ccb.cc.resolveNow(o)
265 }
266
267 func (ccb *ccBalancerWrapper) Target() string {
268         return ccb.cc.target
269 }
270
271 // acBalancerWrapper is a wrapper on top of ac for balancers.
272 // It implements balancer.SubConn interface.
273 type acBalancerWrapper struct {
274         mu sync.Mutex
275         ac *addrConn
276 }
277
278 func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
279         acbw.mu.Lock()
280         defer acbw.mu.Unlock()
281         if len(addrs) <= 0 {
282                 acbw.ac.tearDown(errConnDrain)
283                 return
284         }
285         if !acbw.ac.tryUpdateAddrs(addrs) {
286                 cc := acbw.ac.cc
287                 opts := acbw.ac.scopts
288                 acbw.ac.mu.Lock()
289                 // Set old ac.acbw to nil so the Shutdown state update will be ignored
290                 // by balancer.
291                 //
292                 // TODO(bar) the state transition could be wrong when tearDown() old ac
293                 // and creating new ac, fix the transition.
294                 acbw.ac.acbw = nil
295                 acbw.ac.mu.Unlock()
296                 acState := acbw.ac.getState()
297                 acbw.ac.tearDown(errConnDrain)
298
299                 if acState == connectivity.Shutdown {
300                         return
301                 }
302
303                 ac, err := cc.newAddrConn(addrs, opts)
304                 if err != nil {
305                         grpclog.Warningf("acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err)
306                         return
307                 }
308                 acbw.ac = ac
309                 ac.mu.Lock()
310                 ac.acbw = acbw
311                 ac.mu.Unlock()
312                 if acState != connectivity.Idle {
313                         ac.connect()
314                 }
315         }
316 }
317
318 func (acbw *acBalancerWrapper) Connect() {
319         acbw.mu.Lock()
320         defer acbw.mu.Unlock()
321         acbw.ac.connect()
322 }
323
324 func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
325         acbw.mu.Lock()
326         defer acbw.mu.Unlock()
327         return acbw.ac
328 }