Code refactoring for bpa operator
[icn.git] / cmd / bpa-operator / vendor / google.golang.org / grpc / picker_wrapper.go
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package grpc
20
21 import (
22         "context"
23         "io"
24         "sync"
25
26         "google.golang.org/grpc/balancer"
27         "google.golang.org/grpc/codes"
28         "google.golang.org/grpc/grpclog"
29         "google.golang.org/grpc/internal/channelz"
30         "google.golang.org/grpc/internal/transport"
31         "google.golang.org/grpc/status"
32 )
33
34 // pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
35 // actions and unblock when there's a picker update.
36 type pickerWrapper struct {
37         mu         sync.Mutex
38         done       bool
39         blockingCh chan struct{}
40         picker     balancer.Picker
41
42         // The latest connection happened.
43         connErrMu sync.Mutex
44         connErr   error
45 }
46
47 func newPickerWrapper() *pickerWrapper {
48         bp := &pickerWrapper{blockingCh: make(chan struct{})}
49         return bp
50 }
51
52 func (bp *pickerWrapper) updateConnectionError(err error) {
53         bp.connErrMu.Lock()
54         bp.connErr = err
55         bp.connErrMu.Unlock()
56 }
57
58 func (bp *pickerWrapper) connectionError() error {
59         bp.connErrMu.Lock()
60         err := bp.connErr
61         bp.connErrMu.Unlock()
62         return err
63 }
64
65 // updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
66 func (bp *pickerWrapper) updatePicker(p balancer.Picker) {
67         bp.mu.Lock()
68         if bp.done {
69                 bp.mu.Unlock()
70                 return
71         }
72         bp.picker = p
73         // bp.blockingCh should never be nil.
74         close(bp.blockingCh)
75         bp.blockingCh = make(chan struct{})
76         bp.mu.Unlock()
77 }
78
79 func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) func(balancer.DoneInfo) {
80         acw.mu.Lock()
81         ac := acw.ac
82         acw.mu.Unlock()
83         ac.incrCallsStarted()
84         return func(b balancer.DoneInfo) {
85                 if b.Err != nil && b.Err != io.EOF {
86                         ac.incrCallsFailed()
87                 } else {
88                         ac.incrCallsSucceeded()
89                 }
90                 if done != nil {
91                         done(b)
92                 }
93         }
94 }
95
96 // pick returns the transport that will be used for the RPC.
97 // It may block in the following cases:
98 // - there's no picker
99 // - the current picker returns ErrNoSubConnAvailable
100 // - the current picker returns other errors and failfast is false.
101 // - the subConn returned by the current picker is not READY
102 // When one of these situations happens, pick blocks until the picker gets updated.
103 func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) {
104         var ch chan struct{}
105
106         for {
107                 bp.mu.Lock()
108                 if bp.done {
109                         bp.mu.Unlock()
110                         return nil, nil, ErrClientConnClosing
111                 }
112
113                 if bp.picker == nil {
114                         ch = bp.blockingCh
115                 }
116                 if ch == bp.blockingCh {
117                         // This could happen when either:
118                         // - bp.picker is nil (the previous if condition), or
119                         // - has called pick on the current picker.
120                         bp.mu.Unlock()
121                         select {
122                         case <-ctx.Done():
123                                 return nil, nil, ctx.Err()
124                         case <-ch:
125                         }
126                         continue
127                 }
128
129                 ch = bp.blockingCh
130                 p := bp.picker
131                 bp.mu.Unlock()
132
133                 subConn, done, err := p.Pick(ctx, opts)
134
135                 if err != nil {
136                         switch err {
137                         case balancer.ErrNoSubConnAvailable:
138                                 continue
139                         case balancer.ErrTransientFailure:
140                                 if !failfast {
141                                         continue
142                                 }
143                                 return nil, nil, status.Errorf(codes.Unavailable, "%v, latest connection error: %v", err, bp.connectionError())
144                         case context.DeadlineExceeded:
145                                 return nil, nil, status.Error(codes.DeadlineExceeded, err.Error())
146                         case context.Canceled:
147                                 return nil, nil, status.Error(codes.Canceled, err.Error())
148                         default:
149                                 if _, ok := status.FromError(err); ok {
150                                         return nil, nil, err
151                                 }
152                                 // err is some other error.
153                                 return nil, nil, status.Error(codes.Unknown, err.Error())
154                         }
155                 }
156
157                 acw, ok := subConn.(*acBalancerWrapper)
158                 if !ok {
159                         grpclog.Error("subconn returned from pick is not *acBalancerWrapper")
160                         continue
161                 }
162                 if t, ok := acw.getAddrConn().getReadyTransport(); ok {
163                         if channelz.IsOn() {
164                                 return t, doneChannelzWrapper(acw, done), nil
165                         }
166                         return t, done, nil
167                 }
168                 grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
169                 // If ok == false, ac.state is not READY.
170                 // A valid picker always returns READY subConn. This means the state of ac
171                 // just changed, and picker will be updated shortly.
172                 // continue back to the beginning of the for loop to repick.
173         }
174 }
175
176 func (bp *pickerWrapper) close() {
177         bp.mu.Lock()
178         defer bp.mu.Unlock()
179         if bp.done {
180                 return
181         }
182         bp.done = true
183         close(bp.blockingCh)
184 }