aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/server.go')
-rw-r--r--vendor/google.golang.org/grpc/server.go120
1 files changed, 100 insertions, 20 deletions
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index 968eb598e..0a151dee4 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -40,6 +40,7 @@ import (
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"
"google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
@@ -56,8 +57,24 @@ import (
const (
defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
defaultServerMaxSendMessageSize = math.MaxInt32
+
+ // Server transports are tracked in a map which is keyed on listener
+ // address. For regular gRPC traffic, connections are accepted in Serve()
+ // through a call to Accept(), and we use the actual listener address as key
+ // when we add it to the map. But for connections received through
+ // ServeHTTP(), we do not have a listener and hence use this dummy value.
+ listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
)
+func init() {
+ internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
+ return srv.opts.creds
+ }
+ internal.DrainServerTransports = func(srv *Server, addr string) {
+ srv.drainServerTransports(addr)
+ }
+}
+
var statusOK = status.New(codes.OK, "")
var logger = grpclog.Component("core")
@@ -100,9 +117,12 @@ type serverWorkerData struct {
type Server struct {
opts serverOptions
- mu sync.Mutex // guards following
- lis map[net.Listener]bool
- conns map[transport.ServerTransport]bool
+ mu sync.Mutex // guards following
+ lis map[net.Listener]bool
+ // conns contains all active server transports. It is a map keyed on a
+ // listener address with the value being the set of active transports
+ // belonging to that listener.
+ conns map[string]map[transport.ServerTransport]bool
serve bool
drain bool
cv *sync.Cond // signaled when connections close for GracefulStop
@@ -259,6 +279,35 @@ func CustomCodec(codec Codec) ServerOption {
})
}
+// ForceServerCodec returns a ServerOption that sets a codec for message
+// marshaling and unmarshaling.
+//
+// This will override any lookups by content-subtype for Codecs registered
+// with RegisterCodec.
+//
+// See Content-Type on
+// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
+// more details. Also see the documentation on RegisterCodec and
+// CallContentSubtype for more details on the interaction between encoding.Codec
+// and content-subtype.
+//
+// This function is provided for advanced users; prefer to register codecs
+// using encoding.RegisterCodec.
+// The server will automatically use registered codecs based on the incoming
+// requests' headers. See also
+// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
+// Will be supported throughout 1.x.
+//
+// Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
+func ForceServerCodec(codec encoding.Codec) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.codec = codec
+ })
+}
+
// RPCCompressor returns a ServerOption that sets a compressor for outbound
// messages. For backward compatibility, all outbound messages will be sent
// using this compressor, regardless of incoming message compression. By
@@ -369,6 +418,11 @@ func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOptio
// InTapHandle returns a ServerOption that sets the tap handle for all the server
// transport to be created. Only one can be installed.
+//
+// Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
func InTapHandle(h tap.ServerInHandle) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
if o.inTapHandle != nil {
@@ -512,7 +566,7 @@ func NewServer(opt ...ServerOption) *Server {
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
- conns: make(map[transport.ServerTransport]bool),
+ conns: make(map[string]map[transport.ServerTransport]bool),
services: make(map[string]*serviceInfo),
quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
@@ -771,7 +825,7 @@ func (s *Server) Serve(lis net.Listener) error {
// s.conns before this conn can be added.
s.serveWG.Add(1)
go func() {
- s.handleRawConn(rawConn)
+ s.handleRawConn(lis.Addr().String(), rawConn)
s.serveWG.Done()
}()
}
@@ -779,7 +833,7 @@ func (s *Server) Serve(lis net.Listener) error {
// handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet.
-func (s *Server) handleRawConn(rawConn net.Conn) {
+func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
if s.quit.HasFired() {
rawConn.Close()
return
@@ -807,15 +861,24 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
}
rawConn.SetDeadline(time.Time{})
- if !s.addConn(st) {
+ if !s.addConn(lisAddr, st) {
return
}
go func() {
s.serveStreams(st)
- s.removeConn(st)
+ s.removeConn(lisAddr, st)
}()
}
+func (s *Server) drainServerTransports(addr string) {
+ s.mu.Lock()
+ conns := s.conns[addr]
+ for st := range conns {
+ st.Drain()
+ }
+ s.mu.Unlock()
+}
+
// newHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go).
func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
@@ -917,10 +980,10 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
- if !s.addConn(st) {
+ if !s.addConn(listenerAddressForServeHTTP, st) {
return
}
- defer s.removeConn(st)
+ defer s.removeConn(listenerAddressForServeHTTP, st)
s.serveStreams(st)
}
@@ -948,7 +1011,7 @@ func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Strea
return trInfo
}
-func (s *Server) addConn(st transport.ServerTransport) bool {
+func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.conns == nil {
@@ -960,15 +1023,28 @@ func (s *Server) addConn(st transport.ServerTransport) bool {
// immediately.
st.Drain()
}
- s.conns[st] = true
+
+ if s.conns[addr] == nil {
+ // Create a map entry if this is the first connection on this listener.
+ s.conns[addr] = make(map[transport.ServerTransport]bool)
+ }
+ s.conns[addr][st] = true
return true
}
-func (s *Server) removeConn(st transport.ServerTransport) {
+func (s *Server) removeConn(addr string, st transport.ServerTransport) {
s.mu.Lock()
defer s.mu.Unlock()
- if s.conns != nil {
- delete(s.conns, st)
+
+ conns := s.conns[addr]
+ if conns != nil {
+ delete(conns, st)
+ if len(conns) == 0 {
+ // If the last connection for this address is being removed, also
+ // remove the map entry corresponding to the address. This is used
+ // in GracefulStop() when waiting for all connections to be closed.
+ delete(s.conns, addr)
+ }
s.cv.Broadcast()
}
}
@@ -1632,7 +1708,7 @@ func (s *Server) Stop() {
s.mu.Lock()
listeners := s.lis
s.lis = nil
- st := s.conns
+ conns := s.conns
s.conns = nil
// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
s.cv.Broadcast()
@@ -1641,8 +1717,10 @@ func (s *Server) Stop() {
for lis := range listeners {
lis.Close()
}
- for c := range st {
- c.Close()
+ for _, cs := range conns {
+ for st := range cs {
+ st.Close()
+ }
}
if s.opts.numServerWorkers > 0 {
s.stopServerWorkers()
@@ -1679,8 +1757,10 @@ func (s *Server) GracefulStop() {
}
s.lis = nil
if !s.drain {
- for st := range s.conns {
- st.Drain()
+ for _, conns := range s.conns {
+ for st := range conns {
+ st.Drain()
+ }
}
s.drain = true
}