summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/server.go')
-rw-r--r--vendor/google.golang.org/grpc/server.go120
1 files changed, 20 insertions, 100 deletions
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index 0a151dee4..968eb598e 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -40,7 +40,6 @@ import (
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"
"google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
@@ -57,24 +56,8 @@ import (
const (
defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
defaultServerMaxSendMessageSize = math.MaxInt32
-
- // Server transports are tracked in a map which is keyed on listener
- // address. For regular gRPC traffic, connections are accepted in Serve()
- // through a call to Accept(), and we use the actual listener address as key
- // when we add it to the map. But for connections received through
- // ServeHTTP(), we do not have a listener and hence use this dummy value.
- listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
)
-func init() {
- internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
- return srv.opts.creds
- }
- internal.DrainServerTransports = func(srv *Server, addr string) {
- srv.drainServerTransports(addr)
- }
-}
-
var statusOK = status.New(codes.OK, "")
var logger = grpclog.Component("core")
@@ -117,12 +100,9 @@ type serverWorkerData struct {
type Server struct {
opts serverOptions
- mu sync.Mutex // guards following
- lis map[net.Listener]bool
- // conns contains all active server transports. It is a map keyed on a
- // listener address with the value being the set of active transports
- // belonging to that listener.
- conns map[string]map[transport.ServerTransport]bool
+ mu sync.Mutex // guards following
+ lis map[net.Listener]bool
+ conns map[transport.ServerTransport]bool
serve bool
drain bool
cv *sync.Cond // signaled when connections close for GracefulStop
@@ -279,35 +259,6 @@ func CustomCodec(codec Codec) ServerOption {
})
}
-// ForceServerCodec returns a ServerOption that sets a codec for message
-// marshaling and unmarshaling.
-//
-// This will override any lookups by content-subtype for Codecs registered
-// with RegisterCodec.
-//
-// See Content-Type on
-// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
-// more details. Also see the documentation on RegisterCodec and
-// CallContentSubtype for more details on the interaction between encoding.Codec
-// and content-subtype.
-//
-// This function is provided for advanced users; prefer to register codecs
-// using encoding.RegisterCodec.
-// The server will automatically use registered codecs based on the incoming
-// requests' headers. See also
-// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
-// Will be supported throughout 1.x.
-//
-// Experimental
-//
-// Notice: This API is EXPERIMENTAL and may be changed or removed in a
-// later release.
-func ForceServerCodec(codec encoding.Codec) ServerOption {
- return newFuncServerOption(func(o *serverOptions) {
- o.codec = codec
- })
-}
-
// RPCCompressor returns a ServerOption that sets a compressor for outbound
// messages. For backward compatibility, all outbound messages will be sent
// using this compressor, regardless of incoming message compression. By
@@ -418,11 +369,6 @@ func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOptio
// InTapHandle returns a ServerOption that sets the tap handle for all the server
// transport to be created. Only one can be installed.
-//
-// Experimental
-//
-// Notice: This API is EXPERIMENTAL and may be changed or removed in a
-// later release.
func InTapHandle(h tap.ServerInHandle) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
if o.inTapHandle != nil {
@@ -566,7 +512,7 @@ func NewServer(opt ...ServerOption) *Server {
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
- conns: make(map[string]map[transport.ServerTransport]bool),
+ conns: make(map[transport.ServerTransport]bool),
services: make(map[string]*serviceInfo),
quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
@@ -825,7 +771,7 @@ func (s *Server) Serve(lis net.Listener) error {
// s.conns before this conn can be added.
s.serveWG.Add(1)
go func() {
- s.handleRawConn(lis.Addr().String(), rawConn)
+ s.handleRawConn(rawConn)
s.serveWG.Done()
}()
}
@@ -833,7 +779,7 @@ func (s *Server) Serve(lis net.Listener) error {
// handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet.
-func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
+func (s *Server) handleRawConn(rawConn net.Conn) {
if s.quit.HasFired() {
rawConn.Close()
return
@@ -861,24 +807,15 @@ func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
}
rawConn.SetDeadline(time.Time{})
- if !s.addConn(lisAddr, st) {
+ if !s.addConn(st) {
return
}
go func() {
s.serveStreams(st)
- s.removeConn(lisAddr, st)
+ s.removeConn(st)
}()
}
-func (s *Server) drainServerTransports(addr string) {
- s.mu.Lock()
- conns := s.conns[addr]
- for st := range conns {
- st.Drain()
- }
- s.mu.Unlock()
-}
-
// newHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go).
func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
@@ -980,10 +917,10 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
- if !s.addConn(listenerAddressForServeHTTP, st) {
+ if !s.addConn(st) {
return
}
- defer s.removeConn(listenerAddressForServeHTTP, st)
+ defer s.removeConn(st)
s.serveStreams(st)
}
@@ -1011,7 +948,7 @@ func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Strea
return trInfo
}
-func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
+func (s *Server) addConn(st transport.ServerTransport) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.conns == nil {
@@ -1023,28 +960,15 @@ func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
// immediately.
st.Drain()
}
-
- if s.conns[addr] == nil {
- // Create a map entry if this is the first connection on this listener.
- s.conns[addr] = make(map[transport.ServerTransport]bool)
- }
- s.conns[addr][st] = true
+ s.conns[st] = true
return true
}
-func (s *Server) removeConn(addr string, st transport.ServerTransport) {
+func (s *Server) removeConn(st transport.ServerTransport) {
s.mu.Lock()
defer s.mu.Unlock()
-
- conns := s.conns[addr]
- if conns != nil {
- delete(conns, st)
- if len(conns) == 0 {
- // If the last connection for this address is being removed, also
- // remove the map entry corresponding to the address. This is used
- // in GracefulStop() when waiting for all connections to be closed.
- delete(s.conns, addr)
- }
+ if s.conns != nil {
+ delete(s.conns, st)
s.cv.Broadcast()
}
}
@@ -1708,7 +1632,7 @@ func (s *Server) Stop() {
s.mu.Lock()
listeners := s.lis
s.lis = nil
- conns := s.conns
+ st := s.conns
s.conns = nil
// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
s.cv.Broadcast()
@@ -1717,10 +1641,8 @@ func (s *Server) Stop() {
for lis := range listeners {
lis.Close()
}
- for _, cs := range conns {
- for st := range cs {
- st.Close()
- }
+ for c := range st {
+ c.Close()
}
if s.opts.numServerWorkers > 0 {
s.stopServerWorkers()
@@ -1757,10 +1679,8 @@ func (s *Server) GracefulStop() {
}
s.lis = nil
if !s.drain {
- for _, conns := range s.conns {
- for st := range conns {
- st.Drain()
- }
+ for st := range s.conns {
+ st.Drain()
}
s.drain = true
}