Remove BPA from Makefile
[icn.git] / cmd / bpa-operator / vendor / google.golang.org / grpc / clientconn.go
1 /*
2  *
3  * Copyright 2014 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package grpc
20
21 import (
22         "context"
23         "errors"
24         "fmt"
25         "math"
26         "net"
27         "reflect"
28         "strings"
29         "sync"
30         "sync/atomic"
31         "time"
32
33         "google.golang.org/grpc/balancer"
34         _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
35         "google.golang.org/grpc/codes"
36         "google.golang.org/grpc/connectivity"
37         "google.golang.org/grpc/credentials"
38         "google.golang.org/grpc/grpclog"
39         "google.golang.org/grpc/internal/backoff"
40         "google.golang.org/grpc/internal/channelz"
41         "google.golang.org/grpc/internal/envconfig"
42         "google.golang.org/grpc/internal/grpcsync"
43         "google.golang.org/grpc/internal/transport"
44         "google.golang.org/grpc/keepalive"
45         "google.golang.org/grpc/metadata"
46         "google.golang.org/grpc/resolver"
47         _ "google.golang.org/grpc/resolver/dns"         // To register dns resolver.
48         _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
49         "google.golang.org/grpc/status"
50 )
51
52 const (
53         // minimum time to give a connection to complete
54         minConnectTimeout = 20 * time.Second
55         // must match grpclbName in grpclb/grpclb.go
56         grpclbName = "grpclb"
57 )
58
59 var (
60         // ErrClientConnClosing indicates that the operation is illegal because
61         // the ClientConn is closing.
62         //
63         // Deprecated: this error should not be relied upon by users; use the status
64         // code of Canceled instead.
65         ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
66         // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
67         errConnDrain = errors.New("grpc: the connection is drained")
68         // errConnClosing indicates that the connection is closing.
69         errConnClosing = errors.New("grpc: the connection is closing")
70         // errBalancerClosed indicates that the balancer is closed.
71         errBalancerClosed = errors.New("grpc: balancer is closed")
72         // We use an accessor so that minConnectTimeout can be
73         // atomically read and updated while testing.
74         getMinConnectTimeout = func() time.Duration {
75                 return minConnectTimeout
76         }
77 )
78
79 // The following errors are returned from Dial and DialContext
80 var (
81         // errNoTransportSecurity indicates that there is no transport security
82         // being set for ClientConn. Users should either set one or explicitly
83         // call WithInsecure DialOption to disable security.
84         errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
85         // errTransportCredsAndBundle indicates that creds bundle is used together
86         // with other individual Transport Credentials.
87         errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
88         // errTransportCredentialsMissing indicates that users want to transmit security
89         // information (e.g., OAuth2 token) which requires secure connection on an insecure
90         // connection.
91         errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
92         // errCredentialsConflict indicates that grpc.WithTransportCredentials()
93         // and grpc.WithInsecure() are both called for a connection.
94         errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
95 )
96
97 const (
98         defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
99         defaultClientMaxSendMessageSize    = math.MaxInt32
100         // http2IOBufSize specifies the buffer size for sending frames.
101         defaultWriteBufSize = 32 * 1024
102         defaultReadBufSize  = 32 * 1024
103 )
104
105 // Dial creates a client connection to the given target.
106 func Dial(target string, opts ...DialOption) (*ClientConn, error) {
107         return DialContext(context.Background(), target, opts...)
108 }
109
110 // DialContext creates a client connection to the given target. By default, it's
111 // a non-blocking dial (the function won't wait for connections to be
112 // established, and connecting happens in the background). To make it a blocking
113 // dial, use WithBlock() dial option.
114 //
115 // In the non-blocking case, the ctx does not act against the connection. It
116 // only controls the setup steps.
117 //
118 // In the blocking case, ctx can be used to cancel or expire the pending
119 // connection. Once this function returns, the cancellation and expiration of
120 // ctx will be noop. Users should call ClientConn.Close to terminate all the
121 // pending operations after this function returns.
122 //
123 // The target name syntax is defined in
124 // https://github.com/grpc/grpc/blob/master/doc/naming.md.
125 // e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
126 func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
127         cc := &ClientConn{
128                 target:            target,
129                 csMgr:             &connectivityStateManager{},
130                 conns:             make(map[*addrConn]struct{}),
131                 dopts:             defaultDialOptions(),
132                 blockingpicker:    newPickerWrapper(),
133                 czData:            new(channelzData),
134                 firstResolveEvent: grpcsync.NewEvent(),
135         }
136         cc.retryThrottler.Store((*retryThrottler)(nil))
137         cc.ctx, cc.cancel = context.WithCancel(context.Background())
138
139         for _, opt := range opts {
140                 opt.apply(&cc.dopts)
141         }
142
143         if channelz.IsOn() {
144                 if cc.dopts.channelzParentID != 0 {
145                         cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
146                         channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
147                                 Desc:     "Channel Created",
148                                 Severity: channelz.CtINFO,
149                                 Parent: &channelz.TraceEventDesc{
150                                         Desc:     fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
151                                         Severity: channelz.CtINFO,
152                                 },
153                         })
154                 } else {
155                         cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
156                         channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
157                                 Desc:     "Channel Created",
158                                 Severity: channelz.CtINFO,
159                         })
160                 }
161                 cc.csMgr.channelzID = cc.channelzID
162         }
163
164         if !cc.dopts.insecure {
165                 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
166                         return nil, errNoTransportSecurity
167                 }
168                 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
169                         return nil, errTransportCredsAndBundle
170                 }
171         } else {
172                 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
173                         return nil, errCredentialsConflict
174                 }
175                 for _, cd := range cc.dopts.copts.PerRPCCredentials {
176                         if cd.RequireTransportSecurity() {
177                                 return nil, errTransportCredentialsMissing
178                         }
179                 }
180         }
181
182         cc.mkp = cc.dopts.copts.KeepaliveParams
183
184         if cc.dopts.copts.Dialer == nil {
185                 cc.dopts.copts.Dialer = newProxyDialer(
186                         func(ctx context.Context, addr string) (net.Conn, error) {
187                                 network, addr := parseDialTarget(addr)
188                                 return (&net.Dialer{}).DialContext(ctx, network, addr)
189                         },
190                 )
191         }
192
193         if cc.dopts.copts.UserAgent != "" {
194                 cc.dopts.copts.UserAgent += " " + grpcUA
195         } else {
196                 cc.dopts.copts.UserAgent = grpcUA
197         }
198
199         if cc.dopts.timeout > 0 {
200                 var cancel context.CancelFunc
201                 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
202                 defer cancel()
203         }
204
205         defer func() {
206                 select {
207                 case <-ctx.Done():
208                         conn, err = nil, ctx.Err()
209                 default:
210                 }
211
212                 if err != nil {
213                         cc.Close()
214                 }
215         }()
216
217         scSet := false
218         if cc.dopts.scChan != nil {
219                 // Try to get an initial service config.
220                 select {
221                 case sc, ok := <-cc.dopts.scChan:
222                         if ok {
223                                 cc.sc = sc
224                                 scSet = true
225                         }
226                 default:
227                 }
228         }
229         if cc.dopts.bs == nil {
230                 cc.dopts.bs = backoff.Exponential{
231                         MaxDelay: DefaultBackoffConfig.MaxDelay,
232                 }
233         }
234         if cc.dopts.resolverBuilder == nil {
235                 // Only try to parse target when resolver builder is not already set.
236                 cc.parsedTarget = parseTarget(cc.target)
237                 grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
238                 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
239                 if cc.dopts.resolverBuilder == nil {
240                         // If resolver builder is still nil, the parsed target's scheme is
241                         // not registered. Fallback to default resolver and set Endpoint to
242                         // the original target.
243                         grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
244                         cc.parsedTarget = resolver.Target{
245                                 Scheme:   resolver.GetDefaultScheme(),
246                                 Endpoint: target,
247                         }
248                         cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
249                 }
250         } else {
251                 cc.parsedTarget = resolver.Target{Endpoint: target}
252         }
253         creds := cc.dopts.copts.TransportCredentials
254         if creds != nil && creds.Info().ServerName != "" {
255                 cc.authority = creds.Info().ServerName
256         } else if cc.dopts.insecure && cc.dopts.authority != "" {
257                 cc.authority = cc.dopts.authority
258         } else {
259                 // Use endpoint from "scheme://authority/endpoint" as the default
260                 // authority for ClientConn.
261                 cc.authority = cc.parsedTarget.Endpoint
262         }
263
264         if cc.dopts.scChan != nil && !scSet {
265                 // Blocking wait for the initial service config.
266                 select {
267                 case sc, ok := <-cc.dopts.scChan:
268                         if ok {
269                                 cc.sc = sc
270                         }
271                 case <-ctx.Done():
272                         return nil, ctx.Err()
273                 }
274         }
275         if cc.dopts.scChan != nil {
276                 go cc.scWatcher()
277         }
278
279         var credsClone credentials.TransportCredentials
280         if creds := cc.dopts.copts.TransportCredentials; creds != nil {
281                 credsClone = creds.Clone()
282         }
283         cc.balancerBuildOpts = balancer.BuildOptions{
284                 DialCreds:        credsClone,
285                 CredsBundle:      cc.dopts.copts.CredsBundle,
286                 Dialer:           cc.dopts.copts.Dialer,
287                 ChannelzParentID: cc.channelzID,
288         }
289
290         // Build the resolver.
291         rWrapper, err := newCCResolverWrapper(cc)
292         if err != nil {
293                 return nil, fmt.Errorf("failed to build resolver: %v", err)
294         }
295
296         cc.mu.Lock()
297         cc.resolverWrapper = rWrapper
298         cc.mu.Unlock()
299         // A blocking dial blocks until the clientConn is ready.
300         if cc.dopts.block {
301                 for {
302                         s := cc.GetState()
303                         if s == connectivity.Ready {
304                                 break
305                         } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
306                                 if err = cc.blockingpicker.connectionError(); err != nil {
307                                         terr, ok := err.(interface {
308                                                 Temporary() bool
309                                         })
310                                         if ok && !terr.Temporary() {
311                                                 return nil, err
312                                         }
313                                 }
314                         }
315                         if !cc.WaitForStateChange(ctx, s) {
316                                 // ctx got timeout or canceled.
317                                 return nil, ctx.Err()
318                         }
319                 }
320         }
321
322         return cc, nil
323 }
324
325 // connectivityStateManager keeps the connectivity.State of ClientConn.
326 // This struct will eventually be exported so the balancers can access it.
327 type connectivityStateManager struct {
328         mu         sync.Mutex
329         state      connectivity.State
330         notifyChan chan struct{}
331         channelzID int64
332 }
333
334 // updateState updates the connectivity.State of ClientConn.
335 // If there's a change it notifies goroutines waiting on state change to
336 // happen.
337 func (csm *connectivityStateManager) updateState(state connectivity.State) {
338         csm.mu.Lock()
339         defer csm.mu.Unlock()
340         if csm.state == connectivity.Shutdown {
341                 return
342         }
343         if csm.state == state {
344                 return
345         }
346         csm.state = state
347         if channelz.IsOn() {
348                 channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{
349                         Desc:     fmt.Sprintf("Channel Connectivity change to %v", state),
350                         Severity: channelz.CtINFO,
351                 })
352         }
353         if csm.notifyChan != nil {
354                 // There are other goroutines waiting on this channel.
355                 close(csm.notifyChan)
356                 csm.notifyChan = nil
357         }
358 }
359
360 func (csm *connectivityStateManager) getState() connectivity.State {
361         csm.mu.Lock()
362         defer csm.mu.Unlock()
363         return csm.state
364 }
365
366 func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
367         csm.mu.Lock()
368         defer csm.mu.Unlock()
369         if csm.notifyChan == nil {
370                 csm.notifyChan = make(chan struct{})
371         }
372         return csm.notifyChan
373 }
374
375 // ClientConn represents a client connection to an RPC server.
376 type ClientConn struct {
377         ctx    context.Context
378         cancel context.CancelFunc
379
380         target       string
381         parsedTarget resolver.Target
382         authority    string
383         dopts        dialOptions
384         csMgr        *connectivityStateManager
385
386         balancerBuildOpts balancer.BuildOptions
387         blockingpicker    *pickerWrapper
388
389         mu              sync.RWMutex
390         resolverWrapper *ccResolverWrapper
391         sc              ServiceConfig
392         scRaw           string
393         conns           map[*addrConn]struct{}
394         // Keepalive parameter can be updated if a GoAway is received.
395         mkp             keepalive.ClientParameters
396         curBalancerName string
397         preBalancerName string // previous balancer name.
398         curAddresses    []resolver.Address
399         balancerWrapper *ccBalancerWrapper
400         retryThrottler  atomic.Value
401
402         firstResolveEvent *grpcsync.Event
403
404         channelzID int64 // channelz unique identification number
405         czData     *channelzData
406 }
407
408 // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
409 // ctx expires. A true value is returned in former case and false in latter.
410 // This is an EXPERIMENTAL API.
411 func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
412         ch := cc.csMgr.getNotifyChan()
413         if cc.csMgr.getState() != sourceState {
414                 return true
415         }
416         select {
417         case <-ctx.Done():
418                 return false
419         case <-ch:
420                 return true
421         }
422 }
423
424 // GetState returns the connectivity.State of ClientConn.
425 // This is an EXPERIMENTAL API.
426 func (cc *ClientConn) GetState() connectivity.State {
427         return cc.csMgr.getState()
428 }
429
430 func (cc *ClientConn) scWatcher() {
431         for {
432                 select {
433                 case sc, ok := <-cc.dopts.scChan:
434                         if !ok {
435                                 return
436                         }
437                         cc.mu.Lock()
438                         // TODO: load balance policy runtime change is ignored.
439                         // We may revisit this decision in the future.
440                         cc.sc = sc
441                         cc.scRaw = ""
442                         cc.mu.Unlock()
443                 case <-cc.ctx.Done():
444                         return
445                 }
446         }
447 }
448
449 // waitForResolvedAddrs blocks until the resolver has provided addresses or the
450 // context expires.  Returns nil unless the context expires first; otherwise
451 // returns a status error based on the context.
452 func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
453         // This is on the RPC path, so we use a fast path to avoid the
454         // more-expensive "select" below after the resolver has returned once.
455         if cc.firstResolveEvent.HasFired() {
456                 return nil
457         }
458         select {
459         case <-cc.firstResolveEvent.Done():
460                 return nil
461         case <-ctx.Done():
462                 return status.FromContextError(ctx.Err()).Err()
463         case <-cc.ctx.Done():
464                 return ErrClientConnClosing
465         }
466 }
467
468 func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
469         cc.mu.Lock()
470         defer cc.mu.Unlock()
471         if cc.conns == nil {
472                 // cc was closed.
473                 return
474         }
475
476         if reflect.DeepEqual(cc.curAddresses, addrs) {
477                 return
478         }
479
480         cc.curAddresses = addrs
481         cc.firstResolveEvent.Fire()
482
483         if cc.dopts.balancerBuilder == nil {
484                 // Only look at balancer types and switch balancer if balancer dial
485                 // option is not set.
486                 var isGRPCLB bool
487                 for _, a := range addrs {
488                         if a.Type == resolver.GRPCLB {
489                                 isGRPCLB = true
490                                 break
491                         }
492                 }
493                 var newBalancerName string
494                 if isGRPCLB {
495                         newBalancerName = grpclbName
496                 } else {
497                         // Address list doesn't contain grpclb address. Try to pick a
498                         // non-grpclb balancer.
499                         newBalancerName = cc.curBalancerName
500                         // If current balancer is grpclb, switch to the previous one.
501                         if newBalancerName == grpclbName {
502                                 newBalancerName = cc.preBalancerName
503                         }
504                         // The following could be true in two cases:
505                         // - the first time handling resolved addresses
506                         //   (curBalancerName="")
507                         // - the first time handling non-grpclb addresses
508                         //   (curBalancerName="grpclb", preBalancerName="")
509                         if newBalancerName == "" {
510                                 newBalancerName = PickFirstBalancerName
511                         }
512                 }
513                 cc.switchBalancer(newBalancerName)
514         } else if cc.balancerWrapper == nil {
515                 // Balancer dial option was set, and this is the first time handling
516                 // resolved addresses. Build a balancer with dopts.balancerBuilder.
517                 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
518         }
519
520         cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
521 }
522
523 // switchBalancer starts the switching from current balancer to the balancer
524 // with the given name.
525 //
526 // It will NOT send the current address list to the new balancer. If needed,
527 // caller of this function should send address list to the new balancer after
528 // this function returns.
529 //
530 // Caller must hold cc.mu.
531 func (cc *ClientConn) switchBalancer(name string) {
532         if cc.conns == nil {
533                 return
534         }
535
536         if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
537                 return
538         }
539
540         grpclog.Infof("ClientConn switching balancer to %q", name)
541         if cc.dopts.balancerBuilder != nil {
542                 grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
543                 return
544         }
545         // TODO(bar switching) change this to two steps: drain and close.
546         // Keep track of sc in wrapper.
547         if cc.balancerWrapper != nil {
548                 cc.balancerWrapper.close()
549         }
550
551         builder := balancer.Get(name)
552         // TODO(yuxuanli): If user send a service config that does not contain a valid balancer name, should
553         // we reuse previous one?
554         if channelz.IsOn() {
555                 if builder == nil {
556                         channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
557                                 Desc:     fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName),
558                                 Severity: channelz.CtWarning,
559                         })
560                 } else {
561                         channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
562                                 Desc:     fmt.Sprintf("Channel switches to new LB policy %q", name),
563                                 Severity: channelz.CtINFO,
564                         })
565                 }
566         }
567         if builder == nil {
568                 grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
569                 builder = newPickfirstBuilder()
570         }
571
572         cc.preBalancerName = cc.curBalancerName
573         cc.curBalancerName = builder.Name()
574         cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
575 }
576
577 func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
578         cc.mu.Lock()
579         if cc.conns == nil {
580                 cc.mu.Unlock()
581                 return
582         }
583         // TODO(bar switching) send updates to all balancer wrappers when balancer
584         // gracefully switching is supported.
585         cc.balancerWrapper.handleSubConnStateChange(sc, s)
586         cc.mu.Unlock()
587 }
588
589 // newAddrConn creates an addrConn for addrs and adds it to cc.conns.
590 //
591 // Caller needs to make sure len(addrs) > 0.
592 func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
593         ac := &addrConn{
594                 cc:           cc,
595                 addrs:        addrs,
596                 scopts:       opts,
597                 dopts:        cc.dopts,
598                 czData:       new(channelzData),
599                 resetBackoff: make(chan struct{}),
600         }
601         ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
602         // Track ac in cc. This needs to be done before any getTransport(...) is called.
603         cc.mu.Lock()
604         if cc.conns == nil {
605                 cc.mu.Unlock()
606                 return nil, ErrClientConnClosing
607         }
608         if channelz.IsOn() {
609                 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
610                 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
611                         Desc:     "Subchannel Created",
612                         Severity: channelz.CtINFO,
613                         Parent: &channelz.TraceEventDesc{
614                                 Desc:     fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
615                                 Severity: channelz.CtINFO,
616                         },
617                 })
618         }
619         cc.conns[ac] = struct{}{}
620         cc.mu.Unlock()
621         return ac, nil
622 }
623
624 // removeAddrConn removes the addrConn in the subConn from clientConn.
625 // It also tears down the ac with the given error.
626 func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
627         cc.mu.Lock()
628         if cc.conns == nil {
629                 cc.mu.Unlock()
630                 return
631         }
632         delete(cc.conns, ac)
633         cc.mu.Unlock()
634         ac.tearDown(err)
635 }
636
637 func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
638         return &channelz.ChannelInternalMetric{
639                 State:                    cc.GetState(),
640                 Target:                   cc.target,
641                 CallsStarted:             atomic.LoadInt64(&cc.czData.callsStarted),
642                 CallsSucceeded:           atomic.LoadInt64(&cc.czData.callsSucceeded),
643                 CallsFailed:              atomic.LoadInt64(&cc.czData.callsFailed),
644                 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
645         }
646 }
647
648 // Target returns the target string of the ClientConn.
649 // This is an EXPERIMENTAL API.
650 func (cc *ClientConn) Target() string {
651         return cc.target
652 }
653
654 func (cc *ClientConn) incrCallsStarted() {
655         atomic.AddInt64(&cc.czData.callsStarted, 1)
656         atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
657 }
658
659 func (cc *ClientConn) incrCallsSucceeded() {
660         atomic.AddInt64(&cc.czData.callsSucceeded, 1)
661 }
662
663 func (cc *ClientConn) incrCallsFailed() {
664         atomic.AddInt64(&cc.czData.callsFailed, 1)
665 }
666
667 // connect starts creating a transport.
668 // It does nothing if the ac is not IDLE.
669 // TODO(bar) Move this to the addrConn section.
670 func (ac *addrConn) connect() error {
671         ac.mu.Lock()
672         if ac.state == connectivity.Shutdown {
673                 ac.mu.Unlock()
674                 return errConnClosing
675         }
676         if ac.state != connectivity.Idle {
677                 ac.mu.Unlock()
678                 return nil
679         }
680         ac.updateConnectivityState(connectivity.Connecting)
681         ac.mu.Unlock()
682
683         // Start a goroutine connecting to the server asynchronously.
684         go ac.resetTransport()
685         return nil
686 }
687
688 // tryUpdateAddrs tries to update ac.addrs with the new addresses list.
689 //
690 // It checks whether current connected address of ac is in the new addrs list.
691 //  - If true, it updates ac.addrs and returns true. The ac will keep using
692 //    the existing connection.
693 //  - If false, it does nothing and returns false.
694 func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
695         ac.mu.Lock()
696         defer ac.mu.Unlock()
697         grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
698         if ac.state == connectivity.Shutdown {
699                 ac.addrs = addrs
700                 return true
701         }
702
703         // Unless we're busy reconnecting already, let's reconnect from the top of
704         // the list.
705         if ac.state != connectivity.Ready {
706                 return false
707         }
708
709         var curAddrFound bool
710         for _, a := range addrs {
711                 if reflect.DeepEqual(ac.curAddr, a) {
712                         curAddrFound = true
713                         break
714                 }
715         }
716         grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
717         if curAddrFound {
718                 ac.addrs = addrs
719         }
720
721         return curAddrFound
722 }
723
724 // GetMethodConfig gets the method config of the input method.
725 // If there's an exact match for input method (i.e. /service/method), we return
726 // the corresponding MethodConfig.
727 // If there isn't an exact match for the input method, we look for the default config
728 // under the service (i.e /service/). If there is a default MethodConfig for
729 // the service, we return it.
730 // Otherwise, we return an empty MethodConfig.
731 func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
732         // TODO: Avoid the locking here.
733         cc.mu.RLock()
734         defer cc.mu.RUnlock()
735         m, ok := cc.sc.Methods[method]
736         if !ok {
737                 i := strings.LastIndex(method, "/")
738                 m = cc.sc.Methods[method[:i+1]]
739         }
740         return m
741 }
742
743 func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
744         cc.mu.RLock()
745         defer cc.mu.RUnlock()
746         return cc.sc.healthCheckConfig
747 }
748
749 func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
750         hdr, _ := metadata.FromOutgoingContext(ctx)
751         t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
752                 FullMethodName: method,
753                 Header:         hdr,
754         })
755         if err != nil {
756                 return nil, nil, toRPCErr(err)
757         }
758         return t, done, nil
759 }
760
761 // handleServiceConfig parses the service config string in JSON format to Go native
762 // struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
763 func (cc *ClientConn) handleServiceConfig(js string) error {
764         if cc.dopts.disableServiceConfig {
765                 return nil
766         }
767         if cc.scRaw == js {
768                 return nil
769         }
770         if channelz.IsOn() {
771                 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
772                         // The special formatting of \"%s\" instead of %q is to provide nice printing of service config
773                         // for human consumption.
774                         Desc:     fmt.Sprintf("Channel has a new service config \"%s\"", js),
775                         Severity: channelz.CtINFO,
776                 })
777         }
778         sc, err := parseServiceConfig(js)
779         if err != nil {
780                 return err
781         }
782         cc.mu.Lock()
783         // Check if the ClientConn is already closed. Some fields (e.g.
784         // balancerWrapper) are set to nil when closing the ClientConn, and could
785         // cause nil pointer panic if we don't have this check.
786         if cc.conns == nil {
787                 cc.mu.Unlock()
788                 return nil
789         }
790         cc.scRaw = js
791         cc.sc = sc
792
793         if sc.retryThrottling != nil {
794                 newThrottler := &retryThrottler{
795                         tokens: sc.retryThrottling.MaxTokens,
796                         max:    sc.retryThrottling.MaxTokens,
797                         thresh: sc.retryThrottling.MaxTokens / 2,
798                         ratio:  sc.retryThrottling.TokenRatio,
799                 }
800                 cc.retryThrottler.Store(newThrottler)
801         } else {
802                 cc.retryThrottler.Store((*retryThrottler)(nil))
803         }
804
805         if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config.
806                 if cc.curBalancerName == grpclbName {
807                         // If current balancer is grpclb, there's at least one grpclb
808                         // balancer address in the resolved list. Don't switch the balancer,
809                         // but change the previous balancer name, so if a new resolved
810                         // address list doesn't contain grpclb address, balancer will be
811                         // switched to *sc.LB.
812                         cc.preBalancerName = *sc.LB
813                 } else {
814                         cc.switchBalancer(*sc.LB)
815                         cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil)
816                 }
817         }
818
819         cc.mu.Unlock()
820         return nil
821 }
822
823 func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
824         cc.mu.RLock()
825         r := cc.resolverWrapper
826         cc.mu.RUnlock()
827         if r == nil {
828                 return
829         }
830         go r.resolveNow(o)
831 }
832
833 // ResetConnectBackoff wakes up all subchannels in transient failure and causes
834 // them to attempt another connection immediately.  It also resets the backoff
835 // times used for subsequent attempts regardless of the current state.
836 //
837 // In general, this function should not be used.  Typical service or network
838 // outages result in a reasonable client reconnection strategy by default.
839 // However, if a previously unavailable network becomes available, this may be
840 // used to trigger an immediate reconnect.
841 //
842 // This API is EXPERIMENTAL.
843 func (cc *ClientConn) ResetConnectBackoff() {
844         cc.mu.Lock()
845         defer cc.mu.Unlock()
846         for ac := range cc.conns {
847                 ac.resetConnectBackoff()
848         }
849 }
850
851 // Close tears down the ClientConn and all underlying connections.
852 func (cc *ClientConn) Close() error {
853         defer cc.cancel()
854
855         cc.mu.Lock()
856         if cc.conns == nil {
857                 cc.mu.Unlock()
858                 return ErrClientConnClosing
859         }
860         conns := cc.conns
861         cc.conns = nil
862         cc.csMgr.updateState(connectivity.Shutdown)
863
864         rWrapper := cc.resolverWrapper
865         cc.resolverWrapper = nil
866         bWrapper := cc.balancerWrapper
867         cc.balancerWrapper = nil
868         cc.mu.Unlock()
869
870         cc.blockingpicker.close()
871
872         if rWrapper != nil {
873                 rWrapper.close()
874         }
875         if bWrapper != nil {
876                 bWrapper.close()
877         }
878
879         for ac := range conns {
880                 ac.tearDown(ErrClientConnClosing)
881         }
882         if channelz.IsOn() {
883                 ted := &channelz.TraceEventDesc{
884                         Desc:     "Channel Deleted",
885                         Severity: channelz.CtINFO,
886                 }
887                 if cc.dopts.channelzParentID != 0 {
888                         ted.Parent = &channelz.TraceEventDesc{
889                                 Desc:     fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
890                                 Severity: channelz.CtINFO,
891                         }
892                 }
893                 channelz.AddTraceEvent(cc.channelzID, ted)
894                 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
895                 // the entity beng deleted, and thus prevent it from being deleted right away.
896                 channelz.RemoveEntry(cc.channelzID)
897         }
898         return nil
899 }
900
901 // addrConn is a network connection to a given address.
902 type addrConn struct {
903         ctx    context.Context
904         cancel context.CancelFunc
905
906         cc     *ClientConn
907         dopts  dialOptions
908         acbw   balancer.SubConn
909         scopts balancer.NewSubConnOptions
910
911         // transport is set when there's a viable transport (note: ac state may not be READY as LB channel
912         // health checking may require server to report healthy to set ac to READY), and is reset
913         // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
914         // is received, transport is closed, ac has been torn down).
915         transport transport.ClientTransport // The current transport.
916
917         mu      sync.Mutex
918         curAddr resolver.Address   // The current address.
919         addrs   []resolver.Address // All addresses that the resolver resolved to.
920
921         // Use updateConnectivityState for updating addrConn's connectivity state.
922         state connectivity.State
923
924         tearDownErr error // The reason this addrConn is torn down.
925
926         backoffIdx   int // Needs to be stateful for resetConnectBackoff.
927         resetBackoff chan struct{}
928
929         channelzID int64 // channelz unique identification number.
930         czData     *channelzData
931 }
932
933 // Note: this requires a lock on ac.mu.
934 func (ac *addrConn) updateConnectivityState(s connectivity.State) {
935         if ac.state == s {
936                 return
937         }
938
939         updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s)
940         ac.state = s
941         if channelz.IsOn() {
942                 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
943                         Desc:     updateMsg,
944                         Severity: channelz.CtINFO,
945                 })
946         }
947         ac.cc.handleSubConnStateChange(ac.acbw, s)
948 }
949
950 // adjustParams updates parameters used to create transports upon
951 // receiving a GoAway.
952 func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
953         switch r {
954         case transport.GoAwayTooManyPings:
955                 v := 2 * ac.dopts.copts.KeepaliveParams.Time
956                 ac.cc.mu.Lock()
957                 if v > ac.cc.mkp.Time {
958                         ac.cc.mkp.Time = v
959                 }
960                 ac.cc.mu.Unlock()
961         }
962 }
963
964 func (ac *addrConn) resetTransport() {
965         for i := 0; ; i++ {
966                 tryNextAddrFromStart := grpcsync.NewEvent()
967
968                 ac.mu.Lock()
969                 if i > 0 {
970                         ac.cc.resolveNow(resolver.ResolveNowOption{})
971                 }
972                 addrs := ac.addrs
973                 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
974
975                 // This will be the duration that dial gets to finish.
976                 dialDuration := getMinConnectTimeout()
977                 if dialDuration < backoffFor {
978                         // Give dial more time as we keep failing to connect.
979                         dialDuration = backoffFor
980                 }
981                 connectDeadline := time.Now().Add(dialDuration)
982                 ac.mu.Unlock()
983
984         addrLoop:
985                 for _, addr := range addrs {
986                         ac.mu.Lock()
987
988                         if ac.state == connectivity.Shutdown {
989                                 ac.mu.Unlock()
990                                 return
991                         }
992                         ac.updateConnectivityState(connectivity.Connecting)
993                         ac.transport = nil
994
995                         ac.cc.mu.RLock()
996                         ac.dopts.copts.KeepaliveParams = ac.cc.mkp
997                         ac.cc.mu.RUnlock()
998
999                         if ac.state == connectivity.Shutdown {
1000                                 ac.mu.Unlock()
1001                                 return
1002                         }
1003
1004                         copts := ac.dopts.copts
1005                         if ac.scopts.CredsBundle != nil {
1006                                 copts.CredsBundle = ac.scopts.CredsBundle
1007                         }
1008                         hctx, hcancel := context.WithCancel(ac.ctx)
1009                         defer hcancel()
1010                         ac.mu.Unlock()
1011
1012                         if channelz.IsOn() {
1013                                 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1014                                         Desc:     fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
1015                                         Severity: channelz.CtINFO,
1016                                 })
1017                         }
1018
1019                         reconnect := grpcsync.NewEvent()
1020                         prefaceReceived := make(chan struct{})
1021                         newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived)
1022                         if err == nil {
1023                                 ac.mu.Lock()
1024                                 ac.curAddr = addr
1025                                 ac.transport = newTr
1026                                 ac.mu.Unlock()
1027
1028                                 healthCheckConfig := ac.cc.healthCheckConfig()
1029                                 // LB channel health checking is only enabled when all the four requirements below are met:
1030                                 // 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
1031                                 // 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
1032                                 // 3. a service config with non-empty healthCheckConfig field is provided,
1033                                 // 4. the current load balancer allows it.
1034                                 healthcheckManagingState := false
1035                                 if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
1036                                         if ac.cc.dopts.healthCheckFunc == nil {
1037                                                 // TODO: add a link to the health check doc in the error message.
1038                                                 grpclog.Error("the client side LB channel health check function has not been set.")
1039                                         } else {
1040                                                 // TODO(deklerk) refactor to just return transport
1041                                                 go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
1042                                                 healthcheckManagingState = true
1043                                         }
1044                                 }
1045                                 if !healthcheckManagingState {
1046                                         ac.mu.Lock()
1047                                         ac.updateConnectivityState(connectivity.Ready)
1048                                         ac.mu.Unlock()
1049                                 }
1050                         } else {
1051                                 hcancel()
1052                                 if err == errConnClosing {
1053                                         return
1054                                 }
1055
1056                                 if tryNextAddrFromStart.HasFired() {
1057                                         break addrLoop
1058                                 }
1059                                 continue
1060                         }
1061
1062                         backoffFor = 0
1063                         ac.mu.Lock()
1064                         reqHandshake := ac.dopts.reqHandshake
1065                         ac.mu.Unlock()
1066
1067                         <-reconnect.Done()
1068                         hcancel()
1069
1070                         if reqHandshake == envconfig.RequireHandshakeHybrid {
1071                                 // In RequireHandshakeHybrid mode, we must check to see whether
1072                                 // server preface has arrived yet to decide whether to start
1073                                 // reconnecting at the top of the list (server preface received)
1074                                 // or continue with the next addr in the list as if the
1075                                 // connection were not successful (server preface not received).
1076                                 select {
1077                                 case <-prefaceReceived:
1078                                         // We received a server preface - huzzah! We consider this
1079                                         // a success and restart from the top of the addr list.
1080                                         ac.mu.Lock()
1081                                         ac.backoffIdx = 0
1082                                         ac.mu.Unlock()
1083                                         break addrLoop
1084                                 default:
1085                                         // Despite having set state to READY, in hybrid mode we
1086                                         // consider this a failure and continue connecting at the
1087                                         // next addr in the list.
1088                                         ac.mu.Lock()
1089                                         if ac.state == connectivity.Shutdown {
1090                                                 ac.mu.Unlock()
1091                                                 return
1092                                         }
1093
1094                                         ac.updateConnectivityState(connectivity.TransientFailure)
1095                                         ac.mu.Unlock()
1096
1097                                         if tryNextAddrFromStart.HasFired() {
1098                                                 break addrLoop
1099                                         }
1100                                 }
1101                         } else {
1102                                 // In RequireHandshakeOn mode, we would have already waited for
1103                                 // the server preface, so we consider this a success and restart
1104                                 // from the top of the addr list. In RequireHandshakeOff mode,
1105                                 // we don't care to wait for the server preface before
1106                                 // considering this a success, so we also restart from the top
1107                                 // of the addr list.
1108                                 ac.mu.Lock()
1109                                 ac.backoffIdx = 0
1110                                 ac.mu.Unlock()
1111                                 break addrLoop
1112                         }
1113                 }
1114
1115                 // After exhausting all addresses, or after need to reconnect after a
1116                 // READY, the addrConn enters TRANSIENT_FAILURE.
1117                 ac.mu.Lock()
1118                 if ac.state == connectivity.Shutdown {
1119                         ac.mu.Unlock()
1120                         return
1121                 }
1122                 ac.updateConnectivityState(connectivity.TransientFailure)
1123
1124                 // Backoff.
1125                 b := ac.resetBackoff
1126                 timer := time.NewTimer(backoffFor)
1127                 acctx := ac.ctx
1128                 ac.mu.Unlock()
1129
1130                 select {
1131                 case <-timer.C:
1132                         ac.mu.Lock()
1133                         ac.backoffIdx++
1134                         ac.mu.Unlock()
1135                 case <-b:
1136                         timer.Stop()
1137                 case <-acctx.Done():
1138                         timer.Stop()
1139                         return
1140                 }
1141         }
1142 }
1143
1144 // createTransport creates a connection to one of the backends in addrs. It
1145 // sets ac.transport in the success case, or it returns an error if it was
1146 // unable to successfully create a transport.
1147 //
1148 // If waitForHandshake is enabled, it blocks until server preface arrives.
1149 func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time, reconnect *grpcsync.Event, prefaceReceived chan struct{}) (transport.ClientTransport, error) {
1150         onCloseCalled := make(chan struct{})
1151
1152         target := transport.TargetInfo{
1153                 Addr:      addr.Addr,
1154                 Metadata:  addr.Metadata,
1155                 Authority: ac.cc.authority,
1156         }
1157
1158         prefaceTimer := time.NewTimer(time.Until(connectDeadline))
1159
1160         onGoAway := func(r transport.GoAwayReason) {
1161                 ac.mu.Lock()
1162                 ac.adjustParams(r)
1163                 ac.mu.Unlock()
1164                 reconnect.Fire()
1165         }
1166
1167         onClose := func() {
1168                 close(onCloseCalled)
1169                 prefaceTimer.Stop()
1170                 reconnect.Fire()
1171         }
1172
1173         onPrefaceReceipt := func() {
1174                 close(prefaceReceived)
1175                 prefaceTimer.Stop()
1176         }
1177
1178         connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1179         defer cancel()
1180         if channelz.IsOn() {
1181                 copts.ChannelzParentID = ac.channelzID
1182         }
1183
1184         newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
1185
1186         if err == nil {
1187                 if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
1188                         select {
1189                         case <-prefaceTimer.C:
1190                                 // We didn't get the preface in time.
1191                                 newTr.Close()
1192                                 err = errors.New("timed out waiting for server handshake")
1193                         case <-prefaceReceived:
1194                                 // We got the preface - huzzah! things are good.
1195                         case <-onCloseCalled:
1196                                 // The transport has already closed - noop.
1197                                 return nil, errors.New("connection closed")
1198                         }
1199                 } else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid {
1200                         go func() {
1201                                 select {
1202                                 case <-prefaceTimer.C:
1203                                         // We didn't get the preface in time.
1204                                         newTr.Close()
1205                                 case <-prefaceReceived:
1206                                         // We got the preface just in the nick of time - huzzah!
1207                                 case <-onCloseCalled:
1208                                         // The transport has already closed - noop.
1209                                 }
1210                         }()
1211                 }
1212         }
1213
1214         if err != nil {
1215                 // newTr is either nil, or closed.
1216                 ac.cc.blockingpicker.updateConnectionError(err)
1217                 ac.mu.Lock()
1218                 if ac.state == connectivity.Shutdown {
1219                         // ac.tearDown(...) has been invoked.
1220                         ac.mu.Unlock()
1221
1222                         return nil, errConnClosing
1223                 }
1224                 ac.mu.Unlock()
1225                 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
1226                 return nil, err
1227         }
1228
1229         // Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
1230         ac.mu.Lock()
1231         if ac.state == connectivity.Shutdown {
1232                 ac.mu.Unlock()
1233                 newTr.Close()
1234                 return nil, errConnClosing
1235         }
1236         ac.mu.Unlock()
1237
1238         // Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
1239         ac.mu.Lock()
1240         if ac.state == connectivity.Shutdown {
1241                 ac.mu.Unlock()
1242                 newTr.Close()
1243                 return nil, errConnClosing
1244         }
1245         ac.mu.Unlock()
1246
1247         return newTr, nil
1248 }
1249
1250 func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
1251         // Set up the health check helper functions
1252         newStream := func() (interface{}, error) {
1253                 return ac.newClientStream(ctx, &StreamDesc{ServerStreams: true}, "/grpc.health.v1.Health/Watch", newTr)
1254         }
1255         firstReady := true
1256         reportHealth := func(ok bool) {
1257                 ac.mu.Lock()
1258                 defer ac.mu.Unlock()
1259                 if ac.transport != newTr {
1260                         return
1261                 }
1262                 if ok {
1263                         if firstReady {
1264                                 firstReady = false
1265                                 ac.curAddr = addr
1266                         }
1267                         ac.updateConnectivityState(connectivity.Ready)
1268                 } else {
1269                         ac.updateConnectivityState(connectivity.TransientFailure)
1270                 }
1271         }
1272         err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName)
1273         if err != nil {
1274                 if status.Code(err) == codes.Unimplemented {
1275                         if channelz.IsOn() {
1276                                 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1277                                         Desc:     "Subchannel health check is unimplemented at server side, thus health check is disabled",
1278                                         Severity: channelz.CtError,
1279                                 })
1280                         }
1281                         grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
1282                 } else {
1283                         grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
1284                 }
1285         }
1286 }
1287
1288 func (ac *addrConn) resetConnectBackoff() {
1289         ac.mu.Lock()
1290         close(ac.resetBackoff)
1291         ac.backoffIdx = 0
1292         ac.resetBackoff = make(chan struct{})
1293         ac.mu.Unlock()
1294 }
1295
1296 // getReadyTransport returns the transport if ac's state is READY.
1297 // Otherwise it returns nil, false.
1298 // If ac's state is IDLE, it will trigger ac to connect.
1299 func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
1300         ac.mu.Lock()
1301         if ac.state == connectivity.Ready && ac.transport != nil {
1302                 t := ac.transport
1303                 ac.mu.Unlock()
1304                 return t, true
1305         }
1306         var idle bool
1307         if ac.state == connectivity.Idle {
1308                 idle = true
1309         }
1310         ac.mu.Unlock()
1311         // Trigger idle ac to connect.
1312         if idle {
1313                 ac.connect()
1314         }
1315         return nil, false
1316 }
1317
1318 // tearDown starts to tear down the addrConn.
1319 // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1320 // some edge cases (e.g., the caller opens and closes many addrConn's in a
1321 // tight loop.
1322 // tearDown doesn't remove ac from ac.cc.conns.
1323 func (ac *addrConn) tearDown(err error) {
1324         ac.mu.Lock()
1325         if ac.state == connectivity.Shutdown {
1326                 ac.mu.Unlock()
1327                 return
1328         }
1329         curTr := ac.transport
1330         ac.transport = nil
1331         // We have to set the state to Shutdown before anything else to prevent races
1332         // between setting the state and logic that waits on context cancelation / etc.
1333         ac.updateConnectivityState(connectivity.Shutdown)
1334         ac.cancel()
1335         ac.tearDownErr = err
1336         ac.curAddr = resolver.Address{}
1337         if err == errConnDrain && curTr != nil {
1338                 // GracefulClose(...) may be executed multiple times when
1339                 // i) receiving multiple GoAway frames from the server; or
1340                 // ii) there are concurrent name resolver/Balancer triggered
1341                 // address removal and GoAway.
1342                 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1343                 ac.mu.Unlock()
1344                 curTr.GracefulClose()
1345                 ac.mu.Lock()
1346         }
1347         if channelz.IsOn() {
1348                 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1349                         Desc:     "Subchannel Deleted",
1350                         Severity: channelz.CtINFO,
1351                         Parent: &channelz.TraceEventDesc{
1352                                 Desc:     fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1353                                 Severity: channelz.CtINFO,
1354                         },
1355                 })
1356                 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1357                 // the entity beng deleted, and thus prevent it from being deleted right away.
1358                 channelz.RemoveEntry(ac.channelzID)
1359         }
1360         ac.mu.Unlock()
1361 }
1362
1363 func (ac *addrConn) getState() connectivity.State {
1364         ac.mu.Lock()
1365         defer ac.mu.Unlock()
1366         return ac.state
1367 }
1368
1369 func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1370         ac.mu.Lock()
1371         addr := ac.curAddr.Addr
1372         ac.mu.Unlock()
1373         return &channelz.ChannelInternalMetric{
1374                 State:                    ac.getState(),
1375                 Target:                   addr,
1376                 CallsStarted:             atomic.LoadInt64(&ac.czData.callsStarted),
1377                 CallsSucceeded:           atomic.LoadInt64(&ac.czData.callsSucceeded),
1378                 CallsFailed:              atomic.LoadInt64(&ac.czData.callsFailed),
1379                 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1380         }
1381 }
1382
1383 func (ac *addrConn) incrCallsStarted() {
1384         atomic.AddInt64(&ac.czData.callsStarted, 1)
1385         atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1386 }
1387
1388 func (ac *addrConn) incrCallsSucceeded() {
1389         atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1390 }
1391
1392 func (ac *addrConn) incrCallsFailed() {
1393         atomic.AddInt64(&ac.czData.callsFailed, 1)
1394 }
1395
1396 type retryThrottler struct {
1397         max    float64
1398         thresh float64
1399         ratio  float64
1400
1401         mu     sync.Mutex
1402         tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1403 }
1404
1405 // throttle subtracts a retry token from the pool and returns whether a retry
1406 // should be throttled (disallowed) based upon the retry throttling policy in
1407 // the service config.
1408 func (rt *retryThrottler) throttle() bool {
1409         if rt == nil {
1410                 return false
1411         }
1412         rt.mu.Lock()
1413         defer rt.mu.Unlock()
1414         rt.tokens--
1415         if rt.tokens < 0 {
1416                 rt.tokens = 0
1417         }
1418         return rt.tokens <= rt.thresh
1419 }
1420
1421 func (rt *retryThrottler) successfulRPC() {
1422         if rt == nil {
1423                 return
1424         }
1425         rt.mu.Lock()
1426         defer rt.mu.Unlock()
1427         rt.tokens += rt.ratio
1428         if rt.tokens > rt.max {
1429                 rt.tokens = rt.max
1430         }
1431 }
1432
1433 type channelzChannel struct {
1434         cc *ClientConn
1435 }
1436
1437 func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1438         return c.cc.channelzMetric()
1439 }
1440
1441 // ErrClientConnTimeout indicates that the ClientConn cannot establish the
1442 // underlying connections within the specified timeout.
1443 //
1444 // Deprecated: This error is never returned by grpc and should not be
1445 // referenced by users.
1446 var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")