diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/server.go')
-rw-r--r-- | vendor/google.golang.org/grpc/server.go | 120 |
1 files changed, 20 insertions, 100 deletions
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go index 0a151dee4..968eb598e 100644 --- a/vendor/google.golang.org/grpc/server.go +++ b/vendor/google.golang.org/grpc/server.go @@ -40,7 +40,6 @@ import ( "google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding/proto" "google.golang.org/grpc/grpclog" - "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/binarylog" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcrand" @@ -57,24 +56,8 @@ import ( const ( defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4 defaultServerMaxSendMessageSize = math.MaxInt32 - - // Server transports are tracked in a map which is keyed on listener - // address. For regular gRPC traffic, connections are accepted in Serve() - // through a call to Accept(), and we use the actual listener address as key - // when we add it to the map. But for connections received through - // ServeHTTP(), we do not have a listener and hence use this dummy value. - listenerAddressForServeHTTP = "listenerAddressForServeHTTP" ) -func init() { - internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials { - return srv.opts.creds - } - internal.DrainServerTransports = func(srv *Server, addr string) { - srv.drainServerTransports(addr) - } -} - var statusOK = status.New(codes.OK, "") var logger = grpclog.Component("core") @@ -117,12 +100,9 @@ type serverWorkerData struct { type Server struct { opts serverOptions - mu sync.Mutex // guards following - lis map[net.Listener]bool - // conns contains all active server transports. It is a map keyed on a - // listener address with the value being the set of active transports - // belonging to that listener. - conns map[string]map[transport.ServerTransport]bool + mu sync.Mutex // guards following + lis map[net.Listener]bool + conns map[transport.ServerTransport]bool serve bool drain bool cv *sync.Cond // signaled when connections close for GracefulStop @@ -279,35 +259,6 @@ func CustomCodec(codec Codec) ServerOption { }) } -// ForceServerCodec returns a ServerOption that sets a codec for message -// marshaling and unmarshaling. -// -// This will override any lookups by content-subtype for Codecs registered -// with RegisterCodec. -// -// See Content-Type on -// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for -// more details. Also see the documentation on RegisterCodec and -// CallContentSubtype for more details on the interaction between encoding.Codec -// and content-subtype. -// -// This function is provided for advanced users; prefer to register codecs -// using encoding.RegisterCodec. -// The server will automatically use registered codecs based on the incoming -// requests' headers. See also -// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec. -// Will be supported throughout 1.x. -// -// Experimental -// -// Notice: This API is EXPERIMENTAL and may be changed or removed in a -// later release. -func ForceServerCodec(codec encoding.Codec) ServerOption { - return newFuncServerOption(func(o *serverOptions) { - o.codec = codec - }) -} - // RPCCompressor returns a ServerOption that sets a compressor for outbound // messages. For backward compatibility, all outbound messages will be sent // using this compressor, regardless of incoming message compression. By @@ -418,11 +369,6 @@ func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOptio // InTapHandle returns a ServerOption that sets the tap handle for all the server // transport to be created. Only one can be installed. -// -// Experimental -// -// Notice: This API is EXPERIMENTAL and may be changed or removed in a -// later release. func InTapHandle(h tap.ServerInHandle) ServerOption { return newFuncServerOption(func(o *serverOptions) { if o.inTapHandle != nil { @@ -566,7 +512,7 @@ func NewServer(opt ...ServerOption) *Server { s := &Server{ lis: make(map[net.Listener]bool), opts: opts, - conns: make(map[string]map[transport.ServerTransport]bool), + conns: make(map[transport.ServerTransport]bool), services: make(map[string]*serviceInfo), quit: grpcsync.NewEvent(), done: grpcsync.NewEvent(), @@ -825,7 +771,7 @@ func (s *Server) Serve(lis net.Listener) error { // s.conns before this conn can be added. s.serveWG.Add(1) go func() { - s.handleRawConn(lis.Addr().String(), rawConn) + s.handleRawConn(rawConn) s.serveWG.Done() }() } @@ -833,7 +779,7 @@ func (s *Server) Serve(lis net.Listener) error { // handleRawConn forks a goroutine to handle a just-accepted connection that // has not had any I/O performed on it yet. -func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) { +func (s *Server) handleRawConn(rawConn net.Conn) { if s.quit.HasFired() { rawConn.Close() return @@ -861,24 +807,15 @@ func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) { } rawConn.SetDeadline(time.Time{}) - if !s.addConn(lisAddr, st) { + if !s.addConn(st) { return } go func() { s.serveStreams(st) - s.removeConn(lisAddr, st) + s.removeConn(st) }() } -func (s *Server) drainServerTransports(addr string) { - s.mu.Lock() - conns := s.conns[addr] - for st := range conns { - st.Drain() - } - s.mu.Unlock() -} - // newHTTP2Transport sets up a http/2 transport (using the // gRPC http2 server transport in transport/http2_server.go). func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport { @@ -980,10 +917,10 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusInternalServerError) return } - if !s.addConn(listenerAddressForServeHTTP, st) { + if !s.addConn(st) { return } - defer s.removeConn(listenerAddressForServeHTTP, st) + defer s.removeConn(st) s.serveStreams(st) } @@ -1011,7 +948,7 @@ func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Strea return trInfo } -func (s *Server) addConn(addr string, st transport.ServerTransport) bool { +func (s *Server) addConn(st transport.ServerTransport) bool { s.mu.Lock() defer s.mu.Unlock() if s.conns == nil { @@ -1023,28 +960,15 @@ func (s *Server) addConn(addr string, st transport.ServerTransport) bool { // immediately. st.Drain() } - - if s.conns[addr] == nil { - // Create a map entry if this is the first connection on this listener. - s.conns[addr] = make(map[transport.ServerTransport]bool) - } - s.conns[addr][st] = true + s.conns[st] = true return true } -func (s *Server) removeConn(addr string, st transport.ServerTransport) { +func (s *Server) removeConn(st transport.ServerTransport) { s.mu.Lock() defer s.mu.Unlock() - - conns := s.conns[addr] - if conns != nil { - delete(conns, st) - if len(conns) == 0 { - // If the last connection for this address is being removed, also - // remove the map entry corresponding to the address. This is used - // in GracefulStop() when waiting for all connections to be closed. - delete(s.conns, addr) - } + if s.conns != nil { + delete(s.conns, st) s.cv.Broadcast() } } @@ -1708,7 +1632,7 @@ func (s *Server) Stop() { s.mu.Lock() listeners := s.lis s.lis = nil - conns := s.conns + st := s.conns s.conns = nil // interrupt GracefulStop if Stop and GracefulStop are called concurrently. s.cv.Broadcast() @@ -1717,10 +1641,8 @@ func (s *Server) Stop() { for lis := range listeners { lis.Close() } - for _, cs := range conns { - for st := range cs { - st.Close() - } + for c := range st { + c.Close() } if s.opts.numServerWorkers > 0 { s.stopServerWorkers() @@ -1757,10 +1679,8 @@ func (s *Server) GracefulStop() { } s.lis = nil if !s.drain { - for _, conns := range s.conns { - for st := range conns { - st.Drain() - } + for st := range s.conns { + st.Drain() } s.drain = true } |