Remove BPA from Makefile
[icn.git] / cmd / bpa-operator / vendor / google.golang.org / grpc / server.go
1 /*
2  *
3  * Copyright 2014 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package grpc
20
21 import (
22         "context"
23         "errors"
24         "fmt"
25         "io"
26         "math"
27         "net"
28         "net/http"
29         "reflect"
30         "runtime"
31         "strings"
32         "sync"
33         "sync/atomic"
34         "time"
35
36         "golang.org/x/net/trace"
37
38         "google.golang.org/grpc/codes"
39         "google.golang.org/grpc/credentials"
40         "google.golang.org/grpc/encoding"
41         "google.golang.org/grpc/encoding/proto"
42         "google.golang.org/grpc/grpclog"
43         "google.golang.org/grpc/internal/binarylog"
44         "google.golang.org/grpc/internal/channelz"
45         "google.golang.org/grpc/internal/transport"
46         "google.golang.org/grpc/keepalive"
47         "google.golang.org/grpc/metadata"
48         "google.golang.org/grpc/peer"
49         "google.golang.org/grpc/stats"
50         "google.golang.org/grpc/status"
51         "google.golang.org/grpc/tap"
52 )
53
54 const (
55         defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
56         defaultServerMaxSendMessageSize    = math.MaxInt32
57 )
58
59 type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
60
61 // MethodDesc represents an RPC service's method specification.
62 type MethodDesc struct {
63         MethodName string
64         Handler    methodHandler
65 }
66
67 // ServiceDesc represents an RPC service's specification.
68 type ServiceDesc struct {
69         ServiceName string
70         // The pointer to the service interface. Used to check whether the user
71         // provided implementation satisfies the interface requirements.
72         HandlerType interface{}
73         Methods     []MethodDesc
74         Streams     []StreamDesc
75         Metadata    interface{}
76 }
77
78 // service consists of the information of the server serving this service and
79 // the methods in this service.
80 type service struct {
81         server interface{} // the server for service methods
82         md     map[string]*MethodDesc
83         sd     map[string]*StreamDesc
84         mdata  interface{}
85 }
86
87 // Server is a gRPC server to serve RPC requests.
88 type Server struct {
89         opts options
90
91         mu     sync.Mutex // guards following
92         lis    map[net.Listener]bool
93         conns  map[io.Closer]bool
94         serve  bool
95         drain  bool
96         cv     *sync.Cond          // signaled when connections close for GracefulStop
97         m      map[string]*service // service name -> service info
98         events trace.EventLog
99
100         quit               chan struct{}
101         done               chan struct{}
102         quitOnce           sync.Once
103         doneOnce           sync.Once
104         channelzRemoveOnce sync.Once
105         serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop
106
107         channelzID int64 // channelz unique identification number
108         czData     *channelzData
109 }
110
111 type options struct {
112         creds                 credentials.TransportCredentials
113         codec                 baseCodec
114         cp                    Compressor
115         dc                    Decompressor
116         unaryInt              UnaryServerInterceptor
117         streamInt             StreamServerInterceptor
118         inTapHandle           tap.ServerInHandle
119         statsHandler          stats.Handler
120         maxConcurrentStreams  uint32
121         maxReceiveMessageSize int
122         maxSendMessageSize    int
123         unknownStreamDesc     *StreamDesc
124         keepaliveParams       keepalive.ServerParameters
125         keepalivePolicy       keepalive.EnforcementPolicy
126         initialWindowSize     int32
127         initialConnWindowSize int32
128         writeBufferSize       int
129         readBufferSize        int
130         connectionTimeout     time.Duration
131         maxHeaderListSize     *uint32
132 }
133
134 var defaultServerOptions = options{
135         maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
136         maxSendMessageSize:    defaultServerMaxSendMessageSize,
137         connectionTimeout:     120 * time.Second,
138         writeBufferSize:       defaultWriteBufSize,
139         readBufferSize:        defaultReadBufSize,
140 }
141
142 // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
143 type ServerOption func(*options)
144
145 // WriteBufferSize determines how much data can be batched before doing a write on the wire.
146 // The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
147 // The default value for this buffer is 32KB.
148 // Zero will disable the write buffer such that each write will be on underlying connection.
149 // Note: A Send call may not directly translate to a write.
150 func WriteBufferSize(s int) ServerOption {
151         return func(o *options) {
152                 o.writeBufferSize = s
153         }
154 }
155
156 // ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
157 // for one read syscall.
158 // The default value for this buffer is 32KB.
159 // Zero will disable read buffer for a connection so data framer can access the underlying
160 // conn directly.
161 func ReadBufferSize(s int) ServerOption {
162         return func(o *options) {
163                 o.readBufferSize = s
164         }
165 }
166
167 // InitialWindowSize returns a ServerOption that sets window size for stream.
168 // The lower bound for window size is 64K and any value smaller than that will be ignored.
169 func InitialWindowSize(s int32) ServerOption {
170         return func(o *options) {
171                 o.initialWindowSize = s
172         }
173 }
174
175 // InitialConnWindowSize returns a ServerOption that sets window size for a connection.
176 // The lower bound for window size is 64K and any value smaller than that will be ignored.
177 func InitialConnWindowSize(s int32) ServerOption {
178         return func(o *options) {
179                 o.initialConnWindowSize = s
180         }
181 }
182
183 // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
184 func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
185         if kp.Time > 0 && kp.Time < time.Second {
186                 grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s")
187                 kp.Time = time.Second
188         }
189
190         return func(o *options) {
191                 o.keepaliveParams = kp
192         }
193 }
194
195 // KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
196 func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
197         return func(o *options) {
198                 o.keepalivePolicy = kep
199         }
200 }
201
202 // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
203 //
204 // This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
205 func CustomCodec(codec Codec) ServerOption {
206         return func(o *options) {
207                 o.codec = codec
208         }
209 }
210
211 // RPCCompressor returns a ServerOption that sets a compressor for outbound
212 // messages.  For backward compatibility, all outbound messages will be sent
213 // using this compressor, regardless of incoming message compression.  By
214 // default, server messages will be sent using the same compressor with which
215 // request messages were sent.
216 //
217 // Deprecated: use encoding.RegisterCompressor instead.
218 func RPCCompressor(cp Compressor) ServerOption {
219         return func(o *options) {
220                 o.cp = cp
221         }
222 }
223
224 // RPCDecompressor returns a ServerOption that sets a decompressor for inbound
225 // messages.  It has higher priority than decompressors registered via
226 // encoding.RegisterCompressor.
227 //
228 // Deprecated: use encoding.RegisterCompressor instead.
229 func RPCDecompressor(dc Decompressor) ServerOption {
230         return func(o *options) {
231                 o.dc = dc
232         }
233 }
234
235 // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
236 // If this is not set, gRPC uses the default limit.
237 //
238 // Deprecated: use MaxRecvMsgSize instead.
239 func MaxMsgSize(m int) ServerOption {
240         return MaxRecvMsgSize(m)
241 }
242
243 // MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
244 // If this is not set, gRPC uses the default 4MB.
245 func MaxRecvMsgSize(m int) ServerOption {
246         return func(o *options) {
247                 o.maxReceiveMessageSize = m
248         }
249 }
250
251 // MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
252 // If this is not set, gRPC uses the default `math.MaxInt32`.
253 func MaxSendMsgSize(m int) ServerOption {
254         return func(o *options) {
255                 o.maxSendMessageSize = m
256         }
257 }
258
259 // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
260 // of concurrent streams to each ServerTransport.
261 func MaxConcurrentStreams(n uint32) ServerOption {
262         return func(o *options) {
263                 o.maxConcurrentStreams = n
264         }
265 }
266
267 // Creds returns a ServerOption that sets credentials for server connections.
268 func Creds(c credentials.TransportCredentials) ServerOption {
269         return func(o *options) {
270                 o.creds = c
271         }
272 }
273
274 // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
275 // server. Only one unary interceptor can be installed. The construction of multiple
276 // interceptors (e.g., chaining) can be implemented at the caller.
277 func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
278         return func(o *options) {
279                 if o.unaryInt != nil {
280                         panic("The unary server interceptor was already set and may not be reset.")
281                 }
282                 o.unaryInt = i
283         }
284 }
285
286 // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
287 // server. Only one stream interceptor can be installed.
288 func StreamInterceptor(i StreamServerInterceptor) ServerOption {
289         return func(o *options) {
290                 if o.streamInt != nil {
291                         panic("The stream server interceptor was already set and may not be reset.")
292                 }
293                 o.streamInt = i
294         }
295 }
296
297 // InTapHandle returns a ServerOption that sets the tap handle for all the server
298 // transport to be created. Only one can be installed.
299 func InTapHandle(h tap.ServerInHandle) ServerOption {
300         return func(o *options) {
301                 if o.inTapHandle != nil {
302                         panic("The tap handle was already set and may not be reset.")
303                 }
304                 o.inTapHandle = h
305         }
306 }
307
308 // StatsHandler returns a ServerOption that sets the stats handler for the server.
309 func StatsHandler(h stats.Handler) ServerOption {
310         return func(o *options) {
311                 o.statsHandler = h
312         }
313 }
314
315 // UnknownServiceHandler returns a ServerOption that allows for adding a custom
316 // unknown service handler. The provided method is a bidi-streaming RPC service
317 // handler that will be invoked instead of returning the "unimplemented" gRPC
318 // error whenever a request is received for an unregistered service or method.
319 // The handling function has full access to the Context of the request and the
320 // stream, and the invocation bypasses interceptors.
321 func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
322         return func(o *options) {
323                 o.unknownStreamDesc = &StreamDesc{
324                         StreamName: "unknown_service_handler",
325                         Handler:    streamHandler,
326                         // We need to assume that the users of the streamHandler will want to use both.
327                         ClientStreams: true,
328                         ServerStreams: true,
329                 }
330         }
331 }
332
333 // ConnectionTimeout returns a ServerOption that sets the timeout for
334 // connection establishment (up to and including HTTP/2 handshaking) for all
335 // new connections.  If this is not set, the default is 120 seconds.  A zero or
336 // negative value will result in an immediate timeout.
337 //
338 // This API is EXPERIMENTAL.
339 func ConnectionTimeout(d time.Duration) ServerOption {
340         return func(o *options) {
341                 o.connectionTimeout = d
342         }
343 }
344
345 // MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
346 // of header list that the server is prepared to accept.
347 func MaxHeaderListSize(s uint32) ServerOption {
348         return func(o *options) {
349                 o.maxHeaderListSize = &s
350         }
351 }
352
353 // NewServer creates a gRPC server which has no service registered and has not
354 // started to accept requests yet.
355 func NewServer(opt ...ServerOption) *Server {
356         opts := defaultServerOptions
357         for _, o := range opt {
358                 o(&opts)
359         }
360         s := &Server{
361                 lis:    make(map[net.Listener]bool),
362                 opts:   opts,
363                 conns:  make(map[io.Closer]bool),
364                 m:      make(map[string]*service),
365                 quit:   make(chan struct{}),
366                 done:   make(chan struct{}),
367                 czData: new(channelzData),
368         }
369         s.cv = sync.NewCond(&s.mu)
370         if EnableTracing {
371                 _, file, line, _ := runtime.Caller(1)
372                 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
373         }
374
375         if channelz.IsOn() {
376                 s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
377         }
378         return s
379 }
380
381 // printf records an event in s's event log, unless s has been stopped.
382 // REQUIRES s.mu is held.
383 func (s *Server) printf(format string, a ...interface{}) {
384         if s.events != nil {
385                 s.events.Printf(format, a...)
386         }
387 }
388
389 // errorf records an error in s's event log, unless s has been stopped.
390 // REQUIRES s.mu is held.
391 func (s *Server) errorf(format string, a ...interface{}) {
392         if s.events != nil {
393                 s.events.Errorf(format, a...)
394         }
395 }
396
397 // RegisterService registers a service and its implementation to the gRPC
398 // server. It is called from the IDL generated code. This must be called before
399 // invoking Serve.
400 func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
401         ht := reflect.TypeOf(sd.HandlerType).Elem()
402         st := reflect.TypeOf(ss)
403         if !st.Implements(ht) {
404                 grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
405         }
406         s.register(sd, ss)
407 }
408
409 func (s *Server) register(sd *ServiceDesc, ss interface{}) {
410         s.mu.Lock()
411         defer s.mu.Unlock()
412         s.printf("RegisterService(%q)", sd.ServiceName)
413         if s.serve {
414                 grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
415         }
416         if _, ok := s.m[sd.ServiceName]; ok {
417                 grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
418         }
419         srv := &service{
420                 server: ss,
421                 md:     make(map[string]*MethodDesc),
422                 sd:     make(map[string]*StreamDesc),
423                 mdata:  sd.Metadata,
424         }
425         for i := range sd.Methods {
426                 d := &sd.Methods[i]
427                 srv.md[d.MethodName] = d
428         }
429         for i := range sd.Streams {
430                 d := &sd.Streams[i]
431                 srv.sd[d.StreamName] = d
432         }
433         s.m[sd.ServiceName] = srv
434 }
435
436 // MethodInfo contains the information of an RPC including its method name and type.
437 type MethodInfo struct {
438         // Name is the method name only, without the service name or package name.
439         Name string
440         // IsClientStream indicates whether the RPC is a client streaming RPC.
441         IsClientStream bool
442         // IsServerStream indicates whether the RPC is a server streaming RPC.
443         IsServerStream bool
444 }
445
446 // ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
447 type ServiceInfo struct {
448         Methods []MethodInfo
449         // Metadata is the metadata specified in ServiceDesc when registering service.
450         Metadata interface{}
451 }
452
453 // GetServiceInfo returns a map from service names to ServiceInfo.
454 // Service names include the package names, in the form of <package>.<service>.
455 func (s *Server) GetServiceInfo() map[string]ServiceInfo {
456         ret := make(map[string]ServiceInfo)
457         for n, srv := range s.m {
458                 methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
459                 for m := range srv.md {
460                         methods = append(methods, MethodInfo{
461                                 Name:           m,
462                                 IsClientStream: false,
463                                 IsServerStream: false,
464                         })
465                 }
466                 for m, d := range srv.sd {
467                         methods = append(methods, MethodInfo{
468                                 Name:           m,
469                                 IsClientStream: d.ClientStreams,
470                                 IsServerStream: d.ServerStreams,
471                         })
472                 }
473
474                 ret[n] = ServiceInfo{
475                         Methods:  methods,
476                         Metadata: srv.mdata,
477                 }
478         }
479         return ret
480 }
481
482 // ErrServerStopped indicates that the operation is now illegal because of
483 // the server being stopped.
484 var ErrServerStopped = errors.New("grpc: the server has been stopped")
485
486 func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
487         if s.opts.creds == nil {
488                 return rawConn, nil, nil
489         }
490         return s.opts.creds.ServerHandshake(rawConn)
491 }
492
493 type listenSocket struct {
494         net.Listener
495         channelzID int64
496 }
497
498 func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
499         return &channelz.SocketInternalMetric{
500                 SocketOptions: channelz.GetSocketOption(l.Listener),
501                 LocalAddr:     l.Listener.Addr(),
502         }
503 }
504
505 func (l *listenSocket) Close() error {
506         err := l.Listener.Close()
507         if channelz.IsOn() {
508                 channelz.RemoveEntry(l.channelzID)
509         }
510         return err
511 }
512
513 // Serve accepts incoming connections on the listener lis, creating a new
514 // ServerTransport and service goroutine for each. The service goroutines
515 // read gRPC requests and then call the registered handlers to reply to them.
516 // Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
517 // this method returns.
518 // Serve will return a non-nil error unless Stop or GracefulStop is called.
519 func (s *Server) Serve(lis net.Listener) error {
520         s.mu.Lock()
521         s.printf("serving")
522         s.serve = true
523         if s.lis == nil {
524                 // Serve called after Stop or GracefulStop.
525                 s.mu.Unlock()
526                 lis.Close()
527                 return ErrServerStopped
528         }
529
530         s.serveWG.Add(1)
531         defer func() {
532                 s.serveWG.Done()
533                 select {
534                 // Stop or GracefulStop called; block until done and return nil.
535                 case <-s.quit:
536                         <-s.done
537                 default:
538                 }
539         }()
540
541         ls := &listenSocket{Listener: lis}
542         s.lis[ls] = true
543
544         if channelz.IsOn() {
545                 ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
546         }
547         s.mu.Unlock()
548
549         defer func() {
550                 s.mu.Lock()
551                 if s.lis != nil && s.lis[ls] {
552                         ls.Close()
553                         delete(s.lis, ls)
554                 }
555                 s.mu.Unlock()
556         }()
557
558         var tempDelay time.Duration // how long to sleep on accept failure
559
560         for {
561                 rawConn, err := lis.Accept()
562                 if err != nil {
563                         if ne, ok := err.(interface {
564                                 Temporary() bool
565                         }); ok && ne.Temporary() {
566                                 if tempDelay == 0 {
567                                         tempDelay = 5 * time.Millisecond
568                                 } else {
569                                         tempDelay *= 2
570                                 }
571                                 if max := 1 * time.Second; tempDelay > max {
572                                         tempDelay = max
573                                 }
574                                 s.mu.Lock()
575                                 s.printf("Accept error: %v; retrying in %v", err, tempDelay)
576                                 s.mu.Unlock()
577                                 timer := time.NewTimer(tempDelay)
578                                 select {
579                                 case <-timer.C:
580                                 case <-s.quit:
581                                         timer.Stop()
582                                         return nil
583                                 }
584                                 continue
585                         }
586                         s.mu.Lock()
587                         s.printf("done serving; Accept = %v", err)
588                         s.mu.Unlock()
589
590                         select {
591                         case <-s.quit:
592                                 return nil
593                         default:
594                         }
595                         return err
596                 }
597                 tempDelay = 0
598                 // Start a new goroutine to deal with rawConn so we don't stall this Accept
599                 // loop goroutine.
600                 //
601                 // Make sure we account for the goroutine so GracefulStop doesn't nil out
602                 // s.conns before this conn can be added.
603                 s.serveWG.Add(1)
604                 go func() {
605                         s.handleRawConn(rawConn)
606                         s.serveWG.Done()
607                 }()
608         }
609 }
610
611 // handleRawConn forks a goroutine to handle a just-accepted connection that
612 // has not had any I/O performed on it yet.
613 func (s *Server) handleRawConn(rawConn net.Conn) {
614         rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
615         conn, authInfo, err := s.useTransportAuthenticator(rawConn)
616         if err != nil {
617                 s.mu.Lock()
618                 s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
619                 s.mu.Unlock()
620                 grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
621                 // If serverHandshake returns ErrConnDispatched, keep rawConn open.
622                 if err != credentials.ErrConnDispatched {
623                         rawConn.Close()
624                 }
625                 rawConn.SetDeadline(time.Time{})
626                 return
627         }
628
629         s.mu.Lock()
630         if s.conns == nil {
631                 s.mu.Unlock()
632                 conn.Close()
633                 return
634         }
635         s.mu.Unlock()
636
637         // Finish handshaking (HTTP2)
638         st := s.newHTTP2Transport(conn, authInfo)
639         if st == nil {
640                 return
641         }
642
643         rawConn.SetDeadline(time.Time{})
644         if !s.addConn(st) {
645                 return
646         }
647         go func() {
648                 s.serveStreams(st)
649                 s.removeConn(st)
650         }()
651 }
652
653 // newHTTP2Transport sets up a http/2 transport (using the
654 // gRPC http2 server transport in transport/http2_server.go).
655 func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
656         config := &transport.ServerConfig{
657                 MaxStreams:            s.opts.maxConcurrentStreams,
658                 AuthInfo:              authInfo,
659                 InTapHandle:           s.opts.inTapHandle,
660                 StatsHandler:          s.opts.statsHandler,
661                 KeepaliveParams:       s.opts.keepaliveParams,
662                 KeepalivePolicy:       s.opts.keepalivePolicy,
663                 InitialWindowSize:     s.opts.initialWindowSize,
664                 InitialConnWindowSize: s.opts.initialConnWindowSize,
665                 WriteBufferSize:       s.opts.writeBufferSize,
666                 ReadBufferSize:        s.opts.readBufferSize,
667                 ChannelzParentID:      s.channelzID,
668                 MaxHeaderListSize:     s.opts.maxHeaderListSize,
669         }
670         st, err := transport.NewServerTransport("http2", c, config)
671         if err != nil {
672                 s.mu.Lock()
673                 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
674                 s.mu.Unlock()
675                 c.Close()
676                 grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
677                 return nil
678         }
679
680         return st
681 }
682
683 func (s *Server) serveStreams(st transport.ServerTransport) {
684         defer st.Close()
685         var wg sync.WaitGroup
686         st.HandleStreams(func(stream *transport.Stream) {
687                 wg.Add(1)
688                 go func() {
689                         defer wg.Done()
690                         s.handleStream(st, stream, s.traceInfo(st, stream))
691                 }()
692         }, func(ctx context.Context, method string) context.Context {
693                 if !EnableTracing {
694                         return ctx
695                 }
696                 tr := trace.New("grpc.Recv."+methodFamily(method), method)
697                 return trace.NewContext(ctx, tr)
698         })
699         wg.Wait()
700 }
701
702 var _ http.Handler = (*Server)(nil)
703
704 // ServeHTTP implements the Go standard library's http.Handler
705 // interface by responding to the gRPC request r, by looking up
706 // the requested gRPC method in the gRPC server s.
707 //
708 // The provided HTTP request must have arrived on an HTTP/2
709 // connection. When using the Go standard library's server,
710 // practically this means that the Request must also have arrived
711 // over TLS.
712 //
713 // To share one port (such as 443 for https) between gRPC and an
714 // existing http.Handler, use a root http.Handler such as:
715 //
716 //   if r.ProtoMajor == 2 && strings.HasPrefix(
717 //      r.Header.Get("Content-Type"), "application/grpc") {
718 //      grpcServer.ServeHTTP(w, r)
719 //   } else {
720 //      yourMux.ServeHTTP(w, r)
721 //   }
722 //
723 // Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
724 // separate from grpc-go's HTTP/2 server. Performance and features may vary
725 // between the two paths. ServeHTTP does not support some gRPC features
726 // available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
727 // and subject to change.
728 func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
729         st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
730         if err != nil {
731                 http.Error(w, err.Error(), http.StatusInternalServerError)
732                 return
733         }
734         if !s.addConn(st) {
735                 return
736         }
737         defer s.removeConn(st)
738         s.serveStreams(st)
739 }
740
741 // traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
742 // If tracing is not enabled, it returns nil.
743 func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
744         tr, ok := trace.FromContext(stream.Context())
745         if !ok {
746                 return nil
747         }
748
749         trInfo = &traceInfo{
750                 tr: tr,
751         }
752         trInfo.firstLine.client = false
753         trInfo.firstLine.remoteAddr = st.RemoteAddr()
754
755         if dl, ok := stream.Context().Deadline(); ok {
756                 trInfo.firstLine.deadline = time.Until(dl)
757         }
758         return trInfo
759 }
760
761 func (s *Server) addConn(c io.Closer) bool {
762         s.mu.Lock()
763         defer s.mu.Unlock()
764         if s.conns == nil {
765                 c.Close()
766                 return false
767         }
768         if s.drain {
769                 // Transport added after we drained our existing conns: drain it
770                 // immediately.
771                 c.(transport.ServerTransport).Drain()
772         }
773         s.conns[c] = true
774         return true
775 }
776
777 func (s *Server) removeConn(c io.Closer) {
778         s.mu.Lock()
779         defer s.mu.Unlock()
780         if s.conns != nil {
781                 delete(s.conns, c)
782                 s.cv.Broadcast()
783         }
784 }
785
786 func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
787         return &channelz.ServerInternalMetric{
788                 CallsStarted:             atomic.LoadInt64(&s.czData.callsStarted),
789                 CallsSucceeded:           atomic.LoadInt64(&s.czData.callsSucceeded),
790                 CallsFailed:              atomic.LoadInt64(&s.czData.callsFailed),
791                 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
792         }
793 }
794
795 func (s *Server) incrCallsStarted() {
796         atomic.AddInt64(&s.czData.callsStarted, 1)
797         atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
798 }
799
800 func (s *Server) incrCallsSucceeded() {
801         atomic.AddInt64(&s.czData.callsSucceeded, 1)
802 }
803
804 func (s *Server) incrCallsFailed() {
805         atomic.AddInt64(&s.czData.callsFailed, 1)
806 }
807
808 func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
809         data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
810         if err != nil {
811                 grpclog.Errorln("grpc: server failed to encode response: ", err)
812                 return err
813         }
814         compData, err := compress(data, cp, comp)
815         if err != nil {
816                 grpclog.Errorln("grpc: server failed to compress response: ", err)
817                 return err
818         }
819         hdr, payload := msgHeader(data, compData)
820         // TODO(dfawley): should we be checking len(data) instead?
821         if len(payload) > s.opts.maxSendMessageSize {
822                 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
823         }
824         err = t.Write(stream, hdr, payload, opts)
825         if err == nil && s.opts.statsHandler != nil {
826                 s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
827         }
828         return err
829 }
830
831 func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
832         if channelz.IsOn() {
833                 s.incrCallsStarted()
834                 defer func() {
835                         if err != nil && err != io.EOF {
836                                 s.incrCallsFailed()
837                         } else {
838                                 s.incrCallsSucceeded()
839                         }
840                 }()
841         }
842         sh := s.opts.statsHandler
843         if sh != nil {
844                 beginTime := time.Now()
845                 begin := &stats.Begin{
846                         BeginTime: beginTime,
847                 }
848                 sh.HandleRPC(stream.Context(), begin)
849                 defer func() {
850                         end := &stats.End{
851                                 BeginTime: beginTime,
852                                 EndTime:   time.Now(),
853                         }
854                         if err != nil && err != io.EOF {
855                                 end.Error = toRPCErr(err)
856                         }
857                         sh.HandleRPC(stream.Context(), end)
858                 }()
859         }
860         if trInfo != nil {
861                 defer trInfo.tr.Finish()
862                 trInfo.firstLine.client = false
863                 trInfo.tr.LazyLog(&trInfo.firstLine, false)
864                 defer func() {
865                         if err != nil && err != io.EOF {
866                                 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
867                                 trInfo.tr.SetError()
868                         }
869                 }()
870         }
871
872         binlog := binarylog.GetMethodLogger(stream.Method())
873         if binlog != nil {
874                 ctx := stream.Context()
875                 md, _ := metadata.FromIncomingContext(ctx)
876                 logEntry := &binarylog.ClientHeader{
877                         Header:     md,
878                         MethodName: stream.Method(),
879                         PeerAddr:   nil,
880                 }
881                 if deadline, ok := ctx.Deadline(); ok {
882                         logEntry.Timeout = time.Until(deadline)
883                         if logEntry.Timeout < 0 {
884                                 logEntry.Timeout = 0
885                         }
886                 }
887                 if a := md[":authority"]; len(a) > 0 {
888                         logEntry.Authority = a[0]
889                 }
890                 if peer, ok := peer.FromContext(ctx); ok {
891                         logEntry.PeerAddr = peer.Addr
892                 }
893                 binlog.Log(logEntry)
894         }
895
896         // comp and cp are used for compression.  decomp and dc are used for
897         // decompression.  If comp and decomp are both set, they are the same;
898         // however they are kept separate to ensure that at most one of the
899         // compressor/decompressor variable pairs are set for use later.
900         var comp, decomp encoding.Compressor
901         var cp Compressor
902         var dc Decompressor
903
904         // If dc is set and matches the stream's compression, use it.  Otherwise, try
905         // to find a matching registered compressor for decomp.
906         if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
907                 dc = s.opts.dc
908         } else if rc != "" && rc != encoding.Identity {
909                 decomp = encoding.GetCompressor(rc)
910                 if decomp == nil {
911                         st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
912                         t.WriteStatus(stream, st)
913                         return st.Err()
914                 }
915         }
916
917         // If cp is set, use it.  Otherwise, attempt to compress the response using
918         // the incoming message compression method.
919         //
920         // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
921         if s.opts.cp != nil {
922                 cp = s.opts.cp
923                 stream.SetSendCompress(cp.Type())
924         } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
925                 // Legacy compressor not specified; attempt to respond with same encoding.
926                 comp = encoding.GetCompressor(rc)
927                 if comp != nil {
928                         stream.SetSendCompress(rc)
929                 }
930         }
931
932         var payInfo *payloadInfo
933         if sh != nil || binlog != nil {
934                 payInfo = &payloadInfo{}
935         }
936         d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
937         if err != nil {
938                 if st, ok := status.FromError(err); ok {
939                         if e := t.WriteStatus(stream, st); e != nil {
940                                 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
941                         }
942                 }
943                 return err
944         }
945         if channelz.IsOn() {
946                 t.IncrMsgRecv()
947         }
948         df := func(v interface{}) error {
949                 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
950                         return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
951                 }
952                 if sh != nil {
953                         sh.HandleRPC(stream.Context(), &stats.InPayload{
954                                 RecvTime: time.Now(),
955                                 Payload:  v,
956                                 Data:     d,
957                                 Length:   len(d),
958                         })
959                 }
960                 if binlog != nil {
961                         binlog.Log(&binarylog.ClientMessage{
962                                 Message: d,
963                         })
964                 }
965                 if trInfo != nil {
966                         trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
967                 }
968                 return nil
969         }
970         ctx := NewContextWithServerTransportStream(stream.Context(), stream)
971         reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
972         if appErr != nil {
973                 appStatus, ok := status.FromError(appErr)
974                 if !ok {
975                         // Convert appErr if it is not a grpc status error.
976                         appErr = status.Error(codes.Unknown, appErr.Error())
977                         appStatus, _ = status.FromError(appErr)
978                 }
979                 if trInfo != nil {
980                         trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
981                         trInfo.tr.SetError()
982                 }
983                 if e := t.WriteStatus(stream, appStatus); e != nil {
984                         grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
985                 }
986                 if binlog != nil {
987                         if h, _ := stream.Header(); h.Len() > 0 {
988                                 // Only log serverHeader if there was header. Otherwise it can
989                                 // be trailer only.
990                                 binlog.Log(&binarylog.ServerHeader{
991                                         Header: h,
992                                 })
993                         }
994                         binlog.Log(&binarylog.ServerTrailer{
995                                 Trailer: stream.Trailer(),
996                                 Err:     appErr,
997                         })
998                 }
999                 return appErr
1000         }
1001         if trInfo != nil {
1002                 trInfo.tr.LazyLog(stringer("OK"), false)
1003         }
1004         opts := &transport.Options{Last: true}
1005
1006         if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
1007                 if err == io.EOF {
1008                         // The entire stream is done (for unary RPC only).
1009                         return err
1010                 }
1011                 if s, ok := status.FromError(err); ok {
1012                         if e := t.WriteStatus(stream, s); e != nil {
1013                                 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
1014                         }
1015                 } else {
1016                         switch st := err.(type) {
1017                         case transport.ConnectionError:
1018                                 // Nothing to do here.
1019                         default:
1020                                 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1021                         }
1022                 }
1023                 if binlog != nil {
1024                         h, _ := stream.Header()
1025                         binlog.Log(&binarylog.ServerHeader{
1026                                 Header: h,
1027                         })
1028                         binlog.Log(&binarylog.ServerTrailer{
1029                                 Trailer: stream.Trailer(),
1030                                 Err:     appErr,
1031                         })
1032                 }
1033                 return err
1034         }
1035         if binlog != nil {
1036                 h, _ := stream.Header()
1037                 binlog.Log(&binarylog.ServerHeader{
1038                         Header: h,
1039                 })
1040                 binlog.Log(&binarylog.ServerMessage{
1041                         Message: reply,
1042                 })
1043         }
1044         if channelz.IsOn() {
1045                 t.IncrMsgSent()
1046         }
1047         if trInfo != nil {
1048                 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1049         }
1050         // TODO: Should we be logging if writing status failed here, like above?
1051         // Should the logging be in WriteStatus?  Should we ignore the WriteStatus
1052         // error or allow the stats handler to see it?
1053         err = t.WriteStatus(stream, status.New(codes.OK, ""))
1054         if binlog != nil {
1055                 binlog.Log(&binarylog.ServerTrailer{
1056                         Trailer: stream.Trailer(),
1057                         Err:     appErr,
1058                 })
1059         }
1060         return err
1061 }
1062
1063 func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
1064         if channelz.IsOn() {
1065                 s.incrCallsStarted()
1066                 defer func() {
1067                         if err != nil && err != io.EOF {
1068                                 s.incrCallsFailed()
1069                         } else {
1070                                 s.incrCallsSucceeded()
1071                         }
1072                 }()
1073         }
1074         sh := s.opts.statsHandler
1075         if sh != nil {
1076                 beginTime := time.Now()
1077                 begin := &stats.Begin{
1078                         BeginTime: beginTime,
1079                 }
1080                 sh.HandleRPC(stream.Context(), begin)
1081                 defer func() {
1082                         end := &stats.End{
1083                                 BeginTime: beginTime,
1084                                 EndTime:   time.Now(),
1085                         }
1086                         if err != nil && err != io.EOF {
1087                                 end.Error = toRPCErr(err)
1088                         }
1089                         sh.HandleRPC(stream.Context(), end)
1090                 }()
1091         }
1092         ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1093         ss := &serverStream{
1094                 ctx:                   ctx,
1095                 t:                     t,
1096                 s:                     stream,
1097                 p:                     &parser{r: stream},
1098                 codec:                 s.getCodec(stream.ContentSubtype()),
1099                 maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1100                 maxSendMessageSize:    s.opts.maxSendMessageSize,
1101                 trInfo:                trInfo,
1102                 statsHandler:          sh,
1103         }
1104
1105         ss.binlog = binarylog.GetMethodLogger(stream.Method())
1106         if ss.binlog != nil {
1107                 md, _ := metadata.FromIncomingContext(ctx)
1108                 logEntry := &binarylog.ClientHeader{
1109                         Header:     md,
1110                         MethodName: stream.Method(),
1111                         PeerAddr:   nil,
1112                 }
1113                 if deadline, ok := ctx.Deadline(); ok {
1114                         logEntry.Timeout = time.Until(deadline)
1115                         if logEntry.Timeout < 0 {
1116                                 logEntry.Timeout = 0
1117                         }
1118                 }
1119                 if a := md[":authority"]; len(a) > 0 {
1120                         logEntry.Authority = a[0]
1121                 }
1122                 if peer, ok := peer.FromContext(ss.Context()); ok {
1123                         logEntry.PeerAddr = peer.Addr
1124                 }
1125                 ss.binlog.Log(logEntry)
1126         }
1127
1128         // If dc is set and matches the stream's compression, use it.  Otherwise, try
1129         // to find a matching registered compressor for decomp.
1130         if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1131                 ss.dc = s.opts.dc
1132         } else if rc != "" && rc != encoding.Identity {
1133                 ss.decomp = encoding.GetCompressor(rc)
1134                 if ss.decomp == nil {
1135                         st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1136                         t.WriteStatus(ss.s, st)
1137                         return st.Err()
1138                 }
1139         }
1140
1141         // If cp is set, use it.  Otherwise, attempt to compress the response using
1142         // the incoming message compression method.
1143         //
1144         // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1145         if s.opts.cp != nil {
1146                 ss.cp = s.opts.cp
1147                 stream.SetSendCompress(s.opts.cp.Type())
1148         } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1149                 // Legacy compressor not specified; attempt to respond with same encoding.
1150                 ss.comp = encoding.GetCompressor(rc)
1151                 if ss.comp != nil {
1152                         stream.SetSendCompress(rc)
1153                 }
1154         }
1155
1156         if trInfo != nil {
1157                 trInfo.tr.LazyLog(&trInfo.firstLine, false)
1158                 defer func() {
1159                         ss.mu.Lock()
1160                         if err != nil && err != io.EOF {
1161                                 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1162                                 ss.trInfo.tr.SetError()
1163                         }
1164                         ss.trInfo.tr.Finish()
1165                         ss.trInfo.tr = nil
1166                         ss.mu.Unlock()
1167                 }()
1168         }
1169         var appErr error
1170         var server interface{}
1171         if srv != nil {
1172                 server = srv.server
1173         }
1174         if s.opts.streamInt == nil {
1175                 appErr = sd.Handler(server, ss)
1176         } else {
1177                 info := &StreamServerInfo{
1178                         FullMethod:     stream.Method(),
1179                         IsClientStream: sd.ClientStreams,
1180                         IsServerStream: sd.ServerStreams,
1181                 }
1182                 appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1183         }
1184         if appErr != nil {
1185                 appStatus, ok := status.FromError(appErr)
1186                 if !ok {
1187                         appStatus = status.New(codes.Unknown, appErr.Error())
1188                         appErr = appStatus.Err()
1189                 }
1190                 if trInfo != nil {
1191                         ss.mu.Lock()
1192                         ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1193                         ss.trInfo.tr.SetError()
1194                         ss.mu.Unlock()
1195                 }
1196                 t.WriteStatus(ss.s, appStatus)
1197                 if ss.binlog != nil {
1198                         ss.binlog.Log(&binarylog.ServerTrailer{
1199                                 Trailer: ss.s.Trailer(),
1200                                 Err:     appErr,
1201                         })
1202                 }
1203                 // TODO: Should we log an error from WriteStatus here and below?
1204                 return appErr
1205         }
1206         if trInfo != nil {
1207                 ss.mu.Lock()
1208                 ss.trInfo.tr.LazyLog(stringer("OK"), false)
1209                 ss.mu.Unlock()
1210         }
1211         err = t.WriteStatus(ss.s, status.New(codes.OK, ""))
1212         if ss.binlog != nil {
1213                 ss.binlog.Log(&binarylog.ServerTrailer{
1214                         Trailer: ss.s.Trailer(),
1215                         Err:     appErr,
1216                 })
1217         }
1218         return err
1219 }
1220
1221 func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
1222         sm := stream.Method()
1223         if sm != "" && sm[0] == '/' {
1224                 sm = sm[1:]
1225         }
1226         pos := strings.LastIndex(sm, "/")
1227         if pos == -1 {
1228                 if trInfo != nil {
1229                         trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
1230                         trInfo.tr.SetError()
1231                 }
1232                 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1233                 if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
1234                         if trInfo != nil {
1235                                 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1236                                 trInfo.tr.SetError()
1237                         }
1238                         grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1239                 }
1240                 if trInfo != nil {
1241                         trInfo.tr.Finish()
1242                 }
1243                 return
1244         }
1245         service := sm[:pos]
1246         method := sm[pos+1:]
1247
1248         if srv, ok := s.m[service]; ok {
1249                 if md, ok := srv.md[method]; ok {
1250                         s.processUnaryRPC(t, stream, srv, md, trInfo)
1251                         return
1252                 }
1253                 if sd, ok := srv.sd[method]; ok {
1254                         s.processStreamingRPC(t, stream, srv, sd, trInfo)
1255                         return
1256                 }
1257         }
1258         // Unknown service, or known server unknown method.
1259         if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1260                 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1261                 return
1262         }
1263         if trInfo != nil {
1264                 trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
1265                 trInfo.tr.SetError()
1266         }
1267         errDesc := fmt.Sprintf("unknown service %v", service)
1268         if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1269                 if trInfo != nil {
1270                         trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1271                         trInfo.tr.SetError()
1272                 }
1273                 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1274         }
1275         if trInfo != nil {
1276                 trInfo.tr.Finish()
1277         }
1278 }
1279
1280 // The key to save ServerTransportStream in the context.
1281 type streamKey struct{}
1282
1283 // NewContextWithServerTransportStream creates a new context from ctx and
1284 // attaches stream to it.
1285 //
1286 // This API is EXPERIMENTAL.
1287 func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1288         return context.WithValue(ctx, streamKey{}, stream)
1289 }
1290
1291 // ServerTransportStream is a minimal interface that a transport stream must
1292 // implement. This can be used to mock an actual transport stream for tests of
1293 // handler code that use, for example, grpc.SetHeader (which requires some
1294 // stream to be in context).
1295 //
1296 // See also NewContextWithServerTransportStream.
1297 //
1298 // This API is EXPERIMENTAL.
1299 type ServerTransportStream interface {
1300         Method() string
1301         SetHeader(md metadata.MD) error
1302         SendHeader(md metadata.MD) error
1303         SetTrailer(md metadata.MD) error
1304 }
1305
1306 // ServerTransportStreamFromContext returns the ServerTransportStream saved in
1307 // ctx. Returns nil if the given context has no stream associated with it
1308 // (which implies it is not an RPC invocation context).
1309 //
1310 // This API is EXPERIMENTAL.
1311 func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1312         s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1313         return s
1314 }
1315
1316 // Stop stops the gRPC server. It immediately closes all open
1317 // connections and listeners.
1318 // It cancels all active RPCs on the server side and the corresponding
1319 // pending RPCs on the client side will get notified by connection
1320 // errors.
1321 func (s *Server) Stop() {
1322         s.quitOnce.Do(func() {
1323                 close(s.quit)
1324         })
1325
1326         defer func() {
1327                 s.serveWG.Wait()
1328                 s.doneOnce.Do(func() {
1329                         close(s.done)
1330                 })
1331         }()
1332
1333         s.channelzRemoveOnce.Do(func() {
1334                 if channelz.IsOn() {
1335                         channelz.RemoveEntry(s.channelzID)
1336                 }
1337         })
1338
1339         s.mu.Lock()
1340         listeners := s.lis
1341         s.lis = nil
1342         st := s.conns
1343         s.conns = nil
1344         // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
1345         s.cv.Broadcast()
1346         s.mu.Unlock()
1347
1348         for lis := range listeners {
1349                 lis.Close()
1350         }
1351         for c := range st {
1352                 c.Close()
1353         }
1354
1355         s.mu.Lock()
1356         if s.events != nil {
1357                 s.events.Finish()
1358                 s.events = nil
1359         }
1360         s.mu.Unlock()
1361 }
1362
1363 // GracefulStop stops the gRPC server gracefully. It stops the server from
1364 // accepting new connections and RPCs and blocks until all the pending RPCs are
1365 // finished.
1366 func (s *Server) GracefulStop() {
1367         s.quitOnce.Do(func() {
1368                 close(s.quit)
1369         })
1370
1371         defer func() {
1372                 s.doneOnce.Do(func() {
1373                         close(s.done)
1374                 })
1375         }()
1376
1377         s.channelzRemoveOnce.Do(func() {
1378                 if channelz.IsOn() {
1379                         channelz.RemoveEntry(s.channelzID)
1380                 }
1381         })
1382         s.mu.Lock()
1383         if s.conns == nil {
1384                 s.mu.Unlock()
1385                 return
1386         }
1387
1388         for lis := range s.lis {
1389                 lis.Close()
1390         }
1391         s.lis = nil
1392         if !s.drain {
1393                 for c := range s.conns {
1394                         c.(transport.ServerTransport).Drain()
1395                 }
1396                 s.drain = true
1397         }
1398
1399         // Wait for serving threads to be ready to exit.  Only then can we be sure no
1400         // new conns will be created.
1401         s.mu.Unlock()
1402         s.serveWG.Wait()
1403         s.mu.Lock()
1404
1405         for len(s.conns) != 0 {
1406                 s.cv.Wait()
1407         }
1408         s.conns = nil
1409         if s.events != nil {
1410                 s.events.Finish()
1411                 s.events = nil
1412         }
1413         s.mu.Unlock()
1414 }
1415
1416 // contentSubtype must be lowercase
1417 // cannot return nil
1418 func (s *Server) getCodec(contentSubtype string) baseCodec {
1419         if s.opts.codec != nil {
1420                 return s.opts.codec
1421         }
1422         if contentSubtype == "" {
1423                 return encoding.GetCodec(proto.Name)
1424         }
1425         codec := encoding.GetCodec(contentSubtype)
1426         if codec == nil {
1427                 return encoding.GetCodec(proto.Name)
1428         }
1429         return codec
1430 }
1431
1432 // SetHeader sets the header metadata.
1433 // When called multiple times, all the provided metadata will be merged.
1434 // All the metadata will be sent out when one of the following happens:
1435 //  - grpc.SendHeader() is called;
1436 //  - The first response is sent out;
1437 //  - An RPC status is sent out (error or success).
1438 func SetHeader(ctx context.Context, md metadata.MD) error {
1439         if md.Len() == 0 {
1440                 return nil
1441         }
1442         stream := ServerTransportStreamFromContext(ctx)
1443         if stream == nil {
1444                 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1445         }
1446         return stream.SetHeader(md)
1447 }
1448
1449 // SendHeader sends header metadata. It may be called at most once.
1450 // The provided md and headers set by SetHeader() will be sent.
1451 func SendHeader(ctx context.Context, md metadata.MD) error {
1452         stream := ServerTransportStreamFromContext(ctx)
1453         if stream == nil {
1454                 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1455         }
1456         if err := stream.SendHeader(md); err != nil {
1457                 return toRPCErr(err)
1458         }
1459         return nil
1460 }
1461
1462 // SetTrailer sets the trailer metadata that will be sent when an RPC returns.
1463 // When called more than once, all the provided metadata will be merged.
1464 func SetTrailer(ctx context.Context, md metadata.MD) error {
1465         if md.Len() == 0 {
1466                 return nil
1467         }
1468         stream := ServerTransportStreamFromContext(ctx)
1469         if stream == nil {
1470                 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1471         }
1472         return stream.SetTrailer(md)
1473 }
1474
1475 // Method returns the method string for the server context.  The returned
1476 // string is in the format of "/service/method".
1477 func Method(ctx context.Context) (string, bool) {
1478         s := ServerTransportStreamFromContext(ctx)
1479         if s == nil {
1480                 return "", false
1481         }
1482         return s.Method(), true
1483 }
1484
1485 type channelzServer struct {
1486         s *Server
1487 }
1488
1489 func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
1490         return c.s.channelzMetric()
1491 }