aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc')
-rw-r--r--vendor/google.golang.org/grpc/CONTRIBUTING.md7
-rw-r--r--vendor/google.golang.org/grpc/balancer/balancer.go3
-rw-r--r--vendor/google.golang.org/grpc/balancer_conn_wrappers.go318
-rw-r--r--vendor/google.golang.org/grpc/channelz/channelz.go36
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go350
-rw-r--r--vendor/google.golang.org/grpc/credentials/insecure/insecure.go26
-rw-r--r--vendor/google.golang.org/grpc/dialoptions.go52
-rw-r--r--vendor/google.golang.org/grpc/encoding/encoding.go2
-rw-r--r--vendor/google.golang.org/grpc/go.mod12
-rw-r--r--vendor/google.golang.org/grpc/go.sum30
-rw-r--r--vendor/google.golang.org/grpc/interceptor.go9
-rw-r--r--vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go382
-rw-r--r--vendor/google.golang.org/grpc/internal/binarylog/binarylog.go91
-rw-r--r--vendor/google.golang.org/grpc/internal/binarylog/env_config.go6
-rw-r--r--vendor/google.golang.org/grpc/internal/binarylog/method_logger.go26
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/funcs.go228
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/id.go75
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/logging.go91
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/types.go23
-rw-r--r--vendor/google.golang.org/grpc/internal/envconfig/xds.go12
-rw-r--r--vendor/google.golang.org/grpc/internal/internal.go13
-rw-r--r--vendor/google.golang.org/grpc/internal/metadata/metadata.go46
-rw-r--r--vendor/google.golang.org/grpc/internal/pretty/pretty.go82
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/controlbuf.go6
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_client.go31
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_server.go93
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/transport.go11
-rw-r--r--vendor/google.golang.org/grpc/metadata/metadata.go8
-rw-r--r--vendor/google.golang.org/grpc/picker_wrapper.go8
-rw-r--r--vendor/google.golang.org/grpc/pickfirst.go126
-rw-r--r--vendor/google.golang.org/grpc/regenerate.sh12
-rw-r--r--vendor/google.golang.org/grpc/resolver/resolver.go8
-rw-r--r--vendor/google.golang.org/grpc/resolver_conn_wrapper.go23
-rw-r--r--vendor/google.golang.org/grpc/server.go92
-rw-r--r--vendor/google.golang.org/grpc/service_config.go5
-rw-r--r--vendor/google.golang.org/grpc/stream.go235
-rw-r--r--vendor/google.golang.org/grpc/version.go2
-rw-r--r--vendor/google.golang.org/grpc/vet.sh2
38 files changed, 1749 insertions, 833 deletions
diff --git a/vendor/google.golang.org/grpc/CONTRIBUTING.md b/vendor/google.golang.org/grpc/CONTRIBUTING.md
index cd03f8c76..52338d004 100644
--- a/vendor/google.golang.org/grpc/CONTRIBUTING.md
+++ b/vendor/google.golang.org/grpc/CONTRIBUTING.md
@@ -53,9 +53,8 @@ How to get your contributions merged smoothly and quickly.
- **All tests need to be passing** before your change can be merged. We
recommend you **run tests locally** before creating your PR to catch breakages
early on.
- - `make all` to test everything, OR
- - `make vet` to catch vet errors
- - `make test` to run the tests
- - `make testrace` to run tests in race mode
+ - `VET_SKIP_PROTO=1 ./vet.sh` to catch vet errors
+ - `go test -cpu 1,4 -timeout 7m ./...` to run the tests
+ - `go test -race -cpu 1,4 -timeout 7m ./...` to run tests in race mode
- Exceptions to the rules can be made if there's a compelling reason for doing so.
diff --git a/vendor/google.golang.org/grpc/balancer/balancer.go b/vendor/google.golang.org/grpc/balancer/balancer.go
index bcc6f5451..f7a7697ca 100644
--- a/vendor/google.golang.org/grpc/balancer/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer/balancer.go
@@ -27,6 +27,7 @@ import (
"net"
"strings"
+ "google.golang.org/grpc/channelz"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
@@ -192,7 +193,7 @@ type BuildOptions struct {
// server can ignore this field.
Authority string
// ChannelzParentID is the parent ClientConn's channelz ID.
- ChannelzParentID int64
+ ChannelzParentID *channelz.Identifier
// CustomUserAgent is the custom user agent set on the parent ClientConn.
// The balancer should set the same custom user agent if it creates a
// ClientConn.
diff --git a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
index f4ea61746..b1c23eaae 100644
--- a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
+++ b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
@@ -20,130 +20,178 @@ package grpc
import (
"fmt"
+ "strings"
"sync"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
+ "google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
)
-// scStateUpdate contains the subConn and the new state it changed to.
-type scStateUpdate struct {
- sc balancer.SubConn
- state connectivity.State
- err error
-}
+// ccBalancerWrapper sits between the ClientConn and the Balancer.
+//
+// ccBalancerWrapper implements methods corresponding to the ones on the
+// balancer.Balancer interface. The ClientConn is free to call these methods
+// concurrently and the ccBalancerWrapper ensures that calls from the ClientConn
+// to the Balancer happen synchronously and in order.
+//
+// ccBalancerWrapper also implements the balancer.ClientConn interface and is
+// passed to the Balancer implementations. It invokes unexported methods on the
+// ClientConn to handle these calls from the Balancer.
+//
+// It uses the gracefulswitch.Balancer internally to ensure that balancer
+// switches happen in a graceful manner.
+type ccBalancerWrapper struct {
+ cc *ClientConn
-// exitIdle contains no data and is just a signal sent on the updateCh in
-// ccBalancerWrapper to instruct the balancer to exit idle.
-type exitIdle struct{}
+ // Since these fields are accessed only from handleXxx() methods which are
+ // synchronized by the watcher goroutine, we do not need a mutex to protect
+ // these fields.
+ balancer *gracefulswitch.Balancer
+ curBalancerName string
-// ccBalancerWrapper is a wrapper on top of cc for balancers.
-// It implements balancer.ClientConn interface.
-type ccBalancerWrapper struct {
- cc *ClientConn
- balancerMu sync.Mutex // synchronizes calls to the balancer
- balancer balancer.Balancer
- hasExitIdle bool
- updateCh *buffer.Unbounded
- closed *grpcsync.Event
- done *grpcsync.Event
-
- mu sync.Mutex
- subConns map[*acBalancerWrapper]struct{}
+ updateCh *buffer.Unbounded // Updates written on this channel are processed by watcher().
+ resultCh *buffer.Unbounded // Results of calls to UpdateClientConnState() are pushed here.
+ closed *grpcsync.Event // Indicates if close has been called.
+ done *grpcsync.Event // Indicates if close has completed its work.
}
-func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
+// newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer
+// is not created until the switchTo() method is invoked.
+func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper {
ccb := &ccBalancerWrapper{
cc: cc,
updateCh: buffer.NewUnbounded(),
+ resultCh: buffer.NewUnbounded(),
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
- subConns: make(map[*acBalancerWrapper]struct{}),
}
go ccb.watcher()
- ccb.balancer = b.Build(ccb, bopts)
- _, ccb.hasExitIdle = ccb.balancer.(balancer.ExitIdler)
+ ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts)
return ccb
}
-// watcher balancer functions sequentially, so the balancer can be implemented
-// lock-free.
+// The following xxxUpdate structs wrap the arguments received as part of the
+// corresponding update. The watcher goroutine uses the 'type' of the update to
+// invoke the appropriate handler routine to handle the update.
+
+type ccStateUpdate struct {
+ ccs *balancer.ClientConnState
+}
+
+type scStateUpdate struct {
+ sc balancer.SubConn
+ state connectivity.State
+ err error
+}
+
+type exitIdleUpdate struct{}
+
+type resolverErrorUpdate struct {
+ err error
+}
+
+type switchToUpdate struct {
+ name string
+}
+
+type subConnUpdate struct {
+ acbw *acBalancerWrapper
+}
+
+// watcher is a long-running goroutine which reads updates from a channel and
+// invokes corresponding methods on the underlying balancer. It ensures that
+// these methods are invoked in a synchronous fashion. It also ensures that
+// these methods are invoked in the order in which the updates were received.
func (ccb *ccBalancerWrapper) watcher() {
for {
select {
- case t := <-ccb.updateCh.Get():
+ case u := <-ccb.updateCh.Get():
ccb.updateCh.Load()
if ccb.closed.HasFired() {
break
}
- switch u := t.(type) {
+ switch update := u.(type) {
+ case *ccStateUpdate:
+ ccb.handleClientConnStateChange(update.ccs)
case *scStateUpdate:
- ccb.balancerMu.Lock()
- ccb.balancer.UpdateSubConnState(u.sc, balancer.SubConnState{ConnectivityState: u.state, ConnectionError: u.err})
- ccb.balancerMu.Unlock()
- case *acBalancerWrapper:
- ccb.mu.Lock()
- if ccb.subConns != nil {
- delete(ccb.subConns, u)
- ccb.cc.removeAddrConn(u.getAddrConn(), errConnDrain)
- }
- ccb.mu.Unlock()
- case exitIdle:
- if ccb.cc.GetState() == connectivity.Idle {
- if ei, ok := ccb.balancer.(balancer.ExitIdler); ok {
- // We already checked that the balancer implements
- // ExitIdle before pushing the event to updateCh, but
- // check conditionally again as defensive programming.
- ccb.balancerMu.Lock()
- ei.ExitIdle()
- ccb.balancerMu.Unlock()
- }
- }
+ ccb.handleSubConnStateChange(update)
+ case *exitIdleUpdate:
+ ccb.handleExitIdle()
+ case *resolverErrorUpdate:
+ ccb.handleResolverError(update.err)
+ case *switchToUpdate:
+ ccb.handleSwitchTo(update.name)
+ case *subConnUpdate:
+ ccb.handleRemoveSubConn(update.acbw)
default:
- logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", t, t)
+ logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", update, update)
}
case <-ccb.closed.Done():
}
if ccb.closed.HasFired() {
- ccb.balancerMu.Lock()
- ccb.balancer.Close()
- ccb.balancerMu.Unlock()
- ccb.mu.Lock()
- scs := ccb.subConns
- ccb.subConns = nil
- ccb.mu.Unlock()
- ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil})
- ccb.done.Fire()
- // Fire done before removing the addr conns. We can safely unblock
- // ccb.close and allow the removeAddrConns to happen
- // asynchronously.
- for acbw := range scs {
- ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
- }
+ ccb.handleClose()
return
}
}
}
-func (ccb *ccBalancerWrapper) close() {
- ccb.closed.Fire()
- <-ccb.done.Done()
+// updateClientConnState is invoked by grpc to push a ClientConnState update to
+// the underlying balancer.
+//
+// Unlike other methods invoked by grpc to push updates to the underlying
+// balancer, this method cannot simply push the update onto the update channel
+// and return. It needs to return the error returned by the underlying balancer
+// back to grpc which propagates that to the resolver.
+func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
+ ccb.updateCh.Put(&ccStateUpdate{ccs: ccs})
+
+ var res interface{}
+ select {
+ case res = <-ccb.resultCh.Get():
+ ccb.resultCh.Load()
+ case <-ccb.closed.Done():
+ // Return early if the balancer wrapper is closed while we are waiting for
+ // the underlying balancer to process a ClientConnState update.
+ return nil
+ }
+ // If the returned error is nil, attempting to type assert to error leads to
+ // panic. So, this needs to handled separately.
+ if res == nil {
+ return nil
+ }
+ return res.(error)
}
-func (ccb *ccBalancerWrapper) exitIdle() bool {
- if !ccb.hasExitIdle {
- return false
+// handleClientConnStateChange handles a ClientConnState update from the update
+// channel and invokes the appropriate method on the underlying balancer.
+//
+// If the addresses specified in the update contain addresses of type "grpclb"
+// and the selected LB policy is not "grpclb", these addresses will be filtered
+// out and ccs will be modified with the updated address list.
+func (ccb *ccBalancerWrapper) handleClientConnStateChange(ccs *balancer.ClientConnState) {
+ if ccb.curBalancerName != grpclbName {
+ // Filter any grpclb addresses since we don't have the grpclb balancer.
+ var addrs []resolver.Address
+ for _, addr := range ccs.ResolverState.Addresses {
+ if addr.Type == resolver.GRPCLB {
+ continue
+ }
+ addrs = append(addrs, addr)
+ }
+ ccs.ResolverState.Addresses = addrs
}
- ccb.updateCh.Put(exitIdle{})
- return true
+ ccb.resultCh.Put(ccb.balancer.UpdateClientConnState(*ccs))
}
-func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
+// updateSubConnState is invoked by grpc to push a subConn state update to the
+// underlying balancer.
+func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) {
// When updating addresses for a SubConn, if the address in use is not in
// the new addresses, the old ac will be tearDown() and a new ac will be
// created. tearDown() generates a state change with Shutdown state, we
@@ -161,44 +209,125 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co
})
}
-func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
- ccb.balancerMu.Lock()
- defer ccb.balancerMu.Unlock()
- return ccb.balancer.UpdateClientConnState(*ccs)
+// handleSubConnStateChange handles a SubConnState update from the update
+// channel and invokes the appropriate method on the underlying balancer.
+func (ccb *ccBalancerWrapper) handleSubConnStateChange(update *scStateUpdate) {
+ ccb.balancer.UpdateSubConnState(update.sc, balancer.SubConnState{ConnectivityState: update.state, ConnectionError: update.err})
+}
+
+func (ccb *ccBalancerWrapper) exitIdle() {
+ ccb.updateCh.Put(&exitIdleUpdate{})
+}
+
+func (ccb *ccBalancerWrapper) handleExitIdle() {
+ if ccb.cc.GetState() != connectivity.Idle {
+ return
+ }
+ ccb.balancer.ExitIdle()
}
func (ccb *ccBalancerWrapper) resolverError(err error) {
- ccb.balancerMu.Lock()
- defer ccb.balancerMu.Unlock()
+ ccb.updateCh.Put(&resolverErrorUpdate{err: err})
+}
+
+func (ccb *ccBalancerWrapper) handleResolverError(err error) {
ccb.balancer.ResolverError(err)
}
+// switchTo is invoked by grpc to instruct the balancer wrapper to switch to the
+// LB policy identified by name.
+//
+// ClientConn calls newCCBalancerWrapper() at creation time. Upon receipt of the
+// first good update from the name resolver, it determines the LB policy to use
+// and invokes the switchTo() method. Upon receipt of every subsequent update
+// from the name resolver, it invokes this method.
+//
+// the ccBalancerWrapper keeps track of the current LB policy name, and skips
+// the graceful balancer switching process if the name does not change.
+func (ccb *ccBalancerWrapper) switchTo(name string) {
+ ccb.updateCh.Put(&switchToUpdate{name: name})
+}
+
+// handleSwitchTo handles a balancer switch update from the update channel. It
+// calls the SwitchTo() method on the gracefulswitch.Balancer with a
+// balancer.Builder corresponding to name. If no balancer.Builder is registered
+// for the given name, it uses the default LB policy which is "pick_first".
+func (ccb *ccBalancerWrapper) handleSwitchTo(name string) {
+ // TODO: Other languages use case-insensitive balancer registries. We should
+ // switch as well. See: https://github.com/grpc/grpc-go/issues/5288.
+ if strings.EqualFold(ccb.curBalancerName, name) {
+ return
+ }
+
+ // TODO: Ensure that name is a registered LB policy when we get here.
+ // We currently only validate the `loadBalancingConfig` field. We need to do
+ // the same for the `loadBalancingPolicy` field and reject the service config
+ // if the specified policy is not registered.
+ builder := balancer.Get(name)
+ if builder == nil {
+ channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name)
+ builder = newPickfirstBuilder()
+ } else {
+ channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name)
+ }
+
+ if err := ccb.balancer.SwitchTo(builder); err != nil {
+ channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err)
+ return
+ }
+ ccb.curBalancerName = builder.Name()
+}
+
+// handleRemoveSucConn handles a request from the underlying balancer to remove
+// a subConn.
+//
+// See comments in RemoveSubConn() for more details.
+func (ccb *ccBalancerWrapper) handleRemoveSubConn(acbw *acBalancerWrapper) {
+ ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
+}
+
+func (ccb *ccBalancerWrapper) close() {
+ ccb.closed.Fire()
+ <-ccb.done.Done()
+}
+
+func (ccb *ccBalancerWrapper) handleClose() {
+ ccb.balancer.Close()
+ ccb.done.Fire()
+}
+
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
if len(addrs) <= 0 {
return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
}
- ccb.mu.Lock()
- defer ccb.mu.Unlock()
- if ccb.subConns == nil {
- return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed")
- }
ac, err := ccb.cc.newAddrConn(addrs, opts)
if err != nil {
+ channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)
return nil, err
}
acbw := &acBalancerWrapper{ac: ac}
acbw.ac.mu.Lock()
ac.acbw = acbw
acbw.ac.mu.Unlock()
- ccb.subConns[acbw] = struct{}{}
return acbw, nil
}
func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
- // The RemoveSubConn() is handled in the run() goroutine, to avoid deadlock
- // during switchBalancer() if the old balancer calls RemoveSubConn() in its
- // Close().
- ccb.updateCh.Put(sc)
+ // Before we switched the ccBalancerWrapper to use gracefulswitch.Balancer, it
+ // was required to handle the RemoveSubConn() method asynchronously by pushing
+ // the update onto the update channel. This was done to avoid a deadlock as
+ // switchBalancer() was holding cc.mu when calling Close() on the old
+ // balancer, which would in turn call RemoveSubConn().
+ //
+ // With the use of gracefulswitch.Balancer in ccBalancerWrapper, handling this
+ // asynchronously is probably not required anymore since the switchTo() method
+ // handles the balancer switch by pushing the update onto the channel.
+ // TODO(easwars): Handle this inline.
+ acbw, ok := sc.(*acBalancerWrapper)
+ if !ok {
+ return
+ }
+ ccb.updateCh.Put(&subConnUpdate{acbw: acbw})
}
func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
@@ -210,11 +339,6 @@ func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resol
}
func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
- ccb.mu.Lock()
- defer ccb.mu.Unlock()
- if ccb.subConns == nil {
- return
- }
// Update picker before updating state. Even though the ordering here does
// not matter, it can lead to multiple calls of Pick in the common start-up
// case where we wait for ready and then perform an RPC. If the picker is
diff --git a/vendor/google.golang.org/grpc/channelz/channelz.go b/vendor/google.golang.org/grpc/channelz/channelz.go
new file mode 100644
index 000000000..a220c47c5
--- /dev/null
+++ b/vendor/google.golang.org/grpc/channelz/channelz.go
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright 2020 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package channelz exports internals of the channelz implementation as required
+// by other gRPC packages.
+//
+// The implementation of the channelz spec as defined in
+// https://github.com/grpc/proposal/blob/master/A14-channelz.md, is provided by
+// the `internal/channelz` package.
+//
+// Experimental
+//
+// Notice: All APIs in this package are experimental and may be removed in a
+// later release.
+package channelz
+
+import "google.golang.org/grpc/internal/channelz"
+
+// Identifier is an opaque identifier which uniquely identifies an entity in the
+// channelz database.
+type Identifier = channelz.Identifier
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index 28f09dc87..de6d41c23 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -79,7 +79,7 @@ var (
// errNoTransportSecurity indicates that there is no transport security
// being set for ClientConn. Users should either set one or explicitly
// call WithInsecure DialOption to disable security.
- errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
+ errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithTransportCredentials(insecure.NewCredentials()) explicitly or set credentials)")
// errTransportCredsAndBundle indicates that creds bundle is used together
// with other individual Transport Credentials.
errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
@@ -159,23 +159,20 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
}()
- if channelz.IsOn() {
- if cc.dopts.channelzParentID != 0 {
- cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
- channelz.AddTraceEvent(logger, cc.channelzID, 0, &channelz.TraceEventDesc{
- Desc: "Channel Created",
- Severity: channelz.CtInfo,
- Parent: &channelz.TraceEventDesc{
- Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
- Severity: channelz.CtInfo,
- },
- })
- } else {
- cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
- channelz.Info(logger, cc.channelzID, "Channel Created")
+ pid := cc.dopts.channelzParentID
+ cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, pid, target)
+ ted := &channelz.TraceEventDesc{
+ Desc: "Channel created",
+ Severity: channelz.CtInfo,
+ }
+ if cc.dopts.channelzParentID != nil {
+ ted.Parent = &channelz.TraceEventDesc{
+ Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID.Int()),
+ Severity: channelz.CtInfo,
}
- cc.csMgr.channelzID = cc.channelzID
}
+ channelz.AddTraceEvent(logger, cc.channelzID, 1, ted)
+ cc.csMgr.channelzID = cc.channelzID
if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
return nil, errNoTransportSecurity
@@ -281,7 +278,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if creds := cc.dopts.copts.TransportCredentials; creds != nil {
credsClone = creds.Clone()
}
- cc.balancerBuildOpts = balancer.BuildOptions{
+ cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
@@ -289,7 +286,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
- }
+ })
// Build the resolver.
rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
@@ -398,7 +395,7 @@ type connectivityStateManager struct {
mu sync.Mutex
state connectivity.State
notifyChan chan struct{}
- channelzID int64
+ channelzID *channelz.Identifier
}
// updateState updates the connectivity.State of ClientConn.
@@ -464,34 +461,36 @@ var _ ClientConnInterface = (*ClientConn)(nil)
// handshakes. It also handles errors on established connections by
// re-resolving the name and reconnecting.
type ClientConn struct {
- ctx context.Context
- cancel context.CancelFunc
-
- target string
- parsedTarget resolver.Target
- authority string
- dopts dialOptions
- csMgr *connectivityStateManager
-
- balancerBuildOpts balancer.BuildOptions
- blockingpicker *pickerWrapper
-
+ ctx context.Context // Initialized using the background context at dial time.
+ cancel context.CancelFunc // Cancelled on close.
+
+ // The following are initialized at dial time, and are read-only after that.
+ target string // User's dial target.
+ parsedTarget resolver.Target // See parseTargetAndFindResolver().
+ authority string // See determineAuthority().
+ dopts dialOptions // Default and user specified dial options.
+ channelzID *channelz.Identifier // Channelz identifier for the channel.
+ balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath.
+
+ // The following provide their own synchronization, and therefore don't
+ // require cc.mu to be held to access them.
+ csMgr *connectivityStateManager
+ blockingpicker *pickerWrapper
safeConfigSelector iresolver.SafeConfigSelector
+ czData *channelzData
+ retryThrottler atomic.Value // Updated from service config.
- mu sync.RWMutex
- resolverWrapper *ccResolverWrapper
- sc *ServiceConfig
- conns map[*addrConn]struct{}
- // Keepalive parameter can be updated if a GoAway is received.
- mkp keepalive.ClientParameters
- curBalancerName string
- balancerWrapper *ccBalancerWrapper
- retryThrottler atomic.Value
-
+ // firstResolveEvent is used to track whether the name resolver sent us at
+ // least one update. RPCs block on this event.
firstResolveEvent *grpcsync.Event
- channelzID int64 // channelz unique identification number
- czData *channelzData
+ // mu protects the following fields.
+ // TODO: split mu so the same mutex isn't used for everything.
+ mu sync.RWMutex
+ resolverWrapper *ccResolverWrapper // Initialized in Dial; cleared in Close.
+ sc *ServiceConfig // Latest service config received from the resolver.
+ conns map[*addrConn]struct{} // Set to nil on close.
+ mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway.
lceMu sync.Mutex // protects lastConnectionError
lastConnectionError error
@@ -536,14 +535,7 @@ func (cc *ClientConn) GetState() connectivity.State {
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
// release.
func (cc *ClientConn) Connect() {
- cc.mu.Lock()
- defer cc.mu.Unlock()
- if cc.balancerWrapper != nil && cc.balancerWrapper.exitIdle() {
- return
- }
- for ac := range cc.conns {
- go ac.connect()
- }
+ cc.balancerWrapper.exitIdle()
}
func (cc *ClientConn) scWatcher() {
@@ -623,9 +615,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
// with the new addresses.
cc.maybeApplyDefaultServiceConfig(nil)
- if cc.balancerWrapper != nil {
- cc.balancerWrapper.resolverError(err)
- }
+ cc.balancerWrapper.resolverError(err)
// No addresses are valid with err set; return early.
cc.mu.Unlock()
@@ -653,16 +643,10 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
} else {
ret = balancer.ErrBadResolverState
- if cc.balancerWrapper == nil {
- var err error
- if s.ServiceConfig.Err != nil {
- err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
- } else {
- err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
- }
- cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{cc.sc})
- cc.blockingpicker.updatePicker(base.NewErrPicker(err))
- cc.csMgr.updateState(connectivity.TransientFailure)
+ if cc.sc == nil {
+ // Apply the failing LB only if we haven't received valid service config
+ // from the name resolver in the past.
+ cc.applyFailingLB(s.ServiceConfig)
cc.mu.Unlock()
return ret
}
@@ -670,24 +654,12 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
}
var balCfg serviceconfig.LoadBalancingConfig
- if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
+ if cc.sc != nil && cc.sc.lbConfig != nil {
balCfg = cc.sc.lbConfig.cfg
}
-
- cbn := cc.curBalancerName
bw := cc.balancerWrapper
cc.mu.Unlock()
- if cbn != grpclbName {
- // Filter any grpclb addresses since we don't have the grpclb balancer.
- for i := 0; i < len(s.Addresses); {
- if s.Addresses[i].Type == resolver.GRPCLB {
- copy(s.Addresses[i:], s.Addresses[i+1:])
- s.Addresses = s.Addresses[:len(s.Addresses)-1]
- continue
- }
- i++
- }
- }
+
uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
if ret == nil {
ret = uccsErr // prefer ErrBadResolver state since any other error is
@@ -696,56 +668,28 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
return ret
}
-// switchBalancer starts the switching from current balancer to the balancer
-// with the given name.
-//
-// It will NOT send the current address list to the new balancer. If needed,
-// caller of this function should send address list to the new balancer after
-// this function returns.
+// applyFailingLB is akin to configuring an LB policy on the channel which
+// always fails RPCs. Here, an actual LB policy is not configured, but an always
+// erroring picker is configured, which returns errors with information about
+// what was invalid in the received service config. A config selector with no
+// service config is configured, and the connectivity state of the channel is
+// set to TransientFailure.
//
// Caller must hold cc.mu.
-func (cc *ClientConn) switchBalancer(name string) {
- if strings.EqualFold(cc.curBalancerName, name) {
- return
- }
-
- channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)
- if cc.dopts.balancerBuilder != nil {
- channelz.Info(logger, cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")
- return
- }
- if cc.balancerWrapper != nil {
- // Don't hold cc.mu while closing the balancers. The balancers may call
- // methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex
- // would cause a deadlock in that case.
- cc.mu.Unlock()
- cc.balancerWrapper.close()
- cc.mu.Lock()
- }
-
- builder := balancer.Get(name)
- if builder == nil {
- channelz.Warningf(logger, cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)
- channelz.Infof(logger, cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)
- builder = newPickfirstBuilder()
+func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) {
+ var err error
+ if sc.Err != nil {
+ err = status.Errorf(codes.Unavailable, "error parsing service config: %v", sc.Err)
} else {
- channelz.Infof(logger, cc.channelzID, "Channel switches to new LB policy %q", name)
+ err = status.Errorf(codes.Unavailable, "illegal service config type: %T", sc.Config)
}
-
- cc.curBalancerName = builder.Name()
- cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
+ cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
+ cc.blockingpicker.updatePicker(base.NewErrPicker(err))
+ cc.csMgr.updateState(connectivity.TransientFailure)
}
func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
- cc.mu.Lock()
- if cc.conns == nil {
- cc.mu.Unlock()
- return
- }
- // TODO(bar switching) send updates to all balancer wrappers when balancer
- // gracefully switching is supported.
- cc.balancerWrapper.handleSubConnStateChange(sc, s, err)
- cc.mu.Unlock()
+ cc.balancerWrapper.updateSubConnState(sc, s, err)
}
// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
@@ -768,17 +712,21 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSub
cc.mu.Unlock()
return nil, ErrClientConnClosing
}
- if channelz.IsOn() {
- ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
- channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
- Desc: "Subchannel Created",
- Severity: channelz.CtInfo,
- Parent: &channelz.TraceEventDesc{
- Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
- Severity: channelz.CtInfo,
- },
- })
+
+ var err error
+ ac.channelzID, err = channelz.RegisterSubChannel(ac, cc.channelzID, "")
+ if err != nil {
+ return nil, err
}
+ channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
+ Desc: "Subchannel created",
+ Severity: channelz.CtInfo,
+ Parent: &channelz.TraceEventDesc{
+ Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID.Int()),
+ Severity: channelz.CtInfo,
+ },
+ })
+
cc.conns[ac] = struct{}{}
cc.mu.Unlock()
return ac, nil
@@ -853,16 +801,31 @@ func (ac *addrConn) connect() error {
return nil
}
+func equalAddresses(a, b []resolver.Address) bool {
+ if len(a) != len(b) {
+ return false
+ }
+ for i, v := range a {
+ if !v.Equal(b[i]) {
+ return false
+ }
+ }
+ return true
+}
+
// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
//
-// If ac is Connecting, it returns false. The caller should tear down the ac and
-// create a new one. Note that the backoff will be reset when this happens.
-//
// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
// addresses will be picked up by retry in the next iteration after backoff.
//
// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
//
+// If the addresses is the same as the old list, it does nothing and returns
+// true.
+//
+// If ac is Connecting, it returns false. The caller should tear down the ac and
+// create a new one. Note that the backoff will be reset when this happens.
+//
// If ac is Ready, it checks whether current connected address of ac is in the
// new addrs list.
// - If true, it updates ac.addrs and returns true. The ac will keep using
@@ -879,6 +842,10 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
return true
}
+ if equalAddresses(ac.addrs, addrs) {
+ return true
+ }
+
if ac.state == connectivity.Connecting {
return false
}
@@ -959,14 +926,10 @@ func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
}
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
- t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
+ return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
Ctx: ctx,
FullMethodName: method,
})
- if err != nil {
- return nil, nil, toRPCErr(err)
- }
- return t, done, nil
}
func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {
@@ -991,35 +954,26 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel
cc.retryThrottler.Store((*retryThrottler)(nil))
}
- if cc.dopts.balancerBuilder == nil {
- // Only look at balancer types and switch balancer if balancer dial
- // option is not set.
- var newBalancerName string
- if cc.sc != nil && cc.sc.lbConfig != nil {
- newBalancerName = cc.sc.lbConfig.name
- } else {
- var isGRPCLB bool
- for _, a := range addrs {
- if a.Type == resolver.GRPCLB {
- isGRPCLB = true
- break
- }
- }
- if isGRPCLB {
- newBalancerName = grpclbName
- } else if cc.sc != nil && cc.sc.LB != nil {
- newBalancerName = *cc.sc.LB
- } else {
- newBalancerName = PickFirstBalancerName
+ var newBalancerName string
+ if cc.sc != nil && cc.sc.lbConfig != nil {
+ newBalancerName = cc.sc.lbConfig.name
+ } else {
+ var isGRPCLB bool
+ for _, a := range addrs {
+ if a.Type == resolver.GRPCLB {
+ isGRPCLB = true
+ break
}
}
- cc.switchBalancer(newBalancerName)
- } else if cc.balancerWrapper == nil {
- // Balancer dial option was set, and this is the first time handling
- // resolved addresses. Build a balancer with dopts.balancerBuilder.
- cc.curBalancerName = cc.dopts.balancerBuilder.Name()
- cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
+ if isGRPCLB {
+ newBalancerName = grpclbName
+ } else if cc.sc != nil && cc.sc.LB != nil {
+ newBalancerName = *cc.sc.LB
+ } else {
+ newBalancerName = PickFirstBalancerName
+ }
}
+ cc.balancerWrapper.switchTo(newBalancerName)
}
func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
@@ -1070,11 +1024,11 @@ func (cc *ClientConn) Close() error {
rWrapper := cc.resolverWrapper
cc.resolverWrapper = nil
bWrapper := cc.balancerWrapper
- cc.balancerWrapper = nil
cc.mu.Unlock()
+ // The order of closing matters here since the balancer wrapper assumes the
+ // picker is closed before it is closed.
cc.blockingpicker.close()
-
if bWrapper != nil {
bWrapper.close()
}
@@ -1085,22 +1039,22 @@ func (cc *ClientConn) Close() error {
for ac := range conns {
ac.tearDown(ErrClientConnClosing)
}
- if channelz.IsOn() {
- ted := &channelz.TraceEventDesc{
- Desc: "Channel Deleted",
+ ted := &channelz.TraceEventDesc{
+ Desc: "Channel deleted",
+ Severity: channelz.CtInfo,
+ }
+ if cc.dopts.channelzParentID != nil {
+ ted.Parent = &channelz.TraceEventDesc{
+ Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID.Int()),
Severity: channelz.CtInfo,
}
- if cc.dopts.channelzParentID != 0 {
- ted.Parent = &channelz.TraceEventDesc{
- Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
- Severity: channelz.CtInfo,
- }
- }
- channelz.AddTraceEvent(logger, cc.channelzID, 0, ted)
- // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
- // the entity being deleted, and thus prevent it from being deleted right away.
- channelz.RemoveEntry(cc.channelzID)
}
+ channelz.AddTraceEvent(logger, cc.channelzID, 0, ted)
+ // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add
+ // trace reference to the entity being deleted, and thus prevent it from being
+ // deleted right away.
+ channelz.RemoveEntry(cc.channelzID)
+
return nil
}
@@ -1130,7 +1084,7 @@ type addrConn struct {
backoffIdx int // Needs to be stateful for resetConnectBackoff.
resetBackoff chan struct{}
- channelzID int64 // channelz unique identification number.
+ channelzID *channelz.Identifier
czData *channelzData
}
@@ -1284,6 +1238,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
ac.mu.Lock()
defer ac.mu.Unlock()
defer connClosed.Fire()
+ defer hcancel()
if !hcStarted || hctx.Err() != nil {
// We didn't start the health check or set the state to READY, so
// no need to do anything else here.
@@ -1294,7 +1249,6 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
// state, since there may be a new transport in this addrConn.
return
}
- hcancel()
ac.transport = nil
// Refresh the name resolver
ac.cc.resolveNow(resolver.ResolveNowOptions{})
@@ -1312,14 +1266,13 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
defer cancel()
- if channelz.IsOn() {
- copts.ChannelzParentID = ac.channelzID
- }
+ copts.ChannelzParentID = ac.channelzID
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, func() { prefaceReceived.Fire() }, onGoAway, onClose)
if err != nil {
// newTr is either nil, or closed.
- channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v", addr, err)
+ hcancel()
+ channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addr, err)
return err
}
@@ -1332,7 +1285,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
newTr.Close(transport.ErrConnClosing)
if connectCtx.Err() == context.DeadlineExceeded {
err := errors.New("failed to receive server preface within timeout")
- channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: %v", addr, err)
+ channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %s: %v", addr, err)
return err
}
return nil
@@ -1497,19 +1450,18 @@ func (ac *addrConn) tearDown(err error) {
curTr.GracefulClose()
ac.mu.Lock()
}
- if channelz.IsOn() {
- channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
- Desc: "Subchannel Deleted",
+ channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
+ Desc: "Subchannel deleted",
+ Severity: channelz.CtInfo,
+ Parent: &channelz.TraceEventDesc{
+ Desc: fmt.Sprintf("Subchannel(id:%d) deleted", ac.channelzID.Int()),
Severity: channelz.CtInfo,
- Parent: &channelz.TraceEventDesc{
- Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
- Severity: channelz.CtInfo,
- },
- })
- // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
- // the entity being deleted, and thus prevent it from being deleted right away.
- channelz.RemoveEntry(ac.channelzID)
- }
+ },
+ })
+ // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add
+ // trace reference to the entity being deleted, and thus prevent it from
+ // being deleted right away.
+ channelz.RemoveEntry(ac.channelzID)
ac.mu.Unlock()
}
diff --git a/vendor/google.golang.org/grpc/credentials/insecure/insecure.go b/vendor/google.golang.org/grpc/credentials/insecure/insecure.go
index 4fbed1256..82bee1443 100644
--- a/vendor/google.golang.org/grpc/credentials/insecure/insecure.go
+++ b/vendor/google.golang.org/grpc/credentials/insecure/insecure.go
@@ -70,3 +70,29 @@ type info struct {
func (info) AuthType() string {
return "insecure"
}
+
+// insecureBundle implements an insecure bundle.
+// An insecure bundle provides a thin wrapper around insecureTC to support
+// the credentials.Bundle interface.
+type insecureBundle struct{}
+
+// NewBundle returns a bundle with disabled transport security and no per rpc credential.
+func NewBundle() credentials.Bundle {
+ return insecureBundle{}
+}
+
+// NewWithMode returns a new insecure Bundle. The mode is ignored.
+func (insecureBundle) NewWithMode(string) (credentials.Bundle, error) {
+ return insecureBundle{}, nil
+}
+
+// PerRPCCredentials returns an nil implementation as insecure
+// bundle does not support a per rpc credential.
+func (insecureBundle) PerRPCCredentials() credentials.PerRPCCredentials {
+ return nil
+}
+
+// TransportCredentials returns the underlying insecure transport credential.
+func (insecureBundle) TransportCredentials() credentials.TransportCredentials {
+ return NewCredentials()
+}
diff --git a/vendor/google.golang.org/grpc/dialoptions.go b/vendor/google.golang.org/grpc/dialoptions.go
index c4bf09f9e..f2f605a17 100644
--- a/vendor/google.golang.org/grpc/dialoptions.go
+++ b/vendor/google.golang.org/grpc/dialoptions.go
@@ -20,12 +20,11 @@ package grpc
import (
"context"
- "fmt"
"net"
"time"
"google.golang.org/grpc/backoff"
- "google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/channelz"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
@@ -45,19 +44,17 @@ type dialOptions struct {
chainUnaryInts []UnaryClientInterceptor
chainStreamInts []StreamClientInterceptor
- cp Compressor
- dc Decompressor
- bs internalbackoff.Strategy
- block bool
- returnLastError bool
- timeout time.Duration
- scChan <-chan ServiceConfig
- authority string
- copts transport.ConnectOptions
- callOptions []CallOption
- // This is used by WithBalancerName dial option.
- balancerBuilder balancer.Builder
- channelzParentID int64
+ cp Compressor
+ dc Decompressor
+ bs internalbackoff.Strategy
+ block bool
+ returnLastError bool
+ timeout time.Duration
+ scChan <-chan ServiceConfig
+ authority string
+ copts transport.ConnectOptions
+ callOptions []CallOption
+ channelzParentID *channelz.Identifier
disableServiceConfig bool
disableRetry bool
disableHealthCheck bool
@@ -195,25 +192,6 @@ func WithDecompressor(dc Decompressor) DialOption {
})
}
-// WithBalancerName sets the balancer that the ClientConn will be initialized
-// with. Balancer registered with balancerName will be used. This function
-// panics if no balancer was registered by balancerName.
-//
-// The balancer cannot be overridden by balancer option specified by service
-// config.
-//
-// Deprecated: use WithDefaultServiceConfig and WithDisableServiceConfig
-// instead. Will be removed in a future 1.x release.
-func WithBalancerName(balancerName string) DialOption {
- builder := balancer.Get(balancerName)
- if builder == nil {
- panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
- }
- return newFuncDialOption(func(o *dialOptions) {
- o.balancerBuilder = builder
- })
-}
-
// WithServiceConfig returns a DialOption which has a channel to read the
// service configuration.
//
@@ -304,8 +282,8 @@ func WithReturnConnectionError() DialOption {
// WithCredentialsBundle or WithPerRPCCredentials) which require transport
// security is incompatible and will cause grpc.Dial() to fail.
//
-// Deprecated: use WithTransportCredentials and insecure.NewCredentials() instead.
-// Will be supported throughout 1.x.
+// Deprecated: use WithTransportCredentials and insecure.NewCredentials()
+// instead. Will be supported throughout 1.x.
func WithInsecure() DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.TransportCredentials = insecure.NewCredentials()
@@ -498,7 +476,7 @@ func WithAuthority(a string) DialOption {
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
-func WithChannelzParentID(id int64) DialOption {
+func WithChannelzParentID(id *channelz.Identifier) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.channelzParentID = id
})
diff --git a/vendor/google.golang.org/grpc/encoding/encoding.go b/vendor/google.golang.org/grpc/encoding/encoding.go
index 6d84f74c7..18e530fc9 100644
--- a/vendor/google.golang.org/grpc/encoding/encoding.go
+++ b/vendor/google.golang.org/grpc/encoding/encoding.go
@@ -108,7 +108,7 @@ var registeredCodecs = make(map[string]Codec)
// more details.
//
// NOTE: this function must only be called during initialization time (i.e. in
-// an init() function), and is not thread-safe. If multiple Compressors are
+// an init() function), and is not thread-safe. If multiple Codecs are
// registered with the same name, the one registered last will take effect.
func RegisterCodec(codec Codec) {
if codec == nil {
diff --git a/vendor/google.golang.org/grpc/go.mod b/vendor/google.golang.org/grpc/go.mod
index fcffdceef..6a760ed74 100644
--- a/vendor/google.golang.org/grpc/go.mod
+++ b/vendor/google.golang.org/grpc/go.mod
@@ -6,14 +6,14 @@ require (
github.com/cespare/xxhash/v2 v2.1.1
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1
- github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021
+ github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
- github.com/golang/protobuf v1.4.3
- github.com/google/go-cmp v0.5.0
+ github.com/golang/protobuf v1.5.2
+ github.com/google/go-cmp v0.5.6
github.com/google/uuid v1.1.2
- golang.org/x/net v0.0.0-20200822124328-c89045814202
+ golang.org/x/net v0.0.0-20201021035429-f5854403a974
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
- golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd
+ golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013
- google.golang.org/protobuf v1.25.0
+ google.golang.org/protobuf v1.27.1
)
diff --git a/vendor/google.golang.org/grpc/go.sum b/vendor/google.golang.org/grpc/go.sum
index 8b542e0be..5f418dba1 100644
--- a/vendor/google.golang.org/grpc/go.sum
+++ b/vendor/google.golang.org/grpc/go.sum
@@ -12,8 +12,8 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4 h1:hzAQntlaYRkVSFEfj9OTWlVV1H155FMD8BTKktLv0QI=
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI=
-github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
+github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1 h1:zH8ljVhhq7yC0MIeUL/IviMtY8hx2mK8cN9wEYb8ggw=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
@@ -22,8 +22,8 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
-github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021 h1:fP+fF0up6oPY49OrjPrhIJ8yQfdIM85NXMLkMg1EXVs=
-github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
+github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1 h1:xvqufLtNVwAhN8NMyWklVgxnWohi+wtMGQMhtxexlm0=
+github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE=
github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
@@ -40,14 +40,18 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
-github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM=
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
+github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
-github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
+github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
@@ -72,8 +76,9 @@ golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
-golang.org/x/net v0.0.0-20200822124328-c89045814202 h1:VvcQYSHwXgi7W+TpUR6A9g6Up98WAHf3f/ulnJ62IyA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
+golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI=
+golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d h1:TzXSXBo42m9gQenoE3b9BGiEpg5IG2JkU5FkPIawgtw=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -84,10 +89,14 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
+golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 h1:myAQVi0cGEoqQVR5POX+8RR2mrocKqNN1hmeMqhX27k=
+golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
@@ -117,8 +126,11 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
-google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
+google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
+google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/vendor/google.golang.org/grpc/interceptor.go b/vendor/google.golang.org/grpc/interceptor.go
index 668e0adcf..bb96ef57b 100644
--- a/vendor/google.golang.org/grpc/interceptor.go
+++ b/vendor/google.golang.org/grpc/interceptor.go
@@ -72,9 +72,12 @@ type UnaryServerInfo struct {
}
// UnaryHandler defines the handler invoked by UnaryServerInterceptor to complete the normal
-// execution of a unary RPC. If a UnaryHandler returns an error, it should be produced by the
-// status package, or else gRPC will use codes.Unknown as the status code and err.Error() as
-// the status message of the RPC.
+// execution of a unary RPC.
+//
+// If a UnaryHandler returns an error, it should either be produced by the
+// status package, or be one of the context errors. Otherwise, gRPC will use
+// codes.Unknown as the status code and err.Error() as the status message of the
+// RPC.
type UnaryHandler func(ctx context.Context, req interface{}) (interface{}, error)
// UnaryServerInterceptor provides a hook to intercept the execution of a unary RPC on the server. info
diff --git a/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go
new file mode 100644
index 000000000..7ba8f4d18
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go
@@ -0,0 +1,382 @@
+/*
+ *
+ * Copyright 2022 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package gracefulswitch implements a graceful switch load balancer.
+package gracefulswitch
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+
+ "google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/balancer/base"
+ "google.golang.org/grpc/connectivity"
+ "google.golang.org/grpc/resolver"
+)
+
+var errBalancerClosed = errors.New("gracefulSwitchBalancer is closed")
+var _ balancer.Balancer = (*Balancer)(nil)
+
+// NewBalancer returns a graceful switch Balancer.
+func NewBalancer(cc balancer.ClientConn, opts balancer.BuildOptions) *Balancer {
+ return &Balancer{
+ cc: cc,
+ bOpts: opts,
+ }
+}
+
+// Balancer is a utility to gracefully switch from one balancer to
+// a new balancer. It implements the balancer.Balancer interface.
+type Balancer struct {
+ bOpts balancer.BuildOptions
+ cc balancer.ClientConn
+
+ // mu protects the following fields and all fields within balancerCurrent
+ // and balancerPending. mu does not need to be held when calling into the
+ // child balancers, as all calls into these children happen only as a direct
+ // result of a call into the gracefulSwitchBalancer, which are also
+ // guaranteed to be synchronous. There is one exception: an UpdateState call
+ // from a child balancer when current and pending are populated can lead to
+ // calling Close() on the current. To prevent that racing with an
+ // UpdateSubConnState from the channel, we hold currentMu during Close and
+ // UpdateSubConnState calls.
+ mu sync.Mutex
+ balancerCurrent *balancerWrapper
+ balancerPending *balancerWrapper
+ closed bool // set to true when this balancer is closed
+
+ // currentMu must be locked before mu. This mutex guards against this
+ // sequence of events: UpdateSubConnState() called, finds the
+ // balancerCurrent, gives up lock, updateState comes in, causes Close() on
+ // balancerCurrent before the UpdateSubConnState is called on the
+ // balancerCurrent.
+ currentMu sync.Mutex
+}
+
+// swap swaps out the current lb with the pending lb and updates the ClientConn.
+// The caller must hold gsb.mu.
+func (gsb *Balancer) swap() {
+ gsb.cc.UpdateState(gsb.balancerPending.lastState)
+ cur := gsb.balancerCurrent
+ gsb.balancerCurrent = gsb.balancerPending
+ gsb.balancerPending = nil
+ go func() {
+ gsb.currentMu.Lock()
+ defer gsb.currentMu.Unlock()
+ cur.Close()
+ }()
+}
+
+// Helper function that checks if the balancer passed in is current or pending.
+// The caller must hold gsb.mu.
+func (gsb *Balancer) balancerCurrentOrPending(bw *balancerWrapper) bool {
+ return bw == gsb.balancerCurrent || bw == gsb.balancerPending
+}
+
+// SwitchTo initializes the graceful switch process, which completes based on
+// connectivity state changes on the current/pending balancer. Thus, the switch
+// process is not complete when this method returns. This method must be called
+// synchronously alongside the rest of the balancer.Balancer methods this
+// Graceful Switch Balancer implements.
+func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {
+ gsb.mu.Lock()
+ if gsb.closed {
+ gsb.mu.Unlock()
+ return errBalancerClosed
+ }
+ bw := &balancerWrapper{
+ gsb: gsb,
+ lastState: balancer.State{
+ ConnectivityState: connectivity.Connecting,
+ Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
+ },
+ subconns: make(map[balancer.SubConn]bool),
+ }
+ balToClose := gsb.balancerPending // nil if there is no pending balancer
+ if gsb.balancerCurrent == nil {
+ gsb.balancerCurrent = bw
+ } else {
+ gsb.balancerPending = bw
+ }
+ gsb.mu.Unlock()
+ balToClose.Close()
+ // This function takes a builder instead of a balancer because builder.Build
+ // can call back inline, and this utility needs to handle the callbacks.
+ newBalancer := builder.Build(bw, gsb.bOpts)
+ if newBalancer == nil {
+ // This is illegal and should never happen; we clear the balancerWrapper
+ // we were constructing if it happens to avoid a potential panic.
+ gsb.mu.Lock()
+ if gsb.balancerPending != nil {
+ gsb.balancerPending = nil
+ } else {
+ gsb.balancerCurrent = nil
+ }
+ gsb.mu.Unlock()
+ return balancer.ErrBadResolverState
+ }
+
+ // This write doesn't need to take gsb.mu because this field never gets read
+ // or written to on any calls from the current or pending. Calls from grpc
+ // to this balancer are guaranteed to be called synchronously, so this
+ // bw.Balancer field will never be forwarded to until this SwitchTo()
+ // function returns.
+ bw.Balancer = newBalancer
+ return nil
+}
+
+// Returns nil if the graceful switch balancer is closed.
+func (gsb *Balancer) latestBalancer() *balancerWrapper {
+ gsb.mu.Lock()
+ defer gsb.mu.Unlock()
+ if gsb.balancerPending != nil {
+ return gsb.balancerPending
+ }
+ return gsb.balancerCurrent
+}
+
+// UpdateClientConnState forwards the update to the latest balancer created.
+func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
+ // The resolver data is only relevant to the most recent LB Policy.
+ balToUpdate := gsb.latestBalancer()
+ if balToUpdate == nil {
+ return errBalancerClosed
+ }
+ // Perform this call without gsb.mu to prevent deadlocks if the child calls
+ // back into the channel. The latest balancer can never be closed during a
+ // call from the channel, even without gsb.mu held.
+ return balToUpdate.UpdateClientConnState(state)
+}
+
+// ResolverError forwards the error to the latest balancer created.
+func (gsb *Balancer) ResolverError(err error) {
+ // The resolver data is only relevant to the most recent LB Policy.
+ balToUpdate := gsb.latestBalancer()
+ if balToUpdate == nil {
+ return
+ }
+ // Perform this call without gsb.mu to prevent deadlocks if the child calls
+ // back into the channel. The latest balancer can never be closed during a
+ // call from the channel, even without gsb.mu held.
+ balToUpdate.ResolverError(err)
+}
+
+// ExitIdle forwards the call to the latest balancer created.
+//
+// If the latest balancer does not support ExitIdle, the subConns are
+// re-connected to manually.
+func (gsb *Balancer) ExitIdle() {
+ balToUpdate := gsb.latestBalancer()
+ if balToUpdate == nil {
+ return
+ }
+ // There is no need to protect this read with a mutex, as the write to the
+ // Balancer field happens in SwitchTo, which completes before this can be
+ // called.
+ if ei, ok := balToUpdate.Balancer.(balancer.ExitIdler); ok {
+ ei.ExitIdle()
+ return
+ }
+ for sc := range balToUpdate.subconns {
+ sc.Connect()
+ }
+}
+
+// UpdateSubConnState forwards the update to the appropriate child.
+func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
+ gsb.currentMu.Lock()
+ defer gsb.currentMu.Unlock()
+ gsb.mu.Lock()
+ // Forward update to the appropriate child. Even if there is a pending
+ // balancer, the current balancer should continue to get SubConn updates to
+ // maintain the proper state while the pending is still connecting.
+ var balToUpdate *balancerWrapper
+ if gsb.balancerCurrent != nil && gsb.balancerCurrent.subconns[sc] {
+ balToUpdate = gsb.balancerCurrent
+ } else if gsb.balancerPending != nil && gsb.balancerPending.subconns[sc] {
+ balToUpdate = gsb.balancerPending
+ }
+ gsb.mu.Unlock()
+ if balToUpdate == nil {
+ // SubConn belonged to a stale lb policy that has not yet fully closed,
+ // or the balancer was already closed.
+ return
+ }
+ balToUpdate.UpdateSubConnState(sc, state)
+}
+
+// Close closes any active child balancers.
+func (gsb *Balancer) Close() {
+ gsb.mu.Lock()
+ gsb.closed = true
+ currentBalancerToClose := gsb.balancerCurrent
+ gsb.balancerCurrent = nil
+ pendingBalancerToClose := gsb.balancerPending
+ gsb.balancerPending = nil
+ gsb.mu.Unlock()
+
+ currentBalancerToClose.Close()
+ pendingBalancerToClose.Close()
+}
+
+// balancerWrapper wraps a balancer.Balancer, and overrides some Balancer
+// methods to help cleanup SubConns created by the wrapped balancer.
+//
+// It implements the balancer.ClientConn interface and is passed down in that
+// capacity to the wrapped balancer. It maintains a set of subConns created by
+// the wrapped balancer and calls from the latter to create/update/remove
+// SubConns update this set before being forwarded to the parent ClientConn.
+// State updates from the wrapped balancer can result in invocation of the
+// graceful switch logic.
+type balancerWrapper struct {
+ balancer.Balancer
+ gsb *Balancer
+
+ lastState balancer.State
+ subconns map[balancer.SubConn]bool // subconns created by this balancer
+}
+
+func (bw *balancerWrapper) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
+ if state.ConnectivityState == connectivity.Shutdown {
+ bw.gsb.mu.Lock()
+ delete(bw.subconns, sc)
+ bw.gsb.mu.Unlock()
+ }
+ // There is no need to protect this read with a mutex, as the write to the
+ // Balancer field happens in SwitchTo, which completes before this can be
+ // called.
+ bw.Balancer.UpdateSubConnState(sc, state)
+}
+
+// Close closes the underlying LB policy and removes the subconns it created. bw
+// must not be referenced via balancerCurrent or balancerPending in gsb when
+// called. gsb.mu must not be held. Does not panic with a nil receiver.
+func (bw *balancerWrapper) Close() {
+ // before Close is called.
+ if bw == nil {
+ return
+ }
+ // There is no need to protect this read with a mutex, as Close() is
+ // impossible to be called concurrently with the write in SwitchTo(). The
+ // callsites of Close() for this balancer in Graceful Switch Balancer will
+ // never be called until SwitchTo() returns.
+ bw.Balancer.Close()
+ bw.gsb.mu.Lock()
+ for sc := range bw.subconns {
+ bw.gsb.cc.RemoveSubConn(sc)
+ }
+ bw.gsb.mu.Unlock()
+}
+
+func (bw *balancerWrapper) UpdateState(state balancer.State) {
+ // Hold the mutex for this entire call to ensure it cannot occur
+ // concurrently with other updateState() calls. This causes updates to
+ // lastState and calls to cc.UpdateState to happen atomically.
+ bw.gsb.mu.Lock()
+ defer bw.gsb.mu.Unlock()
+ bw.lastState = state
+
+ if !bw.gsb.balancerCurrentOrPending(bw) {
+ return
+ }
+
+ if bw == bw.gsb.balancerCurrent {
+ // In the case that the current balancer exits READY, and there is a pending
+ // balancer, you can forward the pending balancer's cached State up to
+ // ClientConn and swap the pending into the current. This is because there
+ // is no reason to gracefully switch from and keep using the old policy as
+ // the ClientConn is not connected to any backends.
+ if state.ConnectivityState != connectivity.Ready && bw.gsb.balancerPending != nil {
+ bw.gsb.swap()
+ return
+ }
+ // Even if there is a pending balancer waiting to be gracefully switched to,
+ // continue to forward current balancer updates to the Client Conn. Ignoring
+ // state + picker from the current would cause undefined behavior/cause the
+ // system to behave incorrectly from the current LB policies perspective.
+ // Also, the current LB is still being used by grpc to choose SubConns per
+ // RPC, and thus should use the most updated form of the current balancer.
+ bw.gsb.cc.UpdateState(state)
+ return
+ }
+ // This method is now dealing with a state update from the pending balancer.
+ // If the current balancer is currently in a state other than READY, the new
+ // policy can be swapped into place immediately. This is because there is no
+ // reason to gracefully switch from and keep using the old policy as the
+ // ClientConn is not connected to any backends.
+ if state.ConnectivityState != connectivity.Connecting || bw.gsb.balancerCurrent.lastState.ConnectivityState != connectivity.Ready {
+ bw.gsb.swap()
+ }
+}
+
+func (bw *balancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
+ bw.gsb.mu.Lock()
+ if !bw.gsb.balancerCurrentOrPending(bw) {
+ bw.gsb.mu.Unlock()
+ return nil, fmt.Errorf("%T at address %p that called NewSubConn is deleted", bw, bw)
+ }
+ bw.gsb.mu.Unlock()
+
+ sc, err := bw.gsb.cc.NewSubConn(addrs, opts)
+ if err != nil {
+ return nil, err
+ }
+ bw.gsb.mu.Lock()
+ if !bw.gsb.balancerCurrentOrPending(bw) { // balancer was closed during this call
+ bw.gsb.cc.RemoveSubConn(sc)
+ bw.gsb.mu.Unlock()
+ return nil, fmt.Errorf("%T at address %p that called NewSubConn is deleted", bw, bw)
+ }
+ bw.subconns[sc] = true
+ bw.gsb.mu.Unlock()
+ return sc, nil
+}
+
+func (bw *balancerWrapper) ResolveNow(opts resolver.ResolveNowOptions) {
+ // Ignore ResolveNow requests from anything other than the most recent
+ // balancer, because older balancers were already removed from the config.
+ if bw != bw.gsb.latestBalancer() {
+ return
+ }
+ bw.gsb.cc.ResolveNow(opts)
+}
+
+func (bw *balancerWrapper) RemoveSubConn(sc balancer.SubConn) {
+ bw.gsb.mu.Lock()
+ if !bw.gsb.balancerCurrentOrPending(bw) {
+ bw.gsb.mu.Unlock()
+ return
+ }
+ bw.gsb.mu.Unlock()
+ bw.gsb.cc.RemoveSubConn(sc)
+}
+
+func (bw *balancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
+ bw.gsb.mu.Lock()
+ if !bw.gsb.balancerCurrentOrPending(bw) {
+ bw.gsb.mu.Unlock()
+ return
+ }
+ bw.gsb.mu.Unlock()
+ bw.gsb.cc.UpdateAddresses(sc, addrs)
+}
+
+func (bw *balancerWrapper) Target() string {
+ return bw.gsb.cc.Target()
+}
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
index 5cc3aeddb..0a25ce43f 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
@@ -31,7 +31,7 @@ import (
// Logger is the global binary logger. It can be used to get binary logger for
// each method.
type Logger interface {
- getMethodLogger(methodName string) *MethodLogger
+ GetMethodLogger(methodName string) MethodLogger
}
// binLogger is the global binary logger for the binary. One of this should be
@@ -49,17 +49,24 @@ func SetLogger(l Logger) {
binLogger = l
}
+// GetLogger gets the binarg logger.
+//
+// Only call this at init time.
+func GetLogger() Logger {
+ return binLogger
+}
+
// GetMethodLogger returns the methodLogger for the given methodName.
//
// methodName should be in the format of "/service/method".
//
// Each methodLogger returned by this method is a new instance. This is to
// generate sequence id within the call.
-func GetMethodLogger(methodName string) *MethodLogger {
+func GetMethodLogger(methodName string) MethodLogger {
if binLogger == nil {
return nil
}
- return binLogger.getMethodLogger(methodName)
+ return binLogger.GetMethodLogger(methodName)
}
func init() {
@@ -68,17 +75,29 @@ func init() {
binLogger = NewLoggerFromConfigString(configStr)
}
-type methodLoggerConfig struct {
+// MethodLoggerConfig contains the setting for logging behavior of a method
+// logger. Currently, it contains the max length of header and message.
+type MethodLoggerConfig struct {
// Max length of header and message.
- hdr, msg uint64
+ Header, Message uint64
+}
+
+// LoggerConfig contains the config for loggers to create method loggers.
+type LoggerConfig struct {
+ All *MethodLoggerConfig
+ Services map[string]*MethodLoggerConfig
+ Methods map[string]*MethodLoggerConfig
+
+ Blacklist map[string]struct{}
}
type logger struct {
- all *methodLoggerConfig
- services map[string]*methodLoggerConfig
- methods map[string]*methodLoggerConfig
+ config LoggerConfig
+}
- blacklist map[string]struct{}
+// NewLoggerFromConfig builds a logger with the given LoggerConfig.
+func NewLoggerFromConfig(config LoggerConfig) Logger {
+ return &logger{config: config}
}
// newEmptyLogger creates an empty logger. The map fields need to be filled in
@@ -88,57 +107,57 @@ func newEmptyLogger() *logger {
}
// Set method logger for "*".
-func (l *logger) setDefaultMethodLogger(ml *methodLoggerConfig) error {
- if l.all != nil {
+func (l *logger) setDefaultMethodLogger(ml *MethodLoggerConfig) error {
+ if l.config.All != nil {
return fmt.Errorf("conflicting global rules found")
}
- l.all = ml
+ l.config.All = ml
return nil
}
// Set method logger for "service/*".
//
// New methodLogger with same service overrides the old one.
-func (l *logger) setServiceMethodLogger(service string, ml *methodLoggerConfig) error {
- if _, ok := l.services[service]; ok {
+func (l *logger) setServiceMethodLogger(service string, ml *MethodLoggerConfig) error {
+ if _, ok := l.config.Services[service]; ok {
return fmt.Errorf("conflicting service rules for service %v found", service)
}
- if l.services == nil {
- l.services = make(map[string]*methodLoggerConfig)
+ if l.config.Services == nil {
+ l.config.Services = make(map[string]*MethodLoggerConfig)
}
- l.services[service] = ml
+ l.config.Services[service] = ml
return nil
}
// Set method logger for "service/method".
//
// New methodLogger with same method overrides the old one.
-func (l *logger) setMethodMethodLogger(method string, ml *methodLoggerConfig) error {
- if _, ok := l.blacklist[method]; ok {
+func (l *logger) setMethodMethodLogger(method string, ml *MethodLoggerConfig) error {
+ if _, ok := l.config.Blacklist[method]; ok {
return fmt.Errorf("conflicting blacklist rules for method %v found", method)
}
- if _, ok := l.methods[method]; ok {
+ if _, ok := l.config.Methods[method]; ok {
return fmt.Errorf("conflicting method rules for method %v found", method)
}
- if l.methods == nil {
- l.methods = make(map[string]*methodLoggerConfig)
+ if l.config.Methods == nil {
+ l.config.Methods = make(map[string]*MethodLoggerConfig)
}
- l.methods[method] = ml
+ l.config.Methods[method] = ml
return nil
}
// Set blacklist method for "-service/method".
func (l *logger) setBlacklist(method string) error {
- if _, ok := l.blacklist[method]; ok {
+ if _, ok := l.config.Blacklist[method]; ok {
return fmt.Errorf("conflicting blacklist rules for method %v found", method)
}
- if _, ok := l.methods[method]; ok {
+ if _, ok := l.config.Methods[method]; ok {
return fmt.Errorf("conflicting method rules for method %v found", method)
}
- if l.blacklist == nil {
- l.blacklist = make(map[string]struct{})
+ if l.config.Blacklist == nil {
+ l.config.Blacklist = make(map[string]struct{})
}
- l.blacklist[method] = struct{}{}
+ l.config.Blacklist[method] = struct{}{}
return nil
}
@@ -148,23 +167,23 @@ func (l *logger) setBlacklist(method string) error {
//
// Each methodLogger returned by this method is a new instance. This is to
// generate sequence id within the call.
-func (l *logger) getMethodLogger(methodName string) *MethodLogger {
+func (l *logger) GetMethodLogger(methodName string) MethodLogger {
s, m, err := grpcutil.ParseMethod(methodName)
if err != nil {
grpclogLogger.Infof("binarylogging: failed to parse %q: %v", methodName, err)
return nil
}
- if ml, ok := l.methods[s+"/"+m]; ok {
- return newMethodLogger(ml.hdr, ml.msg)
+ if ml, ok := l.config.Methods[s+"/"+m]; ok {
+ return newMethodLogger(ml.Header, ml.Message)
}
- if _, ok := l.blacklist[s+"/"+m]; ok {
+ if _, ok := l.config.Blacklist[s+"/"+m]; ok {
return nil
}
- if ml, ok := l.services[s]; ok {
- return newMethodLogger(ml.hdr, ml.msg)
+ if ml, ok := l.config.Services[s]; ok {
+ return newMethodLogger(ml.Header, ml.Message)
}
- if l.all == nil {
+ if l.config.All == nil {
return nil
}
- return newMethodLogger(l.all.hdr, l.all.msg)
+ return newMethodLogger(l.config.All.Header, l.config.All.Message)
}
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/env_config.go b/vendor/google.golang.org/grpc/internal/binarylog/env_config.go
index d8f4e7602..ab589a76b 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/env_config.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/env_config.go
@@ -89,7 +89,7 @@ func (l *logger) fillMethodLoggerWithConfigString(config string) error {
if err != nil {
return fmt.Errorf("invalid config: %q, %v", config, err)
}
- if err := l.setDefaultMethodLogger(&methodLoggerConfig{hdr: hdr, msg: msg}); err != nil {
+ if err := l.setDefaultMethodLogger(&MethodLoggerConfig{Header: hdr, Message: msg}); err != nil {
return fmt.Errorf("invalid config: %v", err)
}
return nil
@@ -104,11 +104,11 @@ func (l *logger) fillMethodLoggerWithConfigString(config string) error {
return fmt.Errorf("invalid header/message length config: %q, %v", suffix, err)
}
if m == "*" {
- if err := l.setServiceMethodLogger(s, &methodLoggerConfig{hdr: hdr, msg: msg}); err != nil {
+ if err := l.setServiceMethodLogger(s, &MethodLoggerConfig{Header: hdr, Message: msg}); err != nil {
return fmt.Errorf("invalid config: %v", err)
}
} else {
- if err := l.setMethodMethodLogger(s+"/"+m, &methodLoggerConfig{hdr: hdr, msg: msg}); err != nil {
+ if err := l.setMethodMethodLogger(s+"/"+m, &MethodLoggerConfig{Header: hdr, Message: msg}); err != nil {
return fmt.Errorf("invalid config: %v", err)
}
}
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go
index 0cdb41831..24df0a1a0 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go
@@ -48,7 +48,11 @@ func (g *callIDGenerator) reset() {
var idGen callIDGenerator
// MethodLogger is the sub-logger for each method.
-type MethodLogger struct {
+type MethodLogger interface {
+ Log(LogEntryConfig)
+}
+
+type methodLogger struct {
headerMaxLen, messageMaxLen uint64
callID uint64
@@ -57,8 +61,8 @@ type MethodLogger struct {
sink Sink // TODO(blog): make this plugable.
}
-func newMethodLogger(h, m uint64) *MethodLogger {
- return &MethodLogger{
+func newMethodLogger(h, m uint64) *methodLogger {
+ return &methodLogger{
headerMaxLen: h,
messageMaxLen: m,
@@ -69,8 +73,10 @@ func newMethodLogger(h, m uint64) *MethodLogger {
}
}
-// Log creates a proto binary log entry, and logs it to the sink.
-func (ml *MethodLogger) Log(c LogEntryConfig) {
+// Build is an internal only method for building the proto message out of the
+// input event. It's made public to enable other library to reuse as much logic
+// in methodLogger as possible.
+func (ml *methodLogger) Build(c LogEntryConfig) *pb.GrpcLogEntry {
m := c.toProto()
timestamp, _ := ptypes.TimestampProto(time.Now())
m.Timestamp = timestamp
@@ -85,11 +91,15 @@ func (ml *MethodLogger) Log(c LogEntryConfig) {
case *pb.GrpcLogEntry_Message:
m.PayloadTruncated = ml.truncateMessage(pay.Message)
}
+ return m
+}
- ml.sink.Write(m)
+// Log creates a proto binary log entry, and logs it to the sink.
+func (ml *methodLogger) Log(c LogEntryConfig) {
+ ml.sink.Write(ml.Build(c))
}
-func (ml *MethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
+func (ml *methodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
if ml.headerMaxLen == maxUInt {
return false
}
@@ -119,7 +129,7 @@ func (ml *MethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
return truncated
}
-func (ml *MethodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) {
+func (ml *methodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) {
if ml.messageMaxLen == maxUInt {
return false
}
diff --git a/vendor/google.golang.org/grpc/internal/channelz/funcs.go b/vendor/google.golang.org/grpc/internal/channelz/funcs.go
index cd1807543..777cbcd79 100644
--- a/vendor/google.golang.org/grpc/internal/channelz/funcs.go
+++ b/vendor/google.golang.org/grpc/internal/channelz/funcs.go
@@ -24,6 +24,8 @@
package channelz
import (
+ "context"
+ "errors"
"fmt"
"sort"
"sync"
@@ -49,7 +51,8 @@ var (
// TurnOn turns on channelz data collection.
func TurnOn() {
if !IsOn() {
- NewChannelzStorage()
+ db.set(newChannelMap())
+ idGen.reset()
atomic.StoreInt32(&curState, 1)
}
}
@@ -94,46 +97,40 @@ func (d *dbWrapper) get() *channelMap {
return d.DB
}
-// NewChannelzStorage initializes channelz data storage and id generator.
+// NewChannelzStorageForTesting initializes channelz data storage and id
+// generator for testing purposes.
//
-// This function returns a cleanup function to wait for all channelz state to be reset by the
-// grpc goroutines when those entities get closed. By using this cleanup function, we make sure tests
-// don't mess up each other, i.e. lingering goroutine from previous test doing entity removal happen
-// to remove some entity just register by the new test, since the id space is the same.
-//
-// Note: This function is exported for testing purpose only. User should not call
-// it in most cases.
-func NewChannelzStorage() (cleanup func() error) {
- db.set(&channelMap{
- topLevelChannels: make(map[int64]struct{}),
- channels: make(map[int64]*channel),
- listenSockets: make(map[int64]*listenSocket),
- normalSockets: make(map[int64]*normalSocket),
- servers: make(map[int64]*server),
- subChannels: make(map[int64]*subChannel),
- })
+// Returns a cleanup function to be invoked by the test, which waits for up to
+// 10s for all channelz state to be reset by the grpc goroutines when those
+// entities get closed. This cleanup function helps with ensuring that tests
+// don't mess up each other.
+func NewChannelzStorageForTesting() (cleanup func() error) {
+ db.set(newChannelMap())
idGen.reset()
+
return func() error {
- var err error
cm := db.get()
if cm == nil {
return nil
}
- for i := 0; i < 1000; i++ {
- cm.mu.Lock()
- if len(cm.topLevelChannels) == 0 && len(cm.servers) == 0 && len(cm.channels) == 0 && len(cm.subChannels) == 0 && len(cm.listenSockets) == 0 && len(cm.normalSockets) == 0 {
- cm.mu.Unlock()
- // all things stored in the channelz map have been cleared.
+
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ ticker := time.NewTicker(10 * time.Millisecond)
+ defer ticker.Stop()
+ for {
+ cm.mu.RLock()
+ topLevelChannels, servers, channels, subChannels, listenSockets, normalSockets := len(cm.topLevelChannels), len(cm.servers), len(cm.channels), len(cm.subChannels), len(cm.listenSockets), len(cm.normalSockets)
+ cm.mu.RUnlock()
+
+ if err := ctx.Err(); err != nil {
+ return fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", topLevelChannels, servers, channels, subChannels, listenSockets, normalSockets)
+ }
+ if topLevelChannels == 0 && servers == 0 && channels == 0 && subChannels == 0 && listenSockets == 0 && normalSockets == 0 {
return nil
}
- cm.mu.Unlock()
- time.Sleep(10 * time.Millisecond)
+ <-ticker.C
}
-
- cm.mu.Lock()
- err = fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", len(cm.topLevelChannels), len(cm.servers), len(cm.channels), len(cm.subChannels), len(cm.listenSockets), len(cm.normalSockets))
- cm.mu.Unlock()
- return err
}
}
@@ -188,54 +185,77 @@ func GetServer(id int64) *ServerMetric {
return db.get().GetServer(id)
}
-// RegisterChannel registers the given channel c in channelz database with ref
-// as its reference name, and add it to the child list of its parent (identified
-// by pid). pid = 0 means no parent. It returns the unique channelz tracking id
-// assigned to this channel.
-func RegisterChannel(c Channel, pid int64, ref string) int64 {
+// RegisterChannel registers the given channel c in the channelz database with
+// ref as its reference name, and adds it to the child list of its parent
+// (identified by pid). pid == nil means no parent.
+//
+// Returns a unique channelz identifier assigned to this channel.
+//
+// If channelz is not turned ON, the channelz database is not mutated.
+func RegisterChannel(c Channel, pid *Identifier, ref string) *Identifier {
id := idGen.genID()
+ var parent int64
+ isTopChannel := true
+ if pid != nil {
+ isTopChannel = false
+ parent = pid.Int()
+ }
+
+ if !IsOn() {
+ return newIdentifer(RefChannel, id, pid)
+ }
+
cn := &channel{
refName: ref,
c: c,
subChans: make(map[int64]string),
nestedChans: make(map[int64]string),
id: id,
- pid: pid,
+ pid: parent,
trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
}
- if pid == 0 {
- db.get().addChannel(id, cn, true, pid)
- } else {
- db.get().addChannel(id, cn, false, pid)
- }
- return id
+ db.get().addChannel(id, cn, isTopChannel, parent)
+ return newIdentifer(RefChannel, id, pid)
}
-// RegisterSubChannel registers the given channel c in channelz database with ref
-// as its reference name, and add it to the child list of its parent (identified
-// by pid). It returns the unique channelz tracking id assigned to this subchannel.
-func RegisterSubChannel(c Channel, pid int64, ref string) int64 {
- if pid == 0 {
- logger.Error("a SubChannel's parent id cannot be 0")
- return 0
+// RegisterSubChannel registers the given subChannel c in the channelz database
+// with ref as its reference name, and adds it to the child list of its parent
+// (identified by pid).
+//
+// Returns a unique channelz identifier assigned to this subChannel.
+//
+// If channelz is not turned ON, the channelz database is not mutated.
+func RegisterSubChannel(c Channel, pid *Identifier, ref string) (*Identifier, error) {
+ if pid == nil {
+ return nil, errors.New("a SubChannel's parent id cannot be nil")
}
id := idGen.genID()
+ if !IsOn() {
+ return newIdentifer(RefSubChannel, id, pid), nil
+ }
+
sc := &subChannel{
refName: ref,
c: c,
sockets: make(map[int64]string),
id: id,
- pid: pid,
+ pid: pid.Int(),
trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
}
- db.get().addSubChannel(id, sc, pid)
- return id
+ db.get().addSubChannel(id, sc, pid.Int())
+ return newIdentifer(RefSubChannel, id, pid), nil
}
// RegisterServer registers the given server s in channelz database. It returns
// the unique channelz tracking id assigned to this server.
-func RegisterServer(s Server, ref string) int64 {
+//
+// If channelz is not turned ON, the channelz database is not mutated.
+func RegisterServer(s Server, ref string) *Identifier {
id := idGen.genID()
+ if !IsOn() {
+ return newIdentifer(RefServer, id, nil)
+ }
+
svr := &server{
refName: ref,
s: s,
@@ -244,71 +264,92 @@ func RegisterServer(s Server, ref string) int64 {
id: id,
}
db.get().addServer(id, svr)
- return id
+ return newIdentifer(RefServer, id, nil)
}
// RegisterListenSocket registers the given listen socket s in channelz database
// with ref as its reference name, and add it to the child list of its parent
// (identified by pid). It returns the unique channelz tracking id assigned to
// this listen socket.
-func RegisterListenSocket(s Socket, pid int64, ref string) int64 {
- if pid == 0 {
- logger.Error("a ListenSocket's parent id cannot be 0")
- return 0
+//
+// If channelz is not turned ON, the channelz database is not mutated.
+func RegisterListenSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) {
+ if pid == nil {
+ return nil, errors.New("a ListenSocket's parent id cannot be 0")
}
id := idGen.genID()
- ls := &listenSocket{refName: ref, s: s, id: id, pid: pid}
- db.get().addListenSocket(id, ls, pid)
- return id
+ if !IsOn() {
+ return newIdentifer(RefListenSocket, id, pid), nil
+ }
+
+ ls := &listenSocket{refName: ref, s: s, id: id, pid: pid.Int()}
+ db.get().addListenSocket(id, ls, pid.Int())
+ return newIdentifer(RefListenSocket, id, pid), nil
}
// RegisterNormalSocket registers the given normal socket s in channelz database
-// with ref as its reference name, and add it to the child list of its parent
+// with ref as its reference name, and adds it to the child list of its parent
// (identified by pid). It returns the unique channelz tracking id assigned to
// this normal socket.
-func RegisterNormalSocket(s Socket, pid int64, ref string) int64 {
- if pid == 0 {
- logger.Error("a NormalSocket's parent id cannot be 0")
- return 0
+//
+// If channelz is not turned ON, the channelz database is not mutated.
+func RegisterNormalSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) {
+ if pid == nil {
+ return nil, errors.New("a NormalSocket's parent id cannot be 0")
}
id := idGen.genID()
- ns := &normalSocket{refName: ref, s: s, id: id, pid: pid}
- db.get().addNormalSocket(id, ns, pid)
- return id
+ if !IsOn() {
+ return newIdentifer(RefNormalSocket, id, pid), nil
+ }
+
+ ns := &normalSocket{refName: ref, s: s, id: id, pid: pid.Int()}
+ db.get().addNormalSocket(id, ns, pid.Int())
+ return newIdentifer(RefNormalSocket, id, pid), nil
}
// RemoveEntry removes an entry with unique channelz tracking id to be id from
// channelz database.
-func RemoveEntry(id int64) {
- db.get().removeEntry(id)
+//
+// If channelz is not turned ON, this function is a no-op.
+func RemoveEntry(id *Identifier) {
+ if !IsOn() {
+ return
+ }
+ db.get().removeEntry(id.Int())
}
-// TraceEventDesc is what the caller of AddTraceEvent should provide to describe the event to be added
-// to the channel trace.
-// The Parent field is optional. It is used for event that will be recorded in the entity's parent
-// trace also.
+// TraceEventDesc is what the caller of AddTraceEvent should provide to describe
+// the event to be added to the channel trace.
+//
+// The Parent field is optional. It is used for an event that will be recorded
+// in the entity's parent trace.
type TraceEventDesc struct {
Desc string
Severity Severity
Parent *TraceEventDesc
}
-// AddTraceEvent adds trace related to the entity with specified id, using the provided TraceEventDesc.
-func AddTraceEvent(l grpclog.DepthLoggerV2, id int64, depth int, desc *TraceEventDesc) {
- for d := desc; d != nil; d = d.Parent {
- switch d.Severity {
- case CtUnknown, CtInfo:
- l.InfoDepth(depth+1, d.Desc)
- case CtWarning:
- l.WarningDepth(depth+1, d.Desc)
- case CtError:
- l.ErrorDepth(depth+1, d.Desc)
- }
+// AddTraceEvent adds trace related to the entity with specified id, using the
+// provided TraceEventDesc.
+//
+// If channelz is not turned ON, this will simply log the event descriptions.
+func AddTraceEvent(l grpclog.DepthLoggerV2, id *Identifier, depth int, desc *TraceEventDesc) {
+ // Log only the trace description associated with the bottom most entity.
+ switch desc.Severity {
+ case CtUnknown, CtInfo:
+ l.InfoDepth(depth+1, withParens(id)+desc.Desc)
+ case CtWarning:
+ l.WarningDepth(depth+1, withParens(id)+desc.Desc)
+ case CtError:
+ l.ErrorDepth(depth+1, withParens(id)+desc.Desc)
}
+
if getMaxTraceEntry() == 0 {
return
}
- db.get().traceEvent(id, desc)
+ if IsOn() {
+ db.get().traceEvent(id.Int(), desc)
+ }
}
// channelMap is the storage data structure for channelz.
@@ -326,6 +367,17 @@ type channelMap struct {
normalSockets map[int64]*normalSocket
}
+func newChannelMap() *channelMap {
+ return &channelMap{
+ topLevelChannels: make(map[int64]struct{}),
+ channels: make(map[int64]*channel),
+ listenSockets: make(map[int64]*listenSocket),
+ normalSockets: make(map[int64]*normalSocket),
+ servers: make(map[int64]*server),
+ subChannels: make(map[int64]*subChannel),
+ }
+}
+
func (c *channelMap) addServer(id int64, s *server) {
c.mu.Lock()
s.cm = c
diff --git a/vendor/google.golang.org/grpc/internal/channelz/id.go b/vendor/google.golang.org/grpc/internal/channelz/id.go
new file mode 100644
index 000000000..c9a27acd3
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/channelz/id.go
@@ -0,0 +1,75 @@
+/*
+ *
+ * Copyright 2022 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package channelz
+
+import "fmt"
+
+// Identifier is an opaque identifier which uniquely identifies an entity in the
+// channelz database.
+type Identifier struct {
+ typ RefChannelType
+ id int64
+ str string
+ pid *Identifier
+}
+
+// Type returns the entity type corresponding to id.
+func (id *Identifier) Type() RefChannelType {
+ return id.typ
+}
+
+// Int returns the integer identifier corresponding to id.
+func (id *Identifier) Int() int64 {
+ return id.id
+}
+
+// String returns a string representation of the entity corresponding to id.
+//
+// This includes some information about the parent as well. Examples:
+// Top-level channel: [Channel #channel-number]
+// Nested channel: [Channel #parent-channel-number Channel #channel-number]
+// Sub channel: [Channel #parent-channel SubChannel #subchannel-number]
+func (id *Identifier) String() string {
+ return id.str
+}
+
+// Equal returns true if other is the same as id.
+func (id *Identifier) Equal(other *Identifier) bool {
+ if (id != nil) != (other != nil) {
+ return false
+ }
+ if id == nil && other == nil {
+ return true
+ }
+ return id.typ == other.typ && id.id == other.id && id.pid == other.pid
+}
+
+// NewIdentifierForTesting returns a new opaque identifier to be used only for
+// testing purposes.
+func NewIdentifierForTesting(typ RefChannelType, id int64, pid *Identifier) *Identifier {
+ return newIdentifer(typ, id, pid)
+}
+
+func newIdentifer(typ RefChannelType, id int64, pid *Identifier) *Identifier {
+ str := fmt.Sprintf("%s #%d", typ, id)
+ if pid != nil {
+ str = fmt.Sprintf("%s %s", pid, str)
+ }
+ return &Identifier{typ: typ, id: id, str: str, pid: pid}
+}
diff --git a/vendor/google.golang.org/grpc/internal/channelz/logging.go b/vendor/google.golang.org/grpc/internal/channelz/logging.go
index b0013f9c8..8e13a3d2c 100644
--- a/vendor/google.golang.org/grpc/internal/channelz/logging.go
+++ b/vendor/google.golang.org/grpc/internal/channelz/logging.go
@@ -26,77 +26,54 @@ import (
var logger = grpclog.Component("channelz")
+func withParens(id *Identifier) string {
+ return "[" + id.String() + "] "
+}
+
// Info logs and adds a trace event if channelz is on.
-func Info(l grpclog.DepthLoggerV2, id int64, args ...interface{}) {
- if IsOn() {
- AddTraceEvent(l, id, 1, &TraceEventDesc{
- Desc: fmt.Sprint(args...),
- Severity: CtInfo,
- })
- } else {
- l.InfoDepth(1, args...)
- }
+func Info(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) {
+ AddTraceEvent(l, id, 1, &TraceEventDesc{
+ Desc: fmt.Sprint(args...),
+ Severity: CtInfo,
+ })
}
// Infof logs and adds a trace event if channelz is on.
-func Infof(l grpclog.DepthLoggerV2, id int64, format string, args ...interface{}) {
- msg := fmt.Sprintf(format, args...)
- if IsOn() {
- AddTraceEvent(l, id, 1, &TraceEventDesc{
- Desc: msg,
- Severity: CtInfo,
- })
- } else {
- l.InfoDepth(1, msg)
- }
+func Infof(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...interface{}) {
+ AddTraceEvent(l, id, 1, &TraceEventDesc{
+ Desc: fmt.Sprintf(format, args...),
+ Severity: CtInfo,
+ })
}
// Warning logs and adds a trace event if channelz is on.
-func Warning(l grpclog.DepthLoggerV2, id int64, args ...interface{}) {
- if IsOn() {
- AddTraceEvent(l, id, 1, &TraceEventDesc{
- Desc: fmt.Sprint(args...),
- Severity: CtWarning,
- })
- } else {
- l.WarningDepth(1, args...)
- }
+func Warning(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) {
+ AddTraceEvent(l, id, 1, &TraceEventDesc{
+ Desc: fmt.Sprint(args...),
+ Severity: CtWarning,
+ })
}
// Warningf logs and adds a trace event if channelz is on.
-func Warningf(l grpclog.DepthLoggerV2, id int64, format string, args ...interface{}) {
- msg := fmt.Sprintf(format, args...)
- if IsOn() {
- AddTraceEvent(l, id, 1, &TraceEventDesc{
- Desc: msg,
- Severity: CtWarning,
- })
- } else {
- l.WarningDepth(1, msg)
- }
+func Warningf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...interface{}) {
+ AddTraceEvent(l, id, 1, &TraceEventDesc{
+ Desc: fmt.Sprintf(format, args...),
+ Severity: CtWarning,
+ })
}
// Error logs and adds a trace event if channelz is on.
-func Error(l grpclog.DepthLoggerV2, id int64, args ...interface{}) {
- if IsOn() {
- AddTraceEvent(l, id, 1, &TraceEventDesc{
- Desc: fmt.Sprint(args...),
- Severity: CtError,
- })
- } else {
- l.ErrorDepth(1, args...)
- }
+func Error(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) {
+ AddTraceEvent(l, id, 1, &TraceEventDesc{
+ Desc: fmt.Sprint(args...),
+ Severity: CtError,
+ })
}
// Errorf logs and adds a trace event if channelz is on.
-func Errorf(l grpclog.DepthLoggerV2, id int64, format string, args ...interface{}) {
- msg := fmt.Sprintf(format, args...)
- if IsOn() {
- AddTraceEvent(l, id, 1, &TraceEventDesc{
- Desc: msg,
- Severity: CtError,
- })
- } else {
- l.ErrorDepth(1, msg)
- }
+func Errorf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...interface{}) {
+ AddTraceEvent(l, id, 1, &TraceEventDesc{
+ Desc: fmt.Sprintf(format, args...),
+ Severity: CtError,
+ })
}
diff --git a/vendor/google.golang.org/grpc/internal/channelz/types.go b/vendor/google.golang.org/grpc/internal/channelz/types.go
index 3c595d154..ad0ce4dab 100644
--- a/vendor/google.golang.org/grpc/internal/channelz/types.go
+++ b/vendor/google.golang.org/grpc/internal/channelz/types.go
@@ -686,12 +686,33 @@ const (
type RefChannelType int
const (
+ // RefUnknown indicates an unknown entity type, the zero value for this type.
+ RefUnknown RefChannelType = iota
// RefChannel indicates the referenced entity is a Channel.
- RefChannel RefChannelType = iota
+ RefChannel
// RefSubChannel indicates the referenced entity is a SubChannel.
RefSubChannel
+ // RefServer indicates the referenced entity is a Server.
+ RefServer
+ // RefListenSocket indicates the referenced entity is a ListenSocket.
+ RefListenSocket
+ // RefNormalSocket indicates the referenced entity is a NormalSocket.
+ RefNormalSocket
)
+var refChannelTypeToString = map[RefChannelType]string{
+ RefUnknown: "Unknown",
+ RefChannel: "Channel",
+ RefSubChannel: "SubChannel",
+ RefServer: "Server",
+ RefListenSocket: "ListenSocket",
+ RefNormalSocket: "NormalSocket",
+}
+
+func (r RefChannelType) String() string {
+ return refChannelTypeToString[r]
+}
+
func (c *channelTrace) dumpData() *ChannelTrace {
c.mu.Lock()
ct := &ChannelTrace{EventNum: c.eventCount, CreationTime: c.createdTime}
diff --git a/vendor/google.golang.org/grpc/internal/envconfig/xds.go b/vendor/google.golang.org/grpc/internal/envconfig/xds.go
index 9bad03cec..7d996e51b 100644
--- a/vendor/google.golang.org/grpc/internal/envconfig/xds.go
+++ b/vendor/google.golang.org/grpc/internal/envconfig/xds.go
@@ -26,13 +26,13 @@ import (
const (
// XDSBootstrapFileNameEnv is the env variable to set bootstrap file name.
// Do not use this and read from env directly. Its value is read and kept in
- // variable BootstrapFileName.
+ // variable XDSBootstrapFileName.
//
// When both bootstrap FileName and FileContent are set, FileName is used.
XDSBootstrapFileNameEnv = "GRPC_XDS_BOOTSTRAP"
- // XDSBootstrapFileContentEnv is the env variable to set bootstrapp file
+ // XDSBootstrapFileContentEnv is the env variable to set bootstrap file
// content. Do not use this and read from env directly. Its value is read
- // and kept in variable BootstrapFileName.
+ // and kept in variable XDSBootstrapFileContent.
//
// When both bootstrap FileName and FileContent are set, FileName is used.
XDSBootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"
@@ -41,6 +41,7 @@ const (
clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"
aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"
rbacSupportEnv = "GRPC_XDS_EXPERIMENTAL_RBAC"
+ outlierDetectionSupportEnv = "GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION"
federationEnv = "GRPC_EXPERIMENTAL_XDS_FEDERATION"
rlsInXDSEnv = "GRPC_EXPERIMENTAL_XDS_RLS_LB"
@@ -82,7 +83,10 @@ var (
// which can be disabled by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_RBAC" to "false".
XDSRBAC = !strings.EqualFold(os.Getenv(rbacSupportEnv), "false")
-
+ // XDSOutlierDetection indicates whether outlier detection support is
+ // enabled, which can be enabled by setting the environment variable
+ // "GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION" to "true".
+ XDSOutlierDetection = strings.EqualFold(os.Getenv(outlierDetectionSupportEnv), "true")
// XDSFederation indicates whether federation support is enabled.
XDSFederation = strings.EqualFold(os.Getenv(federationEnv), "true")
diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go
index 1b596bf35..6d355b0b0 100644
--- a/vendor/google.golang.org/grpc/internal/internal.go
+++ b/vendor/google.golang.org/grpc/internal/internal.go
@@ -38,11 +38,10 @@ var (
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by
// default, but tests may wish to set it lower for convenience.
KeepaliveMinPingTime = 10 * time.Second
- // ParseServiceConfigForTesting is for creating a fake
- // ClientConn for resolver testing only
- ParseServiceConfigForTesting interface{} // func(string) *serviceconfig.ParseResult
+ // ParseServiceConfig parses a JSON representation of the service config.
+ ParseServiceConfig interface{} // func(string) *serviceconfig.ParseResult
// EqualServiceConfigForTesting is for testing service config generation and
- // parsing. Both a and b should be returned by ParseServiceConfigForTesting.
+ // parsing. Both a and b should be returned by ParseServiceConfig.
// This function compares the config without rawJSON stripped, in case the
// there's difference in white space.
EqualServiceConfigForTesting func(a, b serviceconfig.Config) bool
@@ -86,3 +85,9 @@ const (
// that supports backend returned by grpclb balancer.
CredsBundleModeBackendFromBalancer = "backend-from-balancer"
)
+
+// RLSLoadBalancingPolicyName is the name of the RLS LB policy.
+//
+// It currently has an experimental suffix which would be removed once
+// end-to-end testing of the policy is completed.
+const RLSLoadBalancingPolicyName = "rls_experimental"
diff --git a/vendor/google.golang.org/grpc/internal/metadata/metadata.go b/vendor/google.golang.org/grpc/internal/metadata/metadata.go
index b8733dbf3..b2980f8ac 100644
--- a/vendor/google.golang.org/grpc/internal/metadata/metadata.go
+++ b/vendor/google.golang.org/grpc/internal/metadata/metadata.go
@@ -22,6 +22,9 @@
package metadata
import (
+ "fmt"
+ "strings"
+
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
)
@@ -72,3 +75,46 @@ func Set(addr resolver.Address, md metadata.MD) resolver.Address {
addr.Attributes = addr.Attributes.WithValue(mdKey, mdValue(md))
return addr
}
+
+// Validate returns an error if the input md contains invalid keys or values.
+//
+// If the header is not a pseudo-header, the following items are checked:
+// - header names must contain one or more characters from this set [0-9 a-z _ - .].
+// - if the header-name ends with a "-bin" suffix, no validation of the header value is performed.
+// - otherwise, the header value must contain one or more characters from the set [%x20-%x7E].
+func Validate(md metadata.MD) error {
+ for k, vals := range md {
+ // pseudo-header will be ignored
+ if k[0] == ':' {
+ continue
+ }
+ // check key, for i that saving a conversion if not using for range
+ for i := 0; i < len(k); i++ {
+ r := k[i]
+ if !(r >= 'a' && r <= 'z') && !(r >= '0' && r <= '9') && r != '.' && r != '-' && r != '_' {
+ return fmt.Errorf("header key %q contains illegal characters not in [0-9a-z-_.]", k)
+ }
+ }
+ if strings.HasSuffix(k, "-bin") {
+ continue
+ }
+ // check value
+ for _, val := range vals {
+ if hasNotPrintable(val) {
+ return fmt.Errorf("header key %q contains value with non-printable ASCII characters", k)
+ }
+ }
+ }
+ return nil
+}
+
+// hasNotPrintable return true if msg contains any characters which are not in %x20-%x7E
+func hasNotPrintable(msg string) bool {
+ // for i that saving a conversion if not using for range
+ for i := 0; i < len(msg); i++ {
+ if msg[i] < 0x20 || msg[i] > 0x7E {
+ return true
+ }
+ }
+ return false
+}
diff --git a/vendor/google.golang.org/grpc/internal/pretty/pretty.go b/vendor/google.golang.org/grpc/internal/pretty/pretty.go
new file mode 100644
index 000000000..0177af4b5
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/pretty/pretty.go
@@ -0,0 +1,82 @@
+/*
+ *
+ * Copyright 2021 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package pretty defines helper functions to pretty-print structs for logging.
+package pretty
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+
+ "github.com/golang/protobuf/jsonpb"
+ protov1 "github.com/golang/protobuf/proto"
+ "google.golang.org/protobuf/encoding/protojson"
+ protov2 "google.golang.org/protobuf/proto"
+)
+
+const jsonIndent = " "
+
+// ToJSON marshals the input into a json string.
+//
+// If marshal fails, it falls back to fmt.Sprintf("%+v").
+func ToJSON(e interface{}) string {
+ switch ee := e.(type) {
+ case protov1.Message:
+ mm := jsonpb.Marshaler{Indent: jsonIndent}
+ ret, err := mm.MarshalToString(ee)
+ if err != nil {
+ // This may fail for proto.Anys, e.g. for xDS v2, LDS, the v2
+ // messages are not imported, and this will fail because the message
+ // is not found.
+ return fmt.Sprintf("%+v", ee)
+ }
+ return ret
+ case protov2.Message:
+ mm := protojson.MarshalOptions{
+ Multiline: true,
+ Indent: jsonIndent,
+ }
+ ret, err := mm.Marshal(ee)
+ if err != nil {
+ // This may fail for proto.Anys, e.g. for xDS v2, LDS, the v2
+ // messages are not imported, and this will fail because the message
+ // is not found.
+ return fmt.Sprintf("%+v", ee)
+ }
+ return string(ret)
+ default:
+ ret, err := json.MarshalIndent(ee, "", jsonIndent)
+ if err != nil {
+ return fmt.Sprintf("%+v", ee)
+ }
+ return string(ret)
+ }
+}
+
+// FormatJSON formats the input json bytes with indentation.
+//
+// If Indent fails, it returns the unchanged input as string.
+func FormatJSON(b []byte) string {
+ var out bytes.Buffer
+ err := json.Indent(&out, b, "", jsonIndent)
+ if err != nil {
+ return string(b)
+ }
+ return out.String()
+}
diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
index 8394d252d..244f4b081 100644
--- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
+++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
@@ -137,6 +137,7 @@ type earlyAbortStream struct {
streamID uint32
contentSubtype string
status *status.Status
+ rst bool
}
func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
@@ -786,6 +787,11 @@ func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil {
return err
}
+ if eas.rst {
+ if err := l.framer.fr.WriteRSTStream(eas.streamID, http2.ErrCodeNo); err != nil {
+ return err
+ }
+ }
return nil
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index f0c72d337..24ca59084 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -132,7 +132,7 @@ type http2Client struct {
kpDormant bool
// Fields below are for channelz metric collection.
- channelzID int64 // channelz unique identification number
+ channelzID *channelz.Identifier
czData *channelzData
onGoAway func(GoAwayReason)
@@ -351,8 +351,9 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
t.statsHandler.HandleConn(t.ctx, connBegin)
}
- if channelz.IsOn() {
- t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
+ t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
+ if err != nil {
+ return nil, err
}
if t.keepaliveEnabled {
t.kpDormancyCond = sync.NewCond(&t.mu)
@@ -630,8 +631,8 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
// the wire. However, there are two notable exceptions:
//
// 1. If the stream headers violate the max header list size allowed by the
-// server. In this case there is no reason to retry at all, as it is
-// assumed the RPC would continue to fail on subsequent attempts.
+// server. It's possible this could succeed on another transport, even if
+// it's unlikely, but do not transparently retry.
// 2. If the credentials errored when requesting their headers. In this case,
// it's possible a retry can fix the problem, but indefinitely transparently
// retrying is not appropriate as it is likely the credentials, if they can
@@ -639,8 +640,7 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
type NewStreamError struct {
Err error
- DoNotRetry bool
- DoNotTransparentRetry bool
+ AllowTransparentRetry bool
}
func (e NewStreamError) Error() string {
@@ -649,11 +649,11 @@ func (e NewStreamError) Error() string {
// NewStream creates a stream and registers it into the transport as "active"
// streams. All non-nil errors returned will be *NewStreamError.
-func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
+func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) {
ctx = peer.NewContext(ctx, t.getPeer())
headerFields, err := t.createHeaderFields(ctx, callHdr)
if err != nil {
- return nil, &NewStreamError{Err: err, DoNotTransparentRetry: true}
+ return nil, &NewStreamError{Err: err, AllowTransparentRetry: false}
}
s := t.newStream(ctx, callHdr)
cleanup := func(err error) {
@@ -753,13 +753,14 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return true
}, hdr)
if err != nil {
- return nil, &NewStreamError{Err: err}
+ // Connection closed.
+ return nil, &NewStreamError{Err: err, AllowTransparentRetry: true}
}
if success {
break
}
if hdrListSizeErr != nil {
- return nil, &NewStreamError{Err: hdrListSizeErr, DoNotRetry: true}
+ return nil, &NewStreamError{Err: hdrListSizeErr}
}
firstTry = false
select {
@@ -767,9 +768,9 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
case <-ctx.Done():
return nil, &NewStreamError{Err: ContextErr(ctx.Err())}
case <-t.goAway:
- return nil, &NewStreamError{Err: errStreamDrain}
+ return nil, &NewStreamError{Err: errStreamDrain, AllowTransparentRetry: true}
case <-t.ctx.Done():
- return nil, &NewStreamError{Err: ErrConnClosing}
+ return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
}
}
if t.statsHandler != nil {
@@ -898,9 +899,7 @@ func (t *http2Client) Close(err error) {
t.controlBuf.finish()
t.cancel()
t.conn.Close()
- if channelz.IsOn() {
- channelz.RemoveEntry(t.channelzID)
- }
+ channelz.RemoveEntry(t.channelzID)
// Append info about previous goaways if there were any, since this may be important
// for understanding the root cause for this connection to be closed.
_, goAwayDebugMessage := t.GetGoAwayReason()
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index 2c6eaf0e5..45d7bd145 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -21,7 +21,6 @@ package transport
import (
"bytes"
"context"
- "errors"
"fmt"
"io"
"math"
@@ -36,6 +35,7 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/internal/grpcutil"
+ "google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
@@ -52,10 +52,10 @@ import (
var (
// ErrIllegalHeaderWrite indicates that setting header is illegal because of
// the stream's state.
- ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
+ ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
// than the limit set by peer.
- ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
+ ErrHeaderListSizeLimitViolation = status.Error(codes.Internal, "transport: trying to send header list size larger than the limit set by peer")
)
// serverConnectionCounter counts the number of connections a server has seen
@@ -117,7 +117,7 @@ type http2Server struct {
idle time.Time
// Fields below are for channelz metric collection.
- channelzID int64 // channelz unique identification number
+ channelzID *channelz.Identifier
czData *channelzData
bufferPool *bufferPool
@@ -231,6 +231,11 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
if kp.Timeout == 0 {
kp.Timeout = defaultServerKeepaliveTimeout
}
+ if kp.Time != infinity {
+ if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
+ return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
+ }
+ }
kep := config.KeepalivePolicy
if kep.MinTime == 0 {
kep.MinTime = defaultKeepalivePolicyMinTime
@@ -275,12 +280,12 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
connBegin := &stats.ConnBegin{}
t.stats.HandleConn(t.ctx, connBegin)
}
- if channelz.IsOn() {
- t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
+ t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
+ if err != nil {
+ return nil, err
}
t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
-
t.framer.writer.Flush()
defer func() {
@@ -443,6 +448,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
streamID: streamID,
contentSubtype: s.contentSubtype,
status: status.New(codes.Internal, errMsg),
+ rst: !frame.StreamEnded(),
})
return false
}
@@ -516,14 +522,16 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
if httpMethod != http.MethodPost {
t.mu.Unlock()
+ errMsg := fmt.Sprintf("http2Server.operateHeaders parsed a :method field: %v which should be POST", httpMethod)
if logger.V(logLevel) {
- logger.Infof("transport: http2Server.operateHeaders parsed a :method field: %v which should be POST", httpMethod)
+ logger.Infof("transport: %v", errMsg)
}
- t.controlBuf.put(&cleanupStream{
- streamID: streamID,
- rst: true,
- rstCode: http2.ErrCodeProtocol,
- onWrite: func() {},
+ t.controlBuf.put(&earlyAbortStream{
+ httpStatus: 405,
+ streamID: streamID,
+ contentSubtype: s.contentSubtype,
+ status: status.New(codes.Internal, errMsg),
+ rst: !frame.StreamEnded(),
})
s.cancel()
return false
@@ -544,6 +552,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
streamID: s.id,
contentSubtype: s.contentSubtype,
status: stat,
+ rst: !frame.StreamEnded(),
})
return false
}
@@ -925,11 +934,25 @@ func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
return true
}
+func (t *http2Server) streamContextErr(s *Stream) error {
+ select {
+ case <-t.done:
+ return ErrConnClosing
+ default:
+ }
+ return ContextErr(s.ctx.Err())
+}
+
// WriteHeader sends the header metadata md back to the client.
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
- if s.updateHeaderSent() || s.getState() == streamDone {
+ if s.updateHeaderSent() {
return ErrIllegalHeaderWrite
}
+
+ if s.getState() == streamDone {
+ return t.streamContextErr(s)
+ }
+
s.hdrMu.Lock()
if md.Len() > 0 {
if s.header.Len() > 0 {
@@ -940,7 +963,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
}
if err := t.writeHeaderLocked(s); err != nil {
s.hdrMu.Unlock()
- return err
+ return status.Convert(err).Err()
}
s.hdrMu.Unlock()
return nil
@@ -1056,23 +1079,12 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
if !s.isHeaderSent() { // Headers haven't been written yet.
if err := t.WriteHeader(s, nil); err != nil {
- if _, ok := err.(ConnectionError); ok {
- return err
- }
- // TODO(mmukhi, dfawley): Make sure this is the right code to return.
- return status.Errorf(codes.Internal, "transport: %v", err)
+ return err
}
} else {
// Writing headers checks for this condition.
if s.getState() == streamDone {
- // TODO(mmukhi, dfawley): Should the server write also return io.EOF?
- s.cancel()
- select {
- case <-t.done:
- return ErrConnClosing
- default:
- }
- return ContextErr(s.ctx.Err())
+ return t.streamContextErr(s)
}
}
df := &dataFrame{
@@ -1082,12 +1094,7 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
onEachWrite: t.setResetPingStrikes,
}
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
- select {
- case <-t.done:
- return ErrConnClosing
- default:
- }
- return ContextErr(s.ctx.Err())
+ return t.streamContextErr(s)
}
return t.controlBuf.put(df)
}
@@ -1210,9 +1217,7 @@ func (t *http2Server) Close() {
if err := t.conn.Close(); err != nil && logger.V(logLevel) {
logger.Infof("transport: error closing conn during Close: %v", err)
}
- if channelz.IsOn() {
- channelz.RemoveEntry(t.channelzID)
- }
+ channelz.RemoveEntry(t.channelzID)
// Cancel all active streams.
for _, s := range streams {
s.cancel()
@@ -1225,10 +1230,6 @@ func (t *http2Server) Close() {
// deleteStream deletes the stream s from transport's active streams.
func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
- // In case stream sending and receiving are invoked in separate
- // goroutines (e.g., bi-directional streaming), cancel needs to be
- // called to interrupt the potential blocking on other goroutines.
- s.cancel()
t.mu.Lock()
if _, ok := t.activeStreams[s.id]; ok {
@@ -1250,6 +1251,11 @@ func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
+ // In case stream sending and receiving are invoked in separate
+ // goroutines (e.g., bi-directional streaming), cancel needs to be
+ // called to interrupt the potential blocking on other goroutines.
+ s.cancel()
+
oldState := s.swapState(streamDone)
if oldState == streamDone {
// If the stream was already done, return.
@@ -1269,6 +1275,11 @@ func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, h
// closeStream clears the footprint of a stream when the stream is not needed any more.
func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
+ // In case stream sending and receiving are invoked in separate
+ // goroutines (e.g., bi-directional streaming), cancel needs to be
+ // called to interrupt the potential blocking on other goroutines.
+ s.cancel()
+
s.swapState(streamDone)
t.deleteStream(s, eosReceived)
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index d3bf65b2b..a9ce717f1 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -34,6 +34,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
@@ -529,7 +530,7 @@ type ServerConfig struct {
InitialConnWindowSize int32
WriteBufferSize int
ReadBufferSize int
- ChannelzParentID int64
+ ChannelzParentID *channelz.Identifier
MaxHeaderListSize *uint32
HeaderTableSize *uint32
}
@@ -563,7 +564,7 @@ type ConnectOptions struct {
// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
ReadBufferSize int
// ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
- ChannelzParentID int64
+ ChannelzParentID *channelz.Identifier
// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
MaxHeaderListSize *uint32
// UseProxy specifies if a proxy should be used.
@@ -741,6 +742,12 @@ func (e ConnectionError) Origin() error {
return e.err
}
+// Unwrap returns the original error of this connection error or nil when the
+// origin is nil.
+func (e ConnectionError) Unwrap() error {
+ return e.err
+}
+
var (
// ErrConnClosing indicates that the transport is closing.
ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
diff --git a/vendor/google.golang.org/grpc/metadata/metadata.go b/vendor/google.golang.org/grpc/metadata/metadata.go
index 3604c7819..8e0f6abe8 100644
--- a/vendor/google.golang.org/grpc/metadata/metadata.go
+++ b/vendor/google.golang.org/grpc/metadata/metadata.go
@@ -188,7 +188,9 @@ func FromIncomingContext(ctx context.Context) (MD, bool) {
// map, and there's no guarantee that the MD attached to the context is
// created using our helper functions.
key := strings.ToLower(k)
- out[key] = v
+ s := make([]string, len(v))
+ copy(s, v)
+ out[key] = s
}
return out, true
}
@@ -226,7 +228,9 @@ func FromOutgoingContext(ctx context.Context) (MD, bool) {
// map, and there's no guarantee that the MD attached to the context is
// created using our helper functions.
key := strings.ToLower(k)
- out[key] = v
+ s := make([]string, len(v))
+ copy(s, v)
+ out[key] = s
}
for _, added := range raw.added {
if len(added)%2 == 1 {
diff --git a/vendor/google.golang.org/grpc/picker_wrapper.go b/vendor/google.golang.org/grpc/picker_wrapper.go
index e8367cb89..843633c91 100644
--- a/vendor/google.golang.org/grpc/picker_wrapper.go
+++ b/vendor/google.golang.org/grpc/picker_wrapper.go
@@ -131,7 +131,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
}
if _, ok := status.FromError(err); ok {
// Status error: end the RPC unconditionally with this status.
- return nil, nil, err
+ return nil, nil, dropError{error: err}
}
// For all other errors, wait for ready RPCs should block and other
// RPCs should fail with unavailable.
@@ -175,3 +175,9 @@ func (pw *pickerWrapper) close() {
pw.done = true
close(pw.blockingCh)
}
+
+// dropError is a wrapper error that indicates the LB policy wishes to drop the
+// RPC and not retry it.
+type dropError struct {
+ error
+}
diff --git a/vendor/google.golang.org/grpc/pickfirst.go b/vendor/google.golang.org/grpc/pickfirst.go
index 5168b62b0..fb7a99e0a 100644
--- a/vendor/google.golang.org/grpc/pickfirst.go
+++ b/vendor/google.golang.org/grpc/pickfirst.go
@@ -44,79 +44,107 @@ func (*pickfirstBuilder) Name() string {
}
type pickfirstBalancer struct {
- state connectivity.State
- cc balancer.ClientConn
- sc balancer.SubConn
+ state connectivity.State
+ cc balancer.ClientConn
+ subConn balancer.SubConn
}
func (b *pickfirstBalancer) ResolverError(err error) {
- switch b.state {
- case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting:
- // Set a failing picker if we don't have a good picker.
- b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure,
- Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
- })
- }
if logger.V(2) {
logger.Infof("pickfirstBalancer: ResolverError called with error %v", err)
}
+ if b.subConn == nil {
+ b.state = connectivity.TransientFailure
+ }
+
+ if b.state != connectivity.TransientFailure {
+ // The picker will not change since the balancer does not currently
+ // report an error.
+ return
+ }
+ b.cc.UpdateState(balancer.State{
+ ConnectivityState: connectivity.TransientFailure,
+ Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
+ })
}
-func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) error {
- if len(cs.ResolverState.Addresses) == 0 {
+func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
+ if len(state.ResolverState.Addresses) == 0 {
+ // The resolver reported an empty address list. Treat it like an error by
+ // calling b.ResolverError.
+ if b.subConn != nil {
+ // Remove the old subConn. All addresses were removed, so it is no longer
+ // valid.
+ b.cc.RemoveSubConn(b.subConn)
+ b.subConn = nil
+ }
b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
- if b.sc == nil {
- var err error
- b.sc, err = b.cc.NewSubConn(cs.ResolverState.Addresses, balancer.NewSubConnOptions{})
- if err != nil {
- if logger.V(2) {
- logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
- }
- b.state = connectivity.TransientFailure
- b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure,
- Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)},
- })
- return balancer.ErrBadResolverState
+
+ if b.subConn != nil {
+ b.cc.UpdateAddresses(b.subConn, state.ResolverState.Addresses)
+ return nil
+ }
+
+ subConn, err := b.cc.NewSubConn(state.ResolverState.Addresses, balancer.NewSubConnOptions{})
+ if err != nil {
+ if logger.V(2) {
+ logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
}
- b.state = connectivity.Idle
- b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: &picker{result: balancer.PickResult{SubConn: b.sc}}})
- b.sc.Connect()
- } else {
- b.cc.UpdateAddresses(b.sc, cs.ResolverState.Addresses)
- b.sc.Connect()
+ b.state = connectivity.TransientFailure
+ b.cc.UpdateState(balancer.State{
+ ConnectivityState: connectivity.TransientFailure,
+ Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)},
+ })
+ return balancer.ErrBadResolverState
}
+ b.subConn = subConn
+ b.state = connectivity.Idle
+ b.cc.UpdateState(balancer.State{
+ ConnectivityState: connectivity.Idle,
+ Picker: &picker{result: balancer.PickResult{SubConn: b.subConn}},
+ })
+ b.subConn.Connect()
return nil
}
-func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
+func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
if logger.V(2) {
- logger.Infof("pickfirstBalancer: UpdateSubConnState: %p, %v", sc, s)
+ logger.Infof("pickfirstBalancer: UpdateSubConnState: %p, %v", subConn, state)
}
- if b.sc != sc {
+ if b.subConn != subConn {
if logger.V(2) {
- logger.Infof("pickfirstBalancer: ignored state change because sc is not recognized")
+ logger.Infof("pickfirstBalancer: ignored state change because subConn is not recognized")
}
return
}
- b.state = s.ConnectivityState
- if s.ConnectivityState == connectivity.Shutdown {
- b.sc = nil
+ b.state = state.ConnectivityState
+ if state.ConnectivityState == connectivity.Shutdown {
+ b.subConn = nil
return
}
- switch s.ConnectivityState {
+ switch state.ConnectivityState {
case connectivity.Ready:
- b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{result: balancer.PickResult{SubConn: sc}}})
+ b.cc.UpdateState(balancer.State{
+ ConnectivityState: state.ConnectivityState,
+ Picker: &picker{result: balancer.PickResult{SubConn: subConn}},
+ })
case connectivity.Connecting:
- b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{err: balancer.ErrNoSubConnAvailable}})
+ b.cc.UpdateState(balancer.State{
+ ConnectivityState: state.ConnectivityState,
+ Picker: &picker{err: balancer.ErrNoSubConnAvailable},
+ })
case connectivity.Idle:
- b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &idlePicker{sc: sc}})
+ b.cc.UpdateState(balancer.State{
+ ConnectivityState: state.ConnectivityState,
+ Picker: &idlePicker{subConn: subConn},
+ })
case connectivity.TransientFailure:
b.cc.UpdateState(balancer.State{
- ConnectivityState: s.ConnectivityState,
- Picker: &picker{err: s.ConnectionError},
+ ConnectivityState: state.ConnectivityState,
+ Picker: &picker{err: state.ConnectionError},
})
}
}
@@ -125,8 +153,8 @@ func (b *pickfirstBalancer) Close() {
}
func (b *pickfirstBalancer) ExitIdle() {
- if b.sc != nil && b.state == connectivity.Idle {
- b.sc.Connect()
+ if b.subConn != nil && b.state == connectivity.Idle {
+ b.subConn.Connect()
}
}
@@ -135,18 +163,18 @@ type picker struct {
err error
}
-func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
+func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
return p.result, p.err
}
// idlePicker is used when the SubConn is IDLE and kicks the SubConn into
// CONNECTING when Pick is called.
type idlePicker struct {
- sc balancer.SubConn
+ subConn balancer.SubConn
}
-func (i *idlePicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
- i.sc.Connect()
+func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
+ i.subConn.Connect()
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
diff --git a/vendor/google.golang.org/grpc/regenerate.sh b/vendor/google.golang.org/grpc/regenerate.sh
index 58c802f8a..978b89f37 100644
--- a/vendor/google.golang.org/grpc/regenerate.sh
+++ b/vendor/google.golang.org/grpc/regenerate.sh
@@ -27,9 +27,9 @@ export PATH=${GOBIN}:${PATH}
mkdir -p ${GOBIN}
echo "remove existing generated files"
-# grpc_testingv3/testv3.pb.go is not re-generated because it was
-# intentionally generated by an older version of protoc-gen-go.
-rm -f $(find . -name '*.pb.go' | grep -v 'grpc_testingv3/testv3.pb.go')
+# grpc_testing_not_regenerate/*.pb.go is not re-generated,
+# see grpc_testing_not_regenerate/README.md for details.
+rm -f $(find . -name '*.pb.go' | grep -v 'grpc_testing_not_regenerate')
echo "go install google.golang.org/protobuf/cmd/protoc-gen-go"
(cd test/tools && go install google.golang.org/protobuf/cmd/protoc-gen-go)
@@ -117,9 +117,9 @@ done
mkdir -p ${WORKDIR}/out/google.golang.org/grpc/internal/proto/grpc_lookup_v1
mv ${WORKDIR}/out/google.golang.org/grpc/lookup/grpc_lookup_v1/* ${WORKDIR}/out/google.golang.org/grpc/internal/proto/grpc_lookup_v1
-# grpc_testingv3/testv3.pb.go is not re-generated because it was
-# intentionally generated by an older version of protoc-gen-go.
-rm ${WORKDIR}/out/google.golang.org/grpc/reflection/grpc_testingv3/*.pb.go
+# grpc_testing_not_regenerate/*.pb.go are not re-generated,
+# see grpc_testing_not_regenerate/README.md for details.
+rm ${WORKDIR}/out/google.golang.org/grpc/reflection/grpc_testing_not_regenerate/*.pb.go
# grpc/service_config/service_config.proto does not have a go_package option.
mv ${WORKDIR}/out/grpc/service_config/service_config.pb.go internal/proto/grpc_service_config
diff --git a/vendor/google.golang.org/grpc/resolver/resolver.go b/vendor/google.golang.org/grpc/resolver/resolver.go
index e28b68026..ca2e35a35 100644
--- a/vendor/google.golang.org/grpc/resolver/resolver.go
+++ b/vendor/google.golang.org/grpc/resolver/resolver.go
@@ -27,6 +27,7 @@ import (
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/serviceconfig"
)
@@ -139,13 +140,18 @@ type Address struct {
// Equal returns whether a and o are identical. Metadata is compared directly,
// not with any recursive introspection.
-func (a *Address) Equal(o Address) bool {
+func (a Address) Equal(o Address) bool {
return a.Addr == o.Addr && a.ServerName == o.ServerName &&
a.Attributes.Equal(o.Attributes) &&
a.BalancerAttributes.Equal(o.BalancerAttributes) &&
a.Type == o.Type && a.Metadata == o.Metadata
}
+// String returns JSON formatted string representation of the address.
+func (a Address) String() string {
+ return pretty.ToJSON(a)
+}
+
// BuildOptions includes additional information for the builder to create
// the resolver.
type BuildOptions struct {
diff --git a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
index 2c47cd54f..05a9d4e0b 100644
--- a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
+++ b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
@@ -19,7 +19,6 @@
package grpc
import (
- "fmt"
"strings"
"sync"
@@ -27,6 +26,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
+ "google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
@@ -97,10 +97,7 @@ func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
if ccr.done.HasFired() {
return nil
}
- channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending update to cc: %v", s)
- if channelz.IsOn() {
- ccr.addChannelzTraceEvent(s)
- }
+ ccr.addChannelzTraceEvent(s)
ccr.curState = s
if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
return balancer.ErrBadResolverState
@@ -125,10 +122,7 @@ func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
if ccr.done.HasFired() {
return
}
- channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending new addresses to cc: %v", addrs)
- if channelz.IsOn() {
- ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
- }
+ ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
ccr.curState.Addresses = addrs
ccr.cc.updateResolverState(ccr.curState, nil)
}
@@ -141,7 +135,7 @@ func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
if ccr.done.HasFired() {
return
}
- channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: got new service config: %v", sc)
+ channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: got new service config: %s", sc)
if ccr.cc.dopts.disableServiceConfig {
channelz.Info(logger, ccr.cc.channelzID, "Service config lookups disabled; ignoring config")
return
@@ -151,9 +145,7 @@ func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err)
return
}
- if channelz.IsOn() {
- ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
- }
+ ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
ccr.curState.ServiceConfig = scpr
ccr.cc.updateResolverState(ccr.curState, nil)
}
@@ -180,8 +172,5 @@ func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
} else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
updates = append(updates, "resolver returned new addresses")
}
- channelz.AddTraceEvent(logger, ccr.cc.channelzID, 0, &channelz.TraceEventDesc{
- Desc: fmt.Sprintf("Resolver state updated: %+v (%v)", s, strings.Join(updates, "; ")),
- Severity: channelz.CtInfo,
- })
+ channelz.Infof(logger, ccr.cc.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; "))
}
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index eadf9e05f..65de84b30 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -134,7 +134,7 @@ type Server struct {
channelzRemoveOnce sync.Once
serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
- channelzID int64 // channelz unique identification number
+ channelzID *channelz.Identifier
czData *channelzData
serverWorkerChannels []chan *serverWorkerData
@@ -584,9 +584,8 @@ func NewServer(opt ...ServerOption) *Server {
s.initServerWorkers()
}
- if channelz.IsOn() {
- s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
- }
+ s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
+ channelz.Info(logger, s.channelzID, "Server created")
return s
}
@@ -712,7 +711,7 @@ var ErrServerStopped = errors.New("grpc: the server has been stopped")
type listenSocket struct {
net.Listener
- channelzID int64
+ channelzID *channelz.Identifier
}
func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
@@ -724,9 +723,8 @@ func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
func (l *listenSocket) Close() error {
err := l.Listener.Close()
- if channelz.IsOn() {
- channelz.RemoveEntry(l.channelzID)
- }
+ channelz.RemoveEntry(l.channelzID)
+ channelz.Info(logger, l.channelzID, "ListenSocket deleted")
return err
}
@@ -759,11 +757,6 @@ func (s *Server) Serve(lis net.Listener) error {
ls := &listenSocket{Listener: lis}
s.lis[ls] = true
- if channelz.IsOn() {
- ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
- }
- s.mu.Unlock()
-
defer func() {
s.mu.Lock()
if s.lis != nil && s.lis[ls] {
@@ -773,8 +766,16 @@ func (s *Server) Serve(lis net.Listener) error {
s.mu.Unlock()
}()
- var tempDelay time.Duration // how long to sleep on accept failure
+ var err error
+ ls.channelzID, err = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
+ if err != nil {
+ s.mu.Unlock()
+ return err
+ }
+ s.mu.Unlock()
+ channelz.Info(logger, ls.channelzID, "ListenSocket created")
+ var tempDelay time.Duration // how long to sleep on accept failure
for {
rawConn, err := lis.Accept()
if err != nil {
@@ -1283,9 +1284,10 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if appErr != nil {
appStatus, ok := status.FromError(appErr)
if !ok {
- // Convert appErr if it is not a grpc status error.
- appErr = status.Error(codes.Unknown, appErr.Error())
- appStatus, _ = status.FromError(appErr)
+ // Convert non-status application error to a status error with code
+ // Unknown, but handle context errors specifically.
+ appStatus = status.FromContextError(appErr)
+ appErr = appStatus.Err()
}
if trInfo != nil {
trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
@@ -1549,7 +1551,9 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
if appErr != nil {
appStatus, ok := status.FromError(appErr)
if !ok {
- appStatus = status.New(codes.Unknown, appErr.Error())
+ // Convert non-status application error to a status error with code
+ // Unknown, but handle context errors specifically.
+ appStatus = status.FromContextError(appErr)
appErr = appStatus.Err()
}
if trInfo != nil {
@@ -1706,11 +1710,7 @@ func (s *Server) Stop() {
s.done.Fire()
}()
- s.channelzRemoveOnce.Do(func() {
- if channelz.IsOn() {
- channelz.RemoveEntry(s.channelzID)
- }
- })
+ s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
s.mu.Lock()
listeners := s.lis
@@ -1748,11 +1748,7 @@ func (s *Server) GracefulStop() {
s.quit.Fire()
defer s.done.Fire()
- s.channelzRemoveOnce.Do(func() {
- if channelz.IsOn() {
- channelz.RemoveEntry(s.channelzID)
- }
- })
+ s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
s.mu.Lock()
if s.conns == nil {
s.mu.Unlock()
@@ -1805,12 +1801,26 @@ func (s *Server) getCodec(contentSubtype string) baseCodec {
return codec
}
-// SetHeader sets the header metadata.
-// When called multiple times, all the provided metadata will be merged.
-// All the metadata will be sent out when one of the following happens:
-// - grpc.SendHeader() is called;
-// - The first response is sent out;
-// - An RPC status is sent out (error or success).
+// SetHeader sets the header metadata to be sent from the server to the client.
+// The context provided must be the context passed to the server's handler.
+//
+// Streaming RPCs should prefer the SetHeader method of the ServerStream.
+//
+// When called multiple times, all the provided metadata will be merged. All
+// the metadata will be sent out when one of the following happens:
+//
+// - grpc.SendHeader is called, or for streaming handlers, stream.SendHeader.
+// - The first response message is sent. For unary handlers, this occurs when
+// the handler returns; for streaming handlers, this can happen when stream's
+// SendMsg method is called.
+// - An RPC status is sent out (error or success). This occurs when the handler
+// returns.
+//
+// SetHeader will fail if called after any of the events above.
+//
+// The error returned is compatible with the status package. However, the
+// status code will often not match the RPC status as seen by the client
+// application, and therefore, should not be relied upon for this purpose.
func SetHeader(ctx context.Context, md metadata.MD) error {
if md.Len() == 0 {
return nil
@@ -1822,8 +1832,14 @@ func SetHeader(ctx context.Context, md metadata.MD) error {
return stream.SetHeader(md)
}
-// SendHeader sends header metadata. It may be called at most once.
-// The provided md and headers set by SetHeader() will be sent.
+// SendHeader sends header metadata. It may be called at most once, and may not
+// be called after any event that causes headers to be sent (see SetHeader for
+// a complete list). The provided md and headers set by SetHeader() will be
+// sent.
+//
+// The error returned is compatible with the status package. However, the
+// status code will often not match the RPC status as seen by the client
+// application, and therefore, should not be relied upon for this purpose.
func SendHeader(ctx context.Context, md metadata.MD) error {
stream := ServerTransportStreamFromContext(ctx)
if stream == nil {
@@ -1837,6 +1853,10 @@ func SendHeader(ctx context.Context, md metadata.MD) error {
// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
// When called more than once, all the provided metadata will be merged.
+//
+// The error returned is compatible with the status package. However, the
+// status code will often not match the RPC status as seen by the client
+// application, and therefore, should not be relied upon for this purpose.
func SetTrailer(ctx context.Context, md metadata.MD) error {
if md.Len() == 0 {
return nil
diff --git a/vendor/google.golang.org/grpc/service_config.go b/vendor/google.golang.org/grpc/service_config.go
index 22c4240cf..b01c548bb 100644
--- a/vendor/google.golang.org/grpc/service_config.go
+++ b/vendor/google.golang.org/grpc/service_config.go
@@ -218,7 +218,7 @@ type jsonSC struct {
}
func init() {
- internal.ParseServiceConfigForTesting = parseServiceConfig
+ internal.ParseServiceConfig = parseServiceConfig
}
func parseServiceConfig(js string) *serviceconfig.ParseResult {
if len(js) == 0 {
@@ -381,6 +381,9 @@ func init() {
//
// If any of them is NOT *ServiceConfig, return false.
func equalServiceConfig(a, b serviceconfig.Config) bool {
+ if a == nil && b == nil {
+ return true
+ }
aa, ok := a.(*ServiceConfig)
if !ok {
return false
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 625d47b34..236fc17ec 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -36,6 +36,7 @@ import (
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpcutil"
+ imetadata "google.golang.org/grpc/internal/metadata"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/transport"
@@ -46,10 +47,12 @@ import (
)
// StreamHandler defines the handler called by gRPC server to complete the
-// execution of a streaming RPC. If a StreamHandler returns an error, it
-// should be produced by the status package, or else gRPC will use
-// codes.Unknown as the status code and err.Error() as the status message
-// of the RPC.
+// execution of a streaming RPC.
+//
+// If a StreamHandler returns an error, it should either be produced by the
+// status package, or be one of the context errors. Otherwise, gRPC will use
+// codes.Unknown as the status code and err.Error() as the status message of the
+// RPC.
type StreamHandler func(srv interface{}, stream ServerStream) error
// StreamDesc represents a streaming RPC service's method specification. Used
@@ -164,6 +167,11 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
+ if md, _, ok := metadata.FromOutgoingContextRaw(ctx); ok {
+ if err := imetadata.Validate(md); err != nil {
+ return nil, status.Error(codes.Internal, err.Error())
+ }
+ }
if channelz.IsOn() {
cc.incrCallsStarted()
defer func() {
@@ -295,14 +303,28 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
}
cs.binlog = binarylog.GetMethodLogger(method)
- if err := cs.newAttemptLocked(false /* isTransparent */); err != nil {
+ cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */)
+ if err != nil {
cs.finish(err)
return nil, err
}
- op := func(a *csAttempt) error { return a.newStream() }
+ // Pick the transport to use and create a new stream on the transport.
+ // Assign cs.attempt upon success.
+ op := func(a *csAttempt) error {
+ if err := a.getTransport(); err != nil {
+ return err
+ }
+ if err := a.newStream(); err != nil {
+ return err
+ }
+ // Because this operation is always called either here (while creating
+ // the clientStream) or by the retry code while locked when replaying
+ // the operation, it is safe to access cs.attempt directly.
+ cs.attempt = a
+ return nil
+ }
if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
- cs.finish(err)
return nil, err
}
@@ -341,9 +363,15 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
return cs, nil
}
-// newAttemptLocked creates a new attempt with a transport.
-// If it succeeds, then it replaces clientStream's attempt with this new attempt.
-func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
+// newAttemptLocked creates a new csAttempt without a transport or stream.
+func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) {
+ if err := cs.ctx.Err(); err != nil {
+ return nil, toRPCErr(err)
+ }
+ if err := cs.cc.ctx.Err(); err != nil {
+ return nil, ErrClientConnClosing
+ }
+
ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
method := cs.callHdr.Method
sh := cs.cc.dopts.copts.StatsHandler
@@ -377,44 +405,39 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
ctx = trace.NewContext(ctx, trInfo.tr)
}
- newAttempt := &csAttempt{
+ if cs.cc.parsedTarget.Scheme == "xds" {
+ // Add extra metadata (metadata that will be added by transport) to context
+ // so the balancer can see them.
+ ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
+ "content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
+ ))
+ }
+
+ return &csAttempt{
ctx: ctx,
beginTime: beginTime,
cs: cs,
dc: cs.cc.dopts.dc,
statsHandler: sh,
trInfo: trInfo,
- }
- defer func() {
- if retErr != nil {
- // This attempt is not set in the clientStream, so it's finish won't
- // be called. Call it here for stats and trace in case they are not
- // nil.
- newAttempt.finish(retErr)
- }
- }()
+ }, nil
+}
- if err := ctx.Err(); err != nil {
- return toRPCErr(err)
- }
+func (a *csAttempt) getTransport() error {
+ cs := a.cs
- if cs.cc.parsedTarget.Scheme == "xds" {
- // Add extra metadata (metadata that will be added by transport) to context
- // so the balancer can see them.
- ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
- "content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
- ))
- }
- t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
+ var err error
+ a.t, a.done, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
if err != nil {
+ if de, ok := err.(dropError); ok {
+ err = de.error
+ a.drop = true
+ }
return err
}
- if trInfo != nil {
- trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
+ if a.trInfo != nil {
+ a.trInfo.firstLine.SetRemoteAddr(a.t.RemoteAddr())
}
- newAttempt.t = t
- newAttempt.done = done
- cs.attempt = newAttempt
return nil
}
@@ -423,12 +446,21 @@ func (a *csAttempt) newStream() error {
cs.callHdr.PreviousAttempts = cs.numRetries
s, err := a.t.NewStream(a.ctx, cs.callHdr)
if err != nil {
- // Return without converting to an RPC error so retry code can
- // inspect.
- return err
+ nse, ok := err.(*transport.NewStreamError)
+ if !ok {
+ // Unexpected.
+ return err
+ }
+
+ if nse.AllowTransparentRetry {
+ a.allowTransparentRetry = true
+ }
+
+ // Unwrap and convert error.
+ return toRPCErr(nse.Err)
}
- cs.attempt.s = s
- cs.attempt.p = &parser{r: s}
+ a.s = s
+ a.p = &parser{r: s}
return nil
}
@@ -454,7 +486,7 @@ type clientStream struct {
retryThrottler *retryThrottler // The throttler active when the RPC began.
- binlog *binarylog.MethodLogger // Binary logger, can be nil.
+ binlog binarylog.MethodLogger // Binary logger, can be nil.
// serverHeaderBinlogged is a boolean for whether server header has been
// logged. Server header will be logged when the first time one of those
// happens: stream.Header(), stream.Recv().
@@ -506,6 +538,11 @@ type csAttempt struct {
statsHandler stats.Handler
beginTime time.Time
+
+ // set for newStream errors that may be transparently retried
+ allowTransparentRetry bool
+ // set for pick errors that are returned as a status
+ drop bool
}
func (cs *clientStream) commitAttemptLocked() {
@@ -525,41 +562,21 @@ func (cs *clientStream) commitAttempt() {
// shouldRetry returns nil if the RPC should be retried; otherwise it returns
// the error that should be returned by the operation. If the RPC should be
// retried, the bool indicates whether it is being retried transparently.
-func (cs *clientStream) shouldRetry(err error) (bool, error) {
- if cs.attempt.s == nil {
- // Error from NewClientStream.
- nse, ok := err.(*transport.NewStreamError)
- if !ok {
- // Unexpected, but assume no I/O was performed and the RPC is not
- // fatal, so retry indefinitely.
- return true, nil
- }
-
- // Unwrap and convert error.
- err = toRPCErr(nse.Err)
-
- // Never retry DoNotRetry errors, which indicate the RPC should not be
- // retried due to max header list size violation, etc.
- if nse.DoNotRetry {
- return false, err
- }
+func (a *csAttempt) shouldRetry(err error) (bool, error) {
+ cs := a.cs
- // In the event of a non-IO operation error from NewStream, we never
- // attempted to write anything to the wire, so we can retry
- // indefinitely.
- if !nse.DoNotTransparentRetry {
- return true, nil
- }
- }
- if cs.finished || cs.committed {
- // RPC is finished or committed; cannot retry.
+ if cs.finished || cs.committed || a.drop {
+ // RPC is finished or committed or was dropped by the picker; cannot retry.
return false, err
}
+ if a.s == nil && a.allowTransparentRetry {
+ return true, nil
+ }
// Wait for the trailers.
unprocessed := false
- if cs.attempt.s != nil {
- <-cs.attempt.s.Done()
- unprocessed = cs.attempt.s.Unprocessed()
+ if a.s != nil {
+ <-a.s.Done()
+ unprocessed = a.s.Unprocessed()
}
if cs.firstAttempt && unprocessed {
// First attempt, stream unprocessed: transparently retry.
@@ -571,14 +588,14 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) {
pushback := 0
hasPushback := false
- if cs.attempt.s != nil {
- if !cs.attempt.s.TrailersOnly() {
+ if a.s != nil {
+ if !a.s.TrailersOnly() {
return false, err
}
// TODO(retry): Move down if the spec changes to not check server pushback
// before considering this a failure for throttling.
- sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"]
+ sps := a.s.Trailer()["grpc-retry-pushback-ms"]
if len(sps) == 1 {
var e error
if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
@@ -595,10 +612,10 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) {
}
var code codes.Code
- if cs.attempt.s != nil {
- code = cs.attempt.s.Status().Code()
+ if a.s != nil {
+ code = a.s.Status().Code()
} else {
- code = status.Convert(err).Code()
+ code = status.Code(err)
}
rp := cs.methodConfig.RetryPolicy
@@ -643,19 +660,24 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) {
}
// Returns nil if a retry was performed and succeeded; error otherwise.
-func (cs *clientStream) retryLocked(lastErr error) error {
+func (cs *clientStream) retryLocked(attempt *csAttempt, lastErr error) error {
for {
- cs.attempt.finish(toRPCErr(lastErr))
- isTransparent, err := cs.shouldRetry(lastErr)
+ attempt.finish(toRPCErr(lastErr))
+ isTransparent, err := attempt.shouldRetry(lastErr)
if err != nil {
cs.commitAttemptLocked()
return err
}
cs.firstAttempt = false
- if err := cs.newAttemptLocked(isTransparent); err != nil {
+ attempt, err = cs.newAttemptLocked(isTransparent)
+ if err != nil {
+ // Only returns error if the clientconn is closed or the context of
+ // the stream is canceled.
return err
}
- if lastErr = cs.replayBufferLocked(); lastErr == nil {
+ // Note that the first op in the replay buffer always sets cs.attempt
+ // if it is able to pick a transport and create a stream.
+ if lastErr = cs.replayBufferLocked(attempt); lastErr == nil {
return nil
}
}
@@ -665,7 +687,10 @@ func (cs *clientStream) Context() context.Context {
cs.commitAttempt()
// No need to lock before using attempt, since we know it is committed and
// cannot change.
- return cs.attempt.s.Context()
+ if cs.attempt.s != nil {
+ return cs.attempt.s.Context()
+ }
+ return cs.ctx
}
func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
@@ -695,7 +720,7 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func())
cs.mu.Unlock()
return err
}
- if err := cs.retryLocked(err); err != nil {
+ if err := cs.retryLocked(a, err); err != nil {
cs.mu.Unlock()
return err
}
@@ -726,7 +751,7 @@ func (cs *clientStream) Header() (metadata.MD, error) {
cs.binlog.Log(logEntry)
cs.serverHeaderBinlogged = true
}
- return m, err
+ return m, nil
}
func (cs *clientStream) Trailer() metadata.MD {
@@ -744,10 +769,9 @@ func (cs *clientStream) Trailer() metadata.MD {
return cs.attempt.s.Trailer()
}
-func (cs *clientStream) replayBufferLocked() error {
- a := cs.attempt
+func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error {
for _, f := range cs.buffer {
- if err := f(a); err != nil {
+ if err := f(attempt); err != nil {
return err
}
}
@@ -795,22 +819,17 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
if len(payload) > *cs.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
}
- msgBytes := data // Store the pointer before setting to nil. For binary logging.
op := func(a *csAttempt) error {
- err := a.sendMsg(m, hdr, payload, data)
- // nil out the message and uncomp when replaying; they are only needed for
- // stats which is disabled for subsequent attempts.
- m, data = nil, nil
- return err
+ return a.sendMsg(m, hdr, payload, data)
}
err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
if cs.binlog != nil && err == nil {
cs.binlog.Log(&binarylog.ClientMessage{
OnClientSide: true,
- Message: msgBytes,
+ Message: data,
})
}
- return
+ return err
}
func (cs *clientStream) RecvMsg(m interface{}) error {
@@ -1362,8 +1381,10 @@ func (as *addrConnStream) finish(err error) {
// ServerStream defines the server-side behavior of a streaming RPC.
//
-// All errors returned from ServerStream methods are compatible with the
-// status package.
+// Errors returned from ServerStream methods are compatible with the status
+// package. However, the status code will often not match the RPC status as
+// seen by the client application, and therefore, should not be relied upon for
+// this purpose.
type ServerStream interface {
// SetHeader sets the header metadata. It may be called multiple times.
// When call multiple times, all the provided metadata will be merged.
@@ -1426,7 +1447,7 @@ type serverStream struct {
statsHandler stats.Handler
- binlog *binarylog.MethodLogger
+ binlog binarylog.MethodLogger
// serverHeaderBinlogged indicates whether server header has been logged. It
// will happen when one of the following two happens: stream.SendHeader(),
// stream.Send().
@@ -1446,11 +1467,20 @@ func (ss *serverStream) SetHeader(md metadata.MD) error {
if md.Len() == 0 {
return nil
}
+ err := imetadata.Validate(md)
+ if err != nil {
+ return status.Error(codes.Internal, err.Error())
+ }
return ss.s.SetHeader(md)
}
func (ss *serverStream) SendHeader(md metadata.MD) error {
- err := ss.t.WriteHeader(ss.s, md)
+ err := imetadata.Validate(md)
+ if err != nil {
+ return status.Error(codes.Internal, err.Error())
+ }
+
+ err = ss.t.WriteHeader(ss.s, md)
if ss.binlog != nil && !ss.serverHeaderBinlogged {
h, _ := ss.s.Header()
ss.binlog.Log(&binarylog.ServerHeader{
@@ -1465,6 +1495,9 @@ func (ss *serverStream) SetTrailer(md metadata.MD) {
if md.Len() == 0 {
return
}
+ if err := imetadata.Validate(md); err != nil {
+ logger.Errorf("stream: failed to validate md when setting trailer, err: %v", err)
+ }
ss.s.SetTrailer(md)
}
diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go
index 9d3fd73da..5bc03f9b3 100644
--- a/vendor/google.golang.org/grpc/version.go
+++ b/vendor/google.golang.org/grpc/version.go
@@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
-const Version = "1.44.1-dev"
+const Version = "1.47.0"
diff --git a/vendor/google.golang.org/grpc/vet.sh b/vendor/google.golang.org/grpc/vet.sh
index d923187a7..ceb436c6c 100644
--- a/vendor/google.golang.org/grpc/vet.sh
+++ b/vendor/google.golang.org/grpc/vet.sh
@@ -107,7 +107,7 @@ for MOD_FILE in $(find . -name 'go.mod'); do
go vet -all ./... | fail_on_output
gofmt -s -d -l . 2>&1 | fail_on_output
goimports -l . 2>&1 | not grep -vE "\.pb\.go"
- golint ./... 2>&1 | not grep -vE "/testv3\.pb\.go:"
+ golint ./... 2>&1 | not grep -vE "/grpc_testing_not_regenerate/.*\.pb\.go:"
go mod tidy
git status --porcelain 2>&1 | fail_on_output || \