summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc')
-rw-r--r--vendor/google.golang.org/grpc/.travis.yml2
-rw-r--r--vendor/google.golang.org/grpc/Makefile30
-rw-r--r--vendor/google.golang.org/grpc/SECURITY.md3
-rw-r--r--vendor/google.golang.org/grpc/balancer/balancer.go14
-rw-r--r--vendor/google.golang.org/grpc/balancer/base/balancer.go53
-rw-r--r--vendor/google.golang.org/grpc/balancer_conn_wrappers.go12
-rw-r--r--vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go40
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go97
-rw-r--r--vendor/google.golang.org/grpc/credentials/credentials.go43
-rw-r--r--vendor/google.golang.org/grpc/dialoptions.go22
-rw-r--r--vendor/google.golang.org/grpc/encoding/proto/proto.go70
-rw-r--r--vendor/google.golang.org/grpc/go.mod6
-rw-r--r--vendor/google.golang.org/grpc/go.sum26
-rw-r--r--vendor/google.golang.org/grpc/interceptor.go36
-rw-r--r--vendor/google.golang.org/grpc/internal/credentials/credentials.go49
-rw-r--r--vendor/google.golang.org/grpc/internal/grpcutil/target.go42
-rw-r--r--vendor/google.golang.org/grpc/internal/internal.go19
-rw-r--r--vendor/google.golang.org/grpc/internal/metadata/metadata.go50
-rw-r--r--vendor/google.golang.org/grpc/internal/resolver/config_selector.go164
-rw-r--r--vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go43
-rw-r--r--vendor/google.golang.org/grpc/internal/resolver/unix/unix.go63
-rw-r--r--vendor/google.golang.org/grpc/internal/serviceconfig/serviceconfig.go72
-rw-r--r--vendor/google.golang.org/grpc/internal/syscall/syscall_linux.go20
-rw-r--r--vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go2
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/controlbuf.go32
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_client.go149
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_server.go54
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http_util.go32
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/networktype/networktype.go46
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/proxy.go (renamed from vendor/google.golang.org/grpc/proxy.go)52
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/transport.go12
-rw-r--r--vendor/google.golang.org/grpc/internal/xds_handshake_cluster.go40
-rw-r--r--vendor/google.golang.org/grpc/metadata/metadata.go26
-rw-r--r--vendor/google.golang.org/grpc/pickfirst.go2
-rw-r--r--vendor/google.golang.org/grpc/regenerate.sh42
-rw-r--r--vendor/google.golang.org/grpc/resolver/resolver.go2
-rw-r--r--vendor/google.golang.org/grpc/resolver_conn_wrapper.go63
-rw-r--r--vendor/google.golang.org/grpc/rpc_util.go57
-rw-r--r--vendor/google.golang.org/grpc/server.go120
-rw-r--r--vendor/google.golang.org/grpc/service_config.go74
-rw-r--r--vendor/google.golang.org/grpc/status/status.go8
-rw-r--r--vendor/google.golang.org/grpc/stream.go81
-rw-r--r--vendor/google.golang.org/grpc/tap/tap.go16
-rw-r--r--vendor/google.golang.org/grpc/version.go2
-rw-r--r--vendor/google.golang.org/grpc/vet.sh47
45 files changed, 1306 insertions, 629 deletions
diff --git a/vendor/google.golang.org/grpc/.travis.yml b/vendor/google.golang.org/grpc/.travis.yml
index 3e495fa23..5847d94e5 100644
--- a/vendor/google.golang.org/grpc/.travis.yml
+++ b/vendor/google.golang.org/grpc/.travis.yml
@@ -35,7 +35,7 @@ install:
script:
- set -e
- - if [[ -n "${TESTEXTRAS}" ]]; then examples/examples_test.sh; interop/interop_test.sh; make testsubmodule; exit 0; fi
+ - if [[ -n "${TESTEXTRAS}" ]]; then examples/examples_test.sh; security/advancedtls/examples/examples_test.sh; interop/interop_test.sh; make testsubmodule; exit 0; fi
- if [[ -n "${VET}" ]]; then ./vet.sh; fi
- if [[ -n "${GAE}" ]]; then make testappengine; exit 0; fi
- if [[ -n "${RACE}" ]]; then make testrace; exit 0; fi
diff --git a/vendor/google.golang.org/grpc/Makefile b/vendor/google.golang.org/grpc/Makefile
index 3f661a787..1f0722f16 100644
--- a/vendor/google.golang.org/grpc/Makefile
+++ b/vendor/google.golang.org/grpc/Makefile
@@ -1,13 +1,13 @@
all: vet test testrace
-build: deps
+build:
go build google.golang.org/grpc/...
clean:
go clean -i google.golang.org/grpc/...
deps:
- go get -d -v google.golang.org/grpc/...
+ GO111MODULE=on go get -d -v google.golang.org/grpc/...
proto:
@ if ! which protoc > /dev/null; then \
@@ -16,30 +16,18 @@ proto:
fi
go generate google.golang.org/grpc/...
-test: testdeps
+test:
go test -cpu 1,4 -timeout 7m google.golang.org/grpc/...
-testsubmodule: testdeps
+testsubmodule:
cd security/advancedtls && go test -cpu 1,4 -timeout 7m google.golang.org/grpc/security/advancedtls/...
cd security/authorization && go test -cpu 1,4 -timeout 7m google.golang.org/grpc/security/authorization/...
-testappengine: testappenginedeps
- goapp test -cpu 1,4 -timeout 7m google.golang.org/grpc/...
-
-testappenginedeps:
- goapp get -d -v -t -tags 'appengine appenginevm' google.golang.org/grpc/...
-
-testdeps:
- go get -d -v -t google.golang.org/grpc/...
-
-testrace: testdeps
+testrace:
go test -race -cpu 1,4 -timeout 7m google.golang.org/grpc/...
-updatedeps:
- go get -d -v -u -f google.golang.org/grpc/...
-
-updatetestdeps:
- go get -d -v -t -u -f google.golang.org/grpc/...
+testdeps:
+ GO111MODULE=on go get -d -v -t google.golang.org/grpc/...
vet: vetdeps
./vet.sh
@@ -51,14 +39,10 @@ vetdeps:
all \
build \
clean \
- deps \
proto \
test \
testappengine \
testappenginedeps \
- testdeps \
testrace \
- updatedeps \
- updatetestdeps \
vet \
vetdeps
diff --git a/vendor/google.golang.org/grpc/SECURITY.md b/vendor/google.golang.org/grpc/SECURITY.md
new file mode 100644
index 000000000..be6e10870
--- /dev/null
+++ b/vendor/google.golang.org/grpc/SECURITY.md
@@ -0,0 +1,3 @@
+# Security Policy
+
+For information on gRPC Security Policy and reporting potentional security issues, please see [gRPC CVE Process](https://github.com/grpc/proposal/blob/master/P4-grpc-cve-process.md).
diff --git a/vendor/google.golang.org/grpc/balancer/balancer.go b/vendor/google.golang.org/grpc/balancer/balancer.go
index 8bf359dbf..ab531f4c0 100644
--- a/vendor/google.golang.org/grpc/balancer/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer/balancer.go
@@ -101,6 +101,9 @@ type SubConn interface {
// a new connection will be created.
//
// This will trigger a state transition for the SubConn.
+ //
+ // Deprecated: This method is now part of the ClientConn interface and will
+ // eventually be removed from here.
UpdateAddresses([]resolver.Address)
// Connect starts the connecting for this SubConn.
Connect()
@@ -143,6 +146,13 @@ type ClientConn interface {
// RemoveSubConn removes the SubConn from ClientConn.
// The SubConn will be shutdown.
RemoveSubConn(SubConn)
+ // UpdateAddresses updates the addresses used in the passed in SubConn.
+ // gRPC checks if the currently connected address is still in the new list.
+ // If so, the connection will be kept. Else, the connection will be
+ // gracefully closed, and a new connection will be created.
+ //
+ // This will trigger a state transition for the SubConn.
+ UpdateAddresses(SubConn, []resolver.Address)
// UpdateState notifies gRPC that the balancer's internal state has
// changed.
@@ -174,6 +184,10 @@ type BuildOptions struct {
Dialer func(context.Context, string) (net.Conn, error)
// ChannelzParentID is the entity parent's channelz unique identification number.
ChannelzParentID int64
+ // CustomUserAgent is the custom user agent set on the parent ClientConn.
+ // The balancer should set the same custom user agent if it creates a
+ // ClientConn.
+ CustomUserAgent string
// Target contains the parsed address info of the dial target. It is the same resolver.Target as
// passed to the resolver.
// See the documentation for the resolver.Target type for details about what it contains.
diff --git a/vendor/google.golang.org/grpc/balancer/base/balancer.go b/vendor/google.golang.org/grpc/balancer/base/balancer.go
index 32d782f1c..c883efa0b 100644
--- a/vendor/google.golang.org/grpc/balancer/base/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer/base/balancer.go
@@ -22,6 +22,7 @@ import (
"errors"
"fmt"
+ "google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
@@ -41,7 +42,7 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions)
cc: cc,
pickerBuilder: bb.pickerBuilder,
- subConns: make(map[resolver.Address]balancer.SubConn),
+ subConns: make(map[resolver.Address]subConnInfo),
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{},
config: bb.config,
@@ -57,6 +58,11 @@ func (bb *baseBuilder) Name() string {
return bb.name
}
+type subConnInfo struct {
+ subConn balancer.SubConn
+ attrs *attributes.Attributes
+}
+
type baseBalancer struct {
cc balancer.ClientConn
pickerBuilder PickerBuilder
@@ -64,7 +70,7 @@ type baseBalancer struct {
csEvltr *balancer.ConnectivityStateEvaluator
state connectivity.State
- subConns map[resolver.Address]balancer.SubConn
+ subConns map[resolver.Address]subConnInfo // `attributes` is stripped from the keys of this map (the addresses)
scStates map[balancer.SubConn]connectivity.State
picker balancer.Picker
config Config
@@ -101,23 +107,49 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
addrsSet := make(map[resolver.Address]struct{})
for _, a := range s.ResolverState.Addresses {
- addrsSet[a] = struct{}{}
- if _, ok := b.subConns[a]; !ok {
+ // Strip attributes from addresses before using them as map keys. So
+ // that when two addresses only differ in attributes pointers (but with
+ // the same attribute content), they are considered the same address.
+ //
+ // Note that this doesn't handle the case where the attribute content is
+ // different. So if users want to set different attributes to create
+ // duplicate connections to the same backend, it doesn't work. This is
+ // fine for now, because duplicate is done by setting Metadata today.
+ //
+ // TODO: read attributes to handle duplicate connections.
+ aNoAttrs := a
+ aNoAttrs.Attributes = nil
+ addrsSet[aNoAttrs] = struct{}{}
+ if scInfo, ok := b.subConns[aNoAttrs]; !ok {
// a is a new address (not existing in b.subConns).
+ //
+ // When creating SubConn, the original address with attributes is
+ // passed through. So that connection configurations in attributes
+ // (like creds) will be used.
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
if err != nil {
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
}
- b.subConns[a] = sc
+ b.subConns[aNoAttrs] = subConnInfo{subConn: sc, attrs: a.Attributes}
b.scStates[sc] = connectivity.Idle
sc.Connect()
+ } else {
+ // Always update the subconn's address in case the attributes
+ // changed.
+ //
+ // The SubConn does a reflect.DeepEqual of the new and old
+ // addresses. So this is a noop if the current address is the same
+ // as the old one (including attributes).
+ scInfo.attrs = a.Attributes
+ b.subConns[aNoAttrs] = scInfo
+ b.cc.UpdateAddresses(scInfo.subConn, []resolver.Address{a})
}
}
- for a, sc := range b.subConns {
+ for a, scInfo := range b.subConns {
// a was removed by resolver.
if _, ok := addrsSet[a]; !ok {
- b.cc.RemoveSubConn(sc)
+ b.cc.RemoveSubConn(scInfo.subConn)
delete(b.subConns, a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in UpdateSubConnState.
@@ -160,9 +192,10 @@ func (b *baseBalancer) regeneratePicker() {
readySCs := make(map[balancer.SubConn]SubConnInfo)
// Filter out all ready SCs from full subConn map.
- for addr, sc := range b.subConns {
- if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
- readySCs[sc] = SubConnInfo{Address: addr}
+ for addr, scInfo := range b.subConns {
+ if st, ok := b.scStates[scInfo.subConn]; ok && st == connectivity.Ready {
+ addr.Attributes = scInfo.attrs
+ readySCs[scInfo.subConn] = SubConnInfo{Address: addr}
}
}
b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
diff --git a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
index 11e592aab..4cc7f9159 100644
--- a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
+++ b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
@@ -163,6 +163,14 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
}
+func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
+ acbw, ok := sc.(*acBalancerWrapper)
+ if !ok {
+ return
+ }
+ acbw.UpdateAddresses(addrs)
+}
+
func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
ccb.mu.Lock()
defer ccb.mu.Unlock()
@@ -197,7 +205,7 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
acbw.mu.Lock()
defer acbw.mu.Unlock()
if len(addrs) <= 0 {
- acbw.ac.tearDown(errConnDrain)
+ acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain)
return
}
if !acbw.ac.tryUpdateAddrs(addrs) {
@@ -212,7 +220,7 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
acbw.ac.acbw = nil
acbw.ac.mu.Unlock()
acState := acbw.ac.getState()
- acbw.ac.tearDown(errConnDrain)
+ acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain)
if acState == connectivity.Shutdown {
return
diff --git a/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go b/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go
index da0472967..ed75290cd 100644
--- a/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go
+++ b/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go
@@ -19,17 +19,17 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.25.0
-// protoc v3.3.0
+// protoc v3.14.0
// source: grpc/binlog/v1/binarylog.proto
package grpc_binarylog_v1
import (
proto "github.com/golang/protobuf/proto"
- duration "github.com/golang/protobuf/ptypes/duration"
- timestamp "github.com/golang/protobuf/ptypes/timestamp"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ durationpb "google.golang.org/protobuf/types/known/durationpb"
+ timestamppb "google.golang.org/protobuf/types/known/timestamppb"
reflect "reflect"
sync "sync"
)
@@ -243,7 +243,7 @@ type GrpcLogEntry struct {
unknownFields protoimpl.UnknownFields
// The timestamp of the binary log message
- Timestamp *timestamp.Timestamp `protobuf:"bytes,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
+ Timestamp *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// Uniquely identifies a call. The value must not be 0 in order to disambiguate
// from an unset value.
// Each call may have several log entries, they will all have the same call_id.
@@ -308,7 +308,7 @@ func (*GrpcLogEntry) Descriptor() ([]byte, []int) {
return file_grpc_binlog_v1_binarylog_proto_rawDescGZIP(), []int{0}
}
-func (x *GrpcLogEntry) GetTimestamp() *timestamp.Timestamp {
+func (x *GrpcLogEntry) GetTimestamp() *timestamppb.Timestamp {
if x != nil {
return x.Timestamp
}
@@ -439,7 +439,7 @@ type ClientHeader struct {
// <host> or <host>:<port> .
Authority string `protobuf:"bytes,3,opt,name=authority,proto3" json:"authority,omitempty"`
// the RPC timeout
- Timeout *duration.Duration `protobuf:"bytes,4,opt,name=timeout,proto3" json:"timeout,omitempty"`
+ Timeout *durationpb.Duration `protobuf:"bytes,4,opt,name=timeout,proto3" json:"timeout,omitempty"`
}
func (x *ClientHeader) Reset() {
@@ -495,7 +495,7 @@ func (x *ClientHeader) GetAuthority() string {
return ""
}
-func (x *ClientHeader) GetTimeout() *duration.Duration {
+func (x *ClientHeader) GetTimeout() *durationpb.Duration {
if x != nil {
return x.Timeout
}
@@ -1020,19 +1020,19 @@ func file_grpc_binlog_v1_binarylog_proto_rawDescGZIP() []byte {
var file_grpc_binlog_v1_binarylog_proto_enumTypes = make([]protoimpl.EnumInfo, 3)
var file_grpc_binlog_v1_binarylog_proto_msgTypes = make([]protoimpl.MessageInfo, 8)
var file_grpc_binlog_v1_binarylog_proto_goTypes = []interface{}{
- (GrpcLogEntry_EventType)(0), // 0: grpc.binarylog.v1.GrpcLogEntry.EventType
- (GrpcLogEntry_Logger)(0), // 1: grpc.binarylog.v1.GrpcLogEntry.Logger
- (Address_Type)(0), // 2: grpc.binarylog.v1.Address.Type
- (*GrpcLogEntry)(nil), // 3: grpc.binarylog.v1.GrpcLogEntry
- (*ClientHeader)(nil), // 4: grpc.binarylog.v1.ClientHeader
- (*ServerHeader)(nil), // 5: grpc.binarylog.v1.ServerHeader
- (*Trailer)(nil), // 6: grpc.binarylog.v1.Trailer
- (*Message)(nil), // 7: grpc.binarylog.v1.Message
- (*Metadata)(nil), // 8: grpc.binarylog.v1.Metadata
- (*MetadataEntry)(nil), // 9: grpc.binarylog.v1.MetadataEntry
- (*Address)(nil), // 10: grpc.binarylog.v1.Address
- (*timestamp.Timestamp)(nil), // 11: google.protobuf.Timestamp
- (*duration.Duration)(nil), // 12: google.protobuf.Duration
+ (GrpcLogEntry_EventType)(0), // 0: grpc.binarylog.v1.GrpcLogEntry.EventType
+ (GrpcLogEntry_Logger)(0), // 1: grpc.binarylog.v1.GrpcLogEntry.Logger
+ (Address_Type)(0), // 2: grpc.binarylog.v1.Address.Type
+ (*GrpcLogEntry)(nil), // 3: grpc.binarylog.v1.GrpcLogEntry
+ (*ClientHeader)(nil), // 4: grpc.binarylog.v1.ClientHeader
+ (*ServerHeader)(nil), // 5: grpc.binarylog.v1.ServerHeader
+ (*Trailer)(nil), // 6: grpc.binarylog.v1.Trailer
+ (*Message)(nil), // 7: grpc.binarylog.v1.Message
+ (*Metadata)(nil), // 8: grpc.binarylog.v1.Metadata
+ (*MetadataEntry)(nil), // 9: grpc.binarylog.v1.MetadataEntry
+ (*Address)(nil), // 10: grpc.binarylog.v1.Address
+ (*timestamppb.Timestamp)(nil), // 11: google.protobuf.Timestamp
+ (*durationpb.Duration)(nil), // 12: google.protobuf.Duration
}
var file_grpc_binlog_v1_binarylog_proto_depIdxs = []int32{
11, // 0: grpc.binarylog.v1.GrpcLogEntry.timestamp:type_name -> google.protobuf.Timestamp
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index cbd671a85..24109264f 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"math"
- "net"
"reflect"
"strings"
"sync"
@@ -39,6 +38,7 @@ import (
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpcutil"
+ iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
@@ -48,6 +48,7 @@ import (
_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
_ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
+ _ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver.
)
const (
@@ -104,6 +105,17 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
+type defaultConfigSelector struct {
+ sc *ServiceConfig
+}
+
+func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
+ return &iresolver.RPCConfig{
+ Context: rpcInfo.Context,
+ MethodConfig: getMethodConfig(dcs.sc, rpcInfo.Method),
+ }, nil
+}
+
// DialContext creates a client connection to the given target. By default, it's
// a non-blocking dial (the function won't wait for connections to be
// established, and connecting happens in the background). To make it a blocking
@@ -131,6 +143,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
firstResolveEvent: grpcsync.NewEvent(),
}
cc.retryThrottler.Store((*retryThrottler)(nil))
+ cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.ctx, cc.cancel = context.WithCancel(context.Background())
for _, opt := range opts {
@@ -191,16 +204,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
cc.mkp = cc.dopts.copts.KeepaliveParams
- if cc.dopts.copts.Dialer == nil {
- cc.dopts.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
- network, addr := parseDialTarget(addr)
- return (&net.Dialer{}).DialContext(ctx, network, addr)
- }
- if cc.dopts.withProxy {
- cc.dopts.copts.Dialer = newProxyDialer(cc.dopts.copts.Dialer)
- }
- }
-
if cc.dopts.copts.UserAgent != "" {
cc.dopts.copts.UserAgent += " " + grpcUA
} else {
@@ -234,6 +237,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = &sc
+ cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
scSet = true
}
default:
@@ -244,8 +248,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
// Determine the resolver to use.
- cc.parsedTarget = grpcutil.ParseTarget(cc.target)
- unixScheme := strings.HasPrefix(cc.target, "unix:")
+ cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)
channelz.Infof(logger, cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)
resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
if resolverBuilder == nil {
@@ -268,8 +271,10 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
cc.authority = creds.Info().ServerName
} else if cc.dopts.insecure && cc.dopts.authority != "" {
cc.authority = cc.dopts.authority
- } else if unixScheme {
+ } else if strings.HasPrefix(cc.target, "unix:") || strings.HasPrefix(cc.target, "unix-abstract:") {
cc.authority = "localhost"
+ } else if strings.HasPrefix(cc.parsedTarget.Endpoint, ":") {
+ cc.authority = "localhost" + cc.parsedTarget.Endpoint
} else {
// Use endpoint from "scheme://authority/endpoint" as the default
// authority for ClientConn.
@@ -282,6 +287,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = &sc
+ cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
}
case <-ctx.Done():
return nil, ctx.Err()
@@ -299,6 +305,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
+ CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
}
@@ -487,6 +494,8 @@ type ClientConn struct {
balancerBuildOpts balancer.BuildOptions
blockingpicker *pickerWrapper
+ safeConfigSelector iresolver.SafeConfigSelector
+
mu sync.RWMutex
resolverWrapper *ccResolverWrapper
sc *ServiceConfig
@@ -547,6 +556,7 @@ func (cc *ClientConn) scWatcher() {
// TODO: load balance policy runtime change is ignored.
// We may revisit this decision in the future.
cc.sc = &sc
+ cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
cc.mu.Unlock()
case <-cc.ctx.Done():
return
@@ -585,13 +595,13 @@ func init() {
func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
if cc.sc != nil {
- cc.applyServiceConfigAndBalancer(cc.sc, addrs)
+ cc.applyServiceConfigAndBalancer(cc.sc, nil, addrs)
return
}
if cc.dopts.defaultServiceConfig != nil {
- cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs)
+ cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, addrs)
} else {
- cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs)
+ cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig}, addrs)
}
}
@@ -628,7 +638,15 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
// default, per the error handling design?
} else {
if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
- cc.applyServiceConfigAndBalancer(sc, s.Addresses)
+ configSelector := iresolver.GetConfigSelector(s)
+ if configSelector != nil {
+ if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
+ channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector")
+ }
+ } else {
+ configSelector = &defaultConfigSelector{sc}
+ }
+ cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
} else {
ret = balancer.ErrBadResolverState
if cc.balancerWrapper == nil {
@@ -638,6 +656,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
} else {
err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
}
+ cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{cc.sc})
cc.blockingpicker.updatePicker(base.NewErrPicker(err))
cc.csMgr.updateState(connectivity.TransientFailure)
cc.mu.Unlock()
@@ -872,6 +891,20 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
return curAddrFound
}
+func getMethodConfig(sc *ServiceConfig, method string) MethodConfig {
+ if sc == nil {
+ return MethodConfig{}
+ }
+ if m, ok := sc.Methods[method]; ok {
+ return m
+ }
+ i := strings.LastIndex(method, "/")
+ if m, ok := sc.Methods[method[:i+1]]; ok {
+ return m
+ }
+ return sc.Methods[""]
+}
+
// GetMethodConfig gets the method config of the input method.
// If there's an exact match for input method (i.e. /service/method), we return
// the corresponding MethodConfig.
@@ -884,17 +917,7 @@ func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
// TODO: Avoid the locking here.
cc.mu.RLock()
defer cc.mu.RUnlock()
- if cc.sc == nil {
- return MethodConfig{}
- }
- if m, ok := cc.sc.Methods[method]; ok {
- return m
- }
- i := strings.LastIndex(method, "/")
- if m, ok := cc.sc.Methods[method[:i+1]]; ok {
- return m
- }
- return cc.sc.Methods[""]
+ return getMethodConfig(cc.sc, method)
}
func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
@@ -917,12 +940,15 @@ func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method st
return t, done, nil
}
-func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) {
+func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {
if sc == nil {
// should never reach here.
return
}
cc.sc = sc
+ if configSelector != nil {
+ cc.safeConfigSelector.UpdateConfigSelector(configSelector)
+ }
if cc.sc.retryThrottling != nil {
newThrottler := &retryThrottler{
@@ -1172,7 +1198,7 @@ func (ac *addrConn) resetTransport() {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
- newTr.Close()
+ newTr.Close(fmt.Errorf("reached connectivity state: SHUTDOWN"))
return
}
ac.curAddr = addr
@@ -1304,7 +1330,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
select {
case <-time.After(time.Until(connectDeadline)):
// We didn't get the preface in time.
- newTr.Close()
+ newTr.Close(fmt.Errorf("failed to receive server preface within timeout"))
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
return nil, nil, errors.New("timed out waiting for server handshake")
case <-prefaceReceived:
@@ -1421,10 +1447,9 @@ func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
}
// tearDown starts to tear down the addrConn.
-// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
-// some edge cases (e.g., the caller opens and closes many addrConn's in a
-// tight loop.
-// tearDown doesn't remove ac from ac.cc.conns.
+//
+// Note that tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct
+// will leak. In most cases, call cc.removeAddrConn() instead.
func (ac *addrConn) tearDown(err error) {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
diff --git a/vendor/google.golang.org/grpc/credentials/credentials.go b/vendor/google.golang.org/grpc/credentials/credentials.go
index 02766443a..7eee7e4ec 100644
--- a/vendor/google.golang.org/grpc/credentials/credentials.go
+++ b/vendor/google.golang.org/grpc/credentials/credentials.go
@@ -30,7 +30,7 @@ import (
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/attributes"
- "google.golang.org/grpc/internal"
+ icredentials "google.golang.org/grpc/internal/credentials"
)
// PerRPCCredentials defines the common interface for the credentials which need to
@@ -58,9 +58,9 @@ type PerRPCCredentials interface {
type SecurityLevel int
const (
- // Invalid indicates an invalid security level.
+ // InvalidSecurityLevel indicates an invalid security level.
// The zero SecurityLevel value is invalid for backward compatibility.
- Invalid SecurityLevel = iota
+ InvalidSecurityLevel SecurityLevel = iota
// NoSecurity indicates a connection is insecure.
NoSecurity
// IntegrityOnly indicates a connection only provides integrity protection.
@@ -92,7 +92,7 @@ type CommonAuthInfo struct {
}
// GetCommonAuthInfo returns the pointer to CommonAuthInfo struct.
-func (c *CommonAuthInfo) GetCommonAuthInfo() *CommonAuthInfo {
+func (c CommonAuthInfo) GetCommonAuthInfo() CommonAuthInfo {
return c
}
@@ -188,15 +188,12 @@ type RequestInfo struct {
AuthInfo AuthInfo
}
-// requestInfoKey is a struct to be used as the key when attaching a RequestInfo to a context object.
-type requestInfoKey struct{}
-
// RequestInfoFromContext extracts the RequestInfo from the context if it exists.
//
// This API is experimental.
func RequestInfoFromContext(ctx context.Context) (ri RequestInfo, ok bool) {
- ri, ok = ctx.Value(requestInfoKey{}).(RequestInfo)
- return
+ ri, ok = icredentials.RequestInfoFromContext(ctx).(RequestInfo)
+ return ri, ok
}
// ClientHandshakeInfo holds data to be passed to ClientHandshake. This makes
@@ -211,16 +208,12 @@ type ClientHandshakeInfo struct {
Attributes *attributes.Attributes
}
-// clientHandshakeInfoKey is a struct used as the key to store
-// ClientHandshakeInfo in a context.
-type clientHandshakeInfoKey struct{}
-
// ClientHandshakeInfoFromContext returns the ClientHandshakeInfo struct stored
// in ctx.
//
// This API is experimental.
func ClientHandshakeInfoFromContext(ctx context.Context) ClientHandshakeInfo {
- chi, _ := ctx.Value(clientHandshakeInfoKey{}).(ClientHandshakeInfo)
+ chi, _ := icredentials.ClientHandshakeInfoFromContext(ctx).(ClientHandshakeInfo)
return chi
}
@@ -229,17 +222,16 @@ func ClientHandshakeInfoFromContext(ctx context.Context) ClientHandshakeInfo {
// or 3) CommonAuthInfo.SecurityLevel has an invalid zero value. For 2) and 3), it is for the purpose of backward-compatibility.
//
// This API is experimental.
-func CheckSecurityLevel(ctx context.Context, level SecurityLevel) error {
+func CheckSecurityLevel(ai AuthInfo, level SecurityLevel) error {
type internalInfo interface {
- GetCommonAuthInfo() *CommonAuthInfo
+ GetCommonAuthInfo() CommonAuthInfo
}
- ri, _ := RequestInfoFromContext(ctx)
- if ri.AuthInfo == nil {
- return errors.New("unable to obtain SecurityLevel from context")
+ if ai == nil {
+ return errors.New("AuthInfo is nil")
}
- if ci, ok := ri.AuthInfo.(internalInfo); ok {
+ if ci, ok := ai.(internalInfo); ok {
// CommonAuthInfo.SecurityLevel has an invalid value.
- if ci.GetCommonAuthInfo().SecurityLevel == Invalid {
+ if ci.GetCommonAuthInfo().SecurityLevel == InvalidSecurityLevel {
return nil
}
if ci.GetCommonAuthInfo().SecurityLevel < level {
@@ -250,15 +242,6 @@ func CheckSecurityLevel(ctx context.Context, level SecurityLevel) error {
return nil
}
-func init() {
- internal.NewRequestInfoContext = func(ctx context.Context, ri RequestInfo) context.Context {
- return context.WithValue(ctx, requestInfoKey{}, ri)
- }
- internal.NewClientHandshakeInfoContext = func(ctx context.Context, chi ClientHandshakeInfo) context.Context {
- return context.WithValue(ctx, clientHandshakeInfoKey{}, chi)
- }
-}
-
// ChannelzSecurityInfo defines the interface that security protocols should implement
// in order to provide security info to channelz.
//
diff --git a/vendor/google.golang.org/grpc/dialoptions.go b/vendor/google.golang.org/grpc/dialoptions.go
index a93fcab8f..7a497237b 100644
--- a/vendor/google.golang.org/grpc/dialoptions.go
+++ b/vendor/google.golang.org/grpc/dialoptions.go
@@ -66,12 +66,7 @@ type dialOptions struct {
minConnectTimeout func() time.Duration
defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON.
defaultServiceConfigRawJSON *string
- // This is used by ccResolverWrapper to backoff between successive calls to
- // resolver.ResolveNow(). The user will have no need to configure this, but
- // we need to be able to configure this in tests.
- resolveNowBackoff func(int) time.Duration
- resolvers []resolver.Builder
- withProxy bool
+ resolvers []resolver.Builder
}
// DialOption configures how we set up the connection.
@@ -325,7 +320,7 @@ func WithInsecure() DialOption {
// later release.
func WithNoProxy() DialOption {
return newFuncDialOption(func(o *dialOptions) {
- o.withProxy = false
+ o.copts.UseProxy = false
})
}
@@ -595,9 +590,8 @@ func defaultDialOptions() dialOptions {
copts: transport.ConnectOptions{
WriteBufferSize: defaultWriteBufSize,
ReadBufferSize: defaultReadBufSize,
+ UseProxy: true,
},
- resolveNowBackoff: internalbackoff.DefaultExponential.Backoff,
- withProxy: true,
}
}
@@ -612,16 +606,6 @@ func withMinConnectDeadline(f func() time.Duration) DialOption {
})
}
-// withResolveNowBackoff specifies the function that clientconn uses to backoff
-// between successive calls to resolver.ResolveNow().
-//
-// For testing purpose only.
-func withResolveNowBackoff(f func(int) time.Duration) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.resolveNowBackoff = f
- })
-}
-
// WithResolvers allows a list of resolver implementations to be registered
// locally with the ClientConn without needing to be globally registered via
// resolver.Register. They will be matched against the scheme used for the
diff --git a/vendor/google.golang.org/grpc/encoding/proto/proto.go b/vendor/google.golang.org/grpc/encoding/proto/proto.go
index 66b97a6f6..3009b35af 100644
--- a/vendor/google.golang.org/grpc/encoding/proto/proto.go
+++ b/vendor/google.golang.org/grpc/encoding/proto/proto.go
@@ -21,8 +21,7 @@
package proto
import (
- "math"
- "sync"
+ "fmt"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/encoding"
@@ -38,73 +37,22 @@ func init() {
// codec is a Codec implementation with protobuf. It is the default codec for gRPC.
type codec struct{}
-type cachedProtoBuffer struct {
- lastMarshaledSize uint32
- proto.Buffer
-}
-
-func capToMaxInt32(val int) uint32 {
- if val > math.MaxInt32 {
- return uint32(math.MaxInt32)
- }
- return uint32(val)
-}
-
-func marshal(v interface{}, cb *cachedProtoBuffer) ([]byte, error) {
- protoMsg := v.(proto.Message)
- newSlice := make([]byte, 0, cb.lastMarshaledSize)
-
- cb.SetBuf(newSlice)
- cb.Reset()
- if err := cb.Marshal(protoMsg); err != nil {
- return nil, err
- }
- out := cb.Bytes()
- cb.lastMarshaledSize = capToMaxInt32(len(out))
- return out, nil
-}
-
func (codec) Marshal(v interface{}) ([]byte, error) {
- if pm, ok := v.(proto.Marshaler); ok {
- // object can marshal itself, no need for buffer
- return pm.Marshal()
+ vv, ok := v.(proto.Message)
+ if !ok {
+ return nil, fmt.Errorf("failed to marshal, message is %T, want proto.Message", v)
}
-
- cb := protoBufferPool.Get().(*cachedProtoBuffer)
- out, err := marshal(v, cb)
-
- // put back buffer and lose the ref to the slice
- cb.SetBuf(nil)
- protoBufferPool.Put(cb)
- return out, err
+ return proto.Marshal(vv)
}
func (codec) Unmarshal(data []byte, v interface{}) error {
- protoMsg := v.(proto.Message)
- protoMsg.Reset()
-
- if pu, ok := protoMsg.(proto.Unmarshaler); ok {
- // object can unmarshal itself, no need for buffer
- return pu.Unmarshal(data)
+ vv, ok := v.(proto.Message)
+ if !ok {
+ return fmt.Errorf("failed to unmarshal, message is %T, want proto.Message", v)
}
-
- cb := protoBufferPool.Get().(*cachedProtoBuffer)
- cb.SetBuf(data)
- err := cb.Unmarshal(protoMsg)
- cb.SetBuf(nil)
- protoBufferPool.Put(cb)
- return err
+ return proto.Unmarshal(data, vv)
}
func (codec) Name() string {
return Name
}
-
-var protoBufferPool = &sync.Pool{
- New: func() interface{} {
- return &cachedProtoBuffer{
- Buffer: proto.Buffer{},
- lastMarshaledSize: 16,
- }
- },
-}
diff --git a/vendor/google.golang.org/grpc/go.mod b/vendor/google.golang.org/grpc/go.mod
index d97276556..b177cfa66 100644
--- a/vendor/google.golang.org/grpc/go.mod
+++ b/vendor/google.golang.org/grpc/go.mod
@@ -3,10 +3,10 @@ module google.golang.org/grpc
go 1.11
require (
- github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f
- github.com/envoyproxy/go-control-plane v0.9.4
+ github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403
+ github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
- github.com/golang/protobuf v1.4.1
+ github.com/golang/protobuf v1.4.2
github.com/google/go-cmp v0.5.0
github.com/google/uuid v1.1.2
golang.org/x/net v0.0.0-20190311183353-d8887717615a
diff --git a/vendor/google.golang.org/grpc/go.sum b/vendor/google.golang.org/grpc/go.sum
index 356a82e3d..24d2976ab 100644
--- a/vendor/google.golang.org/grpc/go.sum
+++ b/vendor/google.golang.org/grpc/go.sum
@@ -1,14 +1,17 @@
cloud.google.com/go v0.26.0 h1:e0WKqKTd5BnrG8aKH3J3h+QvEIQtSUcf2n5UZ5ZgLtQ=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
-github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f h1:WBZRG4aNOuI15bLRrCgN8fCq8E5Xuty6jGbmSNEvSsU=
-github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
+github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403 h1:cqQfy1jclcSy/FwLjemeg3SR1yaINm74aQyupQ0Bl8M=
+github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
+github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
-github.com/envoyproxy/go-control-plane v0.9.4 h1:rEvIZUSZ3fx39WIi3JkQqQBitGwpELBIYWeBVh6wn+E=
-github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
+github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d h1:QyzYnTnPE15SQyUeqU6qLbWxMkwyAyu+vGksa0b7j00=
+github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
@@ -21,18 +24,23 @@ github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:x
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
-github.com/golang/protobuf v1.4.1 h1:ZFgWrT+bLgsYPirOnRfKLYJLvssAegOj/hgyMFdJZe0=
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
+github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
+github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
-github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
+github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
@@ -63,7 +71,6 @@ google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9Ywl
google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
-google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
@@ -77,8 +84,13 @@ google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQ
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
diff --git a/vendor/google.golang.org/grpc/interceptor.go b/vendor/google.golang.org/grpc/interceptor.go
index d1a609e48..668e0adcf 100644
--- a/vendor/google.golang.org/grpc/interceptor.go
+++ b/vendor/google.golang.org/grpc/interceptor.go
@@ -25,25 +25,41 @@ import (
// UnaryInvoker is called by UnaryClientInterceptor to complete RPCs.
type UnaryInvoker func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error
-// UnaryClientInterceptor intercepts the execution of a unary RPC on the client. invoker is the handler to complete the RPC
-// and it is the responsibility of the interceptor to call it.
+// UnaryClientInterceptor intercepts the execution of a unary RPC on the client.
+// Unary interceptors can be specified as a DialOption, using
+// WithUnaryInterceptor() or WithChainUnaryInterceptor(), when creating a
+// ClientConn. When a unary interceptor(s) is set on a ClientConn, gRPC
+// delegates all unary RPC invocations to the interceptor, and it is the
+// responsibility of the interceptor to call invoker to complete the processing
+// of the RPC.
//
-// Experimental
+// method is the RPC name. req and reply are the corresponding request and
+// response messages. cc is the ClientConn on which the RPC was invoked. invoker
+// is the handler to complete the RPC and it is the responsibility of the
+// interceptor to call it. opts contain all applicable call options, including
+// defaults from the ClientConn as well as per-call options.
//
-// Notice: This type is EXPERIMENTAL and may be changed or removed in a
-// later release.
+// The returned error must be compatible with the status package.
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
// Streamer is called by StreamClientInterceptor to create a ClientStream.
type Streamer func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error)
-// StreamClientInterceptor intercepts the creation of ClientStream. It may return a custom ClientStream to intercept all I/O
-// operations. streamer is the handler to create a ClientStream and it is the responsibility of the interceptor to call it.
+// StreamClientInterceptor intercepts the creation of a ClientStream. Stream
+// interceptors can be specified as a DialOption, using WithStreamInterceptor()
+// or WithChainStreamInterceptor(), when creating a ClientConn. When a stream
+// interceptor(s) is set on the ClientConn, gRPC delegates all stream creations
+// to the interceptor, and it is the responsibility of the interceptor to call
+// streamer.
//
-// Experimental
+// desc contains a description of the stream. cc is the ClientConn on which the
+// RPC was invoked. streamer is the handler to create a ClientStream and it is
+// the responsibility of the interceptor to call it. opts contain all applicable
+// call options, including defaults from the ClientConn as well as per-call
+// options.
//
-// Notice: This type is EXPERIMENTAL and may be changed or removed in a
-// later release.
+// StreamClientInterceptor may return a custom ClientStream to intercept all I/O
+// operations. The returned error must be compatible with the status package.
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
// UnaryServerInfo consists of various information about a unary RPC on
diff --git a/vendor/google.golang.org/grpc/internal/credentials/credentials.go b/vendor/google.golang.org/grpc/internal/credentials/credentials.go
new file mode 100644
index 000000000..32c9b5903
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/credentials/credentials.go
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2021 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package credentials
+
+import (
+ "context"
+)
+
+// requestInfoKey is a struct to be used as the key to store RequestInfo in a
+// context.
+type requestInfoKey struct{}
+
+// NewRequestInfoContext creates a context with ri.
+func NewRequestInfoContext(ctx context.Context, ri interface{}) context.Context {
+ return context.WithValue(ctx, requestInfoKey{}, ri)
+}
+
+// RequestInfoFromContext extracts the RequestInfo from ctx.
+func RequestInfoFromContext(ctx context.Context) interface{} {
+ return ctx.Value(requestInfoKey{})
+}
+
+// clientHandshakeInfoKey is a struct used as the key to store
+// ClientHandshakeInfo in a context.
+type clientHandshakeInfoKey struct{}
+
+// ClientHandshakeInfoFromContext extracts the ClientHandshakeInfo from ctx.
+func ClientHandshakeInfoFromContext(ctx context.Context) interface{} {
+ return ctx.Value(clientHandshakeInfoKey{})
+}
+
+// NewClientHandshakeInfoContext creates a context with chi.
+func NewClientHandshakeInfoContext(ctx context.Context, chi interface{}) context.Context {
+ return context.WithValue(ctx, clientHandshakeInfoKey{}, chi)
+}
diff --git a/vendor/google.golang.org/grpc/internal/grpcutil/target.go b/vendor/google.golang.org/grpc/internal/grpcutil/target.go
index 80b33cdaf..8833021da 100644
--- a/vendor/google.golang.org/grpc/internal/grpcutil/target.go
+++ b/vendor/google.golang.org/grpc/internal/grpcutil/target.go
@@ -37,19 +37,53 @@ func split2(s, sep string) (string, string, bool) {
}
// ParseTarget splits target into a resolver.Target struct containing scheme,
-// authority and endpoint.
+// authority and endpoint. skipUnixColonParsing indicates that the parse should
+// not parse "unix:[path]" cases. This should be true in cases where a custom
+// dialer is present, to prevent a behavior change.
//
-// If target is not a valid scheme://authority/endpoint, it returns {Endpoint:
-// target}.
-func ParseTarget(target string) (ret resolver.Target) {
+// If target is not a valid scheme://authority/endpoint as specified in
+// https://github.com/grpc/grpc/blob/master/doc/naming.md,
+// it returns {Endpoint: target}.
+func ParseTarget(target string, skipUnixColonParsing bool) (ret resolver.Target) {
var ok bool
+ if strings.HasPrefix(target, "unix-abstract:") {
+ if strings.HasPrefix(target, "unix-abstract://") {
+ // Maybe, with Authority specified, try to parse it
+ var remain string
+ ret.Scheme, remain, _ = split2(target, "://")
+ ret.Authority, ret.Endpoint, ok = split2(remain, "/")
+ if !ok {
+ // No Authority, add the "//" back
+ ret.Endpoint = "//" + remain
+ } else {
+ // Found Authority, add the "/" back
+ ret.Endpoint = "/" + ret.Endpoint
+ }
+ } else {
+ // Without Authority specified, split target on ":"
+ ret.Scheme, ret.Endpoint, _ = split2(target, ":")
+ }
+ return ret
+ }
ret.Scheme, ret.Endpoint, ok = split2(target, "://")
if !ok {
+ if strings.HasPrefix(target, "unix:") && !skipUnixColonParsing {
+ // Handle the "unix:[local/path]" and "unix:[/absolute/path]" cases,
+ // because splitting on :// only handles the
+ // "unix://[/absolute/path]" case. Only handle if the dialer is nil,
+ // to avoid a behavior change with custom dialers.
+ return resolver.Target{Scheme: "unix", Endpoint: target[len("unix:"):]}
+ }
return resolver.Target{Endpoint: target}
}
ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
if !ok {
return resolver.Target{Endpoint: target}
}
+ if ret.Scheme == "unix" {
+ // Add the "/" back in the unix case, so the unix resolver receives the
+ // actual endpoint in the "unix://[/absolute/path]" case.
+ ret.Endpoint = "/" + ret.Endpoint
+ }
return ret
}
diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go
index 716d92800..1b596bf35 100644
--- a/vendor/google.golang.org/grpc/internal/internal.go
+++ b/vendor/google.golang.org/grpc/internal/internal.go
@@ -38,12 +38,6 @@ var (
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by
// default, but tests may wish to set it lower for convenience.
KeepaliveMinPingTime = 10 * time.Second
- // NewRequestInfoContext creates a new context based on the argument context attaching
- // the passed in RequestInfo to the new context.
- NewRequestInfoContext interface{} // func(context.Context, credentials.RequestInfo) context.Context
- // NewClientHandshakeInfoContext returns a copy of the input context with
- // the passed in ClientHandshakeInfo struct added to it.
- NewClientHandshakeInfoContext interface{} // func(context.Context, credentials.ClientHandshakeInfo) context.Context
// ParseServiceConfigForTesting is for creating a fake
// ClientConn for resolver testing only
ParseServiceConfigForTesting interface{} // func(string) *serviceconfig.ParseResult
@@ -57,6 +51,19 @@ var (
// bootstrap code while parsing certificate provider configs in the
// bootstrap file.
GetCertificateProviderBuilder interface{} // func(string) certprovider.Builder
+ // GetXDSHandshakeInfoForTesting returns a pointer to the xds.HandshakeInfo
+ // stored in the passed in attributes. This is set by
+ // credentials/xds/xds.go.
+ GetXDSHandshakeInfoForTesting interface{} // func (*attributes.Attributes) *xds.HandshakeInfo
+ // GetServerCredentials returns the transport credentials configured on a
+ // gRPC server. An xDS-enabled server needs to know what type of credentials
+ // is configured on the underlying gRPC server. This is set by server.go.
+ GetServerCredentials interface{} // func (*grpc.Server) credentials.TransportCredentials
+ // DrainServerTransports initiates a graceful close of existing connections
+ // on a gRPC server accepted on the provided listener address. An
+ // xDS-enabled server invokes this method on a grpc.Server when a particular
+ // listener moves to "not-serving" mode.
+ DrainServerTransports interface{} // func(*grpc.Server, string)
)
// HealthChecker defines the signature of the client-side LB channel health checking function.
diff --git a/vendor/google.golang.org/grpc/internal/metadata/metadata.go b/vendor/google.golang.org/grpc/internal/metadata/metadata.go
new file mode 100644
index 000000000..302262613
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/metadata/metadata.go
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright 2020 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package metadata contains functions to set and get metadata from addresses.
+//
+// This package is experimental.
+package metadata
+
+import (
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/resolver"
+)
+
+type mdKeyType string
+
+const mdKey = mdKeyType("grpc.internal.address.metadata")
+
+// Get returns the metadata of addr.
+func Get(addr resolver.Address) metadata.MD {
+ attrs := addr.Attributes
+ if attrs == nil {
+ return nil
+ }
+ md, _ := attrs.Value(mdKey).(metadata.MD)
+ return md
+}
+
+// Set sets (overrides) the metadata in addr.
+//
+// When a SubConn is created with this address, the RPCs sent on it will all
+// have this metadata.
+func Set(addr resolver.Address, md metadata.MD) resolver.Address {
+ addr.Attributes = addr.Attributes.WithValues(mdKey, md)
+ return addr
+}
diff --git a/vendor/google.golang.org/grpc/internal/resolver/config_selector.go b/vendor/google.golang.org/grpc/internal/resolver/config_selector.go
new file mode 100644
index 000000000..5e7f36703
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/resolver/config_selector.go
@@ -0,0 +1,164 @@
+/*
+ *
+ * Copyright 2020 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package resolver provides internal resolver-related functionality.
+package resolver
+
+import (
+ "context"
+ "sync"
+
+ "google.golang.org/grpc/internal/serviceconfig"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/resolver"
+)
+
+// ConfigSelector controls what configuration to use for every RPC.
+type ConfigSelector interface {
+ // Selects the configuration for the RPC, or terminates it using the error.
+ // This error will be converted by the gRPC library to a status error with
+ // code UNKNOWN if it is not returned as a status error.
+ SelectConfig(RPCInfo) (*RPCConfig, error)
+}
+
+// RPCInfo contains RPC information needed by a ConfigSelector.
+type RPCInfo struct {
+ // Context is the user's context for the RPC and contains headers and
+ // application timeout. It is passed for interception purposes and for
+ // efficiency reasons. SelectConfig should not be blocking.
+ Context context.Context
+ Method string // i.e. "/Service/Method"
+}
+
+// RPCConfig describes the configuration to use for each RPC.
+type RPCConfig struct {
+ // The context to use for the remainder of the RPC; can pass info to LB
+ // policy or affect timeout or metadata.
+ Context context.Context
+ MethodConfig serviceconfig.MethodConfig // configuration to use for this RPC
+ OnCommitted func() // Called when the RPC has been committed (retries no longer possible)
+ Interceptor ClientInterceptor
+}
+
+// ClientStream is the same as grpc.ClientStream, but defined here for circular
+// dependency reasons.
+type ClientStream interface {
+ // Header returns the header metadata received from the server if there
+ // is any. It blocks if the metadata is not ready to read.
+ Header() (metadata.MD, error)
+ // Trailer returns the trailer metadata from the server, if there is any.
+ // It must only be called after stream.CloseAndRecv has returned, or
+ // stream.Recv has returned a non-nil error (including io.EOF).
+ Trailer() metadata.MD
+ // CloseSend closes the send direction of the stream. It closes the stream
+ // when non-nil error is met. It is also not safe to call CloseSend
+ // concurrently with SendMsg.
+ CloseSend() error
+ // Context returns the context for this stream.
+ //
+ // It should not be called until after Header or RecvMsg has returned. Once
+ // called, subsequent client-side retries are disabled.
+ Context() context.Context
+ // SendMsg is generally called by generated code. On error, SendMsg aborts
+ // the stream. If the error was generated by the client, the status is
+ // returned directly; otherwise, io.EOF is returned and the status of
+ // the stream may be discovered using RecvMsg.
+ //
+ // SendMsg blocks until:
+ // - There is sufficient flow control to schedule m with the transport, or
+ // - The stream is done, or
+ // - The stream breaks.
+ //
+ // SendMsg does not wait until the message is received by the server. An
+ // untimely stream closure may result in lost messages. To ensure delivery,
+ // users should ensure the RPC completed successfully using RecvMsg.
+ //
+ // It is safe to have a goroutine calling SendMsg and another goroutine
+ // calling RecvMsg on the same stream at the same time, but it is not safe
+ // to call SendMsg on the same stream in different goroutines. It is also
+ // not safe to call CloseSend concurrently with SendMsg.
+ SendMsg(m interface{}) error
+ // RecvMsg blocks until it receives a message into m or the stream is
+ // done. It returns io.EOF when the stream completes successfully. On
+ // any other error, the stream is aborted and the error contains the RPC
+ // status.
+ //
+ // It is safe to have a goroutine calling SendMsg and another goroutine
+ // calling RecvMsg on the same stream at the same time, but it is not
+ // safe to call RecvMsg on the same stream in different goroutines.
+ RecvMsg(m interface{}) error
+}
+
+// ClientInterceptor is an interceptor for gRPC client streams.
+type ClientInterceptor interface {
+ // NewStream produces a ClientStream for an RPC which may optionally use
+ // the provided function to produce a stream for delegation. Note:
+ // RPCInfo.Context should not be used (will be nil).
+ //
+ // done is invoked when the RPC is finished using its connection, or could
+ // not be assigned a connection. RPC operations may still occur on
+ // ClientStream after done is called, since the interceptor is invoked by
+ // application-layer operations. done must never be nil when called.
+ NewStream(ctx context.Context, ri RPCInfo, done func(), newStream func(ctx context.Context, done func()) (ClientStream, error)) (ClientStream, error)
+}
+
+// ServerInterceptor is unimplementable; do not use.
+type ServerInterceptor interface {
+ notDefined()
+}
+
+type csKeyType string
+
+const csKey = csKeyType("grpc.internal.resolver.configSelector")
+
+// SetConfigSelector sets the config selector in state and returns the new
+// state.
+func SetConfigSelector(state resolver.State, cs ConfigSelector) resolver.State {
+ state.Attributes = state.Attributes.WithValues(csKey, cs)
+ return state
+}
+
+// GetConfigSelector retrieves the config selector from state, if present, and
+// returns it or nil if absent.
+func GetConfigSelector(state resolver.State) ConfigSelector {
+ cs, _ := state.Attributes.Value(csKey).(ConfigSelector)
+ return cs
+}
+
+// SafeConfigSelector allows for safe switching of ConfigSelector
+// implementations such that previous values are guaranteed to not be in use
+// when UpdateConfigSelector returns.
+type SafeConfigSelector struct {
+ mu sync.RWMutex
+ cs ConfigSelector
+}
+
+// UpdateConfigSelector swaps to the provided ConfigSelector and blocks until
+// all uses of the previous ConfigSelector have completed.
+func (scs *SafeConfigSelector) UpdateConfigSelector(cs ConfigSelector) {
+ scs.mu.Lock()
+ defer scs.mu.Unlock()
+ scs.cs = cs
+}
+
+// SelectConfig defers to the current ConfigSelector in scs.
+func (scs *SafeConfigSelector) SelectConfig(r RPCInfo) (*RPCConfig, error) {
+ scs.mu.RLock()
+ defer scs.mu.RUnlock()
+ return scs.cs.SelectConfig(r)
+}
diff --git a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
index 304235566..03825bbe7 100644
--- a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
+++ b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
@@ -34,6 +34,7 @@ import (
grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
"google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/resolver"
@@ -46,6 +47,13 @@ var EnableSRVLookups = false
var logger = grpclog.Component("dns")
+// Globals to stub out in tests. TODO: Perhaps these two can be combined into a
+// single variable for testing the resolver?
+var (
+ newTimer = time.NewTimer
+ newTimerDNSResRate = time.NewTimer
+)
+
func init() {
resolver.Register(NewBuilder())
}
@@ -143,7 +151,6 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
d.wg.Add(1)
go d.watcher()
- d.ResolveNow(resolver.ResolveNowOptions{})
return d, nil
}
@@ -201,28 +208,38 @@ func (d *dnsResolver) Close() {
func (d *dnsResolver) watcher() {
defer d.wg.Done()
+ backoffIndex := 1
for {
- select {
- case <-d.ctx.Done():
- return
- case <-d.rn:
- }
-
state, err := d.lookup()
if err != nil {
+ // Report error to the underlying grpc.ClientConn.
d.cc.ReportError(err)
} else {
- d.cc.UpdateState(*state)
+ err = d.cc.UpdateState(*state)
}
- // Sleep to prevent excessive re-resolutions. Incoming resolution requests
- // will be queued in d.rn.
- t := time.NewTimer(minDNSResRate)
+ var timer *time.Timer
+ if err == nil {
+ // Success resolving, wait for the next ResolveNow. However, also wait 30 seconds at the very least
+ // to prevent constantly re-resolving.
+ backoffIndex = 1
+ timer = newTimerDNSResRate(minDNSResRate)
+ select {
+ case <-d.ctx.Done():
+ timer.Stop()
+ return
+ case <-d.rn:
+ }
+ } else {
+ // Poll on an error found in DNS Resolver or an error received from ClientConn.
+ timer = newTimer(backoff.DefaultExponential.Backoff(backoffIndex))
+ backoffIndex++
+ }
select {
- case <-t.C:
case <-d.ctx.Done():
- t.Stop()
+ timer.Stop()
return
+ case <-timer.C:
}
}
}
diff --git a/vendor/google.golang.org/grpc/internal/resolver/unix/unix.go b/vendor/google.golang.org/grpc/internal/resolver/unix/unix.go
new file mode 100644
index 000000000..0d5a811dd
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/resolver/unix/unix.go
@@ -0,0 +1,63 @@
+/*
+ *
+ * Copyright 2020 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package unix implements a resolver for unix targets.
+package unix
+
+import (
+ "fmt"
+
+ "google.golang.org/grpc/internal/transport/networktype"
+ "google.golang.org/grpc/resolver"
+)
+
+const unixScheme = "unix"
+const unixAbstractScheme = "unix-abstract"
+
+type builder struct {
+ scheme string
+}
+
+func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
+ if target.Authority != "" {
+ return nil, fmt.Errorf("invalid (non-empty) authority: %v", target.Authority)
+ }
+ addr := resolver.Address{Addr: target.Endpoint}
+ if b.scheme == unixAbstractScheme {
+ // prepend "\x00" to address for unix-abstract
+ addr.Addr = "\x00" + addr.Addr
+ }
+ cc.UpdateState(resolver.State{Addresses: []resolver.Address{networktype.Set(addr, "unix")}})
+ return &nopResolver{}, nil
+}
+
+func (b *builder) Scheme() string {
+ return b.scheme
+}
+
+type nopResolver struct {
+}
+
+func (*nopResolver) ResolveNow(resolver.ResolveNowOptions) {}
+
+func (*nopResolver) Close() {}
+
+func init() {
+ resolver.Register(&builder{scheme: unixScheme})
+ resolver.Register(&builder{scheme: unixAbstractScheme})
+}
diff --git a/vendor/google.golang.org/grpc/internal/serviceconfig/serviceconfig.go b/vendor/google.golang.org/grpc/internal/serviceconfig/serviceconfig.go
index af3e2b5f7..c0634d152 100644
--- a/vendor/google.golang.org/grpc/internal/serviceconfig/serviceconfig.go
+++ b/vendor/google.golang.org/grpc/internal/serviceconfig/serviceconfig.go
@@ -22,8 +22,10 @@ package serviceconfig
import (
"encoding/json"
"fmt"
+ "time"
"google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
externalserviceconfig "google.golang.org/grpc/serviceconfig"
)
@@ -44,6 +46,22 @@ type BalancerConfig struct {
type intermediateBalancerConfig []map[string]json.RawMessage
+// MarshalJSON implements the json.Marshaler interface.
+//
+// It marshals the balancer and config into a length-1 slice
+// ([]map[string]config).
+func (bc *BalancerConfig) MarshalJSON() ([]byte, error) {
+ if bc.Config == nil {
+ // If config is nil, return empty config `{}`.
+ return []byte(fmt.Sprintf(`[{%q: %v}]`, bc.Name, "{}")), nil
+ }
+ c, err := json.Marshal(bc.Config)
+ if err != nil {
+ return nil, err
+ }
+ return []byte(fmt.Sprintf(`[{%q: %s}]`, bc.Name, c)), nil
+}
+
// UnmarshalJSON implements the json.Unmarshaler interface.
//
// ServiceConfig contains a list of loadBalancingConfigs, each with a name and
@@ -104,3 +122,57 @@ func (bc *BalancerConfig) UnmarshalJSON(b []byte) error {
// case.
return fmt.Errorf("invalid loadBalancingConfig: no supported policies found")
}
+
+// MethodConfig defines the configuration recommended by the service providers for a
+// particular method.
+type MethodConfig struct {
+ // WaitForReady indicates whether RPCs sent to this method should wait until
+ // the connection is ready by default (!failfast). The value specified via the
+ // gRPC client API will override the value set here.
+ WaitForReady *bool
+ // Timeout is the default timeout for RPCs sent to this method. The actual
+ // deadline used will be the minimum of the value specified here and the value
+ // set by the application via the gRPC client API. If either one is not set,
+ // then the other will be used. If neither is set, then the RPC has no deadline.
+ Timeout *time.Duration
+ // MaxReqSize is the maximum allowed payload size for an individual request in a
+ // stream (client->server) in bytes. The size which is measured is the serialized
+ // payload after per-message compression (but before stream compression) in bytes.
+ // The actual value used is the minimum of the value specified here and the value set
+ // by the application via the gRPC client API. If either one is not set, then the other
+ // will be used. If neither is set, then the built-in default is used.
+ MaxReqSize *int
+ // MaxRespSize is the maximum allowed payload size for an individual response in a
+ // stream (server->client) in bytes.
+ MaxRespSize *int
+ // RetryPolicy configures retry options for the method.
+ RetryPolicy *RetryPolicy
+}
+
+// RetryPolicy defines the go-native version of the retry policy defined by the
+// service config here:
+// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#integration-with-service-config
+type RetryPolicy struct {
+ // MaxAttempts is the maximum number of attempts, including the original RPC.
+ //
+ // This field is required and must be two or greater.
+ MaxAttempts int
+
+ // Exponential backoff parameters. The initial retry attempt will occur at
+ // random(0, initialBackoff). In general, the nth attempt will occur at
+ // random(0,
+ // min(initialBackoff*backoffMultiplier**(n-1), maxBackoff)).
+ //
+ // These fields are required and must be greater than zero.
+ InitialBackoff time.Duration
+ MaxBackoff time.Duration
+ BackoffMultiplier float64
+
+ // The set of status codes which may be retried.
+ //
+ // Status codes are specified as strings, e.g., "UNAVAILABLE".
+ //
+ // This field is required and must be non-empty.
+ // Note: a set is used to store this for easy lookup.
+ RetryableStatusCodes map[codes.Code]bool
+}
diff --git a/vendor/google.golang.org/grpc/internal/syscall/syscall_linux.go b/vendor/google.golang.org/grpc/internal/syscall/syscall_linux.go
index 2fdcb76e6..4b2964f2a 100644
--- a/vendor/google.golang.org/grpc/internal/syscall/syscall_linux.go
+++ b/vendor/google.golang.org/grpc/internal/syscall/syscall_linux.go
@@ -44,25 +44,23 @@ func GetCPUTime() int64 {
}
// Rusage is an alias for syscall.Rusage under linux environment.
-type Rusage syscall.Rusage
+type Rusage = syscall.Rusage
// GetRusage returns the resource usage of current process.
-func GetRusage() (rusage *Rusage) {
- rusage = new(Rusage)
- syscall.Getrusage(syscall.RUSAGE_SELF, (*syscall.Rusage)(rusage))
- return
+func GetRusage() *Rusage {
+ rusage := new(Rusage)
+ syscall.Getrusage(syscall.RUSAGE_SELF, rusage)
+ return rusage
}
// CPUTimeDiff returns the differences of user CPU time and system CPU time used
// between two Rusage structs.
func CPUTimeDiff(first *Rusage, latest *Rusage) (float64, float64) {
- f := (*syscall.Rusage)(first)
- l := (*syscall.Rusage)(latest)
var (
- utimeDiffs = l.Utime.Sec - f.Utime.Sec
- utimeDiffus = l.Utime.Usec - f.Utime.Usec
- stimeDiffs = l.Stime.Sec - f.Stime.Sec
- stimeDiffus = l.Stime.Usec - f.Stime.Usec
+ utimeDiffs = latest.Utime.Sec - first.Utime.Sec
+ utimeDiffus = latest.Utime.Usec - first.Utime.Usec
+ stimeDiffs = latest.Stime.Sec - first.Stime.Sec
+ stimeDiffus = latest.Stime.Usec - first.Stime.Usec
)
uTimeElapsed := float64(utimeDiffs) + float64(utimeDiffus)*1.0e-6
diff --git a/vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go b/vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go
index adae60d65..7913ef1db 100644
--- a/vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go
+++ b/vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go
@@ -50,7 +50,7 @@ func GetCPUTime() int64 {
type Rusage struct{}
// GetRusage is a no-op function under non-linux or appengine environment.
-func GetRusage() (rusage *Rusage) {
+func GetRusage() *Rusage {
log()
return nil
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
index 40ef23923..f63a01376 100644
--- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
+++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
@@ -20,13 +20,17 @@ package transport
import (
"bytes"
+ "errors"
"fmt"
"runtime"
+ "strconv"
"sync"
"sync/atomic"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
+ "google.golang.org/grpc/internal/grpcutil"
+ "google.golang.org/grpc/status"
)
var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
@@ -128,6 +132,14 @@ type cleanupStream struct {
func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
+type earlyAbortStream struct {
+ streamID uint32
+ contentSubtype string
+ status *status.Status
+}
+
+func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
+
type dataFrame struct {
streamID uint32
endStream bool
@@ -749,6 +761,24 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
return nil
}
+func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
+ if l.side == clientSide {
+ return errors.New("earlyAbortStream not handled on client")
+ }
+
+ headerFields := []hpack.HeaderField{
+ {Name: ":status", Value: "200"},
+ {Name: "content-type", Value: grpcutil.ContentType(eas.contentSubtype)},
+ {Name: "grpc-status", Value: strconv.Itoa(int(eas.status.Code()))},
+ {Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())},
+ }
+
+ if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil {
+ return err
+ }
+ return nil
+}
+
func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
if l.side == clientSide {
l.draining = true
@@ -787,6 +817,8 @@ func (l *loopyWriter) handle(i interface{}) error {
return l.registerStreamHandler(i)
case *cleanupStream:
return l.cleanupStreamHandler(i)
+ case *earlyAbortStream:
+ return l.earlyAbortStreamHandler(i)
case *incomingGoAway:
return l.incomingGoAwayHandler(i)
case *dataFrame:
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index e73b77a15..48c5e52ed 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -32,13 +32,14 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
- "google.golang.org/grpc/internal/grpcutil"
-
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
- "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
+ icredentials "google.golang.org/grpc/internal/credentials"
+ "google.golang.org/grpc/internal/grpcutil"
+ imetadata "google.golang.org/grpc/internal/metadata"
"google.golang.org/grpc/internal/syscall"
+ "google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
@@ -59,7 +60,7 @@ type http2Client struct {
cancel context.CancelFunc
ctxDone <-chan struct{} // Cache the ctx.Done() chan.
userAgent string
- md interface{}
+ md metadata.MD
conn net.Conn // underlying communication channel
loopy *loopyWriter
remoteAddr net.Addr
@@ -114,6 +115,9 @@ type http2Client struct {
// goAwayReason records the http2.ErrCode and debug data received with the
// GoAway frame.
goAwayReason GoAwayReason
+ // goAwayDebugMessage contains a detailed human readable string about a
+ // GoAway frame, useful for error messages.
+ goAwayDebugMessage string
// A condition variable used to signal when the keepalive goroutine should
// go dormant. The condition for dormancy is based on the number of active
// streams and the `PermitWithoutStream` keepalive client parameter. And
@@ -137,11 +141,27 @@ type http2Client struct {
connectionID uint64
}
-func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
+func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) {
+ address := addr.Addr
+ networkType, ok := networktype.Get(addr)
if fn != nil {
- return fn(ctx, addr)
+ if networkType == "unix" && !strings.HasPrefix(address, "\x00") {
+ // For backward compatibility, if the user dialed "unix:///path",
+ // the passthrough resolver would be used and the user's custom
+ // dialer would see "unix:///path". Since the unix resolver is used
+ // and the address is now "/path", prepend "unix://" so the user's
+ // custom dialer sees the same address.
+ return fn(ctx, "unix://"+address)
+ }
+ return fn(ctx, address)
}
- return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
+ if !ok {
+ networkType, address = parseDialTarget(address)
+ }
+ if networkType == "tcp" && useProxy {
+ return proxyDial(ctx, address, grpcUA)
+ }
+ return (&net.Dialer{}).DialContext(ctx, networkType, address)
}
func isTemporary(err error) bool {
@@ -172,7 +192,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
}()
- conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
+ conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)
if err != nil {
if opts.FailOnNonTempDialError {
return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
@@ -220,12 +240,23 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
// Attributes field of resolver.Address, which is shoved into connectCtx
// and passed to the credential handshaker. This makes it possible for
// address specific arbitrary data to reach the credential handshaker.
- contextWithHandshakeInfo := internal.NewClientHandshakeInfoContext.(func(context.Context, credentials.ClientHandshakeInfo) context.Context)
- connectCtx = contextWithHandshakeInfo(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
+ connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, conn)
if err != nil {
return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
}
+ for _, cd := range perRPCCreds {
+ if cd.RequireTransportSecurity() {
+ if ci, ok := authInfo.(interface {
+ GetCommonAuthInfo() credentials.CommonAuthInfo
+ }); ok {
+ secLevel := ci.GetCommonAuthInfo().SecurityLevel
+ if secLevel != credentials.InvalidSecurityLevel && secLevel < credentials.PrivacyAndIntegrity {
+ return nil, connectionErrorf(true, nil, "transport: cannot send secure credentials on an insecure connection")
+ }
+ }
+ }
+ }
isSecure = true
if transportCreds.Info().SecurityProtocol == "tls" {
scheme = "https"
@@ -248,7 +279,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
ctxDone: ctx.Done(), // Cache Done chan.
cancel: cancel,
userAgent: opts.UserAgent,
- md: addr.Metadata,
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
@@ -276,6 +306,12 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
keepaliveEnabled: keepaliveEnabled,
bufferPool: newBufferPool(),
}
+
+ if md, ok := addr.Metadata.(*metadata.MD); ok {
+ t.md = *md
+ } else if md := imetadata.Get(addr); md != nil {
+ t.md = md
+ }
t.controlBuf = newControlBuffer(t.ctxDone)
if opts.InitialWindowSize >= defaultWindowSize {
t.initialWindowSize = opts.InitialWindowSize
@@ -312,12 +348,14 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
// Send connection preface to server.
n, err := t.conn.Write(clientPreface)
if err != nil {
- t.Close()
- return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
+ err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
+ t.Close(err)
+ return nil, err
}
if n != len(clientPreface) {
- t.Close()
- return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
+ err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
+ t.Close(err)
+ return nil, err
}
var ss []http2.Setting
@@ -335,14 +373,16 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
err = t.framer.fr.WriteSettings(ss...)
if err != nil {
- t.Close()
- return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
+ err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
+ t.Close(err)
+ return nil, err
}
// Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
- t.Close()
- return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
+ err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
+ t.Close(err)
+ return nil, err
}
}
@@ -379,6 +419,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
buf: newRecvBuffer(),
headerChan: make(chan struct{}),
contentSubtype: callHdr.ContentSubtype,
+ doneFunc: callHdr.DoneFunc,
}
s.wq = newWriteQuota(defaultWriteQuota, s.done)
s.requestRead = func(n int) {
@@ -418,7 +459,7 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
Method: callHdr.Method,
AuthInfo: t.authInfo,
}
- ctxWithRequestInfo := internal.NewRequestInfoContext.(func(context.Context, credentials.RequestInfo) context.Context)(ctx, ri)
+ ctxWithRequestInfo := icredentials.NewRequestInfoContext(ctx, ri)
authData, err := t.getTrAuthData(ctxWithRequestInfo, aud)
if err != nil {
return nil, err
@@ -481,25 +522,23 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
for _, vv := range added {
for i, v := range vv {
if i%2 == 0 {
- k = v
+ k = strings.ToLower(v)
continue
}
// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
if isReservedHeader(k) {
continue
}
- headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
+ headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
}
}
- if md, ok := t.md.(*metadata.MD); ok {
- for k, vv := range *md {
- if isReservedHeader(k) {
- continue
- }
- for _, v := range vv {
- headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
- }
+ for k, vv := range t.md {
+ if isReservedHeader(k) {
+ continue
+ }
+ for _, v := range vv {
+ headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
}
return headerFields, nil
@@ -549,8 +588,11 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
// Note: if these credentials are provided both via dial options and call
// options, then both sets of credentials will be applied.
if callCreds := callHdr.Creds; callCreds != nil {
- if !t.isSecure && callCreds.RequireTransportSecurity() {
- return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
+ if callCreds.RequireTransportSecurity() {
+ ri, _ := credentials.RequestInfoFromContext(ctx)
+ if !t.isSecure || credentials.CheckSecurityLevel(ri.AuthInfo, credentials.PrivacyAndIntegrity) != nil {
+ return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
+ }
}
data, err := callCreds.GetRequestMetadata(ctx, audience)
if err != nil {
@@ -796,6 +838,9 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
// This will unblock write.
close(s.done)
+ if s.doneFunc != nil {
+ s.doneFunc()
+ }
}
// Close kicks off the shutdown process of the transport. This should be called
@@ -805,12 +850,12 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
// This method blocks until the addrConn that initiated this transport is
// re-connected. This happens because t.onClose() begins reconnect logic at the
// addrConn level and blocks until the addrConn is successfully connected.
-func (t *http2Client) Close() error {
+func (t *http2Client) Close(err error) {
t.mu.Lock()
// Make sure we only Close once.
if t.state == closing {
t.mu.Unlock()
- return nil
+ return
}
// Call t.onClose before setting the state to closing to prevent the client
// from attempting to create new streams ASAP.
@@ -826,13 +871,19 @@ func (t *http2Client) Close() error {
t.mu.Unlock()
t.controlBuf.finish()
t.cancel()
- err := t.conn.Close()
+ t.conn.Close()
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
}
+ // Append info about previous goaways if there were any, since this may be important
+ // for understanding the root cause for this connection to be closed.
+ _, goAwayDebugMessage := t.GetGoAwayReason()
+ if len(goAwayDebugMessage) > 0 {
+ err = fmt.Errorf("closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage)
+ }
// Notify all active streams.
for _, s := range streams {
- t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
+ t.closeStream(s, err, false, http2.ErrCodeNo, status.New(codes.Unavailable, err.Error()), nil, false)
}
if t.statsHandler != nil {
connEnd := &stats.ConnEnd{
@@ -840,7 +891,6 @@ func (t *http2Client) Close() error {
}
t.statsHandler.HandleConn(t.ctx, connEnd)
}
- return err
}
// GracefulClose sets the state to draining, which prevents new streams from
@@ -859,7 +909,7 @@ func (t *http2Client) GracefulClose() {
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
- t.Close()
+ t.Close(ErrConnClosing)
return
}
t.controlBuf.put(&incomingGoAway{})
@@ -1105,9 +1155,9 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
}
}
id := f.LastStreamID
- if id > 0 && id%2 != 1 {
+ if id > 0 && id%2 == 0 {
t.mu.Unlock()
- t.Close()
+ t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", id))
return
}
// A client can receive multiple GoAways from the server (see
@@ -1125,7 +1175,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
// If there are multiple GoAways the first one should always have an ID greater than the following ones.
if id > t.prevGoAwayID {
t.mu.Unlock()
- t.Close()
+ t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID))
return
}
default:
@@ -1155,7 +1205,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
- t.Close()
+ t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
}
}
@@ -1171,12 +1221,13 @@ func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
t.goAwayReason = GoAwayTooManyPings
}
}
+ t.goAwayDebugMessage = fmt.Sprintf("code: %s, debug data: %v", f.ErrCode, string(f.DebugData()))
}
-func (t *http2Client) GetGoAwayReason() GoAwayReason {
+func (t *http2Client) GetGoAwayReason() (GoAwayReason, string) {
t.mu.Lock()
defer t.mu.Unlock()
- return t.goAwayReason
+ return t.goAwayReason, t.goAwayDebugMessage
}
func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
@@ -1273,7 +1324,8 @@ func (t *http2Client) reader() {
// Check the validity of server preface.
frame, err := t.framer.fr.ReadFrame()
if err != nil {
- t.Close() // this kicks off resetTransport, so must be last before return
+ err = connectionErrorf(true, err, "error reading server preface: %v", err)
+ t.Close(err) // this kicks off resetTransport, so must be last before return
return
}
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
@@ -1282,7 +1334,8 @@ func (t *http2Client) reader() {
}
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
- t.Close() // this kicks off resetTransport, so must be last before return
+ // this kicks off resetTransport, so must be last before return
+ t.Close(connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame: %T", frame))
return
}
t.onPrefaceReceipt()
@@ -1318,7 +1371,7 @@ func (t *http2Client) reader() {
continue
} else {
// Transport error.
- t.Close()
+ t.Close(connectionErrorf(true, err, "error reading from server: %v", err))
return
}
}
@@ -1377,7 +1430,7 @@ func (t *http2Client) keepalive() {
continue
}
if outstandingPing && timeoutLeft <= 0 {
- t.Close()
+ t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout"))
return
}
t.mu.Lock()
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index 0cf1cc320..11be5599c 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -26,6 +26,7 @@ import (
"io"
"math"
"net"
+ "net/http"
"strconv"
"sync"
"sync/atomic"
@@ -355,26 +356,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
if state.data.statsTrace != nil {
s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
}
- if t.inTapHandle != nil {
- var err error
- info := &tap.Info{
- FullMethodName: state.data.method,
- }
- s.ctx, err = t.inTapHandle(s.ctx, info)
- if err != nil {
- if logger.V(logLevel) {
- logger.Warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
- }
- t.controlBuf.put(&cleanupStream{
- streamID: s.id,
- rst: true,
- rstCode: http2.ErrCodeRefusedStream,
- onWrite: func() {},
- })
- s.cancel()
- return false
- }
- }
t.mu.Lock()
if t.state != reachable {
t.mu.Unlock()
@@ -402,6 +383,39 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
return true
}
t.maxStreamID = streamID
+ if state.data.httpMethod != http.MethodPost {
+ t.mu.Unlock()
+ if logger.V(logLevel) {
+ logger.Warningf("transport: http2Server.operateHeaders parsed a :method field: %v which should be POST", state.data.httpMethod)
+ }
+ t.controlBuf.put(&cleanupStream{
+ streamID: streamID,
+ rst: true,
+ rstCode: http2.ErrCodeProtocol,
+ onWrite: func() {},
+ })
+ s.cancel()
+ return false
+ }
+ if t.inTapHandle != nil {
+ var err error
+ if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: state.data.method}); err != nil {
+ t.mu.Unlock()
+ if logger.V(logLevel) {
+ logger.Infof("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
+ }
+ stat, ok := status.FromError(err)
+ if !ok {
+ stat = status.New(codes.PermissionDenied, err.Error())
+ }
+ t.controlBuf.put(&earlyAbortStream{
+ streamID: s.id,
+ contentSubtype: s.contentSubtype,
+ status: stat,
+ })
+ return false
+ }
+ }
t.activeStreams[streamID] = s
if len(t.activeStreams) == 1 {
t.idle = time.Time{}
diff --git a/vendor/google.golang.org/grpc/internal/transport/http_util.go b/vendor/google.golang.org/grpc/internal/transport/http_util.go
index 4d15afbf7..c7dee140c 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http_util.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http_util.go
@@ -27,6 +27,7 @@ import (
"math"
"net"
"net/http"
+ "net/url"
"strconv"
"strings"
"time"
@@ -110,6 +111,7 @@ type parsedHeaderData struct {
timeoutSet bool
timeout time.Duration
method string
+ httpMethod string
// key-value metadata map from the peer.
mdata map[string][]string
statsTags []byte
@@ -362,6 +364,8 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) {
}
d.data.statsTrace = v
d.addMetadata(f.Name, string(v))
+ case ":method":
+ d.data.httpMethod = f.Value
default:
if isReservedHeader(f.Name) && !isWhitelistedHeader(f.Name) {
break
@@ -598,3 +602,31 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList
f.fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil)
return f
}
+
+// parseDialTarget returns the network and address to pass to dialer.
+func parseDialTarget(target string) (string, string) {
+ net := "tcp"
+ m1 := strings.Index(target, ":")
+ m2 := strings.Index(target, ":/")
+ // handle unix:addr which will fail with url.Parse
+ if m1 >= 0 && m2 < 0 {
+ if n := target[0:m1]; n == "unix" {
+ return n, target[m1+1:]
+ }
+ }
+ if m2 >= 0 {
+ t, err := url.Parse(target)
+ if err != nil {
+ return net, target
+ }
+ scheme := t.Scheme
+ addr := t.Path
+ if scheme == "unix" {
+ if addr == "" {
+ addr = t.Host
+ }
+ return scheme, addr
+ }
+ }
+ return net, target
+}
diff --git a/vendor/google.golang.org/grpc/internal/transport/networktype/networktype.go b/vendor/google.golang.org/grpc/internal/transport/networktype/networktype.go
new file mode 100644
index 000000000..96967428b
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/transport/networktype/networktype.go
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright 2020 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package networktype declares the network type to be used in the default
+// dailer. Attribute of a resolver.Address.
+package networktype
+
+import (
+ "google.golang.org/grpc/resolver"
+)
+
+// keyType is the key to use for storing State in Attributes.
+type keyType string
+
+const key = keyType("grpc.internal.transport.networktype")
+
+// Set returns a copy of the provided address with attributes containing networkType.
+func Set(address resolver.Address, networkType string) resolver.Address {
+ address.Attributes = address.Attributes.WithValues(key, networkType)
+ return address
+}
+
+// Get returns the network type in the resolver.Address and true, or "", false
+// if not present.
+func Get(address resolver.Address) (string, bool) {
+ v := address.Attributes.Value(key)
+ if v == nil {
+ return "", false
+ }
+ return v.(string), true
+}
diff --git a/vendor/google.golang.org/grpc/proxy.go b/vendor/google.golang.org/grpc/internal/transport/proxy.go
index f8f69bfb7..a662bf39a 100644
--- a/vendor/google.golang.org/grpc/proxy.go
+++ b/vendor/google.golang.org/grpc/internal/transport/proxy.go
@@ -16,13 +16,12 @@
*
*/
-package grpc
+package transport
import (
"bufio"
"context"
"encoding/base64"
- "errors"
"fmt"
"io"
"net"
@@ -34,8 +33,6 @@ import (
const proxyAuthHeaderKey = "Proxy-Authorization"
var (
- // errDisabled indicates that proxy is disabled for the address.
- errDisabled = errors.New("proxy is disabled for the address")
// The following variable will be overwritten in the tests.
httpProxyFromEnvironment = http.ProxyFromEnvironment
)
@@ -51,9 +48,6 @@ func mapAddress(ctx context.Context, address string) (*url.URL, error) {
if err != nil {
return nil, err
}
- if url == nil {
- return nil, errDisabled
- }
return url, nil
}
@@ -76,7 +70,7 @@ func basicAuth(username, password string) string {
return base64.StdEncoding.EncodeToString([]byte(auth))
}
-func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr string, proxyURL *url.URL) (_ net.Conn, err error) {
+func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr string, proxyURL *url.URL, grpcUA string) (_ net.Conn, err error) {
defer func() {
if err != nil {
conn.Close()
@@ -115,32 +109,28 @@ func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr stri
return &bufConn{Conn: conn, r: r}, nil
}
-// newProxyDialer returns a dialer that connects to proxy first if necessary.
-// The returned dialer checks if a proxy is necessary, dial to the proxy with the
-// provided dialer, does HTTP CONNECT handshake and returns the connection.
-func newProxyDialer(dialer func(context.Context, string) (net.Conn, error)) func(context.Context, string) (net.Conn, error) {
- return func(ctx context.Context, addr string) (conn net.Conn, err error) {
- var newAddr string
- proxyURL, err := mapAddress(ctx, addr)
- if err != nil {
- if err != errDisabled {
- return nil, err
- }
- newAddr = addr
- } else {
- newAddr = proxyURL.Host
- }
+// proxyDial dials, connecting to a proxy first if necessary. Checks if a proxy
+// is necessary, dials, does the HTTP CONNECT handshake, and returns the
+// connection.
+func proxyDial(ctx context.Context, addr string, grpcUA string) (conn net.Conn, err error) {
+ newAddr := addr
+ proxyURL, err := mapAddress(ctx, addr)
+ if err != nil {
+ return nil, err
+ }
+ if proxyURL != nil {
+ newAddr = proxyURL.Host
+ }
- conn, err = dialer(ctx, newAddr)
- if err != nil {
- return
- }
- if proxyURL != nil {
- // proxy is disabled if proxyURL is nil.
- conn, err = doHTTPConnectHandshake(ctx, conn, addr, proxyURL)
- }
+ conn, err = (&net.Dialer{}).DialContext(ctx, "tcp", newAddr)
+ if err != nil {
return
}
+ if proxyURL != nil {
+ // proxy is disabled if proxyURL is nil.
+ conn, err = doHTTPConnectHandshake(ctx, conn, addr, proxyURL, grpcUA)
+ }
+ return
}
func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) error {
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index b74030a96..6cc1031fd 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -241,6 +241,7 @@ type Stream struct {
ctx context.Context // the associated context of the stream
cancel context.CancelFunc // always nil for client side Stream
done chan struct{} // closed at the end of stream to unblock writers. On the client side.
+ doneFunc func() // invoked at the end of stream on client side.
ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
method string // the associated RPC method of the stream
recvCompress string
@@ -569,6 +570,8 @@ type ConnectOptions struct {
ChannelzParentID int64
// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
MaxHeaderListSize *uint32
+ // UseProxy specifies if a proxy should be used.
+ UseProxy bool
}
// NewClientTransport establishes the transport with the required ConnectOptions
@@ -609,6 +612,8 @@ type CallHdr struct {
ContentSubtype string
PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
+
+ DoneFunc func() // called when the stream is finished
}
// ClientTransport is the common interface for all gRPC client-side transport
@@ -617,7 +622,7 @@ type ClientTransport interface {
// Close tears down this transport. Once it returns, the transport
// should not be accessed any more. The caller must make sure this
// is called only once.
- Close() error
+ Close(err error)
// GracefulClose starts to tear down the transport: the transport will stop
// accepting new RPCs and NewStream will return error. Once all streams are
@@ -651,8 +656,9 @@ type ClientTransport interface {
// HTTP/2).
GoAway() <-chan struct{}
- // GetGoAwayReason returns the reason why GoAway frame was received.
- GetGoAwayReason() GoAwayReason
+ // GetGoAwayReason returns the reason why GoAway frame was received, along
+ // with a human readable string with debug info.
+ GetGoAwayReason() (GoAwayReason, string)
// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr
diff --git a/vendor/google.golang.org/grpc/internal/xds_handshake_cluster.go b/vendor/google.golang.org/grpc/internal/xds_handshake_cluster.go
new file mode 100644
index 000000000..3677c3f04
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/xds_handshake_cluster.go
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2021 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package internal
+
+import (
+ "google.golang.org/grpc/attributes"
+ "google.golang.org/grpc/resolver"
+)
+
+// handshakeClusterNameKey is the type used as the key to store cluster name in
+// the Attributes field of resolver.Address.
+type handshakeClusterNameKey struct{}
+
+// SetXDSHandshakeClusterName returns a copy of addr in which the Attributes field
+// is updated with the cluster name.
+func SetXDSHandshakeClusterName(addr resolver.Address, clusterName string) resolver.Address {
+ addr.Attributes = addr.Attributes.WithValues(handshakeClusterNameKey{}, clusterName)
+ return addr
+}
+
+// GetXDSHandshakeClusterName returns cluster name stored in attr.
+func GetXDSHandshakeClusterName(attr *attributes.Attributes) (string, bool) {
+ v := attr.Value(handshakeClusterNameKey{})
+ name, ok := v.(string)
+ return name, ok
+}
diff --git a/vendor/google.golang.org/grpc/metadata/metadata.go b/vendor/google.golang.org/grpc/metadata/metadata.go
index cf6d1b947..e4cbea917 100644
--- a/vendor/google.golang.org/grpc/metadata/metadata.go
+++ b/vendor/google.golang.org/grpc/metadata/metadata.go
@@ -75,13 +75,9 @@ func Pairs(kv ...string) MD {
panic(fmt.Sprintf("metadata: Pairs got the odd number of input pairs for metadata: %d", len(kv)))
}
md := MD{}
- var key string
- for i, s := range kv {
- if i%2 == 0 {
- key = strings.ToLower(s)
- continue
- }
- md[key] = append(md[key], s)
+ for i := 0; i < len(kv); i += 2 {
+ key := strings.ToLower(kv[i])
+ md[key] = append(md[key], kv[i+1])
}
return md
}
@@ -195,12 +191,18 @@ func FromOutgoingContext(ctx context.Context) (MD, bool) {
return nil, false
}
- mds := make([]MD, 0, len(raw.added)+1)
- mds = append(mds, raw.md)
- for _, vv := range raw.added {
- mds = append(mds, Pairs(vv...))
+ out := raw.md.Copy()
+ for _, added := range raw.added {
+ if len(added)%2 == 1 {
+ panic(fmt.Sprintf("metadata: FromOutgoingContext got an odd number of input pairs for metadata: %d", len(added)))
+ }
+
+ for i := 0; i < len(added); i += 2 {
+ key := strings.ToLower(added[i])
+ out[key] = append(out[key], added[i+1])
+ }
}
- return Join(mds...), ok
+ return out, ok
}
type rawMD struct {
diff --git a/vendor/google.golang.org/grpc/pickfirst.go b/vendor/google.golang.org/grpc/pickfirst.go
index 56e33f6c7..b858c2a5e 100644
--- a/vendor/google.golang.org/grpc/pickfirst.go
+++ b/vendor/google.golang.org/grpc/pickfirst.go
@@ -84,7 +84,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) e
b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: &picker{result: balancer.PickResult{SubConn: b.sc}}})
b.sc.Connect()
} else {
- b.sc.UpdateAddresses(cs.ResolverState.Addresses)
+ b.cc.UpdateAddresses(b.sc, cs.ResolverState.Addresses)
b.sc.Connect()
}
return nil
diff --git a/vendor/google.golang.org/grpc/regenerate.sh b/vendor/google.golang.org/grpc/regenerate.sh
index 3443ad975..dfd3226a1 100644
--- a/vendor/google.golang.org/grpc/regenerate.sh
+++ b/vendor/google.golang.org/grpc/regenerate.sh
@@ -40,29 +40,14 @@ echo "go install cmd/protoc-gen-go-grpc"
echo "git clone https://github.com/grpc/grpc-proto"
git clone --quiet https://github.com/grpc/grpc-proto ${WORKDIR}/grpc-proto
+echo "git clone https://github.com/protocolbuffers/protobuf"
+git clone --quiet https://github.com/protocolbuffers/protobuf ${WORKDIR}/protobuf
+
# Pull in code.proto as a proto dependency
mkdir -p ${WORKDIR}/googleapis/google/rpc
echo "curl https://raw.githubusercontent.com/googleapis/googleapis/master/google/rpc/code.proto"
curl --silent https://raw.githubusercontent.com/googleapis/googleapis/master/google/rpc/code.proto > ${WORKDIR}/googleapis/google/rpc/code.proto
-# Pull in the following repos to build the MeshCA config proto.
-ENVOY_API_REPOS=(
- "https://github.com/envoyproxy/data-plane-api"
- "https://github.com/cncf/udpa"
- "https://github.com/envoyproxy/protoc-gen-validate"
-)
-for repo in ${ENVOY_API_REPOS[@]}; do
- dirname=$(basename ${repo})
- mkdir -p ${WORKDIR}/${dirname}
- echo "git clone ${repo}"
- git clone --quiet ${repo} ${WORKDIR}/${dirname}
-done
-
-# Pull in the MeshCA service proto.
-mkdir -p ${WORKDIR}/istio/istio/google/security/meshca/v1
-echo "curl https://raw.githubusercontent.com/istio/istio/master/security/proto/providers/google/meshca.proto"
-curl --silent https://raw.githubusercontent.com/istio/istio/master/security/proto/providers/google/meshca.proto > ${WORKDIR}/istio/istio/google/security/meshca/v1/meshca.proto
-
mkdir -p ${WORKDIR}/out
# Generates sources without the embed requirement
@@ -84,15 +69,14 @@ SOURCES=(
${WORKDIR}/grpc-proto/grpc/lookup/v1/rls.proto
${WORKDIR}/grpc-proto/grpc/lookup/v1/rls_config.proto
${WORKDIR}/grpc-proto/grpc/service_config/service_config.proto
- ${WORKDIR}/grpc-proto/grpc/tls/provider/meshca/experimental/config.proto
- ${WORKDIR}/istio/istio/google/security/meshca/v1/meshca.proto
+ ${WORKDIR}/grpc-proto/grpc/testing/*.proto
+ ${WORKDIR}/grpc-proto/grpc/core/*.proto
)
# These options of the form 'Mfoo.proto=bar' instruct the codegen to use an
# import path of 'bar' in the generated code when 'foo.proto' is imported in
# one of the sources.
-OPTS=Mgrpc/service_config/service_config.proto=/internal/proto/grpc_service_config,\
-Menvoy/config/core/v3/config_source.proto=github.com/envoyproxy/go-control-plane/envoy/config/core/v3
+OPTS=Mgrpc/service_config/service_config.proto=/internal/proto/grpc_service_config,Mgrpc/core/stats.proto=google.golang.org/grpc/interop/grpc_testing/core
for src in ${SOURCES[@]}; do
echo "protoc ${src}"
@@ -100,9 +84,7 @@ for src in ${SOURCES[@]}; do
-I"." \
-I${WORKDIR}/grpc-proto \
-I${WORKDIR}/googleapis \
- -I${WORKDIR}/data-plane-api \
- -I${WORKDIR}/udpa \
- -I${WORKDIR}/protoc-gen-validate \
+ -I${WORKDIR}/protobuf/src \
-I${WORKDIR}/istio \
${src}
done
@@ -113,9 +95,7 @@ for src in ${LEGACY_SOURCES[@]}; do
-I"." \
-I${WORKDIR}/grpc-proto \
-I${WORKDIR}/googleapis \
- -I${WORKDIR}/data-plane-api \
- -I${WORKDIR}/udpa \
- -I${WORKDIR}/protoc-gen-validate \
+ -I${WORKDIR}/protobuf/src \
-I${WORKDIR}/istio \
${src}
done
@@ -132,8 +112,8 @@ rm ${WORKDIR}/out/google.golang.org/grpc/reflection/grpc_testingv3/*.pb.go
# grpc/service_config/service_config.proto does not have a go_package option.
mv ${WORKDIR}/out/grpc/service_config/service_config.pb.go internal/proto/grpc_service_config
-# istio/google/security/meshca/v1/meshca.proto does not have a go_package option.
-mkdir -p ${WORKDIR}/out/google.golang.org/grpc/credentials/tls/certprovider/meshca/internal/v1/
-mv ${WORKDIR}/out/istio/google/security/meshca/v1/* ${WORKDIR}/out/google.golang.org/grpc/credentials/tls/certprovider/meshca/internal/v1/
+# grpc/testing does not have a go_package option.
+mv ${WORKDIR}/out/grpc/testing/*.pb.go interop/grpc_testing/
+mv ${WORKDIR}/out/grpc/core/*.pb.go interop/grpc_testing/core/
cp -R ${WORKDIR}/out/google.golang.org/grpc/* .
diff --git a/vendor/google.golang.org/grpc/resolver/resolver.go b/vendor/google.golang.org/grpc/resolver/resolver.go
index e9fa8e33d..6a9d234a5 100644
--- a/vendor/google.golang.org/grpc/resolver/resolver.go
+++ b/vendor/google.golang.org/grpc/resolver/resolver.go
@@ -181,7 +181,7 @@ type State struct {
// gRPC to add new methods to this interface.
type ClientConn interface {
// UpdateState updates the state of the ClientConn appropriately.
- UpdateState(State)
+ UpdateState(State) error
// ReportError notifies the ClientConn that the Resolver encountered an
// error. The ClientConn will notify the load balancer and begin calling
// ResolveNow on the Resolver with exponential backoff.
diff --git a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
index f2d81968f..4118de571 100644
--- a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
+++ b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
@@ -22,7 +22,6 @@ import (
"fmt"
"strings"
"sync"
- "time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/credentials"
@@ -40,9 +39,6 @@ type ccResolverWrapper struct {
resolver resolver.Resolver
done *grpcsync.Event
curState resolver.State
-
- pollingMu sync.Mutex
- polling chan struct{}
}
// newCCResolverWrapper uses the resolver.Builder to build a Resolver and
@@ -93,59 +89,19 @@ func (ccr *ccResolverWrapper) close() {
ccr.resolverMu.Unlock()
}
-// poll begins or ends asynchronous polling of the resolver based on whether
-// err is ErrBadResolverState.
-func (ccr *ccResolverWrapper) poll(err error) {
- ccr.pollingMu.Lock()
- defer ccr.pollingMu.Unlock()
- if err != balancer.ErrBadResolverState {
- // stop polling
- if ccr.polling != nil {
- close(ccr.polling)
- ccr.polling = nil
- }
- return
- }
- if ccr.polling != nil {
- // already polling
- return
- }
- p := make(chan struct{})
- ccr.polling = p
- go func() {
- for i := 0; ; i++ {
- ccr.resolveNow(resolver.ResolveNowOptions{})
- t := time.NewTimer(ccr.cc.dopts.resolveNowBackoff(i))
- select {
- case <-p:
- t.Stop()
- return
- case <-ccr.done.Done():
- // Resolver has been closed.
- t.Stop()
- return
- case <-t.C:
- select {
- case <-p:
- return
- default:
- }
- // Timer expired; re-resolve.
- }
- }
- }()
-}
-
-func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {
+func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
if ccr.done.HasFired() {
- return
+ return nil
}
channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending update to cc: %v", s)
if channelz.IsOn() {
ccr.addChannelzTraceEvent(s)
}
ccr.curState = s
- ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
+ if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
+ return balancer.ErrBadResolverState
+ }
+ return nil
}
func (ccr *ccResolverWrapper) ReportError(err error) {
@@ -153,7 +109,7 @@ func (ccr *ccResolverWrapper) ReportError(err error) {
return
}
channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err)
- ccr.poll(ccr.cc.updateResolverState(resolver.State{}, err))
+ ccr.cc.updateResolverState(resolver.State{}, err)
}
// NewAddress is called by the resolver implementation to send addresses to gRPC.
@@ -166,7 +122,7 @@ func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
}
ccr.curState.Addresses = addrs
- ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
+ ccr.cc.updateResolverState(ccr.curState, nil)
}
// NewServiceConfig is called by the resolver implementation to send service
@@ -183,14 +139,13 @@ func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
scpr := parseServiceConfig(sc)
if scpr.Err != nil {
channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err)
- ccr.poll(balancer.ErrBadResolverState)
return
}
if channelz.IsOn() {
ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
}
ccr.curState.ServiceConfig = scpr
- ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
+ ccr.cc.updateResolverState(ccr.curState, nil)
}
func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go
index f0609f2a4..6db356fa5 100644
--- a/vendor/google.golang.org/grpc/rpc_util.go
+++ b/vendor/google.golang.org/grpc/rpc_util.go
@@ -27,7 +27,6 @@ import (
"io"
"io/ioutil"
"math"
- "net/url"
"strings"
"sync"
"time"
@@ -430,9 +429,10 @@ func (o ContentSubtypeCallOption) before(c *callInfo) error {
}
func (o ContentSubtypeCallOption) after(c *callInfo, attempt *csAttempt) {}
-// ForceCodec returns a CallOption that will set the given Codec to be
-// used for all request and response messages for a call. The result of calling
-// String() will be used as the content-subtype in a case-insensitive manner.
+// ForceCodec returns a CallOption that will set codec to be used for all
+// request and response messages for a call. The result of calling Name() will
+// be used as the content-subtype after converting to lowercase, unless
+// CallContentSubtype is also used.
//
// See Content-Type on
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
@@ -854,7 +854,17 @@ func toRPCErr(err error) error {
// setCallInfoCodec should only be called after CallOptions have been applied.
func setCallInfoCodec(c *callInfo) error {
if c.codec != nil {
- // codec was already set by a CallOption; use it.
+ // codec was already set by a CallOption; use it, but set the content
+ // subtype if it is not set.
+ if c.contentSubtype == "" {
+ // c.codec is a baseCodec to hide the difference between grpc.Codec and
+ // encoding.Codec (Name vs. String method name). We only support
+ // setting content subtype from encoding.Codec to avoid a behavior
+ // change with the deprecated version.
+ if ec, ok := c.codec.(encoding.Codec); ok {
+ c.contentSubtype = strings.ToLower(ec.Name())
+ }
+ }
return nil
}
@@ -872,40 +882,6 @@ func setCallInfoCodec(c *callInfo) error {
return nil
}
-// parseDialTarget returns the network and address to pass to dialer
-func parseDialTarget(target string) (net string, addr string) {
- net = "tcp"
-
- m1 := strings.Index(target, ":")
- m2 := strings.Index(target, ":/")
-
- // handle unix:addr which will fail with url.Parse
- if m1 >= 0 && m2 < 0 {
- if n := target[0:m1]; n == "unix" {
- net = n
- addr = target[m1+1:]
- return net, addr
- }
- }
- if m2 >= 0 {
- t, err := url.Parse(target)
- if err != nil {
- return net, target
- }
- scheme := t.Scheme
- addr = t.Path
- if scheme == "unix" {
- net = scheme
- if addr == "" {
- addr = t.Host
- }
- return net, addr
- }
- }
-
- return net, target
-}
-
// channelzData is used to store channelz related data for ClientConn, addrConn and Server.
// These fields cannot be embedded in the original structs (e.g. ClientConn), since to do atomic
// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
@@ -923,8 +899,7 @@ type channelzData struct {
// buffer files to ensure compatibility with the gRPC version used. The latest
// support package version is 7.
//
-// Older versions are kept for compatibility. They may be removed if
-// compatibility cannot be maintained.
+// Older versions are kept for compatibility.
//
// These constants should not be referenced from any other code.
const (
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index 968eb598e..0a151dee4 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -40,6 +40,7 @@ import (
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"
"google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
@@ -56,8 +57,24 @@ import (
const (
defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
defaultServerMaxSendMessageSize = math.MaxInt32
+
+ // Server transports are tracked in a map which is keyed on listener
+ // address. For regular gRPC traffic, connections are accepted in Serve()
+ // through a call to Accept(), and we use the actual listener address as key
+ // when we add it to the map. But for connections received through
+ // ServeHTTP(), we do not have a listener and hence use this dummy value.
+ listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
)
+func init() {
+ internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
+ return srv.opts.creds
+ }
+ internal.DrainServerTransports = func(srv *Server, addr string) {
+ srv.drainServerTransports(addr)
+ }
+}
+
var statusOK = status.New(codes.OK, "")
var logger = grpclog.Component("core")
@@ -100,9 +117,12 @@ type serverWorkerData struct {
type Server struct {
opts serverOptions
- mu sync.Mutex // guards following
- lis map[net.Listener]bool
- conns map[transport.ServerTransport]bool
+ mu sync.Mutex // guards following
+ lis map[net.Listener]bool
+ // conns contains all active server transports. It is a map keyed on a
+ // listener address with the value being the set of active transports
+ // belonging to that listener.
+ conns map[string]map[transport.ServerTransport]bool
serve bool
drain bool
cv *sync.Cond // signaled when connections close for GracefulStop
@@ -259,6 +279,35 @@ func CustomCodec(codec Codec) ServerOption {
})
}
+// ForceServerCodec returns a ServerOption that sets a codec for message
+// marshaling and unmarshaling.
+//
+// This will override any lookups by content-subtype for Codecs registered
+// with RegisterCodec.
+//
+// See Content-Type on
+// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
+// more details. Also see the documentation on RegisterCodec and
+// CallContentSubtype for more details on the interaction between encoding.Codec
+// and content-subtype.
+//
+// This function is provided for advanced users; prefer to register codecs
+// using encoding.RegisterCodec.
+// The server will automatically use registered codecs based on the incoming
+// requests' headers. See also
+// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
+// Will be supported throughout 1.x.
+//
+// Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
+func ForceServerCodec(codec encoding.Codec) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.codec = codec
+ })
+}
+
// RPCCompressor returns a ServerOption that sets a compressor for outbound
// messages. For backward compatibility, all outbound messages will be sent
// using this compressor, regardless of incoming message compression. By
@@ -369,6 +418,11 @@ func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOptio
// InTapHandle returns a ServerOption that sets the tap handle for all the server
// transport to be created. Only one can be installed.
+//
+// Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
func InTapHandle(h tap.ServerInHandle) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
if o.inTapHandle != nil {
@@ -512,7 +566,7 @@ func NewServer(opt ...ServerOption) *Server {
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
- conns: make(map[transport.ServerTransport]bool),
+ conns: make(map[string]map[transport.ServerTransport]bool),
services: make(map[string]*serviceInfo),
quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
@@ -771,7 +825,7 @@ func (s *Server) Serve(lis net.Listener) error {
// s.conns before this conn can be added.
s.serveWG.Add(1)
go func() {
- s.handleRawConn(rawConn)
+ s.handleRawConn(lis.Addr().String(), rawConn)
s.serveWG.Done()
}()
}
@@ -779,7 +833,7 @@ func (s *Server) Serve(lis net.Listener) error {
// handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet.
-func (s *Server) handleRawConn(rawConn net.Conn) {
+func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
if s.quit.HasFired() {
rawConn.Close()
return
@@ -807,15 +861,24 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
}
rawConn.SetDeadline(time.Time{})
- if !s.addConn(st) {
+ if !s.addConn(lisAddr, st) {
return
}
go func() {
s.serveStreams(st)
- s.removeConn(st)
+ s.removeConn(lisAddr, st)
}()
}
+func (s *Server) drainServerTransports(addr string) {
+ s.mu.Lock()
+ conns := s.conns[addr]
+ for st := range conns {
+ st.Drain()
+ }
+ s.mu.Unlock()
+}
+
// newHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go).
func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
@@ -917,10 +980,10 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
- if !s.addConn(st) {
+ if !s.addConn(listenerAddressForServeHTTP, st) {
return
}
- defer s.removeConn(st)
+ defer s.removeConn(listenerAddressForServeHTTP, st)
s.serveStreams(st)
}
@@ -948,7 +1011,7 @@ func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Strea
return trInfo
}
-func (s *Server) addConn(st transport.ServerTransport) bool {
+func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.conns == nil {
@@ -960,15 +1023,28 @@ func (s *Server) addConn(st transport.ServerTransport) bool {
// immediately.
st.Drain()
}
- s.conns[st] = true
+
+ if s.conns[addr] == nil {
+ // Create a map entry if this is the first connection on this listener.
+ s.conns[addr] = make(map[transport.ServerTransport]bool)
+ }
+ s.conns[addr][st] = true
return true
}
-func (s *Server) removeConn(st transport.ServerTransport) {
+func (s *Server) removeConn(addr string, st transport.ServerTransport) {
s.mu.Lock()
defer s.mu.Unlock()
- if s.conns != nil {
- delete(s.conns, st)
+
+ conns := s.conns[addr]
+ if conns != nil {
+ delete(conns, st)
+ if len(conns) == 0 {
+ // If the last connection for this address is being removed, also
+ // remove the map entry corresponding to the address. This is used
+ // in GracefulStop() when waiting for all connections to be closed.
+ delete(s.conns, addr)
+ }
s.cv.Broadcast()
}
}
@@ -1632,7 +1708,7 @@ func (s *Server) Stop() {
s.mu.Lock()
listeners := s.lis
s.lis = nil
- st := s.conns
+ conns := s.conns
s.conns = nil
// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
s.cv.Broadcast()
@@ -1641,8 +1717,10 @@ func (s *Server) Stop() {
for lis := range listeners {
lis.Close()
}
- for c := range st {
- c.Close()
+ for _, cs := range conns {
+ for st := range cs {
+ st.Close()
+ }
}
if s.opts.numServerWorkers > 0 {
s.stopServerWorkers()
@@ -1679,8 +1757,10 @@ func (s *Server) GracefulStop() {
}
s.lis = nil
if !s.drain {
- for st := range s.conns {
- st.Drain()
+ for _, conns := range s.conns {
+ for st := range conns {
+ st.Drain()
+ }
}
s.drain = true
}
diff --git a/vendor/google.golang.org/grpc/service_config.go b/vendor/google.golang.org/grpc/service_config.go
index 5e434ca7f..22c4240cf 100644
--- a/vendor/google.golang.org/grpc/service_config.go
+++ b/vendor/google.golang.org/grpc/service_config.go
@@ -41,29 +41,7 @@ const maxInt = int(^uint(0) >> 1)
// Deprecated: Users should not use this struct. Service config should be received
// through name resolver, as specified here
// https://github.com/grpc/grpc/blob/master/doc/service_config.md
-type MethodConfig struct {
- // WaitForReady indicates whether RPCs sent to this method should wait until
- // the connection is ready by default (!failfast). The value specified via the
- // gRPC client API will override the value set here.
- WaitForReady *bool
- // Timeout is the default timeout for RPCs sent to this method. The actual
- // deadline used will be the minimum of the value specified here and the value
- // set by the application via the gRPC client API. If either one is not set,
- // then the other will be used. If neither is set, then the RPC has no deadline.
- Timeout *time.Duration
- // MaxReqSize is the maximum allowed payload size for an individual request in a
- // stream (client->server) in bytes. The size which is measured is the serialized
- // payload after per-message compression (but before stream compression) in bytes.
- // The actual value used is the minimum of the value specified here and the value set
- // by the application via the gRPC client API. If either one is not set, then the other
- // will be used. If neither is set, then the built-in default is used.
- MaxReqSize *int
- // MaxRespSize is the maximum allowed payload size for an individual response in a
- // stream (server->client) in bytes.
- MaxRespSize *int
- // RetryPolicy configures retry options for the method.
- retryPolicy *retryPolicy
-}
+type MethodConfig = internalserviceconfig.MethodConfig
type lbConfig struct {
name string
@@ -127,34 +105,6 @@ type healthCheckConfig struct {
ServiceName string
}
-// retryPolicy defines the go-native version of the retry policy defined by the
-// service config here:
-// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#integration-with-service-config
-type retryPolicy struct {
- // MaxAttempts is the maximum number of attempts, including the original RPC.
- //
- // This field is required and must be two or greater.
- maxAttempts int
-
- // Exponential backoff parameters. The initial retry attempt will occur at
- // random(0, initialBackoff). In general, the nth attempt will occur at
- // random(0,
- // min(initialBackoff*backoffMultiplier**(n-1), maxBackoff)).
- //
- // These fields are required and must be greater than zero.
- initialBackoff time.Duration
- maxBackoff time.Duration
- backoffMultiplier float64
-
- // The set of status codes which may be retried.
- //
- // Status codes are specified as strings, e.g., "UNAVAILABLE".
- //
- // This field is required and must be non-empty.
- // Note: a set is used to store this for easy lookup.
- retryableStatusCodes map[codes.Code]bool
-}
-
type jsonRetryPolicy struct {
MaxAttempts int
InitialBackoff string
@@ -313,7 +263,7 @@ func parseServiceConfig(js string) *serviceconfig.ParseResult {
WaitForReady: m.WaitForReady,
Timeout: d,
}
- if mc.retryPolicy, err = convertRetryPolicy(m.RetryPolicy); err != nil {
+ if mc.RetryPolicy, err = convertRetryPolicy(m.RetryPolicy); err != nil {
logger.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
return &serviceconfig.ParseResult{Err: err}
}
@@ -359,7 +309,7 @@ func parseServiceConfig(js string) *serviceconfig.ParseResult {
return &serviceconfig.ParseResult{Config: &sc}
}
-func convertRetryPolicy(jrp *jsonRetryPolicy) (p *retryPolicy, err error) {
+func convertRetryPolicy(jrp *jsonRetryPolicy) (p *internalserviceconfig.RetryPolicy, err error) {
if jrp == nil {
return nil, nil
}
@@ -381,19 +331,19 @@ func convertRetryPolicy(jrp *jsonRetryPolicy) (p *retryPolicy, err error) {
return nil, nil
}
- rp := &retryPolicy{
- maxAttempts: jrp.MaxAttempts,
- initialBackoff: *ib,
- maxBackoff: *mb,
- backoffMultiplier: jrp.BackoffMultiplier,
- retryableStatusCodes: make(map[codes.Code]bool),
+ rp := &internalserviceconfig.RetryPolicy{
+ MaxAttempts: jrp.MaxAttempts,
+ InitialBackoff: *ib,
+ MaxBackoff: *mb,
+ BackoffMultiplier: jrp.BackoffMultiplier,
+ RetryableStatusCodes: make(map[codes.Code]bool),
}
- if rp.maxAttempts > 5 {
+ if rp.MaxAttempts > 5 {
// TODO(retry): Make the max maxAttempts configurable.
- rp.maxAttempts = 5
+ rp.MaxAttempts = 5
}
for _, code := range jrp.RetryableStatusCodes {
- rp.retryableStatusCodes[code] = true
+ rp.RetryableStatusCodes[code] = true
}
return rp, nil
}
diff --git a/vendor/google.golang.org/grpc/status/status.go b/vendor/google.golang.org/grpc/status/status.go
index 01e182c30..54d187186 100644
--- a/vendor/google.golang.org/grpc/status/status.go
+++ b/vendor/google.golang.org/grpc/status/status.go
@@ -73,9 +73,11 @@ func FromProto(s *spb.Status) *Status {
return status.FromProto(s)
}
-// FromError returns a Status representing err if it was produced from this
-// package or has a method `GRPCStatus() *Status`. Otherwise, ok is false and a
-// Status is returned with codes.Unknown and the original error message.
+// FromError returns a Status representing err if it was produced by this
+// package or has a method `GRPCStatus() *Status`.
+// If err is nil, a Status is returned with codes.OK and no message.
+// Otherwise, ok is false and a Status is returned with codes.Unknown and
+// the original error message.
func FromError(err error) (s *Status, ok bool) {
if err == nil {
return nil, true
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 5fd856a38..1f3e70d2c 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -36,6 +36,8 @@ import (
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpcutil"
+ iresolver "google.golang.org/grpc/internal/resolver"
+ "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
@@ -50,14 +52,20 @@ import (
// of the RPC.
type StreamHandler func(srv interface{}, stream ServerStream) error
-// StreamDesc represents a streaming RPC service's method specification.
+// StreamDesc represents a streaming RPC service's method specification. Used
+// on the server when registering services and on the client when initiating
+// new streams.
type StreamDesc struct {
- StreamName string
- Handler StreamHandler
-
- // At least one of these is true.
- ServerStreams bool
- ClientStreams bool
+ // StreamName and Handler are only used when registering handlers on a
+ // server.
+ StreamName string // the name of the method excluding the service
+ Handler StreamHandler // the handler called for the method
+
+ // ServerStreams and ClientStreams are used for registering handlers on a
+ // server as well as defining RPC behavior when passed to NewClientStream
+ // and ClientConn.NewStream. At least one must be true.
+ ServerStreams bool // indicates the server can perform streaming sends
+ ClientStreams bool // indicates the client can perform streaming sends
}
// Stream defines the common interface a client or server stream has to satisfy.
@@ -164,13 +172,48 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
}()
}
- c := defaultCallInfo()
// Provide an opportunity for the first RPC to see the first service config
// provided by the resolver.
if err := cc.waitForResolvedAddrs(ctx); err != nil {
return nil, err
}
- mc := cc.GetMethodConfig(method)
+
+ var mc serviceconfig.MethodConfig
+ var onCommit func()
+ var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
+ return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
+ }
+
+ rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
+ rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
+ if err != nil {
+ return nil, toRPCErr(err)
+ }
+
+ if rpcConfig != nil {
+ if rpcConfig.Context != nil {
+ ctx = rpcConfig.Context
+ }
+ mc = rpcConfig.MethodConfig
+ onCommit = rpcConfig.OnCommitted
+ if rpcConfig.Interceptor != nil {
+ rpcInfo.Context = nil
+ ns := newStream
+ newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
+ cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns)
+ if err != nil {
+ return nil, toRPCErr(err)
+ }
+ return cs, nil
+ }
+ }
+ }
+
+ return newStream(ctx, func() {})
+}
+
+func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
+ c := defaultCallInfo()
if mc.WaitForReady != nil {
c.failFast = !*mc.WaitForReady
}
@@ -207,6 +250,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
Host: cc.authority,
Method: method,
ContentSubtype: c.contentSubtype,
+ DoneFunc: doneFunc,
}
// Set our outgoing compression according to the UseCompressor CallOption, if
@@ -272,6 +316,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
cancel: cancel,
beginTime: beginTime,
firstAttempt: true,
+ onCommit: onCommit,
}
if !cc.dopts.disableRetry {
cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
@@ -432,7 +477,8 @@ type clientStream struct {
// place where we need to check if the attempt is nil.
attempt *csAttempt
// TODO(hedging): hedging will have multiple attempts simultaneously.
- committed bool // active attempt committed for retry?
+ committed bool // active attempt committed for retry?
+ onCommit func()
buffer []func(a *csAttempt) error // operations to replay on retry
bufferSize int // current size of buffer
}
@@ -461,6 +507,9 @@ type csAttempt struct {
}
func (cs *clientStream) commitAttemptLocked() {
+ if !cs.committed && cs.onCommit != nil {
+ cs.onCommit()
+ }
cs.committed = true
cs.buffer = nil
}
@@ -539,8 +588,8 @@ func (cs *clientStream) shouldRetry(err error) error {
code = status.Convert(err).Code()
}
- rp := cs.methodConfig.retryPolicy
- if rp == nil || !rp.retryableStatusCodes[code] {
+ rp := cs.methodConfig.RetryPolicy
+ if rp == nil || !rp.RetryableStatusCodes[code] {
return err
}
@@ -549,7 +598,7 @@ func (cs *clientStream) shouldRetry(err error) error {
if cs.retryThrottler.throttle() {
return err
}
- if cs.numRetries+1 >= rp.maxAttempts {
+ if cs.numRetries+1 >= rp.MaxAttempts {
return err
}
@@ -558,9 +607,9 @@ func (cs *clientStream) shouldRetry(err error) error {
dur = time.Millisecond * time.Duration(pushback)
cs.numRetriesSincePushback = 0
} else {
- fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback))
- cur := float64(rp.initialBackoff) * fact
- if max := float64(rp.maxBackoff); cur > max {
+ fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback))
+ cur := float64(rp.InitialBackoff) * fact
+ if max := float64(rp.MaxBackoff); cur > max {
cur = max
}
dur = time.Duration(grpcrand.Int63n(int64(cur)))
diff --git a/vendor/google.golang.org/grpc/tap/tap.go b/vendor/google.golang.org/grpc/tap/tap.go
index caea1ebed..dbf34e6bb 100644
--- a/vendor/google.golang.org/grpc/tap/tap.go
+++ b/vendor/google.golang.org/grpc/tap/tap.go
@@ -37,16 +37,16 @@ type Info struct {
// TODO: More to be added.
}
-// ServerInHandle defines the function which runs before a new stream is created
-// on the server side. If it returns a non-nil error, the stream will not be
-// created and a RST_STREAM will be sent back to the client with REFUSED_STREAM.
-// The client will receive an RPC error "code = Unavailable, desc = stream
-// terminated by RST_STREAM with error code: REFUSED_STREAM".
+// ServerInHandle defines the function which runs before a new stream is
+// created on the server side. If it returns a non-nil error, the stream will
+// not be created and an error will be returned to the client. If the error
+// returned is a status error, that status code and message will be used,
+// otherwise PermissionDenied will be the code and err.Error() will be the
+// message.
//
// It's intended to be used in situations where you don't want to waste the
-// resources to accept the new stream (e.g. rate-limiting). And the content of
-// the error will be ignored and won't be sent back to the client. For other
-// general usages, please use interceptors.
+// resources to accept the new stream (e.g. rate-limiting). For other general
+// usages, please use interceptors.
//
// Note that it is executed in the per-connection I/O goroutine(s) instead of
// per-RPC goroutine. Therefore, users should NOT have any
diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go
index 144b0bf9e..bfe5cf887 100644
--- a/vendor/google.golang.org/grpc/version.go
+++ b/vendor/google.golang.org/grpc/version.go
@@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
-const Version = "1.33.2"
+const Version = "1.38.0"
diff --git a/vendor/google.golang.org/grpc/vet.sh b/vendor/google.golang.org/grpc/vet.sh
index 48652f604..1a0dbd7ee 100644
--- a/vendor/google.golang.org/grpc/vet.sh
+++ b/vendor/google.golang.org/grpc/vet.sh
@@ -28,7 +28,8 @@ cleanup() {
}
trap cleanup EXIT
-PATH="${GOPATH}/bin:${GOROOT}/bin:${PATH}"
+PATH="${HOME}/go/bin:${GOROOT}/bin:${PATH}"
+go version
if [[ "$1" = "-install" ]]; then
# Check for module support
@@ -53,7 +54,7 @@ if [[ "$1" = "-install" ]]; then
fi
if [[ -z "${VET_SKIP_PROTO}" ]]; then
if [[ "${TRAVIS}" = "true" ]]; then
- PROTOBUF_VERSION=3.3.0
+ PROTOBUF_VERSION=3.14.0
PROTOC_FILENAME=protoc-${PROTOBUF_VERSION}-linux-x86_64.zip
pushd /home/travis
wget https://github.com/google/protobuf/releases/download/v${PROTOBUF_VERSION}/${PROTOC_FILENAME}
@@ -61,7 +62,7 @@ if [[ "$1" = "-install" ]]; then
bin/protoc --version
popd
elif [[ "${GITHUB_ACTIONS}" = "true" ]]; then
- PROTOBUF_VERSION=3.3.0
+ PROTOBUF_VERSION=3.14.0
PROTOC_FILENAME=protoc-${PROTOBUF_VERSION}-linux-x86_64.zip
pushd /home/runner/go
wget https://github.com/google/protobuf/releases/download/v${PROTOBUF_VERSION}/${PROTOC_FILENAME}
@@ -104,12 +105,6 @@ git grep '"github.com/envoyproxy/go-control-plane/envoy' -- '*.go' ':(exclude)*.
# TODO: Remove when we drop Go 1.10 support
go list -f {{.Dir}} ./... | xargs go run test/go_vet/vet.go
-# - gofmt, goimports, golint (with exceptions for generated code), go vet.
-gofmt -s -d -l . 2>&1 | fail_on_output
-goimports -l . 2>&1 | not grep -vE "\.pb\.go"
-golint ./... 2>&1 | not grep -vE "\.pb\.go:"
-go vet -all ./...
-
misspell -error .
# - Check that generated proto files are up to date.
@@ -119,12 +114,22 @@ if [[ -z "${VET_SKIP_PROTO}" ]]; then
(git status; git --no-pager diff; exit 1)
fi
-# - Check that our modules are tidy.
-if go help mod >& /dev/null; then
- find . -name 'go.mod' | xargs -IXXX bash -c 'cd $(dirname XXX); go mod tidy'
+# - gofmt, goimports, golint (with exceptions for generated code), go vet,
+# go mod tidy.
+# Perform these checks on each module inside gRPC.
+for MOD_FILE in $(find . -name 'go.mod'); do
+ MOD_DIR=$(dirname ${MOD_FILE})
+ pushd ${MOD_DIR}
+ go vet -all ./... | fail_on_output
+ gofmt -s -d -l . 2>&1 | fail_on_output
+ goimports -l . 2>&1 | not grep -vE "\.pb\.go"
+ golint ./... 2>&1 | not grep -vE "/testv3\.pb\.go:"
+
+ go mod tidy
git status --porcelain 2>&1 | fail_on_output || \
(git status; git --no-pager diff; exit 1)
-fi
+ popd
+done
# - Collection of static analysis checks
#
@@ -141,8 +146,11 @@ not grep -Fv '.CredsBundle
.NewAddress
.NewServiceConfig
.Type is deprecated: use Attributes
+BuildVersion is deprecated
balancer.ErrTransientFailure
balancer.Picker
+extDesc.Filename is deprecated
+github.com/golang/protobuf/jsonpb is deprecated
grpc.CallCustomCodec
grpc.Code
grpc.Compressor
@@ -164,13 +172,7 @@ grpc.WithServiceConfig
grpc.WithTimeout
http.CloseNotifier
info.SecurityVersion
-resolver.Backend
-resolver.GRPCLB
-extDesc.Filename is deprecated
-BuildVersion is deprecated
-github.com/golang/protobuf/jsonpb is deprecated
proto is deprecated
-xxx_messageInfo_
proto.InternalMessageInfo is deprecated
proto.EnumName is deprecated
proto.ErrInternalBadWireType is deprecated
@@ -184,7 +186,12 @@ proto.RegisterExtension is deprecated
proto.RegisteredExtension is deprecated
proto.RegisteredExtensions is deprecated
proto.RegisterMapType is deprecated
-proto.Unmarshaler is deprecated' "${SC_OUT}"
+proto.Unmarshaler is deprecated
+resolver.Backend
+resolver.GRPCLB
+Target is deprecated: Use the Target field in the BuildOptions instead.
+xxx_messageInfo_
+' "${SC_OUT}"
# - special golint on package comments.
lint_package_comment_per_package() {