summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc')
-rw-r--r--vendor/google.golang.org/grpc/.travis.yml42
-rw-r--r--vendor/google.golang.org/grpc/MAINTAINERS.md5
-rw-r--r--vendor/google.golang.org/grpc/Makefile2
-rw-r--r--vendor/google.golang.org/grpc/NOTICE.txt13
-rw-r--r--vendor/google.golang.org/grpc/README.md2
-rw-r--r--vendor/google.golang.org/grpc/balancer/balancer.go68
-rw-r--r--vendor/google.golang.org/grpc/balancer/base/balancer.go15
-rw-r--r--vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go4
-rw-r--r--vendor/google.golang.org/grpc/balancer_conn_wrappers.go112
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go347
-rw-r--r--vendor/google.golang.org/grpc/connectivity/connectivity.go35
-rw-r--r--vendor/google.golang.org/grpc/credentials/go12.go30
-rw-r--r--vendor/google.golang.org/grpc/credentials/tls.go3
-rw-r--r--vendor/google.golang.org/grpc/go.mod13
-rw-r--r--vendor/google.golang.org/grpc/go.sum50
-rw-r--r--vendor/google.golang.org/grpc/install_gae.sh6
-rw-r--r--vendor/google.golang.org/grpc/internal/binarylog/sink.go41
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/funcs.go2
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/types_linux.go2
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/types_nonlinux.go5
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/util_linux.go2
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/util_nonlinux.go3
-rw-r--r--vendor/google.golang.org/grpc/internal/credentials/spiffe.go2
-rw-r--r--vendor/google.golang.org/grpc/internal/credentials/spiffe_appengine.go31
-rw-r--r--vendor/google.golang.org/grpc/internal/credentials/syscallconn.go2
-rw-r--r--vendor/google.golang.org/grpc/internal/credentials/syscallconn_appengine.go30
-rw-r--r--vendor/google.golang.org/grpc/internal/credentials/util.go4
-rw-r--r--vendor/google.golang.org/grpc/internal/envconfig/envconfig.go6
-rw-r--r--vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go29
-rw-r--r--vendor/google.golang.org/grpc/internal/resolver/config_selector.go7
-rw-r--r--vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go9
-rw-r--r--vendor/google.golang.org/grpc/internal/resolver/dns/go113.go33
-rw-r--r--vendor/google.golang.org/grpc/internal/serviceconfig/serviceconfig.go4
-rw-r--r--vendor/google.golang.org/grpc/internal/status/status.go14
-rw-r--r--vendor/google.golang.org/grpc/internal/syscall/syscall_linux.go2
-rw-r--r--vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go21
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/controlbuf.go24
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/handler_server.go3
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_client.go212
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_server.go199
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http_util.go224
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/networktype/networktype.go2
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/transport.go12
-rw-r--r--vendor/google.golang.org/grpc/internal/xds/env/env.go95
-rw-r--r--vendor/google.golang.org/grpc/metadata/metadata.go76
-rw-r--r--vendor/google.golang.org/grpc/picker_wrapper.go2
-rw-r--r--vendor/google.golang.org/grpc/pickfirst.go21
-rw-r--r--vendor/google.golang.org/grpc/resolver_conn_wrapper.go10
-rw-r--r--vendor/google.golang.org/grpc/rpc_util.go31
-rw-r--r--vendor/google.golang.org/grpc/server.go105
-rw-r--r--vendor/google.golang.org/grpc/stats/stats.go11
-rw-r--r--vendor/google.golang.org/grpc/stream.go172
-rw-r--r--vendor/google.golang.org/grpc/version.go2
-rw-r--r--vendor/google.golang.org/grpc/vet.sh32
54 files changed, 1233 insertions, 996 deletions
diff --git a/vendor/google.golang.org/grpc/.travis.yml b/vendor/google.golang.org/grpc/.travis.yml
deleted file mode 100644
index 5847d94e5..000000000
--- a/vendor/google.golang.org/grpc/.travis.yml
+++ /dev/null
@@ -1,42 +0,0 @@
-language: go
-
-matrix:
- include:
- - go: 1.14.x
- env: VET=1 GO111MODULE=on
- - go: 1.14.x
- env: RACE=1 GO111MODULE=on
- - go: 1.14.x
- env: RUN386=1
- - go: 1.14.x
- env: GRPC_GO_RETRY=on
- - go: 1.14.x
- env: TESTEXTRAS=1
- - go: 1.13.x
- env: GO111MODULE=on
- - go: 1.12.x
- env: GO111MODULE=on
- - go: 1.11.x # Keep until interop tests no longer require Go1.11
- env: GO111MODULE=on
-
-go_import_path: google.golang.org/grpc
-
-before_install:
- - if [[ "${GO111MODULE}" = "on" ]]; then mkdir "${HOME}/go"; export GOPATH="${HOME}/go"; fi
- - if [[ -n "${RUN386}" ]]; then export GOARCH=386; fi
- - if [[ "${TRAVIS_EVENT_TYPE}" = "cron" && -z "${RUN386}" ]]; then RACE=1; fi
- - if [[ "${TRAVIS_EVENT_TYPE}" != "cron" ]]; then export VET_SKIP_PROTO=1; fi
-
-install:
- - try3() { eval "$*" || eval "$*" || eval "$*"; }
- - try3 'if [[ "${GO111MODULE}" = "on" ]]; then go mod download; else make testdeps; fi'
- - if [[ -n "${GAE}" ]]; then source ./install_gae.sh; make testappenginedeps; fi
- - if [[ -n "${VET}" ]]; then ./vet.sh -install; fi
-
-script:
- - set -e
- - if [[ -n "${TESTEXTRAS}" ]]; then examples/examples_test.sh; security/advancedtls/examples/examples_test.sh; interop/interop_test.sh; make testsubmodule; exit 0; fi
- - if [[ -n "${VET}" ]]; then ./vet.sh; fi
- - if [[ -n "${GAE}" ]]; then make testappengine; exit 0; fi
- - if [[ -n "${RACE}" ]]; then make testrace; exit 0; fi
- - make test
diff --git a/vendor/google.golang.org/grpc/MAINTAINERS.md b/vendor/google.golang.org/grpc/MAINTAINERS.md
index 093c82b3a..c6672c0a3 100644
--- a/vendor/google.golang.org/grpc/MAINTAINERS.md
+++ b/vendor/google.golang.org/grpc/MAINTAINERS.md
@@ -8,17 +8,18 @@ See [CONTRIBUTING.md](https://github.com/grpc/grpc-community/blob/master/CONTRIB
for general contribution guidelines.
## Maintainers (in alphabetical order)
-- [canguler](https://github.com/canguler), Google LLC
+
- [cesarghali](https://github.com/cesarghali), Google LLC
- [dfawley](https://github.com/dfawley), Google LLC
- [easwars](https://github.com/easwars), Google LLC
-- [jadekler](https://github.com/jadekler), Google LLC
- [menghanl](https://github.com/menghanl), Google LLC
- [srini100](https://github.com/srini100), Google LLC
## Emeritus Maintainers (in alphabetical order)
- [adelez](https://github.com/adelez), Google LLC
+- [canguler](https://github.com/canguler), Google LLC
- [iamqizhao](https://github.com/iamqizhao), Google LLC
+- [jadekler](https://github.com/jadekler), Google LLC
- [jtattermusch](https://github.com/jtattermusch), Google LLC
- [lyuxuan](https://github.com/lyuxuan), Google LLC
- [makmukhi](https://github.com/makmukhi), Google LLC
diff --git a/vendor/google.golang.org/grpc/Makefile b/vendor/google.golang.org/grpc/Makefile
index 1f0722f16..1f8960922 100644
--- a/vendor/google.golang.org/grpc/Makefile
+++ b/vendor/google.golang.org/grpc/Makefile
@@ -41,8 +41,6 @@ vetdeps:
clean \
proto \
test \
- testappengine \
- testappenginedeps \
testrace \
vet \
vetdeps
diff --git a/vendor/google.golang.org/grpc/NOTICE.txt b/vendor/google.golang.org/grpc/NOTICE.txt
new file mode 100644
index 000000000..530197749
--- /dev/null
+++ b/vendor/google.golang.org/grpc/NOTICE.txt
@@ -0,0 +1,13 @@
+Copyright 2014 gRPC authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
diff --git a/vendor/google.golang.org/grpc/README.md b/vendor/google.golang.org/grpc/README.md
index 3949a683f..0e6ae69a5 100644
--- a/vendor/google.golang.org/grpc/README.md
+++ b/vendor/google.golang.org/grpc/README.md
@@ -136,6 +136,6 @@ errors.
[Go module]: https://github.com/golang/go/wiki/Modules
[gRPC]: https://grpc.io
[Go gRPC docs]: https://grpc.io/docs/languages/go
-[Performance benchmark]: https://performance-dot-grpc-testing.appspot.com/explore?dashboard=5652536396611584&widget=490377658&container=1286539696
+[Performance benchmark]: https://performance-dot-grpc-testing.appspot.com/explore?dashboard=5180705743044608
[quick start]: https://grpc.io/docs/languages/go/quickstart
[go-releases]: https://golang.org/doc/devel/release.html
diff --git a/vendor/google.golang.org/grpc/balancer/balancer.go b/vendor/google.golang.org/grpc/balancer/balancer.go
index ab531f4c0..178de0898 100644
--- a/vendor/google.golang.org/grpc/balancer/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer/balancer.go
@@ -75,24 +75,26 @@ func Get(name string) Builder {
return nil
}
-// SubConn represents a gRPC sub connection.
-// Each sub connection contains a list of addresses. gRPC will
-// try to connect to them (in sequence), and stop trying the
-// remainder once one connection is successful.
+// A SubConn represents a single connection to a gRPC backend service.
//
-// The reconnect backoff will be applied on the list, not a single address.
-// For example, try_on_all_addresses -> backoff -> try_on_all_addresses.
+// Each SubConn contains a list of addresses.
//
-// All SubConns start in IDLE, and will not try to connect. To trigger
-// the connecting, Balancers must call Connect.
-// When the connection encounters an error, it will reconnect immediately.
-// When the connection becomes IDLE, it will not reconnect unless Connect is
-// called.
+// All SubConns start in IDLE, and will not try to connect. To trigger the
+// connecting, Balancers must call Connect. If a connection re-enters IDLE,
+// Balancers must call Connect again to trigger a new connection attempt.
//
-// This interface is to be implemented by gRPC. Users should not need a
-// brand new implementation of this interface. For the situations like
-// testing, the new implementation should embed this interface. This allows
-// gRPC to add new methods to this interface.
+// gRPC will try to connect to the addresses in sequence, and stop trying the
+// remainder once the first connection is successful. If an attempt to connect
+// to all addresses encounters an error, the SubConn will enter
+// TRANSIENT_FAILURE for a backoff period, and then transition to IDLE.
+//
+// Once established, if a connection is lost, the SubConn will transition
+// directly to IDLE.
+//
+// This interface is to be implemented by gRPC. Users should not need their own
+// implementation of this interface. For situations like testing, any
+// implementations should embed this interface. This allows gRPC to add new
+// methods to this interface.
type SubConn interface {
// UpdateAddresses updates the addresses used in this SubConn.
// gRPC checks if currently-connected address is still in the new list.
@@ -326,6 +328,20 @@ type Balancer interface {
Close()
}
+// ExitIdler is an optional interface for balancers to implement. If
+// implemented, ExitIdle will be called when ClientConn.Connect is called, if
+// the ClientConn is idle. If unimplemented, ClientConn.Connect will cause
+// all SubConns to connect.
+//
+// Notice: it will be required for all balancers to implement this in a future
+// release.
+type ExitIdler interface {
+ // ExitIdle instructs the LB policy to reconnect to backends / exit the
+ // IDLE state, if appropriate and possible. Note that SubConns that enter
+ // the IDLE state will not reconnect until SubConn.Connect is called.
+ ExitIdle()
+}
+
// SubConnState describes the state of a SubConn.
type SubConnState struct {
// ConnectivityState is the connectivity state of the SubConn.
@@ -353,8 +369,10 @@ var ErrBadResolverState = errors.New("bad resolver state")
//
// It's not thread safe.
type ConnectivityStateEvaluator struct {
- numReady uint64 // Number of addrConns in ready state.
- numConnecting uint64 // Number of addrConns in connecting state.
+ numReady uint64 // Number of addrConns in ready state.
+ numConnecting uint64 // Number of addrConns in connecting state.
+ numTransientFailure uint64 // Number of addrConns in transient failure state.
+ numIdle uint64 // Number of addrConns in idle state.
}
// RecordTransition records state change happening in subConn and based on that
@@ -362,9 +380,11 @@ type ConnectivityStateEvaluator struct {
//
// - If at least one SubConn in Ready, the aggregated state is Ready;
// - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
-// - Else the aggregated state is TransientFailure.
+// - Else if at least one SubConn is TransientFailure, the aggregated state is Transient Failure;
+// - Else if at least one SubConn is Idle, the aggregated state is Idle;
+// - Else there are no subconns and the aggregated state is Transient Failure
//
-// Idle and Shutdown are not considered.
+// Shutdown is not considered.
func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState connectivity.State) connectivity.State {
// Update counters.
for idx, state := range []connectivity.State{oldState, newState} {
@@ -374,6 +394,10 @@ func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState conne
cse.numReady += updateVal
case connectivity.Connecting:
cse.numConnecting += updateVal
+ case connectivity.TransientFailure:
+ cse.numTransientFailure += updateVal
+ case connectivity.Idle:
+ cse.numIdle += updateVal
}
}
@@ -384,5 +408,11 @@ func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState conne
if cse.numConnecting > 0 {
return connectivity.Connecting
}
+ if cse.numTransientFailure > 0 {
+ return connectivity.TransientFailure
+ }
+ if cse.numIdle > 0 {
+ return connectivity.Idle
+ }
return connectivity.TransientFailure
}
diff --git a/vendor/google.golang.org/grpc/balancer/base/balancer.go b/vendor/google.golang.org/grpc/balancer/base/balancer.go
index c883efa0b..8dd504299 100644
--- a/vendor/google.golang.org/grpc/balancer/base/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer/base/balancer.go
@@ -133,6 +133,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
}
b.subConns[aNoAttrs] = subConnInfo{subConn: sc, attrs: a.Attributes}
b.scStates[sc] = connectivity.Idle
+ b.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle)
sc.Connect()
} else {
// Always update the subconn's address in case the attributes
@@ -213,10 +214,14 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
}
return
}
- if oldS == connectivity.TransientFailure && s == connectivity.Connecting {
- // Once a subconn enters TRANSIENT_FAILURE, ignore subsequent
+ if oldS == connectivity.TransientFailure &&
+ (s == connectivity.Connecting || s == connectivity.Idle) {
+ // Once a subconn enters TRANSIENT_FAILURE, ignore subsequent IDLE or
// CONNECTING transitions to prevent the aggregated state from being
// always CONNECTING when many backends exist but are all down.
+ if s == connectivity.Idle {
+ sc.Connect()
+ }
return
}
b.scStates[sc] = s
@@ -242,7 +247,6 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
b.state == connectivity.TransientFailure {
b.regeneratePicker()
}
-
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
}
@@ -251,6 +255,11 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
func (b *baseBalancer) Close() {
}
+// ExitIdle is a nop because the base balancer attempts to stay connected to
+// all SubConns at all times.
+func (b *baseBalancer) ExitIdle() {
+}
+
// NewErrPicker returns a Picker that always returns err on Pick().
func NewErrPicker(err error) balancer.Picker {
return &errPicker{err: err}
diff --git a/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go b/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go
index 43c2a1537..274eb2f85 100644
--- a/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go
+++ b/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go
@@ -47,11 +47,11 @@ func init() {
type rrPickerBuilder struct{}
func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
- logger.Infof("roundrobinPicker: newPicker called with info: %v", info)
+ logger.Infof("roundrobinPicker: Build called with info: %v", info)
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
- var scs []balancer.SubConn
+ scs := make([]balancer.SubConn, 0, len(info.ReadySCs))
for sc := range info.ReadySCs {
scs = append(scs, sc)
}
diff --git a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
index 4cc7f9159..f4ea61746 100644
--- a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
+++ b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
@@ -37,14 +37,20 @@ type scStateUpdate struct {
err error
}
+// exitIdle contains no data and is just a signal sent on the updateCh in
+// ccBalancerWrapper to instruct the balancer to exit idle.
+type exitIdle struct{}
+
// ccBalancerWrapper is a wrapper on top of cc for balancers.
// It implements balancer.ClientConn interface.
type ccBalancerWrapper struct {
- cc *ClientConn
- balancerMu sync.Mutex // synchronizes calls to the balancer
- balancer balancer.Balancer
- scBuffer *buffer.Unbounded
- done *grpcsync.Event
+ cc *ClientConn
+ balancerMu sync.Mutex // synchronizes calls to the balancer
+ balancer balancer.Balancer
+ hasExitIdle bool
+ updateCh *buffer.Unbounded
+ closed *grpcsync.Event
+ done *grpcsync.Event
mu sync.Mutex
subConns map[*acBalancerWrapper]struct{}
@@ -53,12 +59,14 @@ type ccBalancerWrapper struct {
func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
ccb := &ccBalancerWrapper{
cc: cc,
- scBuffer: buffer.NewUnbounded(),
+ updateCh: buffer.NewUnbounded(),
+ closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
subConns: make(map[*acBalancerWrapper]struct{}),
}
go ccb.watcher()
ccb.balancer = b.Build(ccb, bopts)
+ _, ccb.hasExitIdle = ccb.balancer.(balancer.ExitIdler)
return ccb
}
@@ -67,35 +75,72 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui
func (ccb *ccBalancerWrapper) watcher() {
for {
select {
- case t := <-ccb.scBuffer.Get():
- ccb.scBuffer.Load()
- if ccb.done.HasFired() {
+ case t := <-ccb.updateCh.Get():
+ ccb.updateCh.Load()
+ if ccb.closed.HasFired() {
break
}
- ccb.balancerMu.Lock()
- su := t.(*scStateUpdate)
- ccb.balancer.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state, ConnectionError: su.err})
- ccb.balancerMu.Unlock()
- case <-ccb.done.Done():
+ switch u := t.(type) {
+ case *scStateUpdate:
+ ccb.balancerMu.Lock()
+ ccb.balancer.UpdateSubConnState(u.sc, balancer.SubConnState{ConnectivityState: u.state, ConnectionError: u.err})
+ ccb.balancerMu.Unlock()
+ case *acBalancerWrapper:
+ ccb.mu.Lock()
+ if ccb.subConns != nil {
+ delete(ccb.subConns, u)
+ ccb.cc.removeAddrConn(u.getAddrConn(), errConnDrain)
+ }
+ ccb.mu.Unlock()
+ case exitIdle:
+ if ccb.cc.GetState() == connectivity.Idle {
+ if ei, ok := ccb.balancer.(balancer.ExitIdler); ok {
+ // We already checked that the balancer implements
+ // ExitIdle before pushing the event to updateCh, but
+ // check conditionally again as defensive programming.
+ ccb.balancerMu.Lock()
+ ei.ExitIdle()
+ ccb.balancerMu.Unlock()
+ }
+ }
+ default:
+ logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", t, t)
+ }
+ case <-ccb.closed.Done():
}
- if ccb.done.HasFired() {
+ if ccb.closed.HasFired() {
+ ccb.balancerMu.Lock()
ccb.balancer.Close()
+ ccb.balancerMu.Unlock()
ccb.mu.Lock()
scs := ccb.subConns
ccb.subConns = nil
ccb.mu.Unlock()
+ ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil})
+ ccb.done.Fire()
+ // Fire done before removing the addr conns. We can safely unblock
+ // ccb.close and allow the removeAddrConns to happen
+ // asynchronously.
for acbw := range scs {
ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
}
- ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil})
return
}
}
}
func (ccb *ccBalancerWrapper) close() {
- ccb.done.Fire()
+ ccb.closed.Fire()
+ <-ccb.done.Done()
+}
+
+func (ccb *ccBalancerWrapper) exitIdle() bool {
+ if !ccb.hasExitIdle {
+ return false
+ }
+ ccb.updateCh.Put(exitIdle{})
+ return true
}
func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
@@ -109,7 +154,7 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co
if sc == nil {
return
}
- ccb.scBuffer.Put(&scStateUpdate{
+ ccb.updateCh.Put(&scStateUpdate{
sc: sc,
state: s,
err: err,
@@ -124,8 +169,8 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat
func (ccb *ccBalancerWrapper) resolverError(err error) {
ccb.balancerMu.Lock()
+ defer ccb.balancerMu.Unlock()
ccb.balancer.ResolverError(err)
- ccb.balancerMu.Unlock()
}
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
@@ -150,17 +195,10 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
}
func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
- acbw, ok := sc.(*acBalancerWrapper)
- if !ok {
- return
- }
- ccb.mu.Lock()
- defer ccb.mu.Unlock()
- if ccb.subConns == nil {
- return
- }
- delete(ccb.subConns, acbw)
- ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
+ // The RemoveSubConn() is handled in the run() goroutine, to avoid deadlock
+ // during switchBalancer() if the old balancer calls RemoveSubConn() in its
+ // Close().
+ ccb.updateCh.Put(sc)
}
func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
@@ -226,17 +264,17 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
return
}
- ac, err := cc.newAddrConn(addrs, opts)
+ newAC, err := cc.newAddrConn(addrs, opts)
if err != nil {
channelz.Warningf(logger, acbw.ac.channelzID, "acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err)
return
}
- acbw.ac = ac
- ac.mu.Lock()
- ac.acbw = acbw
- ac.mu.Unlock()
+ acbw.ac = newAC
+ newAC.mu.Lock()
+ newAC.acbw = acbw
+ newAC.mu.Unlock()
if acState != connectivity.Idle {
- ac.connect()
+ go newAC.connect()
}
}
}
@@ -244,7 +282,7 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
func (acbw *acBalancerWrapper) Connect() {
acbw.mu.Lock()
defer acbw.mu.Unlock()
- acbw.ac.connect()
+ go acbw.ac.connect()
}
func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index 24109264f..34cc4c948 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -322,6 +322,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
// A blocking dial blocks until the clientConn is ready.
if cc.dopts.block {
for {
+ cc.Connect()
s := cc.GetState()
if s == connectivity.Ready {
break
@@ -539,12 +540,31 @@ func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connec
//
// Experimental
//
-// Notice: This API is EXPERIMENTAL and may be changed or removed in a
-// later release.
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
+// release.
func (cc *ClientConn) GetState() connectivity.State {
return cc.csMgr.getState()
}
+// Connect causes all subchannels in the ClientConn to attempt to connect if
+// the channel is idle. Does not wait for the connection attempts to begin
+// before returning.
+//
+// Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
+// release.
+func (cc *ClientConn) Connect() {
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+ if cc.balancerWrapper != nil && cc.balancerWrapper.exitIdle() {
+ return
+ }
+ for ac := range cc.conns {
+ go ac.connect()
+ }
+}
+
func (cc *ClientConn) scWatcher() {
for {
select {
@@ -711,7 +731,12 @@ func (cc *ClientConn) switchBalancer(name string) {
return
}
if cc.balancerWrapper != nil {
+ // Don't hold cc.mu while closing the balancers. The balancers may call
+ // methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex
+ // would cause a deadlock in that case.
+ cc.mu.Unlock()
cc.balancerWrapper.close()
+ cc.mu.Lock()
}
builder := balancer.Get(name)
@@ -840,8 +865,7 @@ func (ac *addrConn) connect() error {
ac.updateConnectivityState(connectivity.Connecting, nil)
ac.mu.Unlock()
- // Start a goroutine connecting to the server asynchronously.
- go ac.resetTransport()
+ ac.resetTransport()
return nil
}
@@ -878,6 +902,10 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
// ac.state is Ready, try to find the connected address.
var curAddrFound bool
for _, a := range addrs {
+ // a.ServerName takes precedent over ClientConn authority, if present.
+ if a.ServerName == "" {
+ a.ServerName = ac.cc.authority
+ }
if reflect.DeepEqual(ac.curAddr, a) {
curAddrFound = true
break
@@ -1046,12 +1074,12 @@ func (cc *ClientConn) Close() error {
cc.blockingpicker.close()
- if rWrapper != nil {
- rWrapper.close()
- }
if bWrapper != nil {
bWrapper.close()
}
+ if rWrapper != nil {
+ rWrapper.close()
+ }
for ac := range conns {
ac.tearDown(ErrClientConnClosing)
@@ -1130,112 +1158,86 @@ func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
}
func (ac *addrConn) resetTransport() {
- for i := 0; ; i++ {
- if i > 0 {
- ac.cc.resolveNow(resolver.ResolveNowOptions{})
- }
+ ac.mu.Lock()
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return
+ }
+ addrs := ac.addrs
+ backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
+ // This will be the duration that dial gets to finish.
+ dialDuration := minConnectTimeout
+ if ac.dopts.minConnectTimeout != nil {
+ dialDuration = ac.dopts.minConnectTimeout()
+ }
+
+ if dialDuration < backoffFor {
+ // Give dial more time as we keep failing to connect.
+ dialDuration = backoffFor
+ }
+ // We can potentially spend all the time trying the first address, and
+ // if the server accepts the connection and then hangs, the following
+ // addresses will never be tried.
+ //
+ // The spec doesn't mention what should be done for multiple addresses.
+ // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
+ connectDeadline := time.Now().Add(dialDuration)
+
+ ac.updateConnectivityState(connectivity.Connecting, nil)
+ ac.mu.Unlock()
+
+ if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil {
+ ac.cc.resolveNow(resolver.ResolveNowOptions{})
+ // After exhausting all addresses, the addrConn enters
+ // TRANSIENT_FAILURE.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
+ ac.updateConnectivityState(connectivity.TransientFailure, err)
- addrs := ac.addrs
- backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
- // This will be the duration that dial gets to finish.
- dialDuration := minConnectTimeout
- if ac.dopts.minConnectTimeout != nil {
- dialDuration = ac.dopts.minConnectTimeout()
- }
-
- if dialDuration < backoffFor {
- // Give dial more time as we keep failing to connect.
- dialDuration = backoffFor
- }
- // We can potentially spend all the time trying the first address, and
- // if the server accepts the connection and then hangs, the following
- // addresses will never be tried.
- //
- // The spec doesn't mention what should be done for multiple addresses.
- // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
- connectDeadline := time.Now().Add(dialDuration)
-
- ac.updateConnectivityState(connectivity.Connecting, nil)
- ac.transport = nil
+ // Backoff.
+ b := ac.resetBackoff
ac.mu.Unlock()
- newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
- if err != nil {
- // After exhausting all addresses, the addrConn enters
- // TRANSIENT_FAILURE.
+ timer := time.NewTimer(backoffFor)
+ select {
+ case <-timer.C:
ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
- return
- }
- ac.updateConnectivityState(connectivity.TransientFailure, err)
-
- // Backoff.
- b := ac.resetBackoff
+ ac.backoffIdx++
ac.mu.Unlock()
-
- timer := time.NewTimer(backoffFor)
- select {
- case <-timer.C:
- ac.mu.Lock()
- ac.backoffIdx++
- ac.mu.Unlock()
- case <-b:
- timer.Stop()
- case <-ac.ctx.Done():
- timer.Stop()
- return
- }
- continue
+ case <-b:
+ timer.Stop()
+ case <-ac.ctx.Done():
+ timer.Stop()
+ return
}
ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
- newTr.Close(fmt.Errorf("reached connectivity state: SHUTDOWN"))
- return
+ if ac.state != connectivity.Shutdown {
+ ac.updateConnectivityState(connectivity.Idle, err)
}
- ac.curAddr = addr
- ac.transport = newTr
- ac.backoffIdx = 0
-
- hctx, hcancel := context.WithCancel(ac.ctx)
- ac.startHealthCheck(hctx)
ac.mu.Unlock()
-
- // Block until the created transport is down. And when this happens,
- // we restart from the top of the addr list.
- <-reconnect.Done()
- hcancel()
- // restart connecting - the top of the loop will set state to
- // CONNECTING. This is against the current connectivity semantics doc,
- // however it allows for graceful behavior for RPCs not yet dispatched
- // - unfortunate timing would otherwise lead to the RPC failing even
- // though the TRANSIENT_FAILURE state (called for by the doc) would be
- // instantaneous.
- //
- // Ideally we should transition to Idle here and block until there is
- // RPC activity that leads to the balancer requesting a reconnect of
- // the associated SubConn.
+ return
}
+ // Success; reset backoff.
+ ac.mu.Lock()
+ ac.backoffIdx = 0
+ ac.mu.Unlock()
}
-// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
-// first successful one. It returns the transport, the address and a Event in
-// the successful case. The Event fires when the returned transport disconnects.
-func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
+// tryAllAddrs tries to creates a connection to the addresses, and stop when at
+// the first successful one. It returns an error if no address was successfully
+// connected, or updates ac appropriately with the new transport.
+func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) error {
var firstConnErr error
for _, addr := range addrs {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
- return nil, resolver.Address{}, nil, errConnClosing
+ return errConnClosing
}
ac.cc.mu.RLock()
@@ -1250,9 +1252,9 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T
channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
- newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
+ err := ac.createTransport(addr, copts, connectDeadline)
if err == nil {
- return newTr, addr, reconnect, nil
+ return nil
}
if firstConnErr == nil {
firstConnErr = err
@@ -1261,57 +1263,54 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T
}
// Couldn't connect to any address.
- return nil, resolver.Address{}, nil, firstConnErr
+ return firstConnErr
}
-// createTransport creates a connection to addr. It returns the transport and a
-// Event in the successful case. The Event fires when the returned transport
-// disconnects.
-func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
- prefaceReceived := make(chan struct{})
- onCloseCalled := make(chan struct{})
- reconnect := grpcsync.NewEvent()
+// createTransport creates a connection to addr. It returns an error if the
+// address was not successfully connected, or updates ac appropriately with the
+// new transport.
+func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
+ // TODO: Delete prefaceReceived and move the logic to wait for it into the
+ // transport.
+ prefaceReceived := grpcsync.NewEvent()
+ connClosed := grpcsync.NewEvent()
// addr.ServerName takes precedent over ClientConn authority, if present.
if addr.ServerName == "" {
addr.ServerName = ac.cc.authority
}
- once := sync.Once{}
- onGoAway := func(r transport.GoAwayReason) {
- ac.mu.Lock()
- ac.adjustParams(r)
- once.Do(func() {
- if ac.state == connectivity.Ready {
- // Prevent this SubConn from being used for new RPCs by setting its
- // state to Connecting.
- //
- // TODO: this should be Idle when grpc-go properly supports it.
- ac.updateConnectivityState(connectivity.Connecting, nil)
- }
- })
- ac.mu.Unlock()
- reconnect.Fire()
- }
+ hctx, hcancel := context.WithCancel(ac.ctx)
+ hcStarted := false // protected by ac.mu
onClose := func() {
ac.mu.Lock()
- once.Do(func() {
- if ac.state == connectivity.Ready {
- // Prevent this SubConn from being used for new RPCs by setting its
- // state to Connecting.
- //
- // TODO: this should be Idle when grpc-go properly supports it.
- ac.updateConnectivityState(connectivity.Connecting, nil)
- }
- })
- ac.mu.Unlock()
- close(onCloseCalled)
- reconnect.Fire()
+ defer ac.mu.Unlock()
+ defer connClosed.Fire()
+ if !hcStarted || hctx.Err() != nil {
+ // We didn't start the health check or set the state to READY, so
+ // no need to do anything else here.
+ //
+ // OR, we have already cancelled the health check context, meaning
+ // we have already called onClose once for this transport. In this
+ // case it would be dangerous to clear the transport and update the
+ // state, since there may be a new transport in this addrConn.
+ return
+ }
+ hcancel()
+ ac.transport = nil
+ // Refresh the name resolver
+ ac.cc.resolveNow(resolver.ResolveNowOptions{})
+ if ac.state != connectivity.Shutdown {
+ ac.updateConnectivityState(connectivity.Idle, nil)
+ }
}
- onPrefaceReceipt := func() {
- close(prefaceReceived)
+ onGoAway := func(r transport.GoAwayReason) {
+ ac.mu.Lock()
+ ac.adjustParams(r)
+ ac.mu.Unlock()
+ onClose()
}
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
@@ -1320,27 +1319,67 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
copts.ChannelzParentID = ac.channelzID
}
- newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onPrefaceReceipt, onGoAway, onClose)
+ newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, func() { prefaceReceived.Fire() }, onGoAway, onClose)
if err != nil {
// newTr is either nil, or closed.
- channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err)
- return nil, nil, err
+ channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v", addr, err)
+ return err
}
select {
- case <-time.After(time.Until(connectDeadline)):
+ case <-connectCtx.Done():
// We didn't get the preface in time.
- newTr.Close(fmt.Errorf("failed to receive server preface within timeout"))
- channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
- return nil, nil, errors.New("timed out waiting for server handshake")
- case <-prefaceReceived:
+ // The error we pass to Close() is immaterial since there are no open
+ // streams at this point, so no trailers with error details will be sent
+ // out. We just need to pass a non-nil error.
+ newTr.Close(transport.ErrConnClosing)
+ if connectCtx.Err() == context.DeadlineExceeded {
+ err := errors.New("failed to receive server preface within timeout")
+ channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: %v", addr, err)
+ return err
+ }
+ return nil
+ case <-prefaceReceived.Done():
// We got the preface - huzzah! things are good.
- case <-onCloseCalled:
- // The transport has already closed - noop.
- return nil, nil, errors.New("connection closed")
- // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
+ ac.mu.Lock()
+ defer ac.mu.Unlock()
+ if connClosed.HasFired() {
+ // onClose called first; go idle but do nothing else.
+ if ac.state != connectivity.Shutdown {
+ ac.updateConnectivityState(connectivity.Idle, nil)
+ }
+ return nil
+ }
+ if ac.state == connectivity.Shutdown {
+ // This can happen if the subConn was removed while in `Connecting`
+ // state. tearDown() would have set the state to `Shutdown`, but
+ // would not have closed the transport since ac.transport would not
+ // been set at that point.
+ //
+ // We run this in a goroutine because newTr.Close() calls onClose()
+ // inline, which requires locking ac.mu.
+ //
+ // The error we pass to Close() is immaterial since there are no open
+ // streams at this point, so no trailers with error details will be sent
+ // out. We just need to pass a non-nil error.
+ go newTr.Close(transport.ErrConnClosing)
+ return nil
+ }
+ ac.curAddr = addr
+ ac.transport = newTr
+ hcStarted = true
+ ac.startHealthCheck(hctx) // Will set state to READY if appropriate.
+ return nil
+ case <-connClosed.Done():
+ // The transport has already closed. If we received the preface, too,
+ // this is not an error.
+ select {
+ case <-prefaceReceived.Done():
+ return nil
+ default:
+ return errors.New("connection closed before server preface received")
+ }
}
- return newTr, reconnect, nil
}
// startHealthCheck starts the health checking stream (RPC) to watch the health
@@ -1424,26 +1463,14 @@ func (ac *addrConn) resetConnectBackoff() {
ac.mu.Unlock()
}
-// getReadyTransport returns the transport if ac's state is READY.
-// Otherwise it returns nil, false.
-// If ac's state is IDLE, it will trigger ac to connect.
-func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
+// getReadyTransport returns the transport if ac's state is READY or nil if not.
+func (ac *addrConn) getReadyTransport() transport.ClientTransport {
ac.mu.Lock()
- if ac.state == connectivity.Ready && ac.transport != nil {
- t := ac.transport
- ac.mu.Unlock()
- return t, true
- }
- var idle bool
- if ac.state == connectivity.Idle {
- idle = true
- }
- ac.mu.Unlock()
- // Trigger idle ac to connect.
- if idle {
- ac.connect()
+ defer ac.mu.Unlock()
+ if ac.state == connectivity.Ready {
+ return ac.transport
}
- return nil, false
+ return nil
}
// tearDown starts to tear down the addrConn.
diff --git a/vendor/google.golang.org/grpc/connectivity/connectivity.go b/vendor/google.golang.org/grpc/connectivity/connectivity.go
index 010156261..4a8992642 100644
--- a/vendor/google.golang.org/grpc/connectivity/connectivity.go
+++ b/vendor/google.golang.org/grpc/connectivity/connectivity.go
@@ -18,7 +18,6 @@
// Package connectivity defines connectivity semantics.
// For details, see https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md.
-// All APIs in this package are experimental.
package connectivity
import (
@@ -45,7 +44,7 @@ func (s State) String() string {
return "SHUTDOWN"
default:
logger.Errorf("unknown connectivity state: %d", s)
- return "Invalid-State"
+ return "INVALID_STATE"
}
}
@@ -61,3 +60,35 @@ const (
// Shutdown indicates the ClientConn has started shutting down.
Shutdown
)
+
+// ServingMode indicates the current mode of operation of the server.
+//
+// Only xDS enabled gRPC servers currently report their serving mode.
+type ServingMode int
+
+const (
+ // ServingModeStarting indicates that the server is starting up.
+ ServingModeStarting ServingMode = iota
+ // ServingModeServing indicates that the server contains all required
+ // configuration and is serving RPCs.
+ ServingModeServing
+ // ServingModeNotServing indicates that the server is not accepting new
+ // connections. Existing connections will be closed gracefully, allowing
+ // in-progress RPCs to complete. A server enters this mode when it does not
+ // contain the required configuration to serve RPCs.
+ ServingModeNotServing
+)
+
+func (s ServingMode) String() string {
+ switch s {
+ case ServingModeStarting:
+ return "STARTING"
+ case ServingModeServing:
+ return "SERVING"
+ case ServingModeNotServing:
+ return "NOT_SERVING"
+ default:
+ logger.Errorf("unknown serving mode: %d", s)
+ return "INVALID_MODE"
+ }
+}
diff --git a/vendor/google.golang.org/grpc/credentials/go12.go b/vendor/google.golang.org/grpc/credentials/go12.go
deleted file mode 100644
index ccbf35b33..000000000
--- a/vendor/google.golang.org/grpc/credentials/go12.go
+++ /dev/null
@@ -1,30 +0,0 @@
-// +build go1.12
-
-/*
- *
- * Copyright 2019 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package credentials
-
-import "crypto/tls"
-
-// This init function adds cipher suite constants only defined in Go 1.12.
-func init() {
- cipherSuiteLookup[tls.TLS_AES_128_GCM_SHA256] = "TLS_AES_128_GCM_SHA256"
- cipherSuiteLookup[tls.TLS_AES_256_GCM_SHA384] = "TLS_AES_256_GCM_SHA384"
- cipherSuiteLookup[tls.TLS_CHACHA20_POLY1305_SHA256] = "TLS_CHACHA20_POLY1305_SHA256"
-}
diff --git a/vendor/google.golang.org/grpc/credentials/tls.go b/vendor/google.golang.org/grpc/credentials/tls.go
index 8ee7124f2..784822d05 100644
--- a/vendor/google.golang.org/grpc/credentials/tls.go
+++ b/vendor/google.golang.org/grpc/credentials/tls.go
@@ -230,4 +230,7 @@ var cipherSuiteLookup = map[uint16]string{
tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256: "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256",
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305: "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305",
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305: "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305",
+ tls.TLS_AES_128_GCM_SHA256: "TLS_AES_128_GCM_SHA256",
+ tls.TLS_AES_256_GCM_SHA384: "TLS_AES_256_GCM_SHA384",
+ tls.TLS_CHACHA20_POLY1305_SHA256: "TLS_CHACHA20_POLY1305_SHA256",
}
diff --git a/vendor/google.golang.org/grpc/go.mod b/vendor/google.golang.org/grpc/go.mod
index b177cfa66..022cc9828 100644
--- a/vendor/google.golang.org/grpc/go.mod
+++ b/vendor/google.golang.org/grpc/go.mod
@@ -1,17 +1,18 @@
module google.golang.org/grpc
-go 1.11
+go 1.14
require (
+ github.com/cespare/xxhash/v2 v2.1.1
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403
- github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d
+ github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
- github.com/golang/protobuf v1.4.2
+ github.com/golang/protobuf v1.4.3
github.com/google/go-cmp v0.5.0
github.com/google/uuid v1.1.2
- golang.org/x/net v0.0.0-20190311183353-d8887717615a
- golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be
- golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a
+ golang.org/x/net v0.0.0-20200822124328-c89045814202
+ golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
+ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013
google.golang.org/protobuf v1.25.0
)
diff --git a/vendor/google.golang.org/grpc/go.sum b/vendor/google.golang.org/grpc/go.sum
index 24d2976ab..6e7ae0db2 100644
--- a/vendor/google.golang.org/grpc/go.sum
+++ b/vendor/google.golang.org/grpc/go.sum
@@ -1,32 +1,44 @@
-cloud.google.com/go v0.26.0 h1:e0WKqKTd5BnrG8aKH3J3h+QvEIQtSUcf2n5UZ5ZgLtQ=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+cloud.google.com/go v0.34.0 h1:eOI3/cP2VTU6uZLDYAoic+eyzzB9YyGmJ7eIjl8rOPg=
+cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
+github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
+github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
+github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403 h1:cqQfy1jclcSy/FwLjemeg3SR1yaINm74aQyupQ0Bl8M=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
+github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158 h1:CevA8fI91PAnP8vpnXuB8ZYAZ5wqY86nAbxfgK8tWO4=
+github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
-github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d h1:QyzYnTnPE15SQyUeqU6qLbWxMkwyAyu+vGksa0b7j00=
-github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
+github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
+github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
+github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021 h1:fP+fF0up6oPY49OrjPrhIJ8yQfdIM85NXMLkMg1EXVs=
+github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
+github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
-github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM=
+github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
@@ -35,49 +47,65 @@ github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
-github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
+github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
-golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
-golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20200822124328-c89045814202 h1:VvcQYSHwXgi7W+TpUR6A9g6Up98WAHf3f/ulnJ62IyA=
+golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
+golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d h1:TzXSXBo42m9gQenoE3b9BGiEpg5IG2JkU5FkPIawgtw=
+golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
-golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
-golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
+google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
+google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
+google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@@ -90,7 +118,9 @@ google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
-gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
diff --git a/vendor/google.golang.org/grpc/install_gae.sh b/vendor/google.golang.org/grpc/install_gae.sh
deleted file mode 100644
index 15ff9facd..000000000
--- a/vendor/google.golang.org/grpc/install_gae.sh
+++ /dev/null
@@ -1,6 +0,0 @@
-#!/bin/bash
-
-TMP=$(mktemp -d /tmp/sdk.XXX) \
-&& curl -o $TMP.zip "https://storage.googleapis.com/appengine-sdks/featured/go_appengine_sdk_linux_amd64-1.9.68.zip" \
-&& unzip -q $TMP.zip -d $TMP \
-&& export PATH="$PATH:$TMP/go_appengine" \ No newline at end of file
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/sink.go b/vendor/google.golang.org/grpc/internal/binarylog/sink.go
index 7d7a3056b..c2fdd58b3 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/sink.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/sink.go
@@ -69,7 +69,8 @@ type writerSink struct {
func (ws *writerSink) Write(e *pb.GrpcLogEntry) error {
b, err := proto.Marshal(e)
if err != nil {
- grpclogLogger.Infof("binary logging: failed to marshal proto message: %v", err)
+ grpclogLogger.Errorf("binary logging: failed to marshal proto message: %v", err)
+ return err
}
hdr := make([]byte, 4)
binary.BigEndian.PutUint32(hdr, uint32(len(b)))
@@ -85,24 +86,27 @@ func (ws *writerSink) Write(e *pb.GrpcLogEntry) error {
func (ws *writerSink) Close() error { return nil }
type bufferedSink struct {
- mu sync.Mutex
- closer io.Closer
- out Sink // out is built on buf.
- buf *bufio.Writer // buf is kept for flush.
-
- writeStartOnce sync.Once
- writeTicker *time.Ticker
+ mu sync.Mutex
+ closer io.Closer
+ out Sink // out is built on buf.
+ buf *bufio.Writer // buf is kept for flush.
+ flusherStarted bool
+
+ writeTicker *time.Ticker
+ done chan struct{}
}
func (fs *bufferedSink) Write(e *pb.GrpcLogEntry) error {
- // Start the write loop when Write is called.
- fs.writeStartOnce.Do(fs.startFlushGoroutine)
fs.mu.Lock()
+ defer fs.mu.Unlock()
+ if !fs.flusherStarted {
+ // Start the write loop when Write is called.
+ fs.startFlushGoroutine()
+ fs.flusherStarted = true
+ }
if err := fs.out.Write(e); err != nil {
- fs.mu.Unlock()
return err
}
- fs.mu.Unlock()
return nil
}
@@ -113,7 +117,12 @@ const (
func (fs *bufferedSink) startFlushGoroutine() {
fs.writeTicker = time.NewTicker(bufFlushDuration)
go func() {
- for range fs.writeTicker.C {
+ for {
+ select {
+ case <-fs.done:
+ return
+ case <-fs.writeTicker.C:
+ }
fs.mu.Lock()
if err := fs.buf.Flush(); err != nil {
grpclogLogger.Warningf("failed to flush to Sink: %v", err)
@@ -124,10 +133,12 @@ func (fs *bufferedSink) startFlushGoroutine() {
}
func (fs *bufferedSink) Close() error {
+ fs.mu.Lock()
+ defer fs.mu.Unlock()
if fs.writeTicker != nil {
fs.writeTicker.Stop()
}
- fs.mu.Lock()
+ close(fs.done)
if err := fs.buf.Flush(); err != nil {
grpclogLogger.Warningf("failed to flush to Sink: %v", err)
}
@@ -137,7 +148,6 @@ func (fs *bufferedSink) Close() error {
if err := fs.out.Close(); err != nil {
grpclogLogger.Warningf("failed to close the Sink: %v", err)
}
- fs.mu.Unlock()
return nil
}
@@ -155,5 +165,6 @@ func NewBufferedSink(o io.WriteCloser) Sink {
closer: o,
out: newWriterSink(bufW),
buf: bufW,
+ done: make(chan struct{}),
}
}
diff --git a/vendor/google.golang.org/grpc/internal/channelz/funcs.go b/vendor/google.golang.org/grpc/internal/channelz/funcs.go
index f73141393..6d5760d95 100644
--- a/vendor/google.golang.org/grpc/internal/channelz/funcs.go
+++ b/vendor/google.golang.org/grpc/internal/channelz/funcs.go
@@ -630,7 +630,7 @@ func (c *channelMap) GetServerSockets(id int64, startID int64, maxResults int64)
if count == 0 {
end = true
}
- var s []*SocketMetric
+ s := make([]*SocketMetric, 0, len(sks))
for _, ns := range sks {
sm := &SocketMetric{}
sm.SocketData = ns.s.ChannelzMetric()
diff --git a/vendor/google.golang.org/grpc/internal/channelz/types_linux.go b/vendor/google.golang.org/grpc/internal/channelz/types_linux.go
index 692dd6181..1b1c4cce3 100644
--- a/vendor/google.golang.org/grpc/internal/channelz/types_linux.go
+++ b/vendor/google.golang.org/grpc/internal/channelz/types_linux.go
@@ -1,5 +1,3 @@
-// +build !appengine
-
/*
*
* Copyright 2018 gRPC authors.
diff --git a/vendor/google.golang.org/grpc/internal/channelz/types_nonlinux.go b/vendor/google.golang.org/grpc/internal/channelz/types_nonlinux.go
index 19c2fc521..8b06eed1a 100644
--- a/vendor/google.golang.org/grpc/internal/channelz/types_nonlinux.go
+++ b/vendor/google.golang.org/grpc/internal/channelz/types_nonlinux.go
@@ -1,4 +1,5 @@
-// +build !linux appengine
+//go:build !linux
+// +build !linux
/*
*
@@ -37,6 +38,6 @@ type SocketOptionData struct {
// Windows OS doesn't support Socket Option
func (s *SocketOptionData) Getsockopt(fd uintptr) {
once.Do(func() {
- logger.Warning("Channelz: socket options are not supported on non-linux os and appengine.")
+ logger.Warning("Channelz: socket options are not supported on non-linux environments")
})
}
diff --git a/vendor/google.golang.org/grpc/internal/channelz/util_linux.go b/vendor/google.golang.org/grpc/internal/channelz/util_linux.go
index fdf409d55..8d194e44e 100644
--- a/vendor/google.golang.org/grpc/internal/channelz/util_linux.go
+++ b/vendor/google.golang.org/grpc/internal/channelz/util_linux.go
@@ -1,5 +1,3 @@
-// +build linux,!appengine
-
/*
*
* Copyright 2018 gRPC authors.
diff --git a/vendor/google.golang.org/grpc/internal/channelz/util_nonlinux.go b/vendor/google.golang.org/grpc/internal/channelz/util_nonlinux.go
index 8864a0811..837ddc402 100644
--- a/vendor/google.golang.org/grpc/internal/channelz/util_nonlinux.go
+++ b/vendor/google.golang.org/grpc/internal/channelz/util_nonlinux.go
@@ -1,4 +1,5 @@
-// +build !linux appengine
+//go:build !linux
+// +build !linux
/*
*
diff --git a/vendor/google.golang.org/grpc/internal/credentials/spiffe.go b/vendor/google.golang.org/grpc/internal/credentials/spiffe.go
index be70b6cdf..25ade6230 100644
--- a/vendor/google.golang.org/grpc/internal/credentials/spiffe.go
+++ b/vendor/google.golang.org/grpc/internal/credentials/spiffe.go
@@ -1,5 +1,3 @@
-// +build !appengine
-
/*
*
* Copyright 2020 gRPC authors.
diff --git a/vendor/google.golang.org/grpc/internal/credentials/spiffe_appengine.go b/vendor/google.golang.org/grpc/internal/credentials/spiffe_appengine.go
deleted file mode 100644
index af6f57719..000000000
--- a/vendor/google.golang.org/grpc/internal/credentials/spiffe_appengine.go
+++ /dev/null
@@ -1,31 +0,0 @@
-// +build appengine
-
-/*
- *
- * Copyright 2020 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package credentials
-
-import (
- "crypto/tls"
- "net/url"
-)
-
-// SPIFFEIDFromState is a no-op for appengine builds.
-func SPIFFEIDFromState(state tls.ConnectionState) *url.URL {
- return nil
-}
diff --git a/vendor/google.golang.org/grpc/internal/credentials/syscallconn.go b/vendor/google.golang.org/grpc/internal/credentials/syscallconn.go
index f499a614c..2919632d6 100644
--- a/vendor/google.golang.org/grpc/internal/credentials/syscallconn.go
+++ b/vendor/google.golang.org/grpc/internal/credentials/syscallconn.go
@@ -1,5 +1,3 @@
-// +build !appengine
-
/*
*
* Copyright 2018 gRPC authors.
diff --git a/vendor/google.golang.org/grpc/internal/credentials/syscallconn_appengine.go b/vendor/google.golang.org/grpc/internal/credentials/syscallconn_appengine.go
deleted file mode 100644
index a6144cd66..000000000
--- a/vendor/google.golang.org/grpc/internal/credentials/syscallconn_appengine.go
+++ /dev/null
@@ -1,30 +0,0 @@
-// +build appengine
-
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package credentials
-
-import (
- "net"
-)
-
-// WrapSyscallConn returns newConn on appengine.
-func WrapSyscallConn(rawConn, newConn net.Conn) net.Conn {
- return newConn
-}
diff --git a/vendor/google.golang.org/grpc/internal/credentials/util.go b/vendor/google.golang.org/grpc/internal/credentials/util.go
index 55664fa46..f792fd22c 100644
--- a/vendor/google.golang.org/grpc/internal/credentials/util.go
+++ b/vendor/google.golang.org/grpc/internal/credentials/util.go
@@ -18,7 +18,9 @@
package credentials
-import "crypto/tls"
+import (
+ "crypto/tls"
+)
const alpnProtoStrH2 = "h2"
diff --git a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
index 73931a94b..e766ac04a 100644
--- a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
+++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
@@ -22,6 +22,8 @@ package envconfig
import (
"os"
"strings"
+
+ xdsenv "google.golang.org/grpc/internal/xds/env"
)
const (
@@ -31,8 +33,8 @@ const (
)
var (
- // Retry is set if retry is explicitly enabled via "GRPC_GO_RETRY=on".
- Retry = strings.EqualFold(os.Getenv(retryStr), "on")
+ // Retry is set if retry is explicitly enabled via "GRPC_GO_RETRY=on" or if XDS retry support is enabled.
+ Retry = strings.EqualFold(os.Getenv(retryStr), "on") || xdsenv.RetrySupport
// TXTErrIgnore is set if TXT errors should be ignored ("GRPC_GO_IGNORE_TXT_ERRORS" is not "false").
TXTErrIgnore = !strings.EqualFold(os.Getenv(txtErrIgnoreStr), "false")
)
diff --git a/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go b/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go
index 200b115ca..740f83c2b 100644
--- a/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go
+++ b/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go
@@ -31,26 +31,37 @@ var (
mu sync.Mutex
)
+// Int implements rand.Int on the grpcrand global source.
+func Int() int {
+ mu.Lock()
+ defer mu.Unlock()
+ return r.Int()
+}
+
// Int63n implements rand.Int63n on the grpcrand global source.
func Int63n(n int64) int64 {
mu.Lock()
- res := r.Int63n(n)
- mu.Unlock()
- return res
+ defer mu.Unlock()
+ return r.Int63n(n)
}
// Intn implements rand.Intn on the grpcrand global source.
func Intn(n int) int {
mu.Lock()
- res := r.Intn(n)
- mu.Unlock()
- return res
+ defer mu.Unlock()
+ return r.Intn(n)
}
// Float64 implements rand.Float64 on the grpcrand global source.
func Float64() float64 {
mu.Lock()
- res := r.Float64()
- mu.Unlock()
- return res
+ defer mu.Unlock()
+ return r.Float64()
+}
+
+// Uint64 implements rand.Uint64 on the grpcrand global source.
+func Uint64() uint64 {
+ mu.Lock()
+ defer mu.Unlock()
+ return r.Uint64()
}
diff --git a/vendor/google.golang.org/grpc/internal/resolver/config_selector.go b/vendor/google.golang.org/grpc/internal/resolver/config_selector.go
index 5e7f36703..be7e13d58 100644
--- a/vendor/google.golang.org/grpc/internal/resolver/config_selector.go
+++ b/vendor/google.golang.org/grpc/internal/resolver/config_selector.go
@@ -117,9 +117,12 @@ type ClientInterceptor interface {
NewStream(ctx context.Context, ri RPCInfo, done func(), newStream func(ctx context.Context, done func()) (ClientStream, error)) (ClientStream, error)
}
-// ServerInterceptor is unimplementable; do not use.
+// ServerInterceptor is an interceptor for incoming RPC's on gRPC server side.
type ServerInterceptor interface {
- notDefined()
+ // AllowRPC checks if an incoming RPC is allowed to proceed based on
+ // information about connection RPC was received on, and HTTP Headers. This
+ // information will be piped into context.
+ AllowRPC(ctx context.Context) error // TODO: Make this a real interceptor for filters such as rate limiting.
}
type csKeyType string
diff --git a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
index 03825bbe7..75301c514 100644
--- a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
+++ b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
@@ -277,18 +277,13 @@ func (d *dnsResolver) lookupSRV() ([]resolver.Address, error) {
return newAddrs, nil
}
-var filterError = func(err error) error {
+func handleDNSError(err error, lookupType string) error {
if dnsErr, ok := err.(*net.DNSError); ok && !dnsErr.IsTimeout && !dnsErr.IsTemporary {
// Timeouts and temporary errors should be communicated to gRPC to
// attempt another DNS query (with backoff). Other errors should be
// suppressed (they may represent the absence of a TXT record).
return nil
}
- return err
-}
-
-func handleDNSError(err error, lookupType string) error {
- err = filterError(err)
if err != nil {
err = fmt.Errorf("dns: %v record lookup error: %v", lookupType, err)
logger.Info(err)
@@ -323,12 +318,12 @@ func (d *dnsResolver) lookupTXT() *serviceconfig.ParseResult {
}
func (d *dnsResolver) lookupHost() ([]resolver.Address, error) {
- var newAddrs []resolver.Address
addrs, err := d.resolver.LookupHost(d.ctx, d.host)
if err != nil {
err = handleDNSError(err, "A")
return nil, err
}
+ newAddrs := make([]resolver.Address, 0, len(addrs))
for _, a := range addrs {
ip, ok := formatIP(a)
if !ok {
diff --git a/vendor/google.golang.org/grpc/internal/resolver/dns/go113.go b/vendor/google.golang.org/grpc/internal/resolver/dns/go113.go
deleted file mode 100644
index 8783a8cf8..000000000
--- a/vendor/google.golang.org/grpc/internal/resolver/dns/go113.go
+++ /dev/null
@@ -1,33 +0,0 @@
-// +build go1.13
-
-/*
- *
- * Copyright 2019 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package dns
-
-import "net"
-
-func init() {
- filterError = func(err error) error {
- if dnsErr, ok := err.(*net.DNSError); ok && dnsErr.IsNotFound {
- // The name does not exist; not an error.
- return nil
- }
- return err
- }
-}
diff --git a/vendor/google.golang.org/grpc/internal/serviceconfig/serviceconfig.go b/vendor/google.golang.org/grpc/internal/serviceconfig/serviceconfig.go
index c0634d152..badbdbf59 100644
--- a/vendor/google.golang.org/grpc/internal/serviceconfig/serviceconfig.go
+++ b/vendor/google.golang.org/grpc/internal/serviceconfig/serviceconfig.go
@@ -78,6 +78,7 @@ func (bc *BalancerConfig) UnmarshalJSON(b []byte) error {
return err
}
+ var names []string
for i, lbcfg := range ir {
if len(lbcfg) != 1 {
return fmt.Errorf("invalid loadBalancingConfig: entry %v does not contain exactly 1 policy/config pair: %q", i, lbcfg)
@@ -92,6 +93,7 @@ func (bc *BalancerConfig) UnmarshalJSON(b []byte) error {
for name, jsonCfg = range lbcfg {
}
+ names = append(names, name)
builder := balancer.Get(name)
if builder == nil {
// If the balancer is not registered, move on to the next config.
@@ -120,7 +122,7 @@ func (bc *BalancerConfig) UnmarshalJSON(b []byte) error {
// return. This means we had a loadBalancingConfig slice but did not
// encounter a registered policy. The config is considered invalid in this
// case.
- return fmt.Errorf("invalid loadBalancingConfig: no supported policies found")
+ return fmt.Errorf("invalid loadBalancingConfig: no supported policies found in %v", names)
}
// MethodConfig defines the configuration recommended by the service providers for a
diff --git a/vendor/google.golang.org/grpc/internal/status/status.go b/vendor/google.golang.org/grpc/internal/status/status.go
index 710223b8d..e5c6513ed 100644
--- a/vendor/google.golang.org/grpc/internal/status/status.go
+++ b/vendor/google.golang.org/grpc/internal/status/status.go
@@ -97,7 +97,7 @@ func (s *Status) Err() error {
if s.Code() == codes.OK {
return nil
}
- return &Error{e: s.Proto()}
+ return &Error{s: s}
}
// WithDetails returns a new status with the provided details messages appended to the status.
@@ -136,19 +136,23 @@ func (s *Status) Details() []interface{} {
return details
}
+func (s *Status) String() string {
+ return fmt.Sprintf("rpc error: code = %s desc = %s", s.Code(), s.Message())
+}
+
// Error wraps a pointer of a status proto. It implements error and Status,
// and a nil *Error should never be returned by this package.
type Error struct {
- e *spb.Status
+ s *Status
}
func (e *Error) Error() string {
- return fmt.Sprintf("rpc error: code = %s desc = %s", codes.Code(e.e.GetCode()), e.e.GetMessage())
+ return e.s.String()
}
// GRPCStatus returns the Status represented by se.
func (e *Error) GRPCStatus() *Status {
- return FromProto(e.e)
+ return e.s
}
// Is implements future error.Is functionality.
@@ -158,5 +162,5 @@ func (e *Error) Is(target error) bool {
if !ok {
return false
}
- return proto.Equal(e.e, tse.e)
+ return proto.Equal(e.s.s, tse.s.s)
}
diff --git a/vendor/google.golang.org/grpc/internal/syscall/syscall_linux.go b/vendor/google.golang.org/grpc/internal/syscall/syscall_linux.go
index 4b2964f2a..b3a72276d 100644
--- a/vendor/google.golang.org/grpc/internal/syscall/syscall_linux.go
+++ b/vendor/google.golang.org/grpc/internal/syscall/syscall_linux.go
@@ -1,5 +1,3 @@
-// +build !appengine
-
/*
*
* Copyright 2018 gRPC authors.
diff --git a/vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go b/vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go
index 7913ef1db..999f52cd7 100644
--- a/vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go
+++ b/vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go
@@ -1,4 +1,5 @@
-// +build !linux appengine
+//go:build !linux
+// +build !linux
/*
*
@@ -35,41 +36,41 @@ var logger = grpclog.Component("core")
func log() {
once.Do(func() {
- logger.Info("CPU time info is unavailable on non-linux or appengine environment.")
+ logger.Info("CPU time info is unavailable on non-linux environments.")
})
}
-// GetCPUTime returns the how much CPU time has passed since the start of this process.
-// It always returns 0 under non-linux or appengine environment.
+// GetCPUTime returns the how much CPU time has passed since the start of this
+// process. It always returns 0 under non-linux environments.
func GetCPUTime() int64 {
log()
return 0
}
-// Rusage is an empty struct under non-linux or appengine environment.
+// Rusage is an empty struct under non-linux environments.
type Rusage struct{}
-// GetRusage is a no-op function under non-linux or appengine environment.
+// GetRusage is a no-op function under non-linux environments.
func GetRusage() *Rusage {
log()
return nil
}
// CPUTimeDiff returns the differences of user CPU time and system CPU time used
-// between two Rusage structs. It a no-op function for non-linux or appengine environment.
+// between two Rusage structs. It a no-op function for non-linux environments.
func CPUTimeDiff(first *Rusage, latest *Rusage) (float64, float64) {
log()
return 0, 0
}
-// SetTCPUserTimeout is a no-op function under non-linux or appengine environments
+// SetTCPUserTimeout is a no-op function under non-linux environments.
func SetTCPUserTimeout(conn net.Conn, timeout time.Duration) error {
log()
return nil
}
-// GetTCPUserTimeout is a no-op function under non-linux or appengine environments
-// a negative return value indicates the operation is not supported
+// GetTCPUserTimeout is a no-op function under non-linux environments.
+// A negative return value indicates the operation is not supported
func GetTCPUserTimeout(conn net.Conn) (int, error) {
log()
return -1, nil
diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
index f63a01376..45532f8ae 100644
--- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
+++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
@@ -296,7 +296,7 @@ type controlBuffer struct {
// closed and nilled when transportResponseFrames drops below the
// threshold. Both fields are protected by mu.
transportResponseFrames int
- trfChan atomic.Value // *chan struct{}
+ trfChan atomic.Value // chan struct{}
}
func newControlBuffer(done <-chan struct{}) *controlBuffer {
@@ -310,10 +310,10 @@ func newControlBuffer(done <-chan struct{}) *controlBuffer {
// throttle blocks if there are too many incomingSettings/cleanupStreams in the
// controlbuf.
func (c *controlBuffer) throttle() {
- ch, _ := c.trfChan.Load().(*chan struct{})
+ ch, _ := c.trfChan.Load().(chan struct{})
if ch != nil {
select {
- case <-*ch:
+ case <-ch:
case <-c.done:
}
}
@@ -347,8 +347,7 @@ func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (b
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
// We are adding the frame that puts us over the threshold; create
// a throttling channel.
- ch := make(chan struct{})
- c.trfChan.Store(&ch)
+ c.trfChan.Store(make(chan struct{}))
}
}
c.mu.Unlock()
@@ -389,9 +388,9 @@ func (c *controlBuffer) get(block bool) (interface{}, error) {
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
// We are removing the frame that put us over the
// threshold; close and clear the throttling channel.
- ch := c.trfChan.Load().(*chan struct{})
- close(*ch)
- c.trfChan.Store((*chan struct{})(nil))
+ ch := c.trfChan.Load().(chan struct{})
+ close(ch)
+ c.trfChan.Store((chan struct{})(nil))
}
c.transportResponseFrames--
}
@@ -407,7 +406,6 @@ func (c *controlBuffer) get(block bool) (interface{}, error) {
select {
case <-c.ch:
case <-c.done:
- c.finish()
return nil, ErrConnClosing
}
}
@@ -432,6 +430,14 @@ func (c *controlBuffer) finish() {
hdr.onOrphaned(ErrConnClosing)
}
}
+ // In case throttle() is currently in flight, it needs to be unblocked.
+ // Otherwise, the transport may not close, since the transport is closed by
+ // the reader encountering the connection error.
+ ch, _ := c.trfChan.Load().(chan struct{})
+ if ch != nil {
+ close(ch)
+ }
+ c.trfChan.Store((chan struct{})(nil))
c.mu.Unlock()
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
index 05d3871e6..1c3459c2b 100644
--- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
@@ -141,9 +141,8 @@ type serverHandlerTransport struct {
stats stats.Handler
}
-func (ht *serverHandlerTransport) Close() error {
+func (ht *serverHandlerTransport) Close() {
ht.closeOnce.Do(ht.closeCloseChanOnce)
- return nil
}
func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) }
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index 48c5e52ed..755863074 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -24,6 +24,7 @@ import (
"io"
"math"
"net"
+ "net/http"
"strconv"
"strings"
"sync"
@@ -241,7 +242,15 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
// and passed to the credential handshaker. This makes it possible for
// address specific arbitrary data to reach the credential handshaker.
connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
- conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, conn)
+ rawConn := conn
+ // Pull the deadline from the connectCtx, which will be used for
+ // timeouts in the authentication protocol handshake. Can ignore the
+ // boolean as the deadline will return the zero value, which will make
+ // the conn not timeout on I/O operations.
+ deadline, _ := connectCtx.Deadline()
+ rawConn.SetDeadline(deadline)
+ conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, rawConn)
+ rawConn.SetDeadline(time.Time{})
if err != nil {
return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
}
@@ -399,11 +408,10 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
}
}
- // If it's a connection error, let reader goroutine handle it
- // since there might be data in the buffers.
- if _, ok := err.(net.Error); !ok {
- t.conn.Close()
- }
+ // Do not close the transport. Let reader goroutine handle it since
+ // there might be data in the buffers.
+ t.conn.Close()
+ t.controlBuf.finish()
close(t.writerDone)
}()
return t, nil
@@ -608,26 +616,35 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
return callAuthData, nil
}
-// PerformedIOError wraps an error to indicate IO may have been performed
-// before the error occurred.
-type PerformedIOError struct {
+// NewStreamError wraps an error and reports additional information. Typically
+// NewStream errors result in transparent retry, as they mean nothing went onto
+// the wire. However, there are two notable exceptions:
+//
+// 1. If the stream headers violate the max header list size allowed by the
+// server. In this case there is no reason to retry at all, as it is
+// assumed the RPC would continue to fail on subsequent attempts.
+// 2. If the credentials errored when requesting their headers. In this case,
+// it's possible a retry can fix the problem, but indefinitely transparently
+// retrying is not appropriate as it is likely the credentials, if they can
+// eventually succeed, would need I/O to do so.
+type NewStreamError struct {
Err error
+
+ DoNotRetry bool
+ DoNotTransparentRetry bool
}
-// Error implements error.
-func (p PerformedIOError) Error() string {
- return p.Err.Error()
+func (e NewStreamError) Error() string {
+ return e.Err.Error()
}
// NewStream creates a stream and registers it into the transport as "active"
-// streams.
+// streams. All non-nil errors returned will be *NewStreamError.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
ctx = peer.NewContext(ctx, t.getPeer())
headerFields, err := t.createHeaderFields(ctx, callHdr)
if err != nil {
- // We may have performed I/O in the per-RPC creds callback, so do not
- // allow transparent retry.
- return nil, PerformedIOError{err}
+ return nil, &NewStreamError{Err: err, DoNotTransparentRetry: true}
}
s := t.newStream(ctx, callHdr)
cleanup := func(err error) {
@@ -727,23 +744,23 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return true
}, hdr)
if err != nil {
- return nil, err
+ return nil, &NewStreamError{Err: err}
}
if success {
break
}
if hdrListSizeErr != nil {
- return nil, hdrListSizeErr
+ return nil, &NewStreamError{Err: hdrListSizeErr, DoNotRetry: true}
}
firstTry = false
select {
case <-ch:
- case <-s.ctx.Done():
- return nil, ContextErr(s.ctx.Err())
+ case <-ctx.Done():
+ return nil, &NewStreamError{Err: ContextErr(ctx.Err())}
case <-t.goAway:
- return nil, errStreamDrain
+ return nil, &NewStreamError{Err: errStreamDrain}
case <-t.ctx.Done():
- return nil, ErrConnClosing
+ return nil, &NewStreamError{Err: ErrConnClosing}
}
}
if t.statsHandler != nil {
@@ -878,12 +895,18 @@ func (t *http2Client) Close(err error) {
// Append info about previous goaways if there were any, since this may be important
// for understanding the root cause for this connection to be closed.
_, goAwayDebugMessage := t.GetGoAwayReason()
+
+ var st *status.Status
if len(goAwayDebugMessage) > 0 {
- err = fmt.Errorf("closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage)
+ st = status.Newf(codes.Unavailable, "closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage)
+ err = st.Err()
+ } else {
+ st = status.New(codes.Unavailable, err.Error())
}
+
// Notify all active streams.
for _, s := range streams {
- t.closeStream(s, err, false, http2.ErrCodeNo, status.New(codes.Unavailable, err.Error()), nil, false)
+ t.closeStream(s, err, false, http2.ErrCodeNo, st, nil, false)
}
if t.statsHandler != nil {
connEnd := &stats.ConnEnd{
@@ -1221,7 +1244,11 @@ func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
t.goAwayReason = GoAwayTooManyPings
}
}
- t.goAwayDebugMessage = fmt.Sprintf("code: %s, debug data: %v", f.ErrCode, string(f.DebugData()))
+ if len(f.DebugData()) == 0 {
+ t.goAwayDebugMessage = fmt.Sprintf("code: %s", f.ErrCode)
+ } else {
+ t.goAwayDebugMessage = fmt.Sprintf("code: %s, debug data: %q", f.ErrCode, string(f.DebugData()))
+ }
}
func (t *http2Client) GetGoAwayReason() (GoAwayReason, string) {
@@ -1254,11 +1281,124 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
return
}
- state := &decodeState{}
- // Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
- state.data.isGRPC = !initialHeader
- if h2code, err := state.decodeHeader(frame); err != nil {
- t.closeStream(s, err, true, h2code, status.Convert(err), nil, endStream)
+ // frame.Truncated is set to true when framer detects that the current header
+ // list size hits MaxHeaderListSize limit.
+ if frame.Truncated {
+ se := status.New(codes.Internal, "peer header list size exceeded limit")
+ t.closeStream(s, se.Err(), true, http2.ErrCodeFrameSize, se, nil, endStream)
+ return
+ }
+
+ var (
+ // If a gRPC Response-Headers has already been received, then it means
+ // that the peer is speaking gRPC and we are in gRPC mode.
+ isGRPC = !initialHeader
+ mdata = make(map[string][]string)
+ contentTypeErr = "malformed header: missing HTTP content-type"
+ grpcMessage string
+ statusGen *status.Status
+ recvCompress string
+ httpStatusCode *int
+ httpStatusErr string
+ rawStatusCode = codes.Unknown
+ // headerError is set if an error is encountered while parsing the headers
+ headerError string
+ )
+
+ if initialHeader {
+ httpStatusErr = "malformed header: missing HTTP status"
+ }
+
+ for _, hf := range frame.Fields {
+ switch hf.Name {
+ case "content-type":
+ if _, validContentType := grpcutil.ContentSubtype(hf.Value); !validContentType {
+ contentTypeErr = fmt.Sprintf("transport: received unexpected content-type %q", hf.Value)
+ break
+ }
+ contentTypeErr = ""
+ mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
+ isGRPC = true
+ case "grpc-encoding":
+ recvCompress = hf.Value
+ case "grpc-status":
+ code, err := strconv.ParseInt(hf.Value, 10, 32)
+ if err != nil {
+ se := status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-status: %v", err))
+ t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
+ return
+ }
+ rawStatusCode = codes.Code(uint32(code))
+ case "grpc-message":
+ grpcMessage = decodeGrpcMessage(hf.Value)
+ case "grpc-status-details-bin":
+ var err error
+ statusGen, err = decodeGRPCStatusDetails(hf.Value)
+ if err != nil {
+ headerError = fmt.Sprintf("transport: malformed grpc-status-details-bin: %v", err)
+ }
+ case ":status":
+ if hf.Value == "200" {
+ httpStatusErr = ""
+ statusCode := 200
+ httpStatusCode = &statusCode
+ break
+ }
+
+ c, err := strconv.ParseInt(hf.Value, 10, 32)
+ if err != nil {
+ se := status.New(codes.Internal, fmt.Sprintf("transport: malformed http-status: %v", err))
+ t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
+ return
+ }
+ statusCode := int(c)
+ httpStatusCode = &statusCode
+
+ httpStatusErr = fmt.Sprintf(
+ "unexpected HTTP status code received from server: %d (%s)",
+ statusCode,
+ http.StatusText(statusCode),
+ )
+ default:
+ if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
+ break
+ }
+ v, err := decodeMetadataHeader(hf.Name, hf.Value)
+ if err != nil {
+ headerError = fmt.Sprintf("transport: malformed %s: %v", hf.Name, err)
+ logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
+ break
+ }
+ mdata[hf.Name] = append(mdata[hf.Name], v)
+ }
+ }
+
+ if !isGRPC || httpStatusErr != "" {
+ var code = codes.Internal // when header does not include HTTP status, return INTERNAL
+
+ if httpStatusCode != nil {
+ var ok bool
+ code, ok = HTTPStatusConvTab[*httpStatusCode]
+ if !ok {
+ code = codes.Unknown
+ }
+ }
+ var errs []string
+ if httpStatusErr != "" {
+ errs = append(errs, httpStatusErr)
+ }
+ if contentTypeErr != "" {
+ errs = append(errs, contentTypeErr)
+ }
+ // Verify the HTTP response is a 200.
+ se := status.New(code, strings.Join(errs, "; "))
+ t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
+ return
+ }
+
+ if headerError != "" {
+ se := status.New(codes.Internal, headerError)
+ t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
return
}
@@ -1293,9 +1433,9 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
// These values can be set without any synchronization because
// stream goroutine will read it only after seeing a closed
// headerChan which we'll close after setting this.
- s.recvCompress = state.data.encoding
- if len(state.data.mdata) > 0 {
- s.header = state.data.mdata
+ s.recvCompress = recvCompress
+ if len(mdata) > 0 {
+ s.header = mdata
}
} else {
// HEADERS frame block carries a Trailers-Only.
@@ -1308,9 +1448,13 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
return
}
+ if statusGen == nil {
+ statusGen = status.New(rawStatusCode, grpcMessage)
+ }
+
// if client received END_STREAM from server while stream was still active, send RST_STREAM
rst := s.getState() == streamActive
- t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)
+ t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true)
}
// reader runs as a separate goroutine in charge of reading data from network
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index 11be5599c..19c13e041 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -102,11 +102,11 @@ type http2Server struct {
mu sync.Mutex // guard the following
- // drainChan is initialized when drain(...) is called the first time.
+ // drainChan is initialized when Drain() is called the first time.
// After which the server writes out the first GoAway(with ID 2^31-1) frame.
// Then an independent goroutine will be launched to later send the second GoAway.
// During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
- // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
+ // Thus call to Drain() will be a no-op if drainChan is already initialized since draining is
// already underway.
drainChan chan struct{}
state transportState
@@ -125,9 +125,30 @@ type http2Server struct {
connectionID uint64
}
-// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
-// returned if something goes wrong.
-func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
+// NewServerTransport creates a http2 transport with conn and configuration
+// options from config.
+//
+// It returns a non-nil transport and a nil error on success. On failure, it
+// returns a non-nil transport and a nil-error. For a special case where the
+// underlying conn gets closed before the client preface could be read, it
+// returns a nil transport and a nil error.
+func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
+ var authInfo credentials.AuthInfo
+ rawConn := conn
+ if config.Credentials != nil {
+ var err error
+ conn, authInfo, err = config.Credentials.ServerHandshake(rawConn)
+ if err != nil {
+ // ErrConnDispatched means that the connection was dispatched away
+ // from gRPC; those connections should be left open. io.EOF means
+ // the connection was closed before handshaking completed, which can
+ // happen naturally from probers. Return these errors directly.
+ if err == credentials.ErrConnDispatched || err == io.EOF {
+ return nil, err
+ }
+ return nil, connectionErrorf(false, err, "ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
+ }
+ }
writeBufSize := config.WriteBufferSize
readBufSize := config.ReadBufferSize
maxHeaderListSize := defaultServerMaxHeaderListSize
@@ -210,14 +231,15 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
if kep.MinTime == 0 {
kep.MinTime = defaultKeepalivePolicyMinTime
}
+
done := make(chan struct{})
t := &http2Server{
- ctx: context.Background(),
+ ctx: setConnection(context.Background(), rawConn),
done: done,
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
- authInfo: config.AuthInfo,
+ authInfo: authInfo,
framer: framer,
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
@@ -266,6 +288,13 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
// Check the validity of client preface.
preface := make([]byte, len(clientPreface))
if _, err := io.ReadFull(t.conn, preface); err != nil {
+ // In deployments where a gRPC server runs behind a cloud load balancer
+ // which performs regular TCP level health checks, the connection is
+ // closed immediately by the latter. Skipping the error here will help
+ // reduce log clutter.
+ if err == io.EOF {
+ return nil, nil
+ }
return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
}
if !bytes.Equal(preface, clientPreface) {
@@ -295,6 +324,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
}
}
t.conn.Close()
+ t.controlBuf.finish()
close(t.writerDone)
}()
go t.keepalive()
@@ -304,37 +334,92 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
// operateHeader takes action on the decoded headers.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
streamID := frame.Header().StreamID
- state := &decodeState{
- serverSide: true,
- }
- if h2code, err := state.decodeHeader(frame); err != nil {
- if _, ok := status.FromError(err); ok {
- t.controlBuf.put(&cleanupStream{
- streamID: streamID,
- rst: true,
- rstCode: h2code,
- onWrite: func() {},
- })
- }
+
+ // frame.Truncated is set to true when framer detects that the current header
+ // list size hits MaxHeaderListSize limit.
+ if frame.Truncated {
+ t.controlBuf.put(&cleanupStream{
+ streamID: streamID,
+ rst: true,
+ rstCode: http2.ErrCodeFrameSize,
+ onWrite: func() {},
+ })
return false
}
buf := newRecvBuffer()
s := &Stream{
- id: streamID,
- st: t,
- buf: buf,
- fc: &inFlow{limit: uint32(t.initialWindowSize)},
- recvCompress: state.data.encoding,
- method: state.data.method,
- contentSubtype: state.data.contentSubtype,
+ id: streamID,
+ st: t,
+ buf: buf,
+ fc: &inFlow{limit: uint32(t.initialWindowSize)},
+ }
+
+ var (
+ // If a gRPC Response-Headers has already been received, then it means
+ // that the peer is speaking gRPC and we are in gRPC mode.
+ isGRPC = false
+ mdata = make(map[string][]string)
+ httpMethod string
+ // headerError is set if an error is encountered while parsing the headers
+ headerError bool
+
+ timeoutSet bool
+ timeout time.Duration
+ )
+
+ for _, hf := range frame.Fields {
+ switch hf.Name {
+ case "content-type":
+ contentSubtype, validContentType := grpcutil.ContentSubtype(hf.Value)
+ if !validContentType {
+ break
+ }
+ mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
+ s.contentSubtype = contentSubtype
+ isGRPC = true
+ case "grpc-encoding":
+ s.recvCompress = hf.Value
+ case ":method":
+ httpMethod = hf.Value
+ case ":path":
+ s.method = hf.Value
+ case "grpc-timeout":
+ timeoutSet = true
+ var err error
+ if timeout, err = decodeTimeout(hf.Value); err != nil {
+ headerError = true
+ }
+ default:
+ if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
+ break
+ }
+ v, err := decodeMetadataHeader(hf.Name, hf.Value)
+ if err != nil {
+ headerError = true
+ logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
+ break
+ }
+ mdata[hf.Name] = append(mdata[hf.Name], v)
+ }
}
+
+ if !isGRPC || headerError {
+ t.controlBuf.put(&cleanupStream{
+ streamID: streamID,
+ rst: true,
+ rstCode: http2.ErrCodeProtocol,
+ onWrite: func() {},
+ })
+ return false
+ }
+
if frame.StreamEnded() {
// s is just created by the caller. No lock needed.
s.state = streamReadDone
}
- if state.data.timeoutSet {
- s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
+ if timeoutSet {
+ s.ctx, s.cancel = context.WithTimeout(t.ctx, timeout)
} else {
s.ctx, s.cancel = context.WithCancel(t.ctx)
}
@@ -347,14 +432,14 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
s.ctx = peer.NewContext(s.ctx, pr)
// Attach the received metadata to the context.
- if len(state.data.mdata) > 0 {
- s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
- }
- if state.data.statsTags != nil {
- s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
- }
- if state.data.statsTrace != nil {
- s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
+ if len(mdata) > 0 {
+ s.ctx = metadata.NewIncomingContext(s.ctx, mdata)
+ if statsTags := mdata["grpc-tags-bin"]; len(statsTags) > 0 {
+ s.ctx = stats.SetIncomingTags(s.ctx, []byte(statsTags[len(statsTags)-1]))
+ }
+ if statsTrace := mdata["grpc-trace-bin"]; len(statsTrace) > 0 {
+ s.ctx = stats.SetIncomingTrace(s.ctx, []byte(statsTrace[len(statsTrace)-1]))
+ }
}
t.mu.Lock()
if t.state != reachable {
@@ -383,10 +468,10 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
return true
}
t.maxStreamID = streamID
- if state.data.httpMethod != http.MethodPost {
+ if httpMethod != http.MethodPost {
t.mu.Unlock()
if logger.V(logLevel) {
- logger.Warningf("transport: http2Server.operateHeaders parsed a :method field: %v which should be POST", state.data.httpMethod)
+ logger.Infof("transport: http2Server.operateHeaders parsed a :method field: %v which should be POST", httpMethod)
}
t.controlBuf.put(&cleanupStream{
streamID: streamID,
@@ -399,7 +484,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
if t.inTapHandle != nil {
var err error
- if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: state.data.method}); err != nil {
+ if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: s.method}); err != nil {
t.mu.Unlock()
if logger.V(logLevel) {
logger.Infof("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
@@ -437,7 +522,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
LocalAddr: t.localAddr,
Compression: s.recvCompress,
WireLength: int(frame.Header().Length),
- Header: metadata.MD(state.data.mdata).Copy(),
+ Header: metadata.MD(mdata).Copy(),
}
t.stats.HandleRPC(s.ctx, inHeader)
}
@@ -1004,12 +1089,12 @@ func (t *http2Server) keepalive() {
if val <= 0 {
// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
// Gracefully close the connection.
- t.drain(http2.ErrCodeNo, []byte{})
+ t.Drain()
return
}
idleTimer.Reset(val)
case <-ageTimer.C:
- t.drain(http2.ErrCodeNo, []byte{})
+ t.Drain()
ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
select {
case <-ageTimer.C:
@@ -1063,11 +1148,11 @@ func (t *http2Server) keepalive() {
// Close starts shutting down the http2Server transport.
// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
// could cause some resource issue. Revisit this later.
-func (t *http2Server) Close() error {
+func (t *http2Server) Close() {
t.mu.Lock()
if t.state == closing {
t.mu.Unlock()
- return errors.New("transport: Close() was already called")
+ return
}
t.state = closing
streams := t.activeStreams
@@ -1075,7 +1160,9 @@ func (t *http2Server) Close() error {
t.mu.Unlock()
t.controlBuf.finish()
close(t.done)
- err := t.conn.Close()
+ if err := t.conn.Close(); err != nil && logger.V(logLevel) {
+ logger.Infof("transport: error closing conn during Close: %v", err)
+ }
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
}
@@ -1087,7 +1174,6 @@ func (t *http2Server) Close() error {
connEnd := &stats.ConnEnd{}
t.stats.HandleConn(t.ctx, connEnd)
}
- return err
}
// deleteStream deletes the stream s from transport's active streams.
@@ -1152,17 +1238,13 @@ func (t *http2Server) RemoteAddr() net.Addr {
}
func (t *http2Server) Drain() {
- t.drain(http2.ErrCodeNo, []byte{})
-}
-
-func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
t.mu.Lock()
defer t.mu.Unlock()
if t.drainChan != nil {
return
}
t.drainChan = make(chan struct{})
- t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
+ t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte{}, headsUp: true})
}
var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
@@ -1280,3 +1362,18 @@ func getJitter(v time.Duration) time.Duration {
j := grpcrand.Int63n(2*r) - r
return time.Duration(j)
}
+
+type connectionKey struct{}
+
+// GetConnection gets the connection from the context.
+func GetConnection(ctx context.Context) net.Conn {
+ conn, _ := ctx.Value(connectionKey{}).(net.Conn)
+ return conn
+}
+
+// SetConnection adds the connection to the context to be able to get
+// information about the destination ip and port for an incoming RPC. This also
+// allows any unary or streaming interceptors to see the connection.
+func setConnection(ctx context.Context, conn net.Conn) context.Context {
+ return context.WithValue(ctx, connectionKey{}, conn)
+}
diff --git a/vendor/google.golang.org/grpc/internal/transport/http_util.go b/vendor/google.golang.org/grpc/internal/transport/http_util.go
index c7dee140c..d8247bcdf 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http_util.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http_util.go
@@ -39,7 +39,6 @@ import (
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/status"
)
@@ -96,53 +95,6 @@ var (
logger = grpclog.Component("transport")
)
-type parsedHeaderData struct {
- encoding string
- // statusGen caches the stream status received from the trailer the server
- // sent. Client side only. Do not access directly. After all trailers are
- // parsed, use the status method to retrieve the status.
- statusGen *status.Status
- // rawStatusCode and rawStatusMsg are set from the raw trailer fields and are not
- // intended for direct access outside of parsing.
- rawStatusCode *int
- rawStatusMsg string
- httpStatus *int
- // Server side only fields.
- timeoutSet bool
- timeout time.Duration
- method string
- httpMethod string
- // key-value metadata map from the peer.
- mdata map[string][]string
- statsTags []byte
- statsTrace []byte
- contentSubtype string
-
- // isGRPC field indicates whether the peer is speaking gRPC (otherwise HTTP).
- //
- // We are in gRPC mode (peer speaking gRPC) if:
- // * We are client side and have already received a HEADER frame that indicates gRPC peer.
- // * The header contains valid a content-type, i.e. a string starts with "application/grpc"
- // And we should handle error specific to gRPC.
- //
- // Otherwise (i.e. a content-type string starts without "application/grpc", or does not exist), we
- // are in HTTP fallback mode, and should handle error specific to HTTP.
- isGRPC bool
- grpcErr error
- httpErr error
- contentTypeErr string
-}
-
-// decodeState configures decoding criteria and records the decoded data.
-type decodeState struct {
- // whether decoding on server side or not
- serverSide bool
-
- // Records the states during HPACK decoding. It will be filled with info parsed from HTTP HEADERS
- // frame once decodeHeader function has been invoked and returned.
- data parsedHeaderData
-}
-
// isReservedHeader checks whether hdr belongs to HTTP2 headers
// reserved by gRPC protocol. Any other headers are classified as the
// user-specified metadata.
@@ -180,14 +132,6 @@ func isWhitelistedHeader(hdr string) bool {
}
}
-func (d *decodeState) status() *status.Status {
- if d.data.statusGen == nil {
- // No status-details were provided; generate status using code/msg.
- d.data.statusGen = status.New(codes.Code(int32(*(d.data.rawStatusCode))), d.data.rawStatusMsg)
- }
- return d.data.statusGen
-}
-
const binHdrSuffix = "-bin"
func encodeBinHeader(v []byte) string {
@@ -217,168 +161,16 @@ func decodeMetadataHeader(k, v string) (string, error) {
return v, nil
}
-func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) (http2.ErrCode, error) {
- // frame.Truncated is set to true when framer detects that the current header
- // list size hits MaxHeaderListSize limit.
- if frame.Truncated {
- return http2.ErrCodeFrameSize, status.Error(codes.Internal, "peer header list size exceeded limit")
- }
-
- for _, hf := range frame.Fields {
- d.processHeaderField(hf)
- }
-
- if d.data.isGRPC {
- if d.data.grpcErr != nil {
- return http2.ErrCodeProtocol, d.data.grpcErr
- }
- if d.serverSide {
- return http2.ErrCodeNo, nil
- }
- if d.data.rawStatusCode == nil && d.data.statusGen == nil {
- // gRPC status doesn't exist.
- // Set rawStatusCode to be unknown and return nil error.
- // So that, if the stream has ended this Unknown status
- // will be propagated to the user.
- // Otherwise, it will be ignored. In which case, status from
- // a later trailer, that has StreamEnded flag set, is propagated.
- code := int(codes.Unknown)
- d.data.rawStatusCode = &code
- }
- return http2.ErrCodeNo, nil
- }
-
- // HTTP fallback mode
- if d.data.httpErr != nil {
- return http2.ErrCodeProtocol, d.data.httpErr
- }
-
- var (
- code = codes.Internal // when header does not include HTTP status, return INTERNAL
- ok bool
- )
-
- if d.data.httpStatus != nil {
- code, ok = HTTPStatusConvTab[*(d.data.httpStatus)]
- if !ok {
- code = codes.Unknown
- }
- }
-
- return http2.ErrCodeProtocol, status.Error(code, d.constructHTTPErrMsg())
-}
-
-// constructErrMsg constructs error message to be returned in HTTP fallback mode.
-// Format: HTTP status code and its corresponding message + content-type error message.
-func (d *decodeState) constructHTTPErrMsg() string {
- var errMsgs []string
-
- if d.data.httpStatus == nil {
- errMsgs = append(errMsgs, "malformed header: missing HTTP status")
- } else {
- errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status code %d", http.StatusText(*(d.data.httpStatus)), *d.data.httpStatus))
- }
-
- if d.data.contentTypeErr == "" {
- errMsgs = append(errMsgs, "transport: missing content-type field")
- } else {
- errMsgs = append(errMsgs, d.data.contentTypeErr)
- }
-
- return strings.Join(errMsgs, "; ")
-}
-
-func (d *decodeState) addMetadata(k, v string) {
- if d.data.mdata == nil {
- d.data.mdata = make(map[string][]string)
+func decodeGRPCStatusDetails(rawDetails string) (*status.Status, error) {
+ v, err := decodeBinHeader(rawDetails)
+ if err != nil {
+ return nil, err
}
- d.data.mdata[k] = append(d.data.mdata[k], v)
-}
-
-func (d *decodeState) processHeaderField(f hpack.HeaderField) {
- switch f.Name {
- case "content-type":
- contentSubtype, validContentType := grpcutil.ContentSubtype(f.Value)
- if !validContentType {
- d.data.contentTypeErr = fmt.Sprintf("transport: received the unexpected content-type %q", f.Value)
- return
- }
- d.data.contentSubtype = contentSubtype
- // TODO: do we want to propagate the whole content-type in the metadata,
- // or come up with a way to just propagate the content-subtype if it was set?
- // ie {"content-type": "application/grpc+proto"} or {"content-subtype": "proto"}
- // in the metadata?
- d.addMetadata(f.Name, f.Value)
- d.data.isGRPC = true
- case "grpc-encoding":
- d.data.encoding = f.Value
- case "grpc-status":
- code, err := strconv.Atoi(f.Value)
- if err != nil {
- d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err)
- return
- }
- d.data.rawStatusCode = &code
- case "grpc-message":
- d.data.rawStatusMsg = decodeGrpcMessage(f.Value)
- case "grpc-status-details-bin":
- v, err := decodeBinHeader(f.Value)
- if err != nil {
- d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
- return
- }
- s := &spb.Status{}
- if err := proto.Unmarshal(v, s); err != nil {
- d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
- return
- }
- d.data.statusGen = status.FromProto(s)
- case "grpc-timeout":
- d.data.timeoutSet = true
- var err error
- if d.data.timeout, err = decodeTimeout(f.Value); err != nil {
- d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed time-out: %v", err)
- }
- case ":path":
- d.data.method = f.Value
- case ":status":
- code, err := strconv.Atoi(f.Value)
- if err != nil {
- d.data.httpErr = status.Errorf(codes.Internal, "transport: malformed http-status: %v", err)
- return
- }
- d.data.httpStatus = &code
- case "grpc-tags-bin":
- v, err := decodeBinHeader(f.Value)
- if err != nil {
- d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err)
- return
- }
- d.data.statsTags = v
- d.addMetadata(f.Name, string(v))
- case "grpc-trace-bin":
- v, err := decodeBinHeader(f.Value)
- if err != nil {
- d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err)
- return
- }
- d.data.statsTrace = v
- d.addMetadata(f.Name, string(v))
- case ":method":
- d.data.httpMethod = f.Value
- default:
- if isReservedHeader(f.Name) && !isWhitelistedHeader(f.Name) {
- break
- }
- v, err := decodeMetadataHeader(f.Name, f.Value)
- if err != nil {
- if logger.V(logLevel) {
- logger.Errorf("Failed to decode metadata header (%q, %q): %v", f.Name, f.Value, err)
- }
- return
- }
- d.addMetadata(f.Name, v)
+ st := &spb.Status{}
+ if err = proto.Unmarshal(v, st); err != nil {
+ return nil, err
}
+ return status.FromProto(st), nil
}
type timeoutUnit uint8
diff --git a/vendor/google.golang.org/grpc/internal/transport/networktype/networktype.go b/vendor/google.golang.org/grpc/internal/transport/networktype/networktype.go
index 96967428b..7bb53cff1 100644
--- a/vendor/google.golang.org/grpc/internal/transport/networktype/networktype.go
+++ b/vendor/google.golang.org/grpc/internal/transport/networktype/networktype.go
@@ -17,7 +17,7 @@
*/
// Package networktype declares the network type to be used in the default
-// dailer. Attribute of a resolver.Address.
+// dialer. Attribute of a resolver.Address.
package networktype
import (
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index 6cc1031fd..d3bf65b2b 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -30,6 +30,7 @@ import (
"net"
"sync"
"sync/atomic"
+ "time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
@@ -518,7 +519,8 @@ const (
// ServerConfig consists of all the configurations to establish a server transport.
type ServerConfig struct {
MaxStreams uint32
- AuthInfo credentials.AuthInfo
+ ConnectionTimeout time.Duration
+ Credentials credentials.TransportCredentials
InTapHandle tap.ServerInHandle
StatsHandler stats.Handler
KeepaliveParams keepalive.ServerParameters
@@ -532,12 +534,6 @@ type ServerConfig struct {
HeaderTableSize *uint32
}
-// NewServerTransport creates a ServerTransport with conn or non-nil error
-// if it fails.
-func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
- return newHTTP2Server(conn, config)
-}
-
// ConnectOptions covers all relevant options for communicating with the server.
type ConnectOptions struct {
// UserAgent is the application user agent.
@@ -694,7 +690,7 @@ type ServerTransport interface {
// Close tears down the transport. Once it is called, the transport
// should not be accessed any more. All the pending streams and their
// handlers will be terminated asynchronously.
- Close() error
+ Close()
// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr
diff --git a/vendor/google.golang.org/grpc/internal/xds/env/env.go b/vendor/google.golang.org/grpc/internal/xds/env/env.go
new file mode 100644
index 000000000..b171ac91f
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/xds/env/env.go
@@ -0,0 +1,95 @@
+/*
+ *
+ * Copyright 2020 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package env acts a single source of definition for all environment variables
+// related to the xDS implementation in gRPC.
+package env
+
+import (
+ "os"
+ "strings"
+)
+
+const (
+ // BootstrapFileNameEnv is the env variable to set bootstrap file name.
+ // Do not use this and read from env directly. Its value is read and kept in
+ // variable BootstrapFileName.
+ //
+ // When both bootstrap FileName and FileContent are set, FileName is used.
+ BootstrapFileNameEnv = "GRPC_XDS_BOOTSTRAP"
+ // BootstrapFileContentEnv is the env variable to set bootstrapp file
+ // content. Do not use this and read from env directly. Its value is read
+ // and kept in variable BootstrapFileName.
+ //
+ // When both bootstrap FileName and FileContent are set, FileName is used.
+ BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"
+
+ ringHashSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"
+ clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"
+ aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"
+ retrySupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY"
+ rbacSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RBAC"
+
+ c2pResolverSupportEnv = "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER"
+ c2pResolverTestOnlyTrafficDirectorURIEnv = "GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI"
+)
+
+var (
+ // BootstrapFileName holds the name of the file which contains xDS bootstrap
+ // configuration. Users can specify the location of the bootstrap file by
+ // setting the environment variable "GRPC_XDS_BOOTSTRAP".
+ //
+ // When both bootstrap FileName and FileContent are set, FileName is used.
+ BootstrapFileName = os.Getenv(BootstrapFileNameEnv)
+ // BootstrapFileContent holds the content of the xDS bootstrap
+ // configuration. Users can specify the bootstrap config by
+ // setting the environment variable "GRPC_XDS_BOOTSTRAP_CONFIG".
+ //
+ // When both bootstrap FileName and FileContent are set, FileName is used.
+ BootstrapFileContent = os.Getenv(BootstrapFileContentEnv)
+ // RingHashSupport indicates whether ring hash support is enabled, which can
+ // be disabled by setting the environment variable
+ // "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH" to "false".
+ RingHashSupport = !strings.EqualFold(os.Getenv(ringHashSupportEnv), "false")
+ // ClientSideSecuritySupport is used to control processing of security
+ // configuration on the client-side.
+ //
+ // Note that there is no env var protection for the server-side because we
+ // have a brand new API on the server-side and users explicitly need to use
+ // the new API to get security integration on the server.
+ ClientSideSecuritySupport = !strings.EqualFold(os.Getenv(clientSideSecuritySupportEnv), "false")
+ // AggregateAndDNSSupportEnv indicates whether processing of aggregated
+ // cluster and DNS cluster is enabled, which can be enabled by setting the
+ // environment variable
+ // "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to
+ // "true".
+ AggregateAndDNSSupportEnv = strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "true")
+
+ // RetrySupport indicates whether xDS retry is enabled.
+ RetrySupport = !strings.EqualFold(os.Getenv(retrySupportEnv), "false")
+
+ // RBACSupport indicates whether xDS configured RBAC HTTP Filter is enabled.
+ RBACSupport = strings.EqualFold(os.Getenv(rbacSupportEnv), "true")
+
+ // C2PResolverSupport indicates whether support for C2P resolver is enabled.
+ // This can be enabled by setting the environment variable
+ // "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER" to "true".
+ C2PResolverSupport = strings.EqualFold(os.Getenv(c2pResolverSupportEnv), "true")
+ // C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.
+ C2PResolverTestOnlyTrafficDirectorURI = os.Getenv(c2pResolverTestOnlyTrafficDirectorURIEnv)
+)
diff --git a/vendor/google.golang.org/grpc/metadata/metadata.go b/vendor/google.golang.org/grpc/metadata/metadata.go
index e4cbea917..3604c7819 100644
--- a/vendor/google.golang.org/grpc/metadata/metadata.go
+++ b/vendor/google.golang.org/grpc/metadata/metadata.go
@@ -93,12 +93,16 @@ func (md MD) Copy() MD {
}
// Get obtains the values for a given key.
+//
+// k is converted to lowercase before searching in md.
func (md MD) Get(k string) []string {
k = strings.ToLower(k)
return md[k]
}
// Set sets the value of a given key with a slice of values.
+//
+// k is converted to lowercase before storing in md.
func (md MD) Set(k string, vals ...string) {
if len(vals) == 0 {
return
@@ -107,7 +111,10 @@ func (md MD) Set(k string, vals ...string) {
md[k] = vals
}
-// Append adds the values to key k, not overwriting what was already stored at that key.
+// Append adds the values to key k, not overwriting what was already stored at
+// that key.
+//
+// k is converted to lowercase before storing in md.
func (md MD) Append(k string, vals ...string) {
if len(vals) == 0 {
return
@@ -116,9 +123,17 @@ func (md MD) Append(k string, vals ...string) {
md[k] = append(md[k], vals...)
}
+// Delete removes the values for a given key k which is converted to lowercase
+// before removing it from md.
+func (md MD) Delete(k string) {
+ k = strings.ToLower(k)
+ delete(md, k)
+}
+
// Join joins any number of mds into a single MD.
-// The order of values for each key is determined by the order in which
-// the mds containing those values are presented to Join.
+//
+// The order of values for each key is determined by the order in which the mds
+// containing those values are presented to Join.
func Join(mds ...MD) MD {
out := MD{}
for _, md := range mds {
@@ -145,8 +160,8 @@ func NewOutgoingContext(ctx context.Context, md MD) context.Context {
}
// AppendToOutgoingContext returns a new context with the provided kv merged
-// with any existing metadata in the context. Please refer to the
-// documentation of Pairs for a description of kv.
+// with any existing metadata in the context. Please refer to the documentation
+// of Pairs for a description of kv.
func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Context {
if len(kv)%2 == 1 {
panic(fmt.Sprintf("metadata: AppendToOutgoingContext got an odd number of input pairs for metadata: %d", len(kv)))
@@ -159,20 +174,34 @@ func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Context
return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md.md, added: added})
}
-// FromIncomingContext returns the incoming metadata in ctx if it exists. The
-// returned MD should not be modified. Writing to it may cause races.
-// Modification should be made to copies of the returned MD.
-func FromIncomingContext(ctx context.Context) (md MD, ok bool) {
- md, ok = ctx.Value(mdIncomingKey{}).(MD)
- return
+// FromIncomingContext returns the incoming metadata in ctx if it exists.
+//
+// All keys in the returned MD are lowercase.
+func FromIncomingContext(ctx context.Context) (MD, bool) {
+ md, ok := ctx.Value(mdIncomingKey{}).(MD)
+ if !ok {
+ return nil, false
+ }
+ out := MD{}
+ for k, v := range md {
+ // We need to manually convert all keys to lower case, because MD is a
+ // map, and there's no guarantee that the MD attached to the context is
+ // created using our helper functions.
+ key := strings.ToLower(k)
+ out[key] = v
+ }
+ return out, true
}
-// FromOutgoingContextRaw returns the un-merged, intermediary contents
-// of rawMD. Remember to perform strings.ToLower on the keys. The returned
-// MD should not be modified. Writing to it may cause races. Modification
-// should be made to copies of the returned MD.
+// FromOutgoingContextRaw returns the un-merged, intermediary contents of rawMD.
+//
+// Remember to perform strings.ToLower on the keys, for both the returned MD (MD
+// is a map, there's no guarantee it's created using our helper functions) and
+// the extra kv pairs (AppendToOutgoingContext doesn't turn them into
+// lowercase).
//
-// This is intended for gRPC-internal use ONLY.
+// This is intended for gRPC-internal use ONLY. Users should use
+// FromOutgoingContext instead.
func FromOutgoingContextRaw(ctx context.Context) (MD, [][]string, bool) {
raw, ok := ctx.Value(mdOutgoingKey{}).(rawMD)
if !ok {
@@ -182,16 +211,23 @@ func FromOutgoingContextRaw(ctx context.Context) (MD, [][]string, bool) {
return raw.md, raw.added, true
}
-// FromOutgoingContext returns the outgoing metadata in ctx if it exists. The
-// returned MD should not be modified. Writing to it may cause races.
-// Modification should be made to copies of the returned MD.
+// FromOutgoingContext returns the outgoing metadata in ctx if it exists.
+//
+// All keys in the returned MD are lowercase.
func FromOutgoingContext(ctx context.Context) (MD, bool) {
raw, ok := ctx.Value(mdOutgoingKey{}).(rawMD)
if !ok {
return nil, false
}
- out := raw.md.Copy()
+ out := MD{}
+ for k, v := range raw.md {
+ // We need to manually convert all keys to lower case, because MD is a
+ // map, and there's no guarantee that the MD attached to the context is
+ // created using our helper functions.
+ key := strings.ToLower(k)
+ out[key] = v
+ }
for _, added := range raw.added {
if len(added)%2 == 1 {
panic(fmt.Sprintf("metadata: FromOutgoingContext got an odd number of input pairs for metadata: %d", len(added)))
diff --git a/vendor/google.golang.org/grpc/picker_wrapper.go b/vendor/google.golang.org/grpc/picker_wrapper.go
index a58174b6f..0878ada9d 100644
--- a/vendor/google.golang.org/grpc/picker_wrapper.go
+++ b/vendor/google.golang.org/grpc/picker_wrapper.go
@@ -147,7 +147,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
logger.Error("subconn returned from pick is not *acBalancerWrapper")
continue
}
- if t, ok := acw.getAddrConn().getReadyTransport(); ok {
+ if t := acw.getAddrConn().getReadyTransport(); t != nil {
if channelz.IsOn() {
return t, doneChannelzWrapper(acw, pickResult.Done), nil
}
diff --git a/vendor/google.golang.org/grpc/pickfirst.go b/vendor/google.golang.org/grpc/pickfirst.go
index b858c2a5e..f194d14a0 100644
--- a/vendor/google.golang.org/grpc/pickfirst.go
+++ b/vendor/google.golang.org/grpc/pickfirst.go
@@ -107,10 +107,12 @@ func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.S
}
switch s.ConnectivityState {
- case connectivity.Ready, connectivity.Idle:
+ case connectivity.Ready:
b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{result: balancer.PickResult{SubConn: sc}}})
case connectivity.Connecting:
b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{err: balancer.ErrNoSubConnAvailable}})
+ case connectivity.Idle:
+ b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &idlePicker{sc: sc}})
case connectivity.TransientFailure:
b.cc.UpdateState(balancer.State{
ConnectivityState: s.ConnectivityState,
@@ -122,6 +124,12 @@ func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.S
func (b *pickfirstBalancer) Close() {
}
+func (b *pickfirstBalancer) ExitIdle() {
+ if b.state == connectivity.Idle {
+ b.sc.Connect()
+ }
+}
+
type picker struct {
result balancer.PickResult
err error
@@ -131,6 +139,17 @@ func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
return p.result, p.err
}
+// idlePicker is used when the SubConn is IDLE and kicks the SubConn into
+// CONNECTING when Pick is called.
+type idlePicker struct {
+ sc balancer.SubConn
+}
+
+func (i *idlePicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
+ i.sc.Connect()
+ return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
+}
+
func init() {
balancer.Register(newPickfirstBuilder())
}
diff --git a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
index 4118de571..2c47cd54f 100644
--- a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
+++ b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
@@ -39,6 +39,8 @@ type ccResolverWrapper struct {
resolver resolver.Resolver
done *grpcsync.Event
curState resolver.State
+
+ incomingMu sync.Mutex // Synchronizes all the incoming calls.
}
// newCCResolverWrapper uses the resolver.Builder to build a Resolver and
@@ -90,6 +92,8 @@ func (ccr *ccResolverWrapper) close() {
}
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
+ ccr.incomingMu.Lock()
+ defer ccr.incomingMu.Unlock()
if ccr.done.HasFired() {
return nil
}
@@ -105,6 +109,8 @@ func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
}
func (ccr *ccResolverWrapper) ReportError(err error) {
+ ccr.incomingMu.Lock()
+ defer ccr.incomingMu.Unlock()
if ccr.done.HasFired() {
return
}
@@ -114,6 +120,8 @@ func (ccr *ccResolverWrapper) ReportError(err error) {
// NewAddress is called by the resolver implementation to send addresses to gRPC.
func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
+ ccr.incomingMu.Lock()
+ defer ccr.incomingMu.Unlock()
if ccr.done.HasFired() {
return
}
@@ -128,6 +136,8 @@ func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
// NewServiceConfig is called by the resolver implementation to send service
// configs to gRPC.
func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
+ ccr.incomingMu.Lock()
+ defer ccr.incomingMu.Unlock()
if ccr.done.HasFired() {
return
}
diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go
index 6db356fa5..87987a2e6 100644
--- a/vendor/google.golang.org/grpc/rpc_util.go
+++ b/vendor/google.golang.org/grpc/rpc_util.go
@@ -258,7 +258,8 @@ func (o PeerCallOption) after(c *callInfo, attempt *csAttempt) {
}
// WaitForReady configures the action to take when an RPC is attempted on broken
-// connections or unreachable servers. If waitForReady is false, the RPC will fail
+// connections or unreachable servers. If waitForReady is false and the
+// connection is in the TRANSIENT_FAILURE state, the RPC will fail
// immediately. Otherwise, the RPC client will block the call until a
// connection is available (or the call is canceled or times out) and will
// retry the call if it fails due to a transient error. gRPC will not retry if
@@ -828,26 +829,28 @@ func Errorf(c codes.Code, format string, a ...interface{}) error {
// toRPCErr converts an error into an error from the status package.
func toRPCErr(err error) error {
- if err == nil || err == io.EOF {
+ switch err {
+ case nil, io.EOF:
return err
- }
- if err == io.ErrUnexpectedEOF {
+ case context.DeadlineExceeded:
+ return status.Error(codes.DeadlineExceeded, err.Error())
+ case context.Canceled:
+ return status.Error(codes.Canceled, err.Error())
+ case io.ErrUnexpectedEOF:
return status.Error(codes.Internal, err.Error())
}
- if _, ok := status.FromError(err); ok {
- return err
- }
+
switch e := err.(type) {
case transport.ConnectionError:
return status.Error(codes.Unavailable, e.Desc)
- default:
- switch err {
- case context.DeadlineExceeded:
- return status.Error(codes.DeadlineExceeded, err.Error())
- case context.Canceled:
- return status.Error(codes.Canceled, err.Error())
- }
+ case *transport.NewStreamError:
+ return toRPCErr(e.Err)
+ }
+
+ if _, ok := status.FromError(err); ok {
+ return err
}
+
return status.Error(codes.Unknown, err.Error())
}
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index 0a151dee4..557f29559 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -710,13 +710,6 @@ func (s *Server) GetServiceInfo() map[string]ServiceInfo {
// the server being stopped.
var ErrServerStopped = errors.New("grpc: the server has been stopped")
-func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
- if s.opts.creds == nil {
- return rawConn, nil, nil
- }
- return s.opts.creds.ServerHandshake(rawConn)
-}
-
type listenSocket struct {
net.Listener
channelzID int64
@@ -839,28 +832,14 @@ func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
return
}
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
- conn, authInfo, err := s.useTransportAuthenticator(rawConn)
- if err != nil {
- // ErrConnDispatched means that the connection was dispatched away from
- // gRPC; those connections should be left open.
- if err != credentials.ErrConnDispatched {
- s.mu.Lock()
- s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
- s.mu.Unlock()
- channelz.Warningf(logger, s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
- rawConn.Close()
- }
- rawConn.SetDeadline(time.Time{})
- return
- }
// Finish handshaking (HTTP2)
- st := s.newHTTP2Transport(conn, authInfo)
+ st := s.newHTTP2Transport(rawConn)
+ rawConn.SetDeadline(time.Time{})
if st == nil {
return
}
- rawConn.SetDeadline(time.Time{})
if !s.addConn(lisAddr, st) {
return
}
@@ -881,10 +860,11 @@ func (s *Server) drainServerTransports(addr string) {
// newHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go).
-func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
+func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
config := &transport.ServerConfig{
MaxStreams: s.opts.maxConcurrentStreams,
- AuthInfo: authInfo,
+ ConnectionTimeout: s.opts.connectionTimeout,
+ Credentials: s.opts.creds,
InTapHandle: s.opts.inTapHandle,
StatsHandler: s.opts.statsHandler,
KeepaliveParams: s.opts.keepaliveParams,
@@ -897,13 +877,22 @@ func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) tr
MaxHeaderListSize: s.opts.maxHeaderListSize,
HeaderTableSize: s.opts.headerTableSize,
}
- st, err := transport.NewServerTransport("http2", c, config)
+ st, err := transport.NewServerTransport(c, config)
if err != nil {
s.mu.Lock()
s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
s.mu.Unlock()
- c.Close()
- channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
+ // ErrConnDispatched means that the connection was dispatched away from
+ // gRPC; those connections should be left open.
+ if err != credentials.ErrConnDispatched {
+ c.Close()
+ }
+ // Don't log on ErrConnDispatched and io.EOF to prevent log spam.
+ if err != credentials.ErrConnDispatched {
+ if err != io.EOF {
+ channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
+ }
+ }
return nil
}
@@ -1109,22 +1098,24 @@ func chainUnaryServerInterceptors(s *Server) {
} else if len(interceptors) == 1 {
chainedInt = interceptors[0]
} else {
- chainedInt = func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
- return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
- }
+ chainedInt = chainUnaryInterceptors(interceptors)
}
s.opts.unaryInt = chainedInt
}
-// getChainUnaryHandler recursively generate the chained UnaryHandler
-func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
- if curr == len(interceptors)-1 {
- return finalHandler
- }
-
- return func(ctx context.Context, req interface{}) (interface{}, error) {
- return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
+func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
+ return func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
+ var i int
+ var next UnaryHandler
+ next = func(ctx context.Context, req interface{}) (interface{}, error) {
+ if i == len(interceptors)-1 {
+ return interceptors[i](ctx, req, info, handler)
+ }
+ i++
+ return interceptors[i-1](ctx, req, info, next)
+ }
+ return next(ctx, req)
}
}
@@ -1138,7 +1129,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if sh != nil {
beginTime := time.Now()
statsBegin = &stats.Begin{
- BeginTime: beginTime,
+ BeginTime: beginTime,
+ IsClientStream: false,
+ IsServerStream: false,
}
sh.HandleRPC(stream.Context(), statsBegin)
}
@@ -1390,22 +1383,24 @@ func chainStreamServerInterceptors(s *Server) {
} else if len(interceptors) == 1 {
chainedInt = interceptors[0]
} else {
- chainedInt = func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
- return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
- }
+ chainedInt = chainStreamInterceptors(interceptors)
}
s.opts.streamInt = chainedInt
}
-// getChainStreamHandler recursively generate the chained StreamHandler
-func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
- if curr == len(interceptors)-1 {
- return finalHandler
- }
-
- return func(srv interface{}, ss ServerStream) error {
- return interceptors[curr+1](srv, ss, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
+func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor {
+ return func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
+ var i int
+ var next StreamHandler
+ next = func(srv interface{}, ss ServerStream) error {
+ if i == len(interceptors)-1 {
+ return interceptors[i](srv, ss, info, handler)
+ }
+ i++
+ return interceptors[i-1](srv, ss, info, next)
+ }
+ return next(srv, ss)
}
}
@@ -1418,7 +1413,9 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
if sh != nil {
beginTime := time.Now()
statsBegin = &stats.Begin{
- BeginTime: beginTime,
+ BeginTime: beginTime,
+ IsClientStream: sd.ClientStreams,
+ IsServerStream: sd.ServerStreams,
}
sh.HandleRPC(stream.Context(), statsBegin)
}
@@ -1521,6 +1518,8 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
}
}
+ ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.cp, ss.comp)
+
if trInfo != nil {
trInfo.tr.LazyLog(&trInfo.firstLine, false)
}
@@ -1588,7 +1587,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.SetError()
}
errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
- if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
+ if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
diff --git a/vendor/google.golang.org/grpc/stats/stats.go b/vendor/google.golang.org/grpc/stats/stats.go
index 63e476ee7..0285dcc6a 100644
--- a/vendor/google.golang.org/grpc/stats/stats.go
+++ b/vendor/google.golang.org/grpc/stats/stats.go
@@ -36,15 +36,22 @@ type RPCStats interface {
IsClient() bool
}
-// Begin contains stats when an RPC begins.
+// Begin contains stats when an RPC attempt begins.
// FailFast is only valid if this Begin is from client side.
type Begin struct {
// Client is true if this Begin is from client side.
Client bool
- // BeginTime is the time when the RPC begins.
+ // BeginTime is the time when the RPC attempt begins.
BeginTime time.Time
// FailFast indicates if this RPC is failfast.
FailFast bool
+ // IsClientStream indicates whether the RPC is a client streaming RPC.
+ IsClientStream bool
+ // IsServerStream indicates whether the RPC is a server streaming RPC.
+ IsServerStream bool
+ // IsTransparentRetryAttempt indicates whether this attempt was initiated
+ // due to transparently retrying a previous attempt.
+ IsTransparentRetryAttempt bool
}
// IsClient indicates if the stats information is from client side.
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 1f3e70d2c..625d47b34 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -274,33 +274,6 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
if c.creds != nil {
callHdr.Creds = c.creds
}
- var trInfo *traceInfo
- if EnableTracing {
- trInfo = &traceInfo{
- tr: trace.New("grpc.Sent."+methodFamily(method), method),
- firstLine: firstLine{
- client: true,
- },
- }
- if deadline, ok := ctx.Deadline(); ok {
- trInfo.firstLine.deadline = time.Until(deadline)
- }
- trInfo.tr.LazyLog(&trInfo.firstLine, false)
- ctx = trace.NewContext(ctx, trInfo.tr)
- }
- ctx = newContextWithRPCInfo(ctx, c.failFast, c.codec, cp, comp)
- sh := cc.dopts.copts.StatsHandler
- var beginTime time.Time
- if sh != nil {
- ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
- beginTime = time.Now()
- begin := &stats.Begin{
- Client: true,
- BeginTime: beginTime,
- FailFast: c.failFast,
- }
- sh.HandleRPC(ctx, begin)
- }
cs := &clientStream{
callHdr: callHdr,
@@ -314,7 +287,6 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
cp: cp,
comp: comp,
cancel: cancel,
- beginTime: beginTime,
firstAttempt: true,
onCommit: onCommit,
}
@@ -323,9 +295,7 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
}
cs.binlog = binarylog.GetMethodLogger(method)
- // Only this initial attempt has stats/tracing.
- // TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
- if err := cs.newAttemptLocked(sh, trInfo); err != nil {
+ if err := cs.newAttemptLocked(false /* isTransparent */); err != nil {
cs.finish(err)
return nil, err
}
@@ -373,8 +343,43 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
// newAttemptLocked creates a new attempt with a transport.
// If it succeeds, then it replaces clientStream's attempt with this new attempt.
-func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {
+func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
+ ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
+ method := cs.callHdr.Method
+ sh := cs.cc.dopts.copts.StatsHandler
+ var beginTime time.Time
+ if sh != nil {
+ ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast})
+ beginTime = time.Now()
+ begin := &stats.Begin{
+ Client: true,
+ BeginTime: beginTime,
+ FailFast: cs.callInfo.failFast,
+ IsClientStream: cs.desc.ClientStreams,
+ IsServerStream: cs.desc.ServerStreams,
+ IsTransparentRetryAttempt: isTransparent,
+ }
+ sh.HandleRPC(ctx, begin)
+ }
+
+ var trInfo *traceInfo
+ if EnableTracing {
+ trInfo = &traceInfo{
+ tr: trace.New("grpc.Sent."+methodFamily(method), method),
+ firstLine: firstLine{
+ client: true,
+ },
+ }
+ if deadline, ok := ctx.Deadline(); ok {
+ trInfo.firstLine.deadline = time.Until(deadline)
+ }
+ trInfo.tr.LazyLog(&trInfo.firstLine, false)
+ ctx = trace.NewContext(ctx, trInfo.tr)
+ }
+
newAttempt := &csAttempt{
+ ctx: ctx,
+ beginTime: beginTime,
cs: cs,
dc: cs.cc.dopts.dc,
statsHandler: sh,
@@ -389,15 +394,14 @@ func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (r
}
}()
- if err := cs.ctx.Err(); err != nil {
+ if err := ctx.Err(); err != nil {
return toRPCErr(err)
}
- ctx := cs.ctx
if cs.cc.parsedTarget.Scheme == "xds" {
// Add extra metadata (metadata that will be added by transport) to context
// so the balancer can see them.
- ctx = grpcutil.WithExtraMetadata(cs.ctx, metadata.Pairs(
+ ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
"content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
))
}
@@ -417,14 +421,11 @@ func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (r
func (a *csAttempt) newStream() error {
cs := a.cs
cs.callHdr.PreviousAttempts = cs.numRetries
- s, err := a.t.NewStream(cs.ctx, cs.callHdr)
+ s, err := a.t.NewStream(a.ctx, cs.callHdr)
if err != nil {
- if _, ok := err.(transport.PerformedIOError); ok {
- // Return without converting to an RPC error so retry code can
- // inspect.
- return err
- }
- return toRPCErr(err)
+ // Return without converting to an RPC error so retry code can
+ // inspect.
+ return err
}
cs.attempt.s = s
cs.attempt.p = &parser{r: s}
@@ -445,8 +446,7 @@ type clientStream struct {
cancel context.CancelFunc // cancels all attempts
- sentLast bool // sent an end stream
- beginTime time.Time
+ sentLast bool // sent an end stream
methodConfig *MethodConfig
@@ -486,6 +486,7 @@ type clientStream struct {
// csAttempt implements a single transport stream attempt within a
// clientStream.
type csAttempt struct {
+ ctx context.Context
cs *clientStream
t transport.ClientTransport
s *transport.Stream
@@ -504,6 +505,7 @@ type csAttempt struct {
trInfo *traceInfo
statsHandler stats.Handler
+ beginTime time.Time
}
func (cs *clientStream) commitAttemptLocked() {
@@ -521,46 +523,57 @@ func (cs *clientStream) commitAttempt() {
}
// shouldRetry returns nil if the RPC should be retried; otherwise it returns
-// the error that should be returned by the operation.
-func (cs *clientStream) shouldRetry(err error) error {
- unprocessed := false
+// the error that should be returned by the operation. If the RPC should be
+// retried, the bool indicates whether it is being retried transparently.
+func (cs *clientStream) shouldRetry(err error) (bool, error) {
if cs.attempt.s == nil {
- pioErr, ok := err.(transport.PerformedIOError)
- if ok {
- // Unwrap error.
- err = toRPCErr(pioErr.Err)
- } else {
- unprocessed = true
+ // Error from NewClientStream.
+ nse, ok := err.(*transport.NewStreamError)
+ if !ok {
+ // Unexpected, but assume no I/O was performed and the RPC is not
+ // fatal, so retry indefinitely.
+ return true, nil
}
- if !ok && !cs.callInfo.failFast {
- // In the event of a non-IO operation error from NewStream, we
- // never attempted to write anything to the wire, so we can retry
- // indefinitely for non-fail-fast RPCs.
- return nil
+
+ // Unwrap and convert error.
+ err = toRPCErr(nse.Err)
+
+ // Never retry DoNotRetry errors, which indicate the RPC should not be
+ // retried due to max header list size violation, etc.
+ if nse.DoNotRetry {
+ return false, err
+ }
+
+ // In the event of a non-IO operation error from NewStream, we never
+ // attempted to write anything to the wire, so we can retry
+ // indefinitely.
+ if !nse.DoNotTransparentRetry {
+ return true, nil
}
}
if cs.finished || cs.committed {
// RPC is finished or committed; cannot retry.
- return err
+ return false, err
}
// Wait for the trailers.
+ unprocessed := false
if cs.attempt.s != nil {
<-cs.attempt.s.Done()
unprocessed = cs.attempt.s.Unprocessed()
}
if cs.firstAttempt && unprocessed {
// First attempt, stream unprocessed: transparently retry.
- return nil
+ return true, nil
}
if cs.cc.dopts.disableRetry {
- return err
+ return false, err
}
pushback := 0
hasPushback := false
if cs.attempt.s != nil {
if !cs.attempt.s.TrailersOnly() {
- return err
+ return false, err
}
// TODO(retry): Move down if the spec changes to not check server pushback
@@ -571,13 +584,13 @@ func (cs *clientStream) shouldRetry(err error) error {
if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
channelz.Infof(logger, cs.cc.channelzID, "Server retry pushback specified to abort (%q).", sps[0])
cs.retryThrottler.throttle() // This counts as a failure for throttling.
- return err
+ return false, err
}
hasPushback = true
} else if len(sps) > 1 {
channelz.Warningf(logger, cs.cc.channelzID, "Server retry pushback specified multiple values (%q); not retrying.", sps)
cs.retryThrottler.throttle() // This counts as a failure for throttling.
- return err
+ return false, err
}
}
@@ -590,16 +603,16 @@ func (cs *clientStream) shouldRetry(err error) error {
rp := cs.methodConfig.RetryPolicy
if rp == nil || !rp.RetryableStatusCodes[code] {
- return err
+ return false, err
}
// Note: the ordering here is important; we count this as a failure
// only if the code matched a retryable code.
if cs.retryThrottler.throttle() {
- return err
+ return false, err
}
if cs.numRetries+1 >= rp.MaxAttempts {
- return err
+ return false, err
}
var dur time.Duration
@@ -622,23 +635,24 @@ func (cs *clientStream) shouldRetry(err error) error {
select {
case <-t.C:
cs.numRetries++
- return nil
+ return false, nil
case <-cs.ctx.Done():
t.Stop()
- return status.FromContextError(cs.ctx.Err()).Err()
+ return false, status.FromContextError(cs.ctx.Err()).Err()
}
}
// Returns nil if a retry was performed and succeeded; error otherwise.
func (cs *clientStream) retryLocked(lastErr error) error {
for {
- cs.attempt.finish(lastErr)
- if err := cs.shouldRetry(lastErr); err != nil {
+ cs.attempt.finish(toRPCErr(lastErr))
+ isTransparent, err := cs.shouldRetry(lastErr)
+ if err != nil {
cs.commitAttemptLocked()
return err
}
cs.firstAttempt = false
- if err := cs.newAttemptLocked(nil, nil); err != nil {
+ if err := cs.newAttemptLocked(isTransparent); err != nil {
return err
}
if lastErr = cs.replayBufferLocked(); lastErr == nil {
@@ -659,7 +673,11 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func())
for {
if cs.committed {
cs.mu.Unlock()
- return op(cs.attempt)
+ // toRPCErr is used in case the error from the attempt comes from
+ // NewClientStream, which intentionally doesn't return a status
+ // error to allow for further inspection; all other errors should
+ // already be status errors.
+ return toRPCErr(op(cs.attempt))
}
a := cs.attempt
cs.mu.Unlock()
@@ -924,7 +942,7 @@ func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
return io.EOF
}
if a.statsHandler != nil {
- a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
+ a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
}
if channelz.IsOn() {
a.t.IncrMsgSent()
@@ -972,7 +990,7 @@ func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
a.mu.Unlock()
}
if a.statsHandler != nil {
- a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
+ a.statsHandler.HandleRPC(a.ctx, &stats.InPayload{
Client: true,
RecvTime: time.Now(),
Payload: m,
@@ -1034,12 +1052,12 @@ func (a *csAttempt) finish(err error) {
if a.statsHandler != nil {
end := &stats.End{
Client: true,
- BeginTime: a.cs.beginTime,
+ BeginTime: a.beginTime,
EndTime: time.Now(),
Trailer: tr,
Error: err,
}
- a.statsHandler.HandleRPC(a.cs.ctx, end)
+ a.statsHandler.HandleRPC(a.ctx, end)
}
if a.trInfo != nil && a.trInfo.tr != nil {
if err == nil {
diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go
index bfe5cf887..48594bc24 100644
--- a/vendor/google.golang.org/grpc/version.go
+++ b/vendor/google.golang.org/grpc/version.go
@@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
-const Version = "1.38.0"
+const Version = "1.41.0"
diff --git a/vendor/google.golang.org/grpc/vet.sh b/vendor/google.golang.org/grpc/vet.sh
index 1a0dbd7ee..d923187a7 100644
--- a/vendor/google.golang.org/grpc/vet.sh
+++ b/vendor/google.golang.org/grpc/vet.sh
@@ -32,26 +32,14 @@ PATH="${HOME}/go/bin:${GOROOT}/bin:${PATH}"
go version
if [[ "$1" = "-install" ]]; then
- # Check for module support
- if go help mod >& /dev/null; then
- # Install the pinned versions as defined in module tools.
- pushd ./test/tools
- go install \
- golang.org/x/lint/golint \
- golang.org/x/tools/cmd/goimports \
- honnef.co/go/tools/cmd/staticcheck \
- github.com/client9/misspell/cmd/misspell
- popd
- else
- # Ye olde `go get` incantation.
- # Note: this gets the latest version of all tools (vs. the pinned versions
- # with Go modules).
- go get -u \
- golang.org/x/lint/golint \
- golang.org/x/tools/cmd/goimports \
- honnef.co/go/tools/cmd/staticcheck \
- github.com/client9/misspell/cmd/misspell
- fi
+ # Install the pinned versions as defined in module tools.
+ pushd ./test/tools
+ go install \
+ golang.org/x/lint/golint \
+ golang.org/x/tools/cmd/goimports \
+ honnef.co/go/tools/cmd/staticcheck \
+ github.com/client9/misspell/cmd/misspell
+ popd
if [[ -z "${VET_SKIP_PROTO}" ]]; then
if [[ "${TRAVIS}" = "true" ]]; then
PROTOBUF_VERSION=3.14.0
@@ -101,10 +89,6 @@ not git grep "\(import \|^\s*\)\"github.com/golang/protobuf/ptypes/" -- "*.go"
# - Ensure all xds proto imports are renamed to *pb or *grpc.
git grep '"github.com/envoyproxy/go-control-plane/envoy' -- '*.go' ':(exclude)*.pb.go' | not grep -v 'pb "\|grpc "'
-# - Check imports that are illegal in appengine (until Go 1.11).
-# TODO: Remove when we drop Go 1.10 support
-go list -f {{.Dir}} ./... | xargs go run test/go_vet/vet.go
-
misspell -error .
# - Check that generated proto files are up to date.