3 * Copyright 2014 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
33 "google.golang.org/grpc/balancer"
34 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
35 "google.golang.org/grpc/codes"
36 "google.golang.org/grpc/connectivity"
37 "google.golang.org/grpc/credentials"
38 "google.golang.org/grpc/grpclog"
39 "google.golang.org/grpc/internal/backoff"
40 "google.golang.org/grpc/internal/channelz"
41 "google.golang.org/grpc/internal/envconfig"
42 "google.golang.org/grpc/internal/grpcsync"
43 "google.golang.org/grpc/internal/transport"
44 "google.golang.org/grpc/keepalive"
45 "google.golang.org/grpc/metadata"
46 "google.golang.org/grpc/resolver"
47 _ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
48 _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
49 "google.golang.org/grpc/status"
53 // minimum time to give a connection to complete
54 minConnectTimeout = 20 * time.Second
55 // must match grpclbName in grpclb/grpclb.go
60 // ErrClientConnClosing indicates that the operation is illegal because
61 // the ClientConn is closing.
63 // Deprecated: this error should not be relied upon by users; use the status
64 // code of Canceled instead.
65 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
66 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
67 errConnDrain = errors.New("grpc: the connection is drained")
68 // errConnClosing indicates that the connection is closing.
69 errConnClosing = errors.New("grpc: the connection is closing")
70 // errBalancerClosed indicates that the balancer is closed.
71 errBalancerClosed = errors.New("grpc: balancer is closed")
72 // We use an accessor so that minConnectTimeout can be
73 // atomically read and updated while testing.
74 getMinConnectTimeout = func() time.Duration {
75 return minConnectTimeout
79 // The following errors are returned from Dial and DialContext
81 // errNoTransportSecurity indicates that there is no transport security
82 // being set for ClientConn. Users should either set one or explicitly
83 // call WithInsecure DialOption to disable security.
84 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
85 // errTransportCredsAndBundle indicates that creds bundle is used together
86 // with other individual Transport Credentials.
87 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
88 // errTransportCredentialsMissing indicates that users want to transmit security
89 // information (e.g., OAuth2 token) which requires secure connection on an insecure
91 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
92 // errCredentialsConflict indicates that grpc.WithTransportCredentials()
93 // and grpc.WithInsecure() are both called for a connection.
94 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
98 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
99 defaultClientMaxSendMessageSize = math.MaxInt32
100 // http2IOBufSize specifies the buffer size for sending frames.
101 defaultWriteBufSize = 32 * 1024
102 defaultReadBufSize = 32 * 1024
105 // Dial creates a client connection to the given target.
106 func Dial(target string, opts ...DialOption) (*ClientConn, error) {
107 return DialContext(context.Background(), target, opts...)
110 // DialContext creates a client connection to the given target. By default, it's
111 // a non-blocking dial (the function won't wait for connections to be
112 // established, and connecting happens in the background). To make it a blocking
113 // dial, use WithBlock() dial option.
115 // In the non-blocking case, the ctx does not act against the connection. It
116 // only controls the setup steps.
118 // In the blocking case, ctx can be used to cancel or expire the pending
119 // connection. Once this function returns, the cancellation and expiration of
120 // ctx will be noop. Users should call ClientConn.Close to terminate all the
121 // pending operations after this function returns.
123 // The target name syntax is defined in
124 // https://github.com/grpc/grpc/blob/master/doc/naming.md.
125 // e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
126 func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
129 csMgr: &connectivityStateManager{},
130 conns: make(map[*addrConn]struct{}),
131 dopts: defaultDialOptions(),
132 blockingpicker: newPickerWrapper(),
133 czData: new(channelzData),
134 firstResolveEvent: grpcsync.NewEvent(),
136 cc.retryThrottler.Store((*retryThrottler)(nil))
137 cc.ctx, cc.cancel = context.WithCancel(context.Background())
139 for _, opt := range opts {
144 if cc.dopts.channelzParentID != 0 {
145 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
146 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
147 Desc: "Channel Created",
148 Severity: channelz.CtINFO,
149 Parent: &channelz.TraceEventDesc{
150 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
151 Severity: channelz.CtINFO,
155 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
156 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
157 Desc: "Channel Created",
158 Severity: channelz.CtINFO,
161 cc.csMgr.channelzID = cc.channelzID
164 if !cc.dopts.insecure {
165 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
166 return nil, errNoTransportSecurity
168 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
169 return nil, errTransportCredsAndBundle
172 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
173 return nil, errCredentialsConflict
175 for _, cd := range cc.dopts.copts.PerRPCCredentials {
176 if cd.RequireTransportSecurity() {
177 return nil, errTransportCredentialsMissing
182 cc.mkp = cc.dopts.copts.KeepaliveParams
184 if cc.dopts.copts.Dialer == nil {
185 cc.dopts.copts.Dialer = newProxyDialer(
186 func(ctx context.Context, addr string) (net.Conn, error) {
187 network, addr := parseDialTarget(addr)
188 return (&net.Dialer{}).DialContext(ctx, network, addr)
193 if cc.dopts.copts.UserAgent != "" {
194 cc.dopts.copts.UserAgent += " " + grpcUA
196 cc.dopts.copts.UserAgent = grpcUA
199 if cc.dopts.timeout > 0 {
200 var cancel context.CancelFunc
201 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
208 conn, err = nil, ctx.Err()
218 if cc.dopts.scChan != nil {
219 // Try to get an initial service config.
221 case sc, ok := <-cc.dopts.scChan:
229 if cc.dopts.bs == nil {
230 cc.dopts.bs = backoff.Exponential{
231 MaxDelay: DefaultBackoffConfig.MaxDelay,
234 if cc.dopts.resolverBuilder == nil {
235 // Only try to parse target when resolver builder is not already set.
236 cc.parsedTarget = parseTarget(cc.target)
237 grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
238 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
239 if cc.dopts.resolverBuilder == nil {
240 // If resolver builder is still nil, the parsed target's scheme is
241 // not registered. Fallback to default resolver and set Endpoint to
242 // the original target.
243 grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
244 cc.parsedTarget = resolver.Target{
245 Scheme: resolver.GetDefaultScheme(),
248 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
251 cc.parsedTarget = resolver.Target{Endpoint: target}
253 creds := cc.dopts.copts.TransportCredentials
254 if creds != nil && creds.Info().ServerName != "" {
255 cc.authority = creds.Info().ServerName
256 } else if cc.dopts.insecure && cc.dopts.authority != "" {
257 cc.authority = cc.dopts.authority
259 // Use endpoint from "scheme://authority/endpoint" as the default
260 // authority for ClientConn.
261 cc.authority = cc.parsedTarget.Endpoint
264 if cc.dopts.scChan != nil && !scSet {
265 // Blocking wait for the initial service config.
267 case sc, ok := <-cc.dopts.scChan:
272 return nil, ctx.Err()
275 if cc.dopts.scChan != nil {
279 var credsClone credentials.TransportCredentials
280 if creds := cc.dopts.copts.TransportCredentials; creds != nil {
281 credsClone = creds.Clone()
283 cc.balancerBuildOpts = balancer.BuildOptions{
284 DialCreds: credsClone,
285 CredsBundle: cc.dopts.copts.CredsBundle,
286 Dialer: cc.dopts.copts.Dialer,
287 ChannelzParentID: cc.channelzID,
290 // Build the resolver.
291 rWrapper, err := newCCResolverWrapper(cc)
293 return nil, fmt.Errorf("failed to build resolver: %v", err)
297 cc.resolverWrapper = rWrapper
299 // A blocking dial blocks until the clientConn is ready.
303 if s == connectivity.Ready {
305 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
306 if err = cc.blockingpicker.connectionError(); err != nil {
307 terr, ok := err.(interface {
310 if ok && !terr.Temporary() {
315 if !cc.WaitForStateChange(ctx, s) {
316 // ctx got timeout or canceled.
317 return nil, ctx.Err()
325 // connectivityStateManager keeps the connectivity.State of ClientConn.
326 // This struct will eventually be exported so the balancers can access it.
327 type connectivityStateManager struct {
329 state connectivity.State
330 notifyChan chan struct{}
334 // updateState updates the connectivity.State of ClientConn.
335 // If there's a change it notifies goroutines waiting on state change to
337 func (csm *connectivityStateManager) updateState(state connectivity.State) {
339 defer csm.mu.Unlock()
340 if csm.state == connectivity.Shutdown {
343 if csm.state == state {
348 channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{
349 Desc: fmt.Sprintf("Channel Connectivity change to %v", state),
350 Severity: channelz.CtINFO,
353 if csm.notifyChan != nil {
354 // There are other goroutines waiting on this channel.
355 close(csm.notifyChan)
360 func (csm *connectivityStateManager) getState() connectivity.State {
362 defer csm.mu.Unlock()
366 func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
368 defer csm.mu.Unlock()
369 if csm.notifyChan == nil {
370 csm.notifyChan = make(chan struct{})
372 return csm.notifyChan
375 // ClientConn represents a client connection to an RPC server.
376 type ClientConn struct {
378 cancel context.CancelFunc
381 parsedTarget resolver.Target
384 csMgr *connectivityStateManager
386 balancerBuildOpts balancer.BuildOptions
387 blockingpicker *pickerWrapper
390 resolverWrapper *ccResolverWrapper
393 conns map[*addrConn]struct{}
394 // Keepalive parameter can be updated if a GoAway is received.
395 mkp keepalive.ClientParameters
396 curBalancerName string
397 preBalancerName string // previous balancer name.
398 curAddresses []resolver.Address
399 balancerWrapper *ccBalancerWrapper
400 retryThrottler atomic.Value
402 firstResolveEvent *grpcsync.Event
404 channelzID int64 // channelz unique identification number
408 // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
409 // ctx expires. A true value is returned in former case and false in latter.
410 // This is an EXPERIMENTAL API.
411 func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
412 ch := cc.csMgr.getNotifyChan()
413 if cc.csMgr.getState() != sourceState {
424 // GetState returns the connectivity.State of ClientConn.
425 // This is an EXPERIMENTAL API.
426 func (cc *ClientConn) GetState() connectivity.State {
427 return cc.csMgr.getState()
430 func (cc *ClientConn) scWatcher() {
433 case sc, ok := <-cc.dopts.scChan:
438 // TODO: load balance policy runtime change is ignored.
439 // We may revisit this decision in the future.
443 case <-cc.ctx.Done():
449 // waitForResolvedAddrs blocks until the resolver has provided addresses or the
450 // context expires. Returns nil unless the context expires first; otherwise
451 // returns a status error based on the context.
452 func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
453 // This is on the RPC path, so we use a fast path to avoid the
454 // more-expensive "select" below after the resolver has returned once.
455 if cc.firstResolveEvent.HasFired() {
459 case <-cc.firstResolveEvent.Done():
462 return status.FromContextError(ctx.Err()).Err()
463 case <-cc.ctx.Done():
464 return ErrClientConnClosing
468 func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
476 if reflect.DeepEqual(cc.curAddresses, addrs) {
480 cc.curAddresses = addrs
481 cc.firstResolveEvent.Fire()
483 if cc.dopts.balancerBuilder == nil {
484 // Only look at balancer types and switch balancer if balancer dial
485 // option is not set.
487 for _, a := range addrs {
488 if a.Type == resolver.GRPCLB {
493 var newBalancerName string
495 newBalancerName = grpclbName
497 // Address list doesn't contain grpclb address. Try to pick a
498 // non-grpclb balancer.
499 newBalancerName = cc.curBalancerName
500 // If current balancer is grpclb, switch to the previous one.
501 if newBalancerName == grpclbName {
502 newBalancerName = cc.preBalancerName
504 // The following could be true in two cases:
505 // - the first time handling resolved addresses
506 // (curBalancerName="")
507 // - the first time handling non-grpclb addresses
508 // (curBalancerName="grpclb", preBalancerName="")
509 if newBalancerName == "" {
510 newBalancerName = PickFirstBalancerName
513 cc.switchBalancer(newBalancerName)
514 } else if cc.balancerWrapper == nil {
515 // Balancer dial option was set, and this is the first time handling
516 // resolved addresses. Build a balancer with dopts.balancerBuilder.
517 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
520 cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
523 // switchBalancer starts the switching from current balancer to the balancer
524 // with the given name.
526 // It will NOT send the current address list to the new balancer. If needed,
527 // caller of this function should send address list to the new balancer after
528 // this function returns.
530 // Caller must hold cc.mu.
531 func (cc *ClientConn) switchBalancer(name string) {
536 if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
540 grpclog.Infof("ClientConn switching balancer to %q", name)
541 if cc.dopts.balancerBuilder != nil {
542 grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
545 // TODO(bar switching) change this to two steps: drain and close.
546 // Keep track of sc in wrapper.
547 if cc.balancerWrapper != nil {
548 cc.balancerWrapper.close()
551 builder := balancer.Get(name)
552 // TODO(yuxuanli): If user send a service config that does not contain a valid balancer name, should
553 // we reuse previous one?
556 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
557 Desc: fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName),
558 Severity: channelz.CtWarning,
561 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
562 Desc: fmt.Sprintf("Channel switches to new LB policy %q", name),
563 Severity: channelz.CtINFO,
568 grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
569 builder = newPickfirstBuilder()
572 cc.preBalancerName = cc.curBalancerName
573 cc.curBalancerName = builder.Name()
574 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
577 func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
583 // TODO(bar switching) send updates to all balancer wrappers when balancer
584 // gracefully switching is supported.
585 cc.balancerWrapper.handleSubConnStateChange(sc, s)
589 // newAddrConn creates an addrConn for addrs and adds it to cc.conns.
591 // Caller needs to make sure len(addrs) > 0.
592 func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
598 czData: new(channelzData),
599 resetBackoff: make(chan struct{}),
601 ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
602 // Track ac in cc. This needs to be done before any getTransport(...) is called.
606 return nil, ErrClientConnClosing
609 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
610 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
611 Desc: "Subchannel Created",
612 Severity: channelz.CtINFO,
613 Parent: &channelz.TraceEventDesc{
614 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
615 Severity: channelz.CtINFO,
619 cc.conns[ac] = struct{}{}
624 // removeAddrConn removes the addrConn in the subConn from clientConn.
625 // It also tears down the ac with the given error.
626 func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
637 func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
638 return &channelz.ChannelInternalMetric{
639 State: cc.GetState(),
641 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted),
642 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded),
643 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed),
644 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
648 // Target returns the target string of the ClientConn.
649 // This is an EXPERIMENTAL API.
650 func (cc *ClientConn) Target() string {
654 func (cc *ClientConn) incrCallsStarted() {
655 atomic.AddInt64(&cc.czData.callsStarted, 1)
656 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
659 func (cc *ClientConn) incrCallsSucceeded() {
660 atomic.AddInt64(&cc.czData.callsSucceeded, 1)
663 func (cc *ClientConn) incrCallsFailed() {
664 atomic.AddInt64(&cc.czData.callsFailed, 1)
667 // connect starts creating a transport.
668 // It does nothing if the ac is not IDLE.
669 // TODO(bar) Move this to the addrConn section.
670 func (ac *addrConn) connect() error {
672 if ac.state == connectivity.Shutdown {
674 return errConnClosing
676 if ac.state != connectivity.Idle {
680 ac.updateConnectivityState(connectivity.Connecting)
683 // Start a goroutine connecting to the server asynchronously.
684 go ac.resetTransport()
688 // tryUpdateAddrs tries to update ac.addrs with the new addresses list.
690 // It checks whether current connected address of ac is in the new addrs list.
691 // - If true, it updates ac.addrs and returns true. The ac will keep using
692 // the existing connection.
693 // - If false, it does nothing and returns false.
694 func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
697 grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
698 if ac.state == connectivity.Shutdown {
703 // Unless we're busy reconnecting already, let's reconnect from the top of
705 if ac.state != connectivity.Ready {
709 var curAddrFound bool
710 for _, a := range addrs {
711 if reflect.DeepEqual(ac.curAddr, a) {
716 grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
724 // GetMethodConfig gets the method config of the input method.
725 // If there's an exact match for input method (i.e. /service/method), we return
726 // the corresponding MethodConfig.
727 // If there isn't an exact match for the input method, we look for the default config
728 // under the service (i.e /service/). If there is a default MethodConfig for
729 // the service, we return it.
730 // Otherwise, we return an empty MethodConfig.
731 func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
732 // TODO: Avoid the locking here.
734 defer cc.mu.RUnlock()
735 m, ok := cc.sc.Methods[method]
737 i := strings.LastIndex(method, "/")
738 m = cc.sc.Methods[method[:i+1]]
743 func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
745 defer cc.mu.RUnlock()
746 return cc.sc.healthCheckConfig
749 func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
750 hdr, _ := metadata.FromOutgoingContext(ctx)
751 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
752 FullMethodName: method,
756 return nil, nil, toRPCErr(err)
761 // handleServiceConfig parses the service config string in JSON format to Go native
762 // struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
763 func (cc *ClientConn) handleServiceConfig(js string) error {
764 if cc.dopts.disableServiceConfig {
771 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
772 // The special formatting of \"%s\" instead of %q is to provide nice printing of service config
773 // for human consumption.
774 Desc: fmt.Sprintf("Channel has a new service config \"%s\"", js),
775 Severity: channelz.CtINFO,
778 sc, err := parseServiceConfig(js)
783 // Check if the ClientConn is already closed. Some fields (e.g.
784 // balancerWrapper) are set to nil when closing the ClientConn, and could
785 // cause nil pointer panic if we don't have this check.
793 if sc.retryThrottling != nil {
794 newThrottler := &retryThrottler{
795 tokens: sc.retryThrottling.MaxTokens,
796 max: sc.retryThrottling.MaxTokens,
797 thresh: sc.retryThrottling.MaxTokens / 2,
798 ratio: sc.retryThrottling.TokenRatio,
800 cc.retryThrottler.Store(newThrottler)
802 cc.retryThrottler.Store((*retryThrottler)(nil))
805 if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config.
806 if cc.curBalancerName == grpclbName {
807 // If current balancer is grpclb, there's at least one grpclb
808 // balancer address in the resolved list. Don't switch the balancer,
809 // but change the previous balancer name, so if a new resolved
810 // address list doesn't contain grpclb address, balancer will be
811 // switched to *sc.LB.
812 cc.preBalancerName = *sc.LB
814 cc.switchBalancer(*sc.LB)
815 cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil)
823 func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
825 r := cc.resolverWrapper
833 // ResetConnectBackoff wakes up all subchannels in transient failure and causes
834 // them to attempt another connection immediately. It also resets the backoff
835 // times used for subsequent attempts regardless of the current state.
837 // In general, this function should not be used. Typical service or network
838 // outages result in a reasonable client reconnection strategy by default.
839 // However, if a previously unavailable network becomes available, this may be
840 // used to trigger an immediate reconnect.
842 // This API is EXPERIMENTAL.
843 func (cc *ClientConn) ResetConnectBackoff() {
846 for ac := range cc.conns {
847 ac.resetConnectBackoff()
851 // Close tears down the ClientConn and all underlying connections.
852 func (cc *ClientConn) Close() error {
858 return ErrClientConnClosing
862 cc.csMgr.updateState(connectivity.Shutdown)
864 rWrapper := cc.resolverWrapper
865 cc.resolverWrapper = nil
866 bWrapper := cc.balancerWrapper
867 cc.balancerWrapper = nil
870 cc.blockingpicker.close()
879 for ac := range conns {
880 ac.tearDown(ErrClientConnClosing)
883 ted := &channelz.TraceEventDesc{
884 Desc: "Channel Deleted",
885 Severity: channelz.CtINFO,
887 if cc.dopts.channelzParentID != 0 {
888 ted.Parent = &channelz.TraceEventDesc{
889 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
890 Severity: channelz.CtINFO,
893 channelz.AddTraceEvent(cc.channelzID, ted)
894 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
895 // the entity beng deleted, and thus prevent it from being deleted right away.
896 channelz.RemoveEntry(cc.channelzID)
901 // addrConn is a network connection to a given address.
902 type addrConn struct {
904 cancel context.CancelFunc
908 acbw balancer.SubConn
909 scopts balancer.NewSubConnOptions
911 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel
912 // health checking may require server to report healthy to set ac to READY), and is reset
913 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
914 // is received, transport is closed, ac has been torn down).
915 transport transport.ClientTransport // The current transport.
918 curAddr resolver.Address // The current address.
919 addrs []resolver.Address // All addresses that the resolver resolved to.
921 // Use updateConnectivityState for updating addrConn's connectivity state.
922 state connectivity.State
924 tearDownErr error // The reason this addrConn is torn down.
926 backoffIdx int // Needs to be stateful for resetConnectBackoff.
927 resetBackoff chan struct{}
929 channelzID int64 // channelz unique identification number.
933 // Note: this requires a lock on ac.mu.
934 func (ac *addrConn) updateConnectivityState(s connectivity.State) {
939 updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s)
942 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
944 Severity: channelz.CtINFO,
947 ac.cc.handleSubConnStateChange(ac.acbw, s)
950 // adjustParams updates parameters used to create transports upon
951 // receiving a GoAway.
952 func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
954 case transport.GoAwayTooManyPings:
955 v := 2 * ac.dopts.copts.KeepaliveParams.Time
957 if v > ac.cc.mkp.Time {
964 func (ac *addrConn) resetTransport() {
966 tryNextAddrFromStart := grpcsync.NewEvent()
970 ac.cc.resolveNow(resolver.ResolveNowOption{})
973 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
975 // This will be the duration that dial gets to finish.
976 dialDuration := getMinConnectTimeout()
977 if dialDuration < backoffFor {
978 // Give dial more time as we keep failing to connect.
979 dialDuration = backoffFor
981 connectDeadline := time.Now().Add(dialDuration)
985 for _, addr := range addrs {
988 if ac.state == connectivity.Shutdown {
992 ac.updateConnectivityState(connectivity.Connecting)
996 ac.dopts.copts.KeepaliveParams = ac.cc.mkp
999 if ac.state == connectivity.Shutdown {
1004 copts := ac.dopts.copts
1005 if ac.scopts.CredsBundle != nil {
1006 copts.CredsBundle = ac.scopts.CredsBundle
1008 hctx, hcancel := context.WithCancel(ac.ctx)
1012 if channelz.IsOn() {
1013 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1014 Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
1015 Severity: channelz.CtINFO,
1019 reconnect := grpcsync.NewEvent()
1020 prefaceReceived := make(chan struct{})
1021 newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived)
1025 ac.transport = newTr
1028 healthCheckConfig := ac.cc.healthCheckConfig()
1029 // LB channel health checking is only enabled when all the four requirements below are met:
1030 // 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
1031 // 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
1032 // 3. a service config with non-empty healthCheckConfig field is provided,
1033 // 4. the current load balancer allows it.
1034 healthcheckManagingState := false
1035 if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
1036 if ac.cc.dopts.healthCheckFunc == nil {
1037 // TODO: add a link to the health check doc in the error message.
1038 grpclog.Error("the client side LB channel health check function has not been set.")
1040 // TODO(deklerk) refactor to just return transport
1041 go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
1042 healthcheckManagingState = true
1045 if !healthcheckManagingState {
1047 ac.updateConnectivityState(connectivity.Ready)
1052 if err == errConnClosing {
1056 if tryNextAddrFromStart.HasFired() {
1064 reqHandshake := ac.dopts.reqHandshake
1070 if reqHandshake == envconfig.RequireHandshakeHybrid {
1071 // In RequireHandshakeHybrid mode, we must check to see whether
1072 // server preface has arrived yet to decide whether to start
1073 // reconnecting at the top of the list (server preface received)
1074 // or continue with the next addr in the list as if the
1075 // connection were not successful (server preface not received).
1077 case <-prefaceReceived:
1078 // We received a server preface - huzzah! We consider this
1079 // a success and restart from the top of the addr list.
1085 // Despite having set state to READY, in hybrid mode we
1086 // consider this a failure and continue connecting at the
1087 // next addr in the list.
1089 if ac.state == connectivity.Shutdown {
1094 ac.updateConnectivityState(connectivity.TransientFailure)
1097 if tryNextAddrFromStart.HasFired() {
1102 // In RequireHandshakeOn mode, we would have already waited for
1103 // the server preface, so we consider this a success and restart
1104 // from the top of the addr list. In RequireHandshakeOff mode,
1105 // we don't care to wait for the server preface before
1106 // considering this a success, so we also restart from the top
1107 // of the addr list.
1115 // After exhausting all addresses, or after need to reconnect after a
1116 // READY, the addrConn enters TRANSIENT_FAILURE.
1118 if ac.state == connectivity.Shutdown {
1122 ac.updateConnectivityState(connectivity.TransientFailure)
1125 b := ac.resetBackoff
1126 timer := time.NewTimer(backoffFor)
1137 case <-acctx.Done():
1144 // createTransport creates a connection to one of the backends in addrs. It
1145 // sets ac.transport in the success case, or it returns an error if it was
1146 // unable to successfully create a transport.
1148 // If waitForHandshake is enabled, it blocks until server preface arrives.
1149 func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time, reconnect *grpcsync.Event, prefaceReceived chan struct{}) (transport.ClientTransport, error) {
1150 onCloseCalled := make(chan struct{})
1152 target := transport.TargetInfo{
1154 Metadata: addr.Metadata,
1155 Authority: ac.cc.authority,
1158 prefaceTimer := time.NewTimer(time.Until(connectDeadline))
1160 onGoAway := func(r transport.GoAwayReason) {
1168 close(onCloseCalled)
1173 onPrefaceReceipt := func() {
1174 close(prefaceReceived)
1178 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1180 if channelz.IsOn() {
1181 copts.ChannelzParentID = ac.channelzID
1184 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
1187 if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
1189 case <-prefaceTimer.C:
1190 // We didn't get the preface in time.
1192 err = errors.New("timed out waiting for server handshake")
1193 case <-prefaceReceived:
1194 // We got the preface - huzzah! things are good.
1195 case <-onCloseCalled:
1196 // The transport has already closed - noop.
1197 return nil, errors.New("connection closed")
1199 } else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid {
1202 case <-prefaceTimer.C:
1203 // We didn't get the preface in time.
1205 case <-prefaceReceived:
1206 // We got the preface just in the nick of time - huzzah!
1207 case <-onCloseCalled:
1208 // The transport has already closed - noop.
1215 // newTr is either nil, or closed.
1216 ac.cc.blockingpicker.updateConnectionError(err)
1218 if ac.state == connectivity.Shutdown {
1219 // ac.tearDown(...) has been invoked.
1222 return nil, errConnClosing
1225 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
1229 // Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
1231 if ac.state == connectivity.Shutdown {
1234 return nil, errConnClosing
1238 // Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
1240 if ac.state == connectivity.Shutdown {
1243 return nil, errConnClosing
1250 func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
1251 // Set up the health check helper functions
1252 newStream := func() (interface{}, error) {
1253 return ac.newClientStream(ctx, &StreamDesc{ServerStreams: true}, "/grpc.health.v1.Health/Watch", newTr)
1256 reportHealth := func(ok bool) {
1258 defer ac.mu.Unlock()
1259 if ac.transport != newTr {
1267 ac.updateConnectivityState(connectivity.Ready)
1269 ac.updateConnectivityState(connectivity.TransientFailure)
1272 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName)
1274 if status.Code(err) == codes.Unimplemented {
1275 if channelz.IsOn() {
1276 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1277 Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled",
1278 Severity: channelz.CtError,
1281 grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
1283 grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
1288 func (ac *addrConn) resetConnectBackoff() {
1290 close(ac.resetBackoff)
1292 ac.resetBackoff = make(chan struct{})
1296 // getReadyTransport returns the transport if ac's state is READY.
1297 // Otherwise it returns nil, false.
1298 // If ac's state is IDLE, it will trigger ac to connect.
1299 func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
1301 if ac.state == connectivity.Ready && ac.transport != nil {
1307 if ac.state == connectivity.Idle {
1311 // Trigger idle ac to connect.
1318 // tearDown starts to tear down the addrConn.
1319 // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1320 // some edge cases (e.g., the caller opens and closes many addrConn's in a
1322 // tearDown doesn't remove ac from ac.cc.conns.
1323 func (ac *addrConn) tearDown(err error) {
1325 if ac.state == connectivity.Shutdown {
1329 curTr := ac.transport
1331 // We have to set the state to Shutdown before anything else to prevent races
1332 // between setting the state and logic that waits on context cancelation / etc.
1333 ac.updateConnectivityState(connectivity.Shutdown)
1335 ac.tearDownErr = err
1336 ac.curAddr = resolver.Address{}
1337 if err == errConnDrain && curTr != nil {
1338 // GracefulClose(...) may be executed multiple times when
1339 // i) receiving multiple GoAway frames from the server; or
1340 // ii) there are concurrent name resolver/Balancer triggered
1341 // address removal and GoAway.
1342 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1344 curTr.GracefulClose()
1347 if channelz.IsOn() {
1348 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1349 Desc: "Subchannel Deleted",
1350 Severity: channelz.CtINFO,
1351 Parent: &channelz.TraceEventDesc{
1352 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1353 Severity: channelz.CtINFO,
1356 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1357 // the entity beng deleted, and thus prevent it from being deleted right away.
1358 channelz.RemoveEntry(ac.channelzID)
1363 func (ac *addrConn) getState() connectivity.State {
1365 defer ac.mu.Unlock()
1369 func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1371 addr := ac.curAddr.Addr
1373 return &channelz.ChannelInternalMetric{
1374 State: ac.getState(),
1376 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted),
1377 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded),
1378 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed),
1379 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1383 func (ac *addrConn) incrCallsStarted() {
1384 atomic.AddInt64(&ac.czData.callsStarted, 1)
1385 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1388 func (ac *addrConn) incrCallsSucceeded() {
1389 atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1392 func (ac *addrConn) incrCallsFailed() {
1393 atomic.AddInt64(&ac.czData.callsFailed, 1)
1396 type retryThrottler struct {
1402 tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1405 // throttle subtracts a retry token from the pool and returns whether a retry
1406 // should be throttled (disallowed) based upon the retry throttling policy in
1407 // the service config.
1408 func (rt *retryThrottler) throttle() bool {
1413 defer rt.mu.Unlock()
1418 return rt.tokens <= rt.thresh
1421 func (rt *retryThrottler) successfulRPC() {
1426 defer rt.mu.Unlock()
1427 rt.tokens += rt.ratio
1428 if rt.tokens > rt.max {
1433 type channelzChannel struct {
1437 func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1438 return c.cc.channelzMetric()
1441 // ErrClientConnTimeout indicates that the ClientConn cannot establish the
1442 // underlying connections within the specified timeout.
1444 // Deprecated: This error is never returned by grpc and should not be
1445 // referenced by users.
1446 var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")