summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/server.go')
-rw-r--r--vendor/google.golang.org/grpc/server.go1780
1 files changed, 1780 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
new file mode 100644
index 000000000..968eb598e
--- /dev/null
+++ b/vendor/google.golang.org/grpc/server.go
@@ -0,0 +1,1780 @@
+/*
+ *
+ * Copyright 2014 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package grpc
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "math"
+ "net"
+ "net/http"
+ "reflect"
+ "runtime"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "golang.org/x/net/trace"
+
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/encoding"
+ "google.golang.org/grpc/encoding/proto"
+ "google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/internal/binarylog"
+ "google.golang.org/grpc/internal/channelz"
+ "google.golang.org/grpc/internal/grpcrand"
+ "google.golang.org/grpc/internal/grpcsync"
+ "google.golang.org/grpc/internal/transport"
+ "google.golang.org/grpc/keepalive"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/peer"
+ "google.golang.org/grpc/stats"
+ "google.golang.org/grpc/status"
+ "google.golang.org/grpc/tap"
+)
+
+const (
+ defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
+ defaultServerMaxSendMessageSize = math.MaxInt32
+)
+
+var statusOK = status.New(codes.OK, "")
+var logger = grpclog.Component("core")
+
+type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
+
+// MethodDesc represents an RPC service's method specification.
+type MethodDesc struct {
+ MethodName string
+ Handler methodHandler
+}
+
+// ServiceDesc represents an RPC service's specification.
+type ServiceDesc struct {
+ ServiceName string
+ // The pointer to the service interface. Used to check whether the user
+ // provided implementation satisfies the interface requirements.
+ HandlerType interface{}
+ Methods []MethodDesc
+ Streams []StreamDesc
+ Metadata interface{}
+}
+
+// serviceInfo wraps information about a service. It is very similar to
+// ServiceDesc and is constructed from it for internal purposes.
+type serviceInfo struct {
+ // Contains the implementation for the methods in this service.
+ serviceImpl interface{}
+ methods map[string]*MethodDesc
+ streams map[string]*StreamDesc
+ mdata interface{}
+}
+
+type serverWorkerData struct {
+ st transport.ServerTransport
+ wg *sync.WaitGroup
+ stream *transport.Stream
+}
+
+// Server is a gRPC server to serve RPC requests.
+type Server struct {
+ opts serverOptions
+
+ mu sync.Mutex // guards following
+ lis map[net.Listener]bool
+ conns map[transport.ServerTransport]bool
+ serve bool
+ drain bool
+ cv *sync.Cond // signaled when connections close for GracefulStop
+ services map[string]*serviceInfo // service name -> service info
+ events trace.EventLog
+
+ quit *grpcsync.Event
+ done *grpcsync.Event
+ channelzRemoveOnce sync.Once
+ serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
+
+ channelzID int64 // channelz unique identification number
+ czData *channelzData
+
+ serverWorkerChannels []chan *serverWorkerData
+}
+
+type serverOptions struct {
+ creds credentials.TransportCredentials
+ codec baseCodec
+ cp Compressor
+ dc Decompressor
+ unaryInt UnaryServerInterceptor
+ streamInt StreamServerInterceptor
+ chainUnaryInts []UnaryServerInterceptor
+ chainStreamInts []StreamServerInterceptor
+ inTapHandle tap.ServerInHandle
+ statsHandler stats.Handler
+ maxConcurrentStreams uint32
+ maxReceiveMessageSize int
+ maxSendMessageSize int
+ unknownStreamDesc *StreamDesc
+ keepaliveParams keepalive.ServerParameters
+ keepalivePolicy keepalive.EnforcementPolicy
+ initialWindowSize int32
+ initialConnWindowSize int32
+ writeBufferSize int
+ readBufferSize int
+ connectionTimeout time.Duration
+ maxHeaderListSize *uint32
+ headerTableSize *uint32
+ numServerWorkers uint32
+}
+
+var defaultServerOptions = serverOptions{
+ maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
+ maxSendMessageSize: defaultServerMaxSendMessageSize,
+ connectionTimeout: 120 * time.Second,
+ writeBufferSize: defaultWriteBufSize,
+ readBufferSize: defaultReadBufSize,
+}
+
+// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
+type ServerOption interface {
+ apply(*serverOptions)
+}
+
+// EmptyServerOption does not alter the server configuration. It can be embedded
+// in another structure to build custom server options.
+//
+// Experimental
+//
+// Notice: This type is EXPERIMENTAL and may be changed or removed in a
+// later release.
+type EmptyServerOption struct{}
+
+func (EmptyServerOption) apply(*serverOptions) {}
+
+// funcServerOption wraps a function that modifies serverOptions into an
+// implementation of the ServerOption interface.
+type funcServerOption struct {
+ f func(*serverOptions)
+}
+
+func (fdo *funcServerOption) apply(do *serverOptions) {
+ fdo.f(do)
+}
+
+func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
+ return &funcServerOption{
+ f: f,
+ }
+}
+
+// WriteBufferSize determines how much data can be batched before doing a write on the wire.
+// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
+// The default value for this buffer is 32KB.
+// Zero will disable the write buffer such that each write will be on underlying connection.
+// Note: A Send call may not directly translate to a write.
+func WriteBufferSize(s int) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.writeBufferSize = s
+ })
+}
+
+// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
+// for one read syscall.
+// The default value for this buffer is 32KB.
+// Zero will disable read buffer for a connection so data framer can access the underlying
+// conn directly.
+func ReadBufferSize(s int) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.readBufferSize = s
+ })
+}
+
+// InitialWindowSize returns a ServerOption that sets window size for stream.
+// The lower bound for window size is 64K and any value smaller than that will be ignored.
+func InitialWindowSize(s int32) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.initialWindowSize = s
+ })
+}
+
+// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
+// The lower bound for window size is 64K and any value smaller than that will be ignored.
+func InitialConnWindowSize(s int32) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.initialConnWindowSize = s
+ })
+}
+
+// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
+func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
+ if kp.Time > 0 && kp.Time < time.Second {
+ logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
+ kp.Time = time.Second
+ }
+
+ return newFuncServerOption(func(o *serverOptions) {
+ o.keepaliveParams = kp
+ })
+}
+
+// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
+func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.keepalivePolicy = kep
+ })
+}
+
+// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
+//
+// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
+//
+// Deprecated: register codecs using encoding.RegisterCodec. The server will
+// automatically use registered codecs based on the incoming requests' headers.
+// See also
+// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
+// Will be supported throughout 1.x.
+func CustomCodec(codec Codec) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.codec = codec
+ })
+}
+
+// RPCCompressor returns a ServerOption that sets a compressor for outbound
+// messages. For backward compatibility, all outbound messages will be sent
+// using this compressor, regardless of incoming message compression. By
+// default, server messages will be sent using the same compressor with which
+// request messages were sent.
+//
+// Deprecated: use encoding.RegisterCompressor instead. Will be supported
+// throughout 1.x.
+func RPCCompressor(cp Compressor) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.cp = cp
+ })
+}
+
+// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
+// messages. It has higher priority than decompressors registered via
+// encoding.RegisterCompressor.
+//
+// Deprecated: use encoding.RegisterCompressor instead. Will be supported
+// throughout 1.x.
+func RPCDecompressor(dc Decompressor) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.dc = dc
+ })
+}
+
+// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
+// If this is not set, gRPC uses the default limit.
+//
+// Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x.
+func MaxMsgSize(m int) ServerOption {
+ return MaxRecvMsgSize(m)
+}
+
+// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
+// If this is not set, gRPC uses the default 4MB.
+func MaxRecvMsgSize(m int) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.maxReceiveMessageSize = m
+ })
+}
+
+// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
+// If this is not set, gRPC uses the default `math.MaxInt32`.
+func MaxSendMsgSize(m int) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.maxSendMessageSize = m
+ })
+}
+
+// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
+// of concurrent streams to each ServerTransport.
+func MaxConcurrentStreams(n uint32) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.maxConcurrentStreams = n
+ })
+}
+
+// Creds returns a ServerOption that sets credentials for server connections.
+func Creds(c credentials.TransportCredentials) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.creds = c
+ })
+}
+
+// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
+// server. Only one unary interceptor can be installed. The construction of multiple
+// interceptors (e.g., chaining) can be implemented at the caller.
+func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ if o.unaryInt != nil {
+ panic("The unary server interceptor was already set and may not be reset.")
+ }
+ o.unaryInt = i
+ })
+}
+
+// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
+// for unary RPCs. The first interceptor will be the outer most,
+// while the last interceptor will be the inner most wrapper around the real call.
+// All unary interceptors added by this method will be chained.
+func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
+ })
+}
+
+// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
+// server. Only one stream interceptor can be installed.
+func StreamInterceptor(i StreamServerInterceptor) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ if o.streamInt != nil {
+ panic("The stream server interceptor was already set and may not be reset.")
+ }
+ o.streamInt = i
+ })
+}
+
+// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
+// for streaming RPCs. The first interceptor will be the outer most,
+// while the last interceptor will be the inner most wrapper around the real call.
+// All stream interceptors added by this method will be chained.
+func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.chainStreamInts = append(o.chainStreamInts, interceptors...)
+ })
+}
+
+// InTapHandle returns a ServerOption that sets the tap handle for all the server
+// transport to be created. Only one can be installed.
+func InTapHandle(h tap.ServerInHandle) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ if o.inTapHandle != nil {
+ panic("The tap handle was already set and may not be reset.")
+ }
+ o.inTapHandle = h
+ })
+}
+
+// StatsHandler returns a ServerOption that sets the stats handler for the server.
+func StatsHandler(h stats.Handler) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.statsHandler = h
+ })
+}
+
+// UnknownServiceHandler returns a ServerOption that allows for adding a custom
+// unknown service handler. The provided method is a bidi-streaming RPC service
+// handler that will be invoked instead of returning the "unimplemented" gRPC
+// error whenever a request is received for an unregistered service or method.
+// The handling function and stream interceptor (if set) have full access to
+// the ServerStream, including its Context.
+func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.unknownStreamDesc = &StreamDesc{
+ StreamName: "unknown_service_handler",
+ Handler: streamHandler,
+ // We need to assume that the users of the streamHandler will want to use both.
+ ClientStreams: true,
+ ServerStreams: true,
+ }
+ })
+}
+
+// ConnectionTimeout returns a ServerOption that sets the timeout for
+// connection establishment (up to and including HTTP/2 handshaking) for all
+// new connections. If this is not set, the default is 120 seconds. A zero or
+// negative value will result in an immediate timeout.
+//
+// Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
+func ConnectionTimeout(d time.Duration) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.connectionTimeout = d
+ })
+}
+
+// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
+// of header list that the server is prepared to accept.
+func MaxHeaderListSize(s uint32) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.maxHeaderListSize = &s
+ })
+}
+
+// HeaderTableSize returns a ServerOption that sets the size of dynamic
+// header table for stream.
+//
+// Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
+func HeaderTableSize(s uint32) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.headerTableSize = &s
+ })
+}
+
+// NumStreamWorkers returns a ServerOption that sets the number of worker
+// goroutines that should be used to process incoming streams. Setting this to
+// zero (default) will disable workers and spawn a new goroutine for each
+// stream.
+//
+// Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
+func NumStreamWorkers(numServerWorkers uint32) ServerOption {
+ // TODO: If/when this API gets stabilized (i.e. stream workers become the
+ // only way streams are processed), change the behavior of the zero value to
+ // a sane default. Preliminary experiments suggest that a value equal to the
+ // number of CPUs available is most performant; requires thorough testing.
+ return newFuncServerOption(func(o *serverOptions) {
+ o.numServerWorkers = numServerWorkers
+ })
+}
+
+// serverWorkerResetThreshold defines how often the stack must be reset. Every
+// N requests, by spawning a new goroutine in its place, a worker can reset its
+// stack so that large stacks don't live in memory forever. 2^16 should allow
+// each goroutine stack to live for at least a few seconds in a typical
+// workload (assuming a QPS of a few thousand requests/sec).
+const serverWorkerResetThreshold = 1 << 16
+
+// serverWorkers blocks on a *transport.Stream channel forever and waits for
+// data to be fed by serveStreams. This allows different requests to be
+// processed by the same goroutine, removing the need for expensive stack
+// re-allocations (see the runtime.morestack problem [1]).
+//
+// [1] https://github.com/golang/go/issues/18138
+func (s *Server) serverWorker(ch chan *serverWorkerData) {
+ // To make sure all server workers don't reset at the same time, choose a
+ // random number of iterations before resetting.
+ threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
+ for completed := 0; completed < threshold; completed++ {
+ data, ok := <-ch
+ if !ok {
+ return
+ }
+ s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
+ data.wg.Done()
+ }
+ go s.serverWorker(ch)
+}
+
+// initServerWorkers creates worker goroutines and channels to process incoming
+// connections to reduce the time spent overall on runtime.morestack.
+func (s *Server) initServerWorkers() {
+ s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
+ for i := uint32(0); i < s.opts.numServerWorkers; i++ {
+ s.serverWorkerChannels[i] = make(chan *serverWorkerData)
+ go s.serverWorker(s.serverWorkerChannels[i])
+ }
+}
+
+func (s *Server) stopServerWorkers() {
+ for i := uint32(0); i < s.opts.numServerWorkers; i++ {
+ close(s.serverWorkerChannels[i])
+ }
+}
+
+// NewServer creates a gRPC server which has no service registered and has not
+// started to accept requests yet.
+func NewServer(opt ...ServerOption) *Server {
+ opts := defaultServerOptions
+ for _, o := range opt {
+ o.apply(&opts)
+ }
+ s := &Server{
+ lis: make(map[net.Listener]bool),
+ opts: opts,
+ conns: make(map[transport.ServerTransport]bool),
+ services: make(map[string]*serviceInfo),
+ quit: grpcsync.NewEvent(),
+ done: grpcsync.NewEvent(),
+ czData: new(channelzData),
+ }
+ chainUnaryServerInterceptors(s)
+ chainStreamServerInterceptors(s)
+ s.cv = sync.NewCond(&s.mu)
+ if EnableTracing {
+ _, file, line, _ := runtime.Caller(1)
+ s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
+ }
+
+ if s.opts.numServerWorkers > 0 {
+ s.initServerWorkers()
+ }
+
+ if channelz.IsOn() {
+ s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
+ }
+ return s
+}
+
+// printf records an event in s's event log, unless s has been stopped.
+// REQUIRES s.mu is held.
+func (s *Server) printf(format string, a ...interface{}) {
+ if s.events != nil {
+ s.events.Printf(format, a...)
+ }
+}
+
+// errorf records an error in s's event log, unless s has been stopped.
+// REQUIRES s.mu is held.
+func (s *Server) errorf(format string, a ...interface{}) {
+ if s.events != nil {
+ s.events.Errorf(format, a...)
+ }
+}
+
+// ServiceRegistrar wraps a single method that supports service registration. It
+// enables users to pass concrete types other than grpc.Server to the service
+// registration methods exported by the IDL generated code.
+type ServiceRegistrar interface {
+ // RegisterService registers a service and its implementation to the
+ // concrete type implementing this interface. It may not be called
+ // once the server has started serving.
+ // desc describes the service and its methods and handlers. impl is the
+ // service implementation which is passed to the method handlers.
+ RegisterService(desc *ServiceDesc, impl interface{})
+}
+
+// RegisterService registers a service and its implementation to the gRPC
+// server. It is called from the IDL generated code. This must be called before
+// invoking Serve. If ss is non-nil (for legacy code), its type is checked to
+// ensure it implements sd.HandlerType.
+func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
+ if ss != nil {
+ ht := reflect.TypeOf(sd.HandlerType).Elem()
+ st := reflect.TypeOf(ss)
+ if !st.Implements(ht) {
+ logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
+ }
+ }
+ s.register(sd, ss)
+}
+
+func (s *Server) register(sd *ServiceDesc, ss interface{}) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.printf("RegisterService(%q)", sd.ServiceName)
+ if s.serve {
+ logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
+ }
+ if _, ok := s.services[sd.ServiceName]; ok {
+ logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
+ }
+ info := &serviceInfo{
+ serviceImpl: ss,
+ methods: make(map[string]*MethodDesc),
+ streams: make(map[string]*StreamDesc),
+ mdata: sd.Metadata,
+ }
+ for i := range sd.Methods {
+ d := &sd.Methods[i]
+ info.methods[d.MethodName] = d
+ }
+ for i := range sd.Streams {
+ d := &sd.Streams[i]
+ info.streams[d.StreamName] = d
+ }
+ s.services[sd.ServiceName] = info
+}
+
+// MethodInfo contains the information of an RPC including its method name and type.
+type MethodInfo struct {
+ // Name is the method name only, without the service name or package name.
+ Name string
+ // IsClientStream indicates whether the RPC is a client streaming RPC.
+ IsClientStream bool
+ // IsServerStream indicates whether the RPC is a server streaming RPC.
+ IsServerStream bool
+}
+
+// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
+type ServiceInfo struct {
+ Methods []MethodInfo
+ // Metadata is the metadata specified in ServiceDesc when registering service.
+ Metadata interface{}
+}
+
+// GetServiceInfo returns a map from service names to ServiceInfo.
+// Service names include the package names, in the form of <package>.<service>.
+func (s *Server) GetServiceInfo() map[string]ServiceInfo {
+ ret := make(map[string]ServiceInfo)
+ for n, srv := range s.services {
+ methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams))
+ for m := range srv.methods {
+ methods = append(methods, MethodInfo{
+ Name: m,
+ IsClientStream: false,
+ IsServerStream: false,
+ })
+ }
+ for m, d := range srv.streams {
+ methods = append(methods, MethodInfo{
+ Name: m,
+ IsClientStream: d.ClientStreams,
+ IsServerStream: d.ServerStreams,
+ })
+ }
+
+ ret[n] = ServiceInfo{
+ Methods: methods,
+ Metadata: srv.mdata,
+ }
+ }
+ return ret
+}
+
+// ErrServerStopped indicates that the operation is now illegal because of
+// the server being stopped.
+var ErrServerStopped = errors.New("grpc: the server has been stopped")
+
+func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
+ if s.opts.creds == nil {
+ return rawConn, nil, nil
+ }
+ return s.opts.creds.ServerHandshake(rawConn)
+}
+
+type listenSocket struct {
+ net.Listener
+ channelzID int64
+}
+
+func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
+ return &channelz.SocketInternalMetric{
+ SocketOptions: channelz.GetSocketOption(l.Listener),
+ LocalAddr: l.Listener.Addr(),
+ }
+}
+
+func (l *listenSocket) Close() error {
+ err := l.Listener.Close()
+ if channelz.IsOn() {
+ channelz.RemoveEntry(l.channelzID)
+ }
+ return err
+}
+
+// Serve accepts incoming connections on the listener lis, creating a new
+// ServerTransport and service goroutine for each. The service goroutines
+// read gRPC requests and then call the registered handlers to reply to them.
+// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
+// this method returns.
+// Serve will return a non-nil error unless Stop or GracefulStop is called.
+func (s *Server) Serve(lis net.Listener) error {
+ s.mu.Lock()
+ s.printf("serving")
+ s.serve = true
+ if s.lis == nil {
+ // Serve called after Stop or GracefulStop.
+ s.mu.Unlock()
+ lis.Close()
+ return ErrServerStopped
+ }
+
+ s.serveWG.Add(1)
+ defer func() {
+ s.serveWG.Done()
+ if s.quit.HasFired() {
+ // Stop or GracefulStop called; block until done and return nil.
+ <-s.done.Done()
+ }
+ }()
+
+ ls := &listenSocket{Listener: lis}
+ s.lis[ls] = true
+
+ if channelz.IsOn() {
+ ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
+ }
+ s.mu.Unlock()
+
+ defer func() {
+ s.mu.Lock()
+ if s.lis != nil && s.lis[ls] {
+ ls.Close()
+ delete(s.lis, ls)
+ }
+ s.mu.Unlock()
+ }()
+
+ var tempDelay time.Duration // how long to sleep on accept failure
+
+ for {
+ rawConn, err := lis.Accept()
+ if err != nil {
+ if ne, ok := err.(interface {
+ Temporary() bool
+ }); ok && ne.Temporary() {
+ if tempDelay == 0 {
+ tempDelay = 5 * time.Millisecond
+ } else {
+ tempDelay *= 2
+ }
+ if max := 1 * time.Second; tempDelay > max {
+ tempDelay = max
+ }
+ s.mu.Lock()
+ s.printf("Accept error: %v; retrying in %v", err, tempDelay)
+ s.mu.Unlock()
+ timer := time.NewTimer(tempDelay)
+ select {
+ case <-timer.C:
+ case <-s.quit.Done():
+ timer.Stop()
+ return nil
+ }
+ continue
+ }
+ s.mu.Lock()
+ s.printf("done serving; Accept = %v", err)
+ s.mu.Unlock()
+
+ if s.quit.HasFired() {
+ return nil
+ }
+ return err
+ }
+ tempDelay = 0
+ // Start a new goroutine to deal with rawConn so we don't stall this Accept
+ // loop goroutine.
+ //
+ // Make sure we account for the goroutine so GracefulStop doesn't nil out
+ // s.conns before this conn can be added.
+ s.serveWG.Add(1)
+ go func() {
+ s.handleRawConn(rawConn)
+ s.serveWG.Done()
+ }()
+ }
+}
+
+// handleRawConn forks a goroutine to handle a just-accepted connection that
+// has not had any I/O performed on it yet.
+func (s *Server) handleRawConn(rawConn net.Conn) {
+ if s.quit.HasFired() {
+ rawConn.Close()
+ return
+ }
+ rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
+ conn, authInfo, err := s.useTransportAuthenticator(rawConn)
+ if err != nil {
+ // ErrConnDispatched means that the connection was dispatched away from
+ // gRPC; those connections should be left open.
+ if err != credentials.ErrConnDispatched {
+ s.mu.Lock()
+ s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
+ s.mu.Unlock()
+ channelz.Warningf(logger, s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
+ rawConn.Close()
+ }
+ rawConn.SetDeadline(time.Time{})
+ return
+ }
+
+ // Finish handshaking (HTTP2)
+ st := s.newHTTP2Transport(conn, authInfo)
+ if st == nil {
+ return
+ }
+
+ rawConn.SetDeadline(time.Time{})
+ if !s.addConn(st) {
+ return
+ }
+ go func() {
+ s.serveStreams(st)
+ s.removeConn(st)
+ }()
+}
+
+// newHTTP2Transport sets up a http/2 transport (using the
+// gRPC http2 server transport in transport/http2_server.go).
+func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
+ config := &transport.ServerConfig{
+ MaxStreams: s.opts.maxConcurrentStreams,
+ AuthInfo: authInfo,
+ InTapHandle: s.opts.inTapHandle,
+ StatsHandler: s.opts.statsHandler,
+ KeepaliveParams: s.opts.keepaliveParams,
+ KeepalivePolicy: s.opts.keepalivePolicy,
+ InitialWindowSize: s.opts.initialWindowSize,
+ InitialConnWindowSize: s.opts.initialConnWindowSize,
+ WriteBufferSize: s.opts.writeBufferSize,
+ ReadBufferSize: s.opts.readBufferSize,
+ ChannelzParentID: s.channelzID,
+ MaxHeaderListSize: s.opts.maxHeaderListSize,
+ HeaderTableSize: s.opts.headerTableSize,
+ }
+ st, err := transport.NewServerTransport("http2", c, config)
+ if err != nil {
+ s.mu.Lock()
+ s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
+ s.mu.Unlock()
+ c.Close()
+ channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
+ return nil
+ }
+
+ return st
+}
+
+func (s *Server) serveStreams(st transport.ServerTransport) {
+ defer st.Close()
+ var wg sync.WaitGroup
+
+ var roundRobinCounter uint32
+ st.HandleStreams(func(stream *transport.Stream) {
+ wg.Add(1)
+ if s.opts.numServerWorkers > 0 {
+ data := &serverWorkerData{st: st, wg: &wg, stream: stream}
+ select {
+ case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
+ default:
+ // If all stream workers are busy, fallback to the default code path.
+ go func() {
+ s.handleStream(st, stream, s.traceInfo(st, stream))
+ wg.Done()
+ }()
+ }
+ } else {
+ go func() {
+ defer wg.Done()
+ s.handleStream(st, stream, s.traceInfo(st, stream))
+ }()
+ }
+ }, func(ctx context.Context, method string) context.Context {
+ if !EnableTracing {
+ return ctx
+ }
+ tr := trace.New("grpc.Recv."+methodFamily(method), method)
+ return trace.NewContext(ctx, tr)
+ })
+ wg.Wait()
+}
+
+var _ http.Handler = (*Server)(nil)
+
+// ServeHTTP implements the Go standard library's http.Handler
+// interface by responding to the gRPC request r, by looking up
+// the requested gRPC method in the gRPC server s.
+//
+// The provided HTTP request must have arrived on an HTTP/2
+// connection. When using the Go standard library's server,
+// practically this means that the Request must also have arrived
+// over TLS.
+//
+// To share one port (such as 443 for https) between gRPC and an
+// existing http.Handler, use a root http.Handler such as:
+//
+// if r.ProtoMajor == 2 && strings.HasPrefix(
+// r.Header.Get("Content-Type"), "application/grpc") {
+// grpcServer.ServeHTTP(w, r)
+// } else {
+// yourMux.ServeHTTP(w, r)
+// }
+//
+// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
+// separate from grpc-go's HTTP/2 server. Performance and features may vary
+// between the two paths. ServeHTTP does not support some gRPC features
+// available through grpc-go's HTTP/2 server.
+//
+// Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
+func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ if !s.addConn(st) {
+ return
+ }
+ defer s.removeConn(st)
+ s.serveStreams(st)
+}
+
+// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
+// If tracing is not enabled, it returns nil.
+func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
+ if !EnableTracing {
+ return nil
+ }
+ tr, ok := trace.FromContext(stream.Context())
+ if !ok {
+ return nil
+ }
+
+ trInfo = &traceInfo{
+ tr: tr,
+ firstLine: firstLine{
+ client: false,
+ remoteAddr: st.RemoteAddr(),
+ },
+ }
+ if dl, ok := stream.Context().Deadline(); ok {
+ trInfo.firstLine.deadline = time.Until(dl)
+ }
+ return trInfo
+}
+
+func (s *Server) addConn(st transport.ServerTransport) bool {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.conns == nil {
+ st.Close()
+ return false
+ }
+ if s.drain {
+ // Transport added after we drained our existing conns: drain it
+ // immediately.
+ st.Drain()
+ }
+ s.conns[st] = true
+ return true
+}
+
+func (s *Server) removeConn(st transport.ServerTransport) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.conns != nil {
+ delete(s.conns, st)
+ s.cv.Broadcast()
+ }
+}
+
+func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
+ return &channelz.ServerInternalMetric{
+ CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
+ CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
+ CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
+ LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
+ }
+}
+
+func (s *Server) incrCallsStarted() {
+ atomic.AddInt64(&s.czData.callsStarted, 1)
+ atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
+}
+
+func (s *Server) incrCallsSucceeded() {
+ atomic.AddInt64(&s.czData.callsSucceeded, 1)
+}
+
+func (s *Server) incrCallsFailed() {
+ atomic.AddInt64(&s.czData.callsFailed, 1)
+}
+
+func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
+ data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
+ if err != nil {
+ channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err)
+ return err
+ }
+ compData, err := compress(data, cp, comp)
+ if err != nil {
+ channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err)
+ return err
+ }
+ hdr, payload := msgHeader(data, compData)
+ // TODO(dfawley): should we be checking len(data) instead?
+ if len(payload) > s.opts.maxSendMessageSize {
+ return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
+ }
+ err = t.Write(stream, hdr, payload, opts)
+ if err == nil && s.opts.statsHandler != nil {
+ s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
+ }
+ return err
+}
+
+// chainUnaryServerInterceptors chains all unary server interceptors into one.
+func chainUnaryServerInterceptors(s *Server) {
+ // Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
+ // be executed before any other chained interceptors.
+ interceptors := s.opts.chainUnaryInts
+ if s.opts.unaryInt != nil {
+ interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
+ }
+
+ var chainedInt UnaryServerInterceptor
+ if len(interceptors) == 0 {
+ chainedInt = nil
+ } else if len(interceptors) == 1 {
+ chainedInt = interceptors[0]
+ } else {
+ chainedInt = func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
+ return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
+ }
+ }
+
+ s.opts.unaryInt = chainedInt
+}
+
+// getChainUnaryHandler recursively generate the chained UnaryHandler
+func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
+ if curr == len(interceptors)-1 {
+ return finalHandler
+ }
+
+ return func(ctx context.Context, req interface{}) (interface{}, error) {
+ return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
+ }
+}
+
+func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
+ sh := s.opts.statsHandler
+ if sh != nil || trInfo != nil || channelz.IsOn() {
+ if channelz.IsOn() {
+ s.incrCallsStarted()
+ }
+ var statsBegin *stats.Begin
+ if sh != nil {
+ beginTime := time.Now()
+ statsBegin = &stats.Begin{
+ BeginTime: beginTime,
+ }
+ sh.HandleRPC(stream.Context(), statsBegin)
+ }
+ if trInfo != nil {
+ trInfo.tr.LazyLog(&trInfo.firstLine, false)
+ }
+ // The deferred error handling for tracing, stats handler and channelz are
+ // combined into one function to reduce stack usage -- a defer takes ~56-64
+ // bytes on the stack, so overflowing the stack will require a stack
+ // re-allocation, which is expensive.
+ //
+ // To maintain behavior similar to separate deferred statements, statements
+ // should be executed in the reverse order. That is, tracing first, stats
+ // handler second, and channelz last. Note that panics *within* defers will
+ // lead to different behavior, but that's an acceptable compromise; that
+ // would be undefined behavior territory anyway.
+ defer func() {
+ if trInfo != nil {
+ if err != nil && err != io.EOF {
+ trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
+ trInfo.tr.SetError()
+ }
+ trInfo.tr.Finish()
+ }
+
+ if sh != nil {
+ end := &stats.End{
+ BeginTime: statsBegin.BeginTime,
+ EndTime: time.Now(),
+ }
+ if err != nil && err != io.EOF {
+ end.Error = toRPCErr(err)
+ }
+ sh.HandleRPC(stream.Context(), end)
+ }
+
+ if channelz.IsOn() {
+ if err != nil && err != io.EOF {
+ s.incrCallsFailed()
+ } else {
+ s.incrCallsSucceeded()
+ }
+ }
+ }()
+ }
+
+ binlog := binarylog.GetMethodLogger(stream.Method())
+ if binlog != nil {
+ ctx := stream.Context()
+ md, _ := metadata.FromIncomingContext(ctx)
+ logEntry := &binarylog.ClientHeader{
+ Header: md,
+ MethodName: stream.Method(),
+ PeerAddr: nil,
+ }
+ if deadline, ok := ctx.Deadline(); ok {
+ logEntry.Timeout = time.Until(deadline)
+ if logEntry.Timeout < 0 {
+ logEntry.Timeout = 0
+ }
+ }
+ if a := md[":authority"]; len(a) > 0 {
+ logEntry.Authority = a[0]
+ }
+ if peer, ok := peer.FromContext(ctx); ok {
+ logEntry.PeerAddr = peer.Addr
+ }
+ binlog.Log(logEntry)
+ }
+
+ // comp and cp are used for compression. decomp and dc are used for
+ // decompression. If comp and decomp are both set, they are the same;
+ // however they are kept separate to ensure that at most one of the
+ // compressor/decompressor variable pairs are set for use later.
+ var comp, decomp encoding.Compressor
+ var cp Compressor
+ var dc Decompressor
+
+ // If dc is set and matches the stream's compression, use it. Otherwise, try
+ // to find a matching registered compressor for decomp.
+ if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
+ dc = s.opts.dc
+ } else if rc != "" && rc != encoding.Identity {
+ decomp = encoding.GetCompressor(rc)
+ if decomp == nil {
+ st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
+ t.WriteStatus(stream, st)
+ return st.Err()
+ }
+ }
+
+ // If cp is set, use it. Otherwise, attempt to compress the response using
+ // the incoming message compression method.
+ //
+ // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
+ if s.opts.cp != nil {
+ cp = s.opts.cp
+ stream.SetSendCompress(cp.Type())
+ } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
+ // Legacy compressor not specified; attempt to respond with same encoding.
+ comp = encoding.GetCompressor(rc)
+ if comp != nil {
+ stream.SetSendCompress(rc)
+ }
+ }
+
+ var payInfo *payloadInfo
+ if sh != nil || binlog != nil {
+ payInfo = &payloadInfo{}
+ }
+ d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
+ if err != nil {
+ if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
+ channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
+ }
+ return err
+ }
+ if channelz.IsOn() {
+ t.IncrMsgRecv()
+ }
+ df := func(v interface{}) error {
+ if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
+ return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
+ }
+ if sh != nil {
+ sh.HandleRPC(stream.Context(), &stats.InPayload{
+ RecvTime: time.Now(),
+ Payload: v,
+ WireLength: payInfo.wireLength + headerLen,
+ Data: d,
+ Length: len(d),
+ })
+ }
+ if binlog != nil {
+ binlog.Log(&binarylog.ClientMessage{
+ Message: d,
+ })
+ }
+ if trInfo != nil {
+ trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
+ }
+ return nil
+ }
+ ctx := NewContextWithServerTransportStream(stream.Context(), stream)
+ reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
+ if appErr != nil {
+ appStatus, ok := status.FromError(appErr)
+ if !ok {
+ // Convert appErr if it is not a grpc status error.
+ appErr = status.Error(codes.Unknown, appErr.Error())
+ appStatus, _ = status.FromError(appErr)
+ }
+ if trInfo != nil {
+ trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
+ trInfo.tr.SetError()
+ }
+ if e := t.WriteStatus(stream, appStatus); e != nil {
+ channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
+ }
+ if binlog != nil {
+ if h, _ := stream.Header(); h.Len() > 0 {
+ // Only log serverHeader if there was header. Otherwise it can
+ // be trailer only.
+ binlog.Log(&binarylog.ServerHeader{
+ Header: h,
+ })
+ }
+ binlog.Log(&binarylog.ServerTrailer{
+ Trailer: stream.Trailer(),
+ Err: appErr,
+ })
+ }
+ return appErr
+ }
+ if trInfo != nil {
+ trInfo.tr.LazyLog(stringer("OK"), false)
+ }
+ opts := &transport.Options{Last: true}
+
+ if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
+ if err == io.EOF {
+ // The entire stream is done (for unary RPC only).
+ return err
+ }
+ if sts, ok := status.FromError(err); ok {
+ if e := t.WriteStatus(stream, sts); e != nil {
+ channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
+ }
+ } else {
+ switch st := err.(type) {
+ case transport.ConnectionError:
+ // Nothing to do here.
+ default:
+ panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
+ }
+ }
+ if binlog != nil {
+ h, _ := stream.Header()
+ binlog.Log(&binarylog.ServerHeader{
+ Header: h,
+ })
+ binlog.Log(&binarylog.ServerTrailer{
+ Trailer: stream.Trailer(),
+ Err: appErr,
+ })
+ }
+ return err
+ }
+ if binlog != nil {
+ h, _ := stream.Header()
+ binlog.Log(&binarylog.ServerHeader{
+ Header: h,
+ })
+ binlog.Log(&binarylog.ServerMessage{
+ Message: reply,
+ })
+ }
+ if channelz.IsOn() {
+ t.IncrMsgSent()
+ }
+ if trInfo != nil {
+ trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
+ }
+ // TODO: Should we be logging if writing status failed here, like above?
+ // Should the logging be in WriteStatus? Should we ignore the WriteStatus
+ // error or allow the stats handler to see it?
+ err = t.WriteStatus(stream, statusOK)
+ if binlog != nil {
+ binlog.Log(&binarylog.ServerTrailer{
+ Trailer: stream.Trailer(),
+ Err: appErr,
+ })
+ }
+ return err
+}
+
+// chainStreamServerInterceptors chains all stream server interceptors into one.
+func chainStreamServerInterceptors(s *Server) {
+ // Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
+ // be executed before any other chained interceptors.
+ interceptors := s.opts.chainStreamInts
+ if s.opts.streamInt != nil {
+ interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
+ }
+
+ var chainedInt StreamServerInterceptor
+ if len(interceptors) == 0 {
+ chainedInt = nil
+ } else if len(interceptors) == 1 {
+ chainedInt = interceptors[0]
+ } else {
+ chainedInt = func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
+ return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
+ }
+ }
+
+ s.opts.streamInt = chainedInt
+}
+
+// getChainStreamHandler recursively generate the chained StreamHandler
+func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
+ if curr == len(interceptors)-1 {
+ return finalHandler
+ }
+
+ return func(srv interface{}, ss ServerStream) error {
+ return interceptors[curr+1](srv, ss, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
+ }
+}
+
+func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
+ if channelz.IsOn() {
+ s.incrCallsStarted()
+ }
+ sh := s.opts.statsHandler
+ var statsBegin *stats.Begin
+ if sh != nil {
+ beginTime := time.Now()
+ statsBegin = &stats.Begin{
+ BeginTime: beginTime,
+ }
+ sh.HandleRPC(stream.Context(), statsBegin)
+ }
+ ctx := NewContextWithServerTransportStream(stream.Context(), stream)
+ ss := &serverStream{
+ ctx: ctx,
+ t: t,
+ s: stream,
+ p: &parser{r: stream},
+ codec: s.getCodec(stream.ContentSubtype()),
+ maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
+ maxSendMessageSize: s.opts.maxSendMessageSize,
+ trInfo: trInfo,
+ statsHandler: sh,
+ }
+
+ if sh != nil || trInfo != nil || channelz.IsOn() {
+ // See comment in processUnaryRPC on defers.
+ defer func() {
+ if trInfo != nil {
+ ss.mu.Lock()
+ if err != nil && err != io.EOF {
+ ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
+ ss.trInfo.tr.SetError()
+ }
+ ss.trInfo.tr.Finish()
+ ss.trInfo.tr = nil
+ ss.mu.Unlock()
+ }
+
+ if sh != nil {
+ end := &stats.End{
+ BeginTime: statsBegin.BeginTime,
+ EndTime: time.Now(),
+ }
+ if err != nil && err != io.EOF {
+ end.Error = toRPCErr(err)
+ }
+ sh.HandleRPC(stream.Context(), end)
+ }
+
+ if channelz.IsOn() {
+ if err != nil && err != io.EOF {
+ s.incrCallsFailed()
+ } else {
+ s.incrCallsSucceeded()
+ }
+ }
+ }()
+ }
+
+ ss.binlog = binarylog.GetMethodLogger(stream.Method())
+ if ss.binlog != nil {
+ md, _ := metadata.FromIncomingContext(ctx)
+ logEntry := &binarylog.ClientHeader{
+ Header: md,
+ MethodName: stream.Method(),
+ PeerAddr: nil,
+ }
+ if deadline, ok := ctx.Deadline(); ok {
+ logEntry.Timeout = time.Until(deadline)
+ if logEntry.Timeout < 0 {
+ logEntry.Timeout = 0
+ }
+ }
+ if a := md[":authority"]; len(a) > 0 {
+ logEntry.Authority = a[0]
+ }
+ if peer, ok := peer.FromContext(ss.Context()); ok {
+ logEntry.PeerAddr = peer.Addr
+ }
+ ss.binlog.Log(logEntry)
+ }
+
+ // If dc is set and matches the stream's compression, use it. Otherwise, try
+ // to find a matching registered compressor for decomp.
+ if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
+ ss.dc = s.opts.dc
+ } else if rc != "" && rc != encoding.Identity {
+ ss.decomp = encoding.GetCompressor(rc)
+ if ss.decomp == nil {
+ st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
+ t.WriteStatus(ss.s, st)
+ return st.Err()
+ }
+ }
+
+ // If cp is set, use it. Otherwise, attempt to compress the response using
+ // the incoming message compression method.
+ //
+ // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
+ if s.opts.cp != nil {
+ ss.cp = s.opts.cp
+ stream.SetSendCompress(s.opts.cp.Type())
+ } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
+ // Legacy compressor not specified; attempt to respond with same encoding.
+ ss.comp = encoding.GetCompressor(rc)
+ if ss.comp != nil {
+ stream.SetSendCompress(rc)
+ }
+ }
+
+ if trInfo != nil {
+ trInfo.tr.LazyLog(&trInfo.firstLine, false)
+ }
+ var appErr error
+ var server interface{}
+ if info != nil {
+ server = info.serviceImpl
+ }
+ if s.opts.streamInt == nil {
+ appErr = sd.Handler(server, ss)
+ } else {
+ info := &StreamServerInfo{
+ FullMethod: stream.Method(),
+ IsClientStream: sd.ClientStreams,
+ IsServerStream: sd.ServerStreams,
+ }
+ appErr = s.opts.streamInt(server, ss, info, sd.Handler)
+ }
+ if appErr != nil {
+ appStatus, ok := status.FromError(appErr)
+ if !ok {
+ appStatus = status.New(codes.Unknown, appErr.Error())
+ appErr = appStatus.Err()
+ }
+ if trInfo != nil {
+ ss.mu.Lock()
+ ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
+ ss.trInfo.tr.SetError()
+ ss.mu.Unlock()
+ }
+ t.WriteStatus(ss.s, appStatus)
+ if ss.binlog != nil {
+ ss.binlog.Log(&binarylog.ServerTrailer{
+ Trailer: ss.s.Trailer(),
+ Err: appErr,
+ })
+ }
+ // TODO: Should we log an error from WriteStatus here and below?
+ return appErr
+ }
+ if trInfo != nil {
+ ss.mu.Lock()
+ ss.trInfo.tr.LazyLog(stringer("OK"), false)
+ ss.mu.Unlock()
+ }
+ err = t.WriteStatus(ss.s, statusOK)
+ if ss.binlog != nil {
+ ss.binlog.Log(&binarylog.ServerTrailer{
+ Trailer: ss.s.Trailer(),
+ Err: appErr,
+ })
+ }
+ return err
+}
+
+func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
+ sm := stream.Method()
+ if sm != "" && sm[0] == '/' {
+ sm = sm[1:]
+ }
+ pos := strings.LastIndex(sm, "/")
+ if pos == -1 {
+ if trInfo != nil {
+ trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
+ trInfo.tr.SetError()
+ }
+ errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
+ if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
+ if trInfo != nil {
+ trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
+ trInfo.tr.SetError()
+ }
+ channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
+ }
+ if trInfo != nil {
+ trInfo.tr.Finish()
+ }
+ return
+ }
+ service := sm[:pos]
+ method := sm[pos+1:]
+
+ srv, knownService := s.services[service]
+ if knownService {
+ if md, ok := srv.methods[method]; ok {
+ s.processUnaryRPC(t, stream, srv, md, trInfo)
+ return
+ }
+ if sd, ok := srv.streams[method]; ok {
+ s.processStreamingRPC(t, stream, srv, sd, trInfo)
+ return
+ }
+ }
+ // Unknown service, or known server unknown method.
+ if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
+ s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
+ return
+ }
+ var errDesc string
+ if !knownService {
+ errDesc = fmt.Sprintf("unknown service %v", service)
+ } else {
+ errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
+ }
+ if trInfo != nil {
+ trInfo.tr.LazyPrintf("%s", errDesc)
+ trInfo.tr.SetError()
+ }
+ if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
+ if trInfo != nil {
+ trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
+ trInfo.tr.SetError()
+ }
+ channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
+ }
+ if trInfo != nil {
+ trInfo.tr.Finish()
+ }
+}
+
+// The key to save ServerTransportStream in the context.
+type streamKey struct{}
+
+// NewContextWithServerTransportStream creates a new context from ctx and
+// attaches stream to it.
+//
+// Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
+func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
+ return context.WithValue(ctx, streamKey{}, stream)
+}
+
+// ServerTransportStream is a minimal interface that a transport stream must
+// implement. This can be used to mock an actual transport stream for tests of
+// handler code that use, for example, grpc.SetHeader (which requires some
+// stream to be in context).
+//
+// See also NewContextWithServerTransportStream.
+//
+// Experimental
+//
+// Notice: This type is EXPERIMENTAL and may be changed or removed in a
+// later release.
+type ServerTransportStream interface {
+ Method() string
+ SetHeader(md metadata.MD) error
+ SendHeader(md metadata.MD) error
+ SetTrailer(md metadata.MD) error
+}
+
+// ServerTransportStreamFromContext returns the ServerTransportStream saved in
+// ctx. Returns nil if the given context has no stream associated with it
+// (which implies it is not an RPC invocation context).
+//
+// Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
+func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
+ s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
+ return s
+}
+
+// Stop stops the gRPC server. It immediately closes all open
+// connections and listeners.
+// It cancels all active RPCs on the server side and the corresponding
+// pending RPCs on the client side will get notified by connection
+// errors.
+func (s *Server) Stop() {
+ s.quit.Fire()
+
+ defer func() {
+ s.serveWG.Wait()
+ s.done.Fire()
+ }()
+
+ s.channelzRemoveOnce.Do(func() {
+ if channelz.IsOn() {
+ channelz.RemoveEntry(s.channelzID)
+ }
+ })
+
+ s.mu.Lock()
+ listeners := s.lis
+ s.lis = nil
+ st := s.conns
+ s.conns = nil
+ // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
+ s.cv.Broadcast()
+ s.mu.Unlock()
+
+ for lis := range listeners {
+ lis.Close()
+ }
+ for c := range st {
+ c.Close()
+ }
+ if s.opts.numServerWorkers > 0 {
+ s.stopServerWorkers()
+ }
+
+ s.mu.Lock()
+ if s.events != nil {
+ s.events.Finish()
+ s.events = nil
+ }
+ s.mu.Unlock()
+}
+
+// GracefulStop stops the gRPC server gracefully. It stops the server from
+// accepting new connections and RPCs and blocks until all the pending RPCs are
+// finished.
+func (s *Server) GracefulStop() {
+ s.quit.Fire()
+ defer s.done.Fire()
+
+ s.channelzRemoveOnce.Do(func() {
+ if channelz.IsOn() {
+ channelz.RemoveEntry(s.channelzID)
+ }
+ })
+ s.mu.Lock()
+ if s.conns == nil {
+ s.mu.Unlock()
+ return
+ }
+
+ for lis := range s.lis {
+ lis.Close()
+ }
+ s.lis = nil
+ if !s.drain {
+ for st := range s.conns {
+ st.Drain()
+ }
+ s.drain = true
+ }
+
+ // Wait for serving threads to be ready to exit. Only then can we be sure no
+ // new conns will be created.
+ s.mu.Unlock()
+ s.serveWG.Wait()
+ s.mu.Lock()
+
+ for len(s.conns) != 0 {
+ s.cv.Wait()
+ }
+ s.conns = nil
+ if s.events != nil {
+ s.events.Finish()
+ s.events = nil
+ }
+ s.mu.Unlock()
+}
+
+// contentSubtype must be lowercase
+// cannot return nil
+func (s *Server) getCodec(contentSubtype string) baseCodec {
+ if s.opts.codec != nil {
+ return s.opts.codec
+ }
+ if contentSubtype == "" {
+ return encoding.GetCodec(proto.Name)
+ }
+ codec := encoding.GetCodec(contentSubtype)
+ if codec == nil {
+ return encoding.GetCodec(proto.Name)
+ }
+ return codec
+}
+
+// SetHeader sets the header metadata.
+// When called multiple times, all the provided metadata will be merged.
+// All the metadata will be sent out when one of the following happens:
+// - grpc.SendHeader() is called;
+// - The first response is sent out;
+// - An RPC status is sent out (error or success).
+func SetHeader(ctx context.Context, md metadata.MD) error {
+ if md.Len() == 0 {
+ return nil
+ }
+ stream := ServerTransportStreamFromContext(ctx)
+ if stream == nil {
+ return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
+ }
+ return stream.SetHeader(md)
+}
+
+// SendHeader sends header metadata. It may be called at most once.
+// The provided md and headers set by SetHeader() will be sent.
+func SendHeader(ctx context.Context, md metadata.MD) error {
+ stream := ServerTransportStreamFromContext(ctx)
+ if stream == nil {
+ return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
+ }
+ if err := stream.SendHeader(md); err != nil {
+ return toRPCErr(err)
+ }
+ return nil
+}
+
+// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
+// When called more than once, all the provided metadata will be merged.
+func SetTrailer(ctx context.Context, md metadata.MD) error {
+ if md.Len() == 0 {
+ return nil
+ }
+ stream := ServerTransportStreamFromContext(ctx)
+ if stream == nil {
+ return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
+ }
+ return stream.SetTrailer(md)
+}
+
+// Method returns the method string for the server context. The returned
+// string is in the format of "/service/method".
+func Method(ctx context.Context) (string, bool) {
+ s := ServerTransportStreamFromContext(ctx)
+ if s == nil {
+ return "", false
+ }
+ return s.Method(), true
+}
+
+type channelzServer struct {
+ s *Server
+}
+
+func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
+ return c.s.channelzMetric()
+}