aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc')
-rw-r--r--vendor/google.golang.org/grpc/LICENSE28
-rw-r--r--vendor/google.golang.org/grpc/PATENTS22
-rw-r--r--vendor/google.golang.org/grpc/README.md32
-rw-r--r--vendor/google.golang.org/grpc/backoff.go80
-rw-r--r--vendor/google.golang.org/grpc/balancer.go400
-rw-r--r--vendor/google.golang.org/grpc/call.go232
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go888
-rw-r--r--vendor/google.golang.org/grpc/codes/code_string.go16
-rw-r--r--vendor/google.golang.org/grpc/codes/codes.go159
-rw-r--r--vendor/google.golang.org/grpc/credentials/credentials.go232
-rw-r--r--vendor/google.golang.org/grpc/credentials/credentials_util_go17.go76
-rw-r--r--vendor/google.golang.org/grpc/credentials/credentials_util_pre_go17.go74
-rw-r--r--vendor/google.golang.org/grpc/doc.go6
-rw-r--r--vendor/google.golang.org/grpc/grpclog/logger.go93
-rw-r--r--vendor/google.golang.org/grpc/interceptor.go90
-rw-r--r--vendor/google.golang.org/grpc/internal/internal.go49
-rw-r--r--vendor/google.golang.org/grpc/metadata/metadata.go147
-rw-r--r--vendor/google.golang.org/grpc/naming/naming.go74
-rw-r--r--vendor/google.golang.org/grpc/peer/peer.go65
-rw-r--r--vendor/google.golang.org/grpc/rpc_util.go457
-rw-r--r--vendor/google.golang.org/grpc/server.go944
-rw-r--r--vendor/google.golang.org/grpc/stream.go526
-rw-r--r--vendor/google.golang.org/grpc/trace.go119
-rw-r--r--vendor/google.golang.org/grpc/transport/control.go215
-rw-r--r--vendor/google.golang.org/grpc/transport/go16.go46
-rw-r--r--vendor/google.golang.org/grpc/transport/go17.go46
-rw-r--r--vendor/google.golang.org/grpc/transport/handler_server.go397
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_client.go1072
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_server.go775
-rw-r--r--vendor/google.golang.org/grpc/transport/http_util.go513
-rw-r--r--vendor/google.golang.org/grpc/transport/pre_go16.go51
-rw-r--r--vendor/google.golang.org/grpc/transport/transport.go595
32 files changed, 0 insertions, 8519 deletions
diff --git a/vendor/google.golang.org/grpc/LICENSE b/vendor/google.golang.org/grpc/LICENSE
deleted file mode 100644
index f4988b450..000000000
--- a/vendor/google.golang.org/grpc/LICENSE
+++ /dev/null
@@ -1,28 +0,0 @@
-Copyright 2014, Google Inc.
-All rights reserved.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are
-met:
-
- * Redistributions of source code must retain the above copyright
-notice, this list of conditions and the following disclaimer.
- * Redistributions in binary form must reproduce the above
-copyright notice, this list of conditions and the following disclaimer
-in the documentation and/or other materials provided with the
-distribution.
- * Neither the name of Google Inc. nor the names of its
-contributors may be used to endorse or promote products derived from
-this software without specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/vendor/google.golang.org/grpc/PATENTS b/vendor/google.golang.org/grpc/PATENTS
deleted file mode 100644
index 69b47959f..000000000
--- a/vendor/google.golang.org/grpc/PATENTS
+++ /dev/null
@@ -1,22 +0,0 @@
-Additional IP Rights Grant (Patents)
-
-"This implementation" means the copyrightable works distributed by
-Google as part of the gRPC project.
-
-Google hereby grants to You a perpetual, worldwide, non-exclusive,
-no-charge, royalty-free, irrevocable (except as stated in this section)
-patent license to make, have made, use, offer to sell, sell, import,
-transfer and otherwise run, modify and propagate the contents of this
-implementation of gRPC, where such license applies only to those patent
-claims, both currently owned or controlled by Google and acquired in
-the future, licensable by Google that are necessarily infringed by this
-implementation of gRPC. This grant does not include claims that would be
-infringed only as a consequence of further modification of this
-implementation. If you or your agent or exclusive licensee institute or
-order or agree to the institution of patent litigation against any
-entity (including a cross-claim or counterclaim in a lawsuit) alleging
-that this implementation of gRPC or any code incorporated within this
-implementation of gRPC constitutes direct or contributory patent
-infringement, or inducement of patent infringement, then any patent
-rights granted to you under this License for this implementation of gRPC
-shall terminate as of the date such litigation is filed.
diff --git a/vendor/google.golang.org/grpc/README.md b/vendor/google.golang.org/grpc/README.md
deleted file mode 100644
index 110a8cf42..000000000
--- a/vendor/google.golang.org/grpc/README.md
+++ /dev/null
@@ -1,32 +0,0 @@
-#gRPC-Go
-
-[![Build Status](https://travis-ci.org/grpc/grpc-go.svg)](https://travis-ci.org/grpc/grpc-go) [![GoDoc](https://godoc.org/google.golang.org/grpc?status.svg)](https://godoc.org/google.golang.org/grpc)
-
-The Go implementation of [gRPC](http://www.grpc.io/): A high performance, open source, general RPC framework that puts mobile and HTTP/2 first. For more information see the [gRPC Quick Start](http://www.grpc.io/docs/) guide.
-
-Installation
-------------
-
-To install this package, you need to install Go and setup your Go workspace on your computer. The simplest way to install the library is to run:
-
-```
-$ go get google.golang.org/grpc
-```
-
-Prerequisites
--------------
-
-This requires Go 1.5 or later.
-
-Constraints
------------
-The grpc package should only depend on standard Go packages and a small number of exceptions. If your contribution introduces new dependencies which are NOT in the [list](http://godoc.org/google.golang.org/grpc?imports), you need a discussion with gRPC-Go authors and consultants.
-
-Documentation
--------------
-See [API documentation](https://godoc.org/google.golang.org/grpc) for package and API descriptions and find examples in the [examples directory](examples/).
-
-Status
-------
-GA
-
diff --git a/vendor/google.golang.org/grpc/backoff.go b/vendor/google.golang.org/grpc/backoff.go
deleted file mode 100644
index c99024ee3..000000000
--- a/vendor/google.golang.org/grpc/backoff.go
+++ /dev/null
@@ -1,80 +0,0 @@
-package grpc
-
-import (
- "math/rand"
- "time"
-)
-
-// DefaultBackoffConfig uses values specified for backoff in
-// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md.
-var (
- DefaultBackoffConfig = BackoffConfig{
- MaxDelay: 120 * time.Second,
- baseDelay: 1.0 * time.Second,
- factor: 1.6,
- jitter: 0.2,
- }
-)
-
-// backoffStrategy defines the methodology for backing off after a grpc
-// connection failure.
-//
-// This is unexported until the gRPC project decides whether or not to allow
-// alternative backoff strategies. Once a decision is made, this type and its
-// method may be exported.
-type backoffStrategy interface {
- // backoff returns the amount of time to wait before the next retry given
- // the number of consecutive failures.
- backoff(retries int) time.Duration
-}
-
-// BackoffConfig defines the parameters for the default gRPC backoff strategy.
-type BackoffConfig struct {
- // MaxDelay is the upper bound of backoff delay.
- MaxDelay time.Duration
-
- // TODO(stevvooe): The following fields are not exported, as allowing
- // changes would violate the current gRPC specification for backoff. If
- // gRPC decides to allow more interesting backoff strategies, these fields
- // may be opened up in the future.
-
- // baseDelay is the amount of time to wait before retrying after the first
- // failure.
- baseDelay time.Duration
-
- // factor is applied to the backoff after each retry.
- factor float64
-
- // jitter provides a range to randomize backoff delays.
- jitter float64
-}
-
-func setDefaults(bc *BackoffConfig) {
- md := bc.MaxDelay
- *bc = DefaultBackoffConfig
-
- if md > 0 {
- bc.MaxDelay = md
- }
-}
-
-func (bc BackoffConfig) backoff(retries int) time.Duration {
- if retries == 0 {
- return bc.baseDelay
- }
- backoff, max := float64(bc.baseDelay), float64(bc.MaxDelay)
- for backoff < max && retries > 0 {
- backoff *= bc.factor
- retries--
- }
- if backoff > max {
- backoff = max
- }
- // Randomize backoff delays so that if a cluster of requests start at
- // the same time, they won't operate in lockstep.
- backoff *= 1 + bc.jitter*(rand.Float64()*2-1)
- if backoff < 0 {
- return 0
- }
- return time.Duration(backoff)
-}
diff --git a/vendor/google.golang.org/grpc/balancer.go b/vendor/google.golang.org/grpc/balancer.go
deleted file mode 100644
index 9d943fbad..000000000
--- a/vendor/google.golang.org/grpc/balancer.go
+++ /dev/null
@@ -1,400 +0,0 @@
-/*
- *
- * Copyright 2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-package grpc
-
-import (
- "fmt"
- "sync"
-
- "golang.org/x/net/context"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/naming"
-)
-
-// Address represents a server the client connects to.
-// This is the EXPERIMENTAL API and may be changed or extended in the future.
-type Address struct {
- // Addr is the server address on which a connection will be established.
- Addr string
- // Metadata is the information associated with Addr, which may be used
- // to make load balancing decision.
- Metadata interface{}
-}
-
-// BalancerConfig specifies the configurations for Balancer.
-type BalancerConfig struct {
- // DialCreds is the transport credential the Balancer implementation can
- // use to dial to a remote load balancer server. The Balancer implementations
- // can ignore this if it does not need to talk to another party securely.
- DialCreds credentials.TransportCredentials
-}
-
-// BalancerGetOptions configures a Get call.
-// This is the EXPERIMENTAL API and may be changed or extended in the future.
-type BalancerGetOptions struct {
- // BlockingWait specifies whether Get should block when there is no
- // connected address.
- BlockingWait bool
-}
-
-// Balancer chooses network addresses for RPCs.
-// This is the EXPERIMENTAL API and may be changed or extended in the future.
-type Balancer interface {
- // Start does the initialization work to bootstrap a Balancer. For example,
- // this function may start the name resolution and watch the updates. It will
- // be called when dialing.
- Start(target string, config BalancerConfig) error
- // Up informs the Balancer that gRPC has a connection to the server at
- // addr. It returns down which is called once the connection to addr gets
- // lost or closed.
- // TODO: It is not clear how to construct and take advantage of the meaningful error
- // parameter for down. Need realistic demands to guide.
- Up(addr Address) (down func(error))
- // Get gets the address of a server for the RPC corresponding to ctx.
- // i) If it returns a connected address, gRPC internals issues the RPC on the
- // connection to this address;
- // ii) If it returns an address on which the connection is under construction
- // (initiated by Notify(...)) but not connected, gRPC internals
- // * fails RPC if the RPC is fail-fast and connection is in the TransientFailure or
- // Shutdown state;
- // or
- // * issues RPC on the connection otherwise.
- // iii) If it returns an address on which the connection does not exist, gRPC
- // internals treats it as an error and will fail the corresponding RPC.
- //
- // Therefore, the following is the recommended rule when writing a custom Balancer.
- // If opts.BlockingWait is true, it should return a connected address or
- // block if there is no connected address. It should respect the timeout or
- // cancellation of ctx when blocking. If opts.BlockingWait is false (for fail-fast
- // RPCs), it should return an address it has notified via Notify(...) immediately
- // instead of blocking.
- //
- // The function returns put which is called once the rpc has completed or failed.
- // put can collect and report RPC stats to a remote load balancer.
- //
- // This function should only return the errors Balancer cannot recover by itself.
- // gRPC internals will fail the RPC if an error is returned.
- Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error)
- // Notify returns a channel that is used by gRPC internals to watch the addresses
- // gRPC needs to connect. The addresses might be from a name resolver or remote
- // load balancer. gRPC internals will compare it with the existing connected
- // addresses. If the address Balancer notified is not in the existing connected
- // addresses, gRPC starts to connect the address. If an address in the existing
- // connected addresses is not in the notification list, the corresponding connection
- // is shutdown gracefully. Otherwise, there are no operations to take. Note that
- // the Address slice must be the full list of the Addresses which should be connected.
- // It is NOT delta.
- Notify() <-chan []Address
- // Close shuts down the balancer.
- Close() error
-}
-
-// downErr implements net.Error. It is constructed by gRPC internals and passed to the down
-// call of Balancer.
-type downErr struct {
- timeout bool
- temporary bool
- desc string
-}
-
-func (e downErr) Error() string { return e.desc }
-func (e downErr) Timeout() bool { return e.timeout }
-func (e downErr) Temporary() bool { return e.temporary }
-
-func downErrorf(timeout, temporary bool, format string, a ...interface{}) downErr {
- return downErr{
- timeout: timeout,
- temporary: temporary,
- desc: fmt.Sprintf(format, a...),
- }
-}
-
-// RoundRobin returns a Balancer that selects addresses round-robin. It uses r to watch
-// the name resolution updates and updates the addresses available correspondingly.
-func RoundRobin(r naming.Resolver) Balancer {
- return &roundRobin{r: r}
-}
-
-type addrInfo struct {
- addr Address
- connected bool
-}
-
-type roundRobin struct {
- r naming.Resolver
- w naming.Watcher
- addrs []*addrInfo // all the addresses the client should potentially connect
- mu sync.Mutex
- addrCh chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to.
- next int // index of the next address to return for Get()
- waitCh chan struct{} // the channel to block when there is no connected address available
- done bool // The Balancer is closed.
-}
-
-func (rr *roundRobin) watchAddrUpdates() error {
- updates, err := rr.w.Next()
- if err != nil {
- grpclog.Printf("grpc: the naming watcher stops working due to %v.\n", err)
- return err
- }
- rr.mu.Lock()
- defer rr.mu.Unlock()
- for _, update := range updates {
- addr := Address{
- Addr: update.Addr,
- Metadata: update.Metadata,
- }
- switch update.Op {
- case naming.Add:
- var exist bool
- for _, v := range rr.addrs {
- if addr == v.addr {
- exist = true
- grpclog.Println("grpc: The name resolver wanted to add an existing address: ", addr)
- break
- }
- }
- if exist {
- continue
- }
- rr.addrs = append(rr.addrs, &addrInfo{addr: addr})
- case naming.Delete:
- for i, v := range rr.addrs {
- if addr == v.addr {
- copy(rr.addrs[i:], rr.addrs[i+1:])
- rr.addrs = rr.addrs[:len(rr.addrs)-1]
- break
- }
- }
- default:
- grpclog.Println("Unknown update.Op ", update.Op)
- }
- }
- // Make a copy of rr.addrs and write it onto rr.addrCh so that gRPC internals gets notified.
- open := make([]Address, len(rr.addrs))
- for i, v := range rr.addrs {
- open[i] = v.addr
- }
- if rr.done {
- return ErrClientConnClosing
- }
- rr.addrCh <- open
- return nil
-}
-
-func (rr *roundRobin) Start(target string, config BalancerConfig) error {
- rr.mu.Lock()
- defer rr.mu.Unlock()
- if rr.done {
- return ErrClientConnClosing
- }
- if rr.r == nil {
- // If there is no name resolver installed, it is not needed to
- // do name resolution. In this case, target is added into rr.addrs
- // as the only address available and rr.addrCh stays nil.
- rr.addrs = append(rr.addrs, &addrInfo{addr: Address{Addr: target}})
- return nil
- }
- w, err := rr.r.Resolve(target)
- if err != nil {
- return err
- }
- rr.w = w
- rr.addrCh = make(chan []Address)
- go func() {
- for {
- if err := rr.watchAddrUpdates(); err != nil {
- return
- }
- }
- }()
- return nil
-}
-
-// Up sets the connected state of addr and sends notification if there are pending
-// Get() calls.
-func (rr *roundRobin) Up(addr Address) func(error) {
- rr.mu.Lock()
- defer rr.mu.Unlock()
- var cnt int
- for _, a := range rr.addrs {
- if a.addr == addr {
- if a.connected {
- return nil
- }
- a.connected = true
- }
- if a.connected {
- cnt++
- }
- }
- // addr is only one which is connected. Notify the Get() callers who are blocking.
- if cnt == 1 && rr.waitCh != nil {
- close(rr.waitCh)
- rr.waitCh = nil
- }
- return func(err error) {
- rr.down(addr, err)
- }
-}
-
-// down unsets the connected state of addr.
-func (rr *roundRobin) down(addr Address, err error) {
- rr.mu.Lock()
- defer rr.mu.Unlock()
- for _, a := range rr.addrs {
- if addr == a.addr {
- a.connected = false
- break
- }
- }
-}
-
-// Get returns the next addr in the rotation.
-func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
- var ch chan struct{}
- rr.mu.Lock()
- if rr.done {
- rr.mu.Unlock()
- err = ErrClientConnClosing
- return
- }
-
- if len(rr.addrs) > 0 {
- if rr.next >= len(rr.addrs) {
- rr.next = 0
- }
- next := rr.next
- for {
- a := rr.addrs[next]
- next = (next + 1) % len(rr.addrs)
- if a.connected {
- addr = a.addr
- rr.next = next
- rr.mu.Unlock()
- return
- }
- if next == rr.next {
- // Has iterated all the possible address but none is connected.
- break
- }
- }
- }
- if !opts.BlockingWait {
- if len(rr.addrs) == 0 {
- rr.mu.Unlock()
- err = Errorf(codes.Unavailable, "there is no address available")
- return
- }
- // Returns the next addr on rr.addrs for failfast RPCs.
- addr = rr.addrs[rr.next].addr
- rr.next++
- rr.mu.Unlock()
- return
- }
- // Wait on rr.waitCh for non-failfast RPCs.
- if rr.waitCh == nil {
- ch = make(chan struct{})
- rr.waitCh = ch
- } else {
- ch = rr.waitCh
- }
- rr.mu.Unlock()
- for {
- select {
- case <-ctx.Done():
- err = ctx.Err()
- return
- case <-ch:
- rr.mu.Lock()
- if rr.done {
- rr.mu.Unlock()
- err = ErrClientConnClosing
- return
- }
-
- if len(rr.addrs) > 0 {
- if rr.next >= len(rr.addrs) {
- rr.next = 0
- }
- next := rr.next
- for {
- a := rr.addrs[next]
- next = (next + 1) % len(rr.addrs)
- if a.connected {
- addr = a.addr
- rr.next = next
- rr.mu.Unlock()
- return
- }
- if next == rr.next {
- // Has iterated all the possible address but none is connected.
- break
- }
- }
- }
- // The newly added addr got removed by Down() again.
- if rr.waitCh == nil {
- ch = make(chan struct{})
- rr.waitCh = ch
- } else {
- ch = rr.waitCh
- }
- rr.mu.Unlock()
- }
- }
-}
-
-func (rr *roundRobin) Notify() <-chan []Address {
- return rr.addrCh
-}
-
-func (rr *roundRobin) Close() error {
- rr.mu.Lock()
- defer rr.mu.Unlock()
- rr.done = true
- if rr.w != nil {
- rr.w.Close()
- }
- if rr.waitCh != nil {
- close(rr.waitCh)
- rr.waitCh = nil
- }
- if rr.addrCh != nil {
- close(rr.addrCh)
- }
- return nil
-}
diff --git a/vendor/google.golang.org/grpc/call.go b/vendor/google.golang.org/grpc/call.go
deleted file mode 100644
index 772c817ed..000000000
--- a/vendor/google.golang.org/grpc/call.go
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-package grpc
-
-import (
- "bytes"
- "io"
- "math"
- "time"
-
- "golang.org/x/net/context"
- "golang.org/x/net/trace"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/transport"
-)
-
-// recvResponse receives and parses an RPC response.
-// On error, it returns the error and indicates whether the call should be retried.
-//
-// TODO(zhaoq): Check whether the received message sequence is valid.
-func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) {
- // Try to acquire header metadata from the server if there is any.
- defer func() {
- if err != nil {
- if _, ok := err.(transport.ConnectionError); !ok {
- t.CloseStream(stream, err)
- }
- }
- }()
- c.headerMD, err = stream.Header()
- if err != nil {
- return
- }
- p := &parser{r: stream}
- for {
- if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32); err != nil {
- if err == io.EOF {
- break
- }
- return
- }
- }
- c.trailerMD = stream.Trailer()
- return nil
-}
-
-// sendRequest writes out various information of an RPC such as Context and Message.
-func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHdr *transport.CallHdr, t transport.ClientTransport, args interface{}, opts *transport.Options) (_ *transport.Stream, err error) {
- stream, err := t.NewStream(ctx, callHdr)
- if err != nil {
- return nil, err
- }
- defer func() {
- if err != nil {
- // If err is connection error, t will be closed, no need to close stream here.
- if _, ok := err.(transport.ConnectionError); !ok {
- t.CloseStream(stream, err)
- }
- }
- }()
- var cbuf *bytes.Buffer
- if compressor != nil {
- cbuf = new(bytes.Buffer)
- }
- outBuf, err := encode(codec, args, compressor, cbuf)
- if err != nil {
- return nil, Errorf(codes.Internal, "grpc: %v", err)
- }
- err = t.Write(stream, outBuf, opts)
- // t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method
- // does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following
- // recvResponse to get the final status.
- if err != nil && err != io.EOF {
- return nil, err
- }
- // Sent successfully.
- return stream, nil
-}
-
-// Invoke sends the RPC request on the wire and returns after response is received.
-// Invoke is called by generated code. Also users can call Invoke directly when it
-// is really needed in their use cases.
-func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) error {
- if cc.dopts.unaryInt != nil {
- return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
- }
- return invoke(ctx, method, args, reply, cc, opts...)
-}
-
-func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) {
- c := defaultCallInfo
- for _, o := range opts {
- if err := o.before(&c); err != nil {
- return toRPCErr(err)
- }
- }
- defer func() {
- for _, o := range opts {
- o.after(&c)
- }
- }()
- if EnableTracing {
- c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
- defer c.traceInfo.tr.Finish()
- c.traceInfo.firstLine.client = true
- if deadline, ok := ctx.Deadline(); ok {
- c.traceInfo.firstLine.deadline = deadline.Sub(time.Now())
- }
- c.traceInfo.tr.LazyLog(&c.traceInfo.firstLine, false)
- // TODO(dsymonds): Arrange for c.traceInfo.firstLine.remoteAddr to be set.
- defer func() {
- if err != nil {
- c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
- c.traceInfo.tr.SetError()
- }
- }()
- }
- topts := &transport.Options{
- Last: true,
- Delay: false,
- }
- for {
- var (
- err error
- t transport.ClientTransport
- stream *transport.Stream
- // Record the put handler from Balancer.Get(...). It is called once the
- // RPC has completed or failed.
- put func()
- )
- // TODO(zhaoq): Need a formal spec of fail-fast.
- callHdr := &transport.CallHdr{
- Host: cc.authority,
- Method: method,
- }
- if cc.dopts.cp != nil {
- callHdr.SendCompress = cc.dopts.cp.Type()
- }
- gopts := BalancerGetOptions{
- BlockingWait: !c.failFast,
- }
- t, put, err = cc.getTransport(ctx, gopts)
- if err != nil {
- // TODO(zhaoq): Probably revisit the error handling.
- if _, ok := err.(*rpcError); ok {
- return err
- }
- if err == errConnClosing || err == errConnUnavailable {
- if c.failFast {
- return Errorf(codes.Unavailable, "%v", err)
- }
- continue
- }
- // All the other errors are treated as Internal errors.
- return Errorf(codes.Internal, "%v", err)
- }
- if c.traceInfo.tr != nil {
- c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)
- }
- stream, err = sendRequest(ctx, cc.dopts.codec, cc.dopts.cp, callHdr, t, args, topts)
- if err != nil {
- if put != nil {
- put()
- put = nil
- }
- // Retry a non-failfast RPC when
- // i) there is a connection error; or
- // ii) the server started to drain before this RPC was initiated.
- if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
- if c.failFast {
- return toRPCErr(err)
- }
- continue
- }
- return toRPCErr(err)
- }
- err = recvResponse(cc.dopts, t, &c, stream, reply)
- if err != nil {
- if put != nil {
- put()
- put = nil
- }
- if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
- if c.failFast {
- return toRPCErr(err)
- }
- continue
- }
- return toRPCErr(err)
- }
- if c.traceInfo.tr != nil {
- c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true)
- }
- t.CloseStream(stream, nil)
- if put != nil {
- put()
- put = nil
- }
- return Errorf(stream.StatusCode(), "%s", stream.StatusDesc())
- }
-}
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
deleted file mode 100644
index 61674729a..000000000
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ /dev/null
@@ -1,888 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-package grpc
-
-import (
- "errors"
- "fmt"
- "net"
- "strings"
- "sync"
- "time"
-
- "golang.org/x/net/context"
- "golang.org/x/net/trace"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/transport"
-)
-
-var (
- // ErrClientConnClosing indicates that the operation is illegal because
- // the ClientConn is closing.
- ErrClientConnClosing = errors.New("grpc: the client connection is closing")
- // ErrClientConnTimeout indicates that the ClientConn cannot establish the
- // underlying connections within the specified timeout.
- ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
-
- // errNoTransportSecurity indicates that there is no transport security
- // being set for ClientConn. Users should either set one or explicitly
- // call WithInsecure DialOption to disable security.
- errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
- // errTransportCredentialsMissing indicates that users want to transmit security
- // information (e.g., oauth2 token) which requires secure connection on an insecure
- // connection.
- errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
- // errCredentialsConflict indicates that grpc.WithTransportCredentials()
- // and grpc.WithInsecure() are both called for a connection.
- errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
- // errNetworkIO indicates that the connection is down due to some network I/O error.
- errNetworkIO = errors.New("grpc: failed with network I/O error")
- // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
- errConnDrain = errors.New("grpc: the connection is drained")
- // errConnClosing indicates that the connection is closing.
- errConnClosing = errors.New("grpc: the connection is closing")
- // errConnUnavailable indicates that the connection is unavailable.
- errConnUnavailable = errors.New("grpc: the connection is unavailable")
- errNoAddr = errors.New("grpc: there is no address available to dial")
- // minimum time to give a connection to complete
- minConnectTimeout = 20 * time.Second
-)
-
-// dialOptions configure a Dial call. dialOptions are set by the DialOption
-// values passed to Dial.
-type dialOptions struct {
- unaryInt UnaryClientInterceptor
- streamInt StreamClientInterceptor
- codec Codec
- cp Compressor
- dc Decompressor
- bs backoffStrategy
- balancer Balancer
- block bool
- insecure bool
- timeout time.Duration
- copts transport.ConnectOptions
-}
-
-// DialOption configures how we set up the connection.
-type DialOption func(*dialOptions)
-
-// WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
-func WithCodec(c Codec) DialOption {
- return func(o *dialOptions) {
- o.codec = c
- }
-}
-
-// WithCompressor returns a DialOption which sets a CompressorGenerator for generating message
-// compressor.
-func WithCompressor(cp Compressor) DialOption {
- return func(o *dialOptions) {
- o.cp = cp
- }
-}
-
-// WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating
-// message decompressor.
-func WithDecompressor(dc Decompressor) DialOption {
- return func(o *dialOptions) {
- o.dc = dc
- }
-}
-
-// WithBalancer returns a DialOption which sets a load balancer.
-func WithBalancer(b Balancer) DialOption {
- return func(o *dialOptions) {
- o.balancer = b
- }
-}
-
-// WithBackoffMaxDelay configures the dialer to use the provided maximum delay
-// when backing off after failed connection attempts.
-func WithBackoffMaxDelay(md time.Duration) DialOption {
- return WithBackoffConfig(BackoffConfig{MaxDelay: md})
-}
-
-// WithBackoffConfig configures the dialer to use the provided backoff
-// parameters after connection failures.
-//
-// Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
-// for use.
-func WithBackoffConfig(b BackoffConfig) DialOption {
- // Set defaults to ensure that provided BackoffConfig is valid and
- // unexported fields get default values.
- setDefaults(&b)
- return withBackoff(b)
-}
-
-// withBackoff sets the backoff strategy used for retries after a
-// failed connection attempt.
-//
-// This can be exported if arbitrary backoff strategies are allowed by gRPC.
-func withBackoff(bs backoffStrategy) DialOption {
- return func(o *dialOptions) {
- o.bs = bs
- }
-}
-
-// WithBlock returns a DialOption which makes caller of Dial blocks until the underlying
-// connection is up. Without this, Dial returns immediately and connecting the server
-// happens in background.
-func WithBlock() DialOption {
- return func(o *dialOptions) {
- o.block = true
- }
-}
-
-// WithInsecure returns a DialOption which disables transport security for this ClientConn.
-// Note that transport security is required unless WithInsecure is set.
-func WithInsecure() DialOption {
- return func(o *dialOptions) {
- o.insecure = true
- }
-}
-
-// WithTransportCredentials returns a DialOption which configures a
-// connection level security credentials (e.g., TLS/SSL).
-func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
- return func(o *dialOptions) {
- o.copts.TransportCredentials = creds
- }
-}
-
-// WithPerRPCCredentials returns a DialOption which sets
-// credentials which will place auth state on each outbound RPC.
-func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
- return func(o *dialOptions) {
- o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
- }
-}
-
-// WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
-// initially. This is valid if and only if WithBlock() is present.
-func WithTimeout(d time.Duration) DialOption {
- return func(o *dialOptions) {
- o.timeout = d
- }
-}
-
-// WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
-func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
- return func(o *dialOptions) {
- o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
- if deadline, ok := ctx.Deadline(); ok {
- return f(addr, deadline.Sub(time.Now()))
- }
- return f(addr, 0)
- }
- }
-}
-
-// WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
-func WithUserAgent(s string) DialOption {
- return func(o *dialOptions) {
- o.copts.UserAgent = s
- }
-}
-
-// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
-func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
- return func(o *dialOptions) {
- o.unaryInt = f
- }
-}
-
-// WithStreamInterceptor returns a DialOption that specifies the interceptor for streaming RPCs.
-func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
- return func(o *dialOptions) {
- o.streamInt = f
- }
-}
-
-// Dial creates a client connection to the given target.
-func Dial(target string, opts ...DialOption) (*ClientConn, error) {
- return DialContext(context.Background(), target, opts...)
-}
-
-// DialContext creates a client connection to the given target. ctx can be used to
-// cancel or expire the pending connecting. Once this function returns, the
-// cancellation and expiration of ctx will be noop. Users should call ClientConn.Close
-// to terminate all the pending operations after this function returns.
-// This is the EXPERIMENTAL API.
-func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
- cc := &ClientConn{
- target: target,
- conns: make(map[Address]*addrConn),
- }
- cc.ctx, cc.cancel = context.WithCancel(context.Background())
- defer func() {
- select {
- case <-ctx.Done():
- conn, err = nil, ctx.Err()
- default:
- }
-
- if err != nil {
- cc.Close()
- }
- }()
-
- for _, opt := range opts {
- opt(&cc.dopts)
- }
-
- // Set defaults.
- if cc.dopts.codec == nil {
- cc.dopts.codec = protoCodec{}
- }
- if cc.dopts.bs == nil {
- cc.dopts.bs = DefaultBackoffConfig
- }
- creds := cc.dopts.copts.TransportCredentials
- if creds != nil && creds.Info().ServerName != "" {
- cc.authority = creds.Info().ServerName
- } else {
- colonPos := strings.LastIndex(target, ":")
- if colonPos == -1 {
- colonPos = len(target)
- }
- cc.authority = target[:colonPos]
- }
- var ok bool
- waitC := make(chan error, 1)
- go func() {
- var addrs []Address
- if cc.dopts.balancer == nil {
- // Connect to target directly if balancer is nil.
- addrs = append(addrs, Address{Addr: target})
- } else {
- var credsClone credentials.TransportCredentials
- if creds != nil {
- credsClone = creds.Clone()
- }
- config := BalancerConfig{
- DialCreds: credsClone,
- }
- if err := cc.dopts.balancer.Start(target, config); err != nil {
- waitC <- err
- return
- }
- ch := cc.dopts.balancer.Notify()
- if ch == nil {
- // There is no name resolver installed.
- addrs = append(addrs, Address{Addr: target})
- } else {
- addrs, ok = <-ch
- if !ok || len(addrs) == 0 {
- waitC <- errNoAddr
- return
- }
- }
- }
- for _, a := range addrs {
- if err := cc.resetAddrConn(a, false, nil); err != nil {
- waitC <- err
- return
- }
- }
- close(waitC)
- }()
- var timeoutCh <-chan time.Time
- if cc.dopts.timeout > 0 {
- timeoutCh = time.After(cc.dopts.timeout)
- }
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- case err := <-waitC:
- if err != nil {
- return nil, err
- }
- case <-timeoutCh:
- return nil, ErrClientConnTimeout
- }
- // If balancer is nil or balancer.Notify() is nil, ok will be false here.
- // The lbWatcher goroutine will not be created.
- if ok {
- go cc.lbWatcher()
- }
- return cc, nil
-}
-
-// ConnectivityState indicates the state of a client connection.
-type ConnectivityState int
-
-const (
- // Idle indicates the ClientConn is idle.
- Idle ConnectivityState = iota
- // Connecting indicates the ClienConn is connecting.
- Connecting
- // Ready indicates the ClientConn is ready for work.
- Ready
- // TransientFailure indicates the ClientConn has seen a failure but expects to recover.
- TransientFailure
- // Shutdown indicates the ClientConn has started shutting down.
- Shutdown
-)
-
-func (s ConnectivityState) String() string {
- switch s {
- case Idle:
- return "IDLE"
- case Connecting:
- return "CONNECTING"
- case Ready:
- return "READY"
- case TransientFailure:
- return "TRANSIENT_FAILURE"
- case Shutdown:
- return "SHUTDOWN"
- default:
- panic(fmt.Sprintf("unknown connectivity state: %d", s))
- }
-}
-
-// ClientConn represents a client connection to an RPC server.
-type ClientConn struct {
- ctx context.Context
- cancel context.CancelFunc
-
- target string
- authority string
- dopts dialOptions
-
- mu sync.RWMutex
- conns map[Address]*addrConn
-}
-
-func (cc *ClientConn) lbWatcher() {
- for addrs := range cc.dopts.balancer.Notify() {
- var (
- add []Address // Addresses need to setup connections.
- del []*addrConn // Connections need to tear down.
- )
- cc.mu.Lock()
- for _, a := range addrs {
- if _, ok := cc.conns[a]; !ok {
- add = append(add, a)
- }
- }
- for k, c := range cc.conns {
- var keep bool
- for _, a := range addrs {
- if k == a {
- keep = true
- break
- }
- }
- if !keep {
- del = append(del, c)
- delete(cc.conns, c.addr)
- }
- }
- cc.mu.Unlock()
- for _, a := range add {
- cc.resetAddrConn(a, true, nil)
- }
- for _, c := range del {
- c.tearDown(errConnDrain)
- }
- }
-}
-
-// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
-// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
-// If tearDownErr is nil, errConnDrain will be used instead.
-func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error {
- ac := &addrConn{
- cc: cc,
- addr: addr,
- dopts: cc.dopts,
- }
- ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
- ac.stateCV = sync.NewCond(&ac.mu)
- if EnableTracing {
- ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
- }
- if !ac.dopts.insecure {
- if ac.dopts.copts.TransportCredentials == nil {
- return errNoTransportSecurity
- }
- } else {
- if ac.dopts.copts.TransportCredentials != nil {
- return errCredentialsConflict
- }
- for _, cd := range ac.dopts.copts.PerRPCCredentials {
- if cd.RequireTransportSecurity() {
- return errTransportCredentialsMissing
- }
- }
- }
- // Track ac in cc. This needs to be done before any getTransport(...) is called.
- cc.mu.Lock()
- if cc.conns == nil {
- cc.mu.Unlock()
- return ErrClientConnClosing
- }
- stale := cc.conns[ac.addr]
- cc.conns[ac.addr] = ac
- cc.mu.Unlock()
- if stale != nil {
- // There is an addrConn alive on ac.addr already. This could be due to
- // 1) a buggy Balancer notifies duplicated Addresses;
- // 2) goaway was received, a new ac will replace the old ac.
- // The old ac should be deleted from cc.conns, but the
- // underlying transport should drain rather than close.
- if tearDownErr == nil {
- // tearDownErr is nil if resetAddrConn is called by
- // 1) Dial
- // 2) lbWatcher
- // In both cases, the stale ac should drain, not close.
- stale.tearDown(errConnDrain)
- } else {
- stale.tearDown(tearDownErr)
- }
- }
- // skipWait may overwrite the decision in ac.dopts.block.
- if ac.dopts.block && !skipWait {
- if err := ac.resetTransport(false); err != nil {
- if err != errConnClosing {
- // Tear down ac and delete it from cc.conns.
- cc.mu.Lock()
- delete(cc.conns, ac.addr)
- cc.mu.Unlock()
- ac.tearDown(err)
- }
- if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
- return e.Origin()
- }
- return err
- }
- // Start to monitor the error status of transport.
- go ac.transportMonitor()
- } else {
- // Start a goroutine connecting to the server asynchronously.
- go func() {
- if err := ac.resetTransport(false); err != nil {
- grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
- if err != errConnClosing {
- // Keep this ac in cc.conns, to get the reason it's torn down.
- ac.tearDown(err)
- }
- return
- }
- ac.transportMonitor()
- }()
- }
- return nil
-}
-
-func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
- var (
- ac *addrConn
- ok bool
- put func()
- )
- if cc.dopts.balancer == nil {
- // If balancer is nil, there should be only one addrConn available.
- cc.mu.RLock()
- if cc.conns == nil {
- cc.mu.RUnlock()
- return nil, nil, toRPCErr(ErrClientConnClosing)
- }
- for _, ac = range cc.conns {
- // Break after the first iteration to get the first addrConn.
- ok = true
- break
- }
- cc.mu.RUnlock()
- } else {
- var (
- addr Address
- err error
- )
- addr, put, err = cc.dopts.balancer.Get(ctx, opts)
- if err != nil {
- return nil, nil, toRPCErr(err)
- }
- cc.mu.RLock()
- if cc.conns == nil {
- cc.mu.RUnlock()
- return nil, nil, toRPCErr(ErrClientConnClosing)
- }
- ac, ok = cc.conns[addr]
- cc.mu.RUnlock()
- }
- if !ok {
- if put != nil {
- put()
- }
- return nil, nil, errConnClosing
- }
- t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
- if err != nil {
- if put != nil {
- put()
- }
- return nil, nil, err
- }
- return t, put, nil
-}
-
-// Close tears down the ClientConn and all underlying connections.
-func (cc *ClientConn) Close() error {
- cc.cancel()
-
- cc.mu.Lock()
- if cc.conns == nil {
- cc.mu.Unlock()
- return ErrClientConnClosing
- }
- conns := cc.conns
- cc.conns = nil
- cc.mu.Unlock()
- if cc.dopts.balancer != nil {
- cc.dopts.balancer.Close()
- }
- for _, ac := range conns {
- ac.tearDown(ErrClientConnClosing)
- }
- return nil
-}
-
-// addrConn is a network connection to a given address.
-type addrConn struct {
- ctx context.Context
- cancel context.CancelFunc
-
- cc *ClientConn
- addr Address
- dopts dialOptions
- events trace.EventLog
-
- mu sync.Mutex
- state ConnectivityState
- stateCV *sync.Cond
- down func(error) // the handler called when a connection is down.
- // ready is closed and becomes nil when a new transport is up or failed
- // due to timeout.
- ready chan struct{}
- transport transport.ClientTransport
-
- // The reason this addrConn is torn down.
- tearDownErr error
-}
-
-// printf records an event in ac's event log, unless ac has been closed.
-// REQUIRES ac.mu is held.
-func (ac *addrConn) printf(format string, a ...interface{}) {
- if ac.events != nil {
- ac.events.Printf(format, a...)
- }
-}
-
-// errorf records an error in ac's event log, unless ac has been closed.
-// REQUIRES ac.mu is held.
-func (ac *addrConn) errorf(format string, a ...interface{}) {
- if ac.events != nil {
- ac.events.Errorf(format, a...)
- }
-}
-
-// getState returns the connectivity state of the Conn
-func (ac *addrConn) getState() ConnectivityState {
- ac.mu.Lock()
- defer ac.mu.Unlock()
- return ac.state
-}
-
-// waitForStateChange blocks until the state changes to something other than the sourceState.
-func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
- ac.mu.Lock()
- defer ac.mu.Unlock()
- if sourceState != ac.state {
- return ac.state, nil
- }
- done := make(chan struct{})
- var err error
- go func() {
- select {
- case <-ctx.Done():
- ac.mu.Lock()
- err = ctx.Err()
- ac.stateCV.Broadcast()
- ac.mu.Unlock()
- case <-done:
- }
- }()
- defer close(done)
- for sourceState == ac.state {
- ac.stateCV.Wait()
- if err != nil {
- return ac.state, err
- }
- }
- return ac.state, nil
-}
-
-func (ac *addrConn) resetTransport(closeTransport bool) error {
- for retries := 0; ; retries++ {
- ac.mu.Lock()
- ac.printf("connecting")
- if ac.state == Shutdown {
- // ac.tearDown(...) has been invoked.
- ac.mu.Unlock()
- return errConnClosing
- }
- if ac.down != nil {
- ac.down(downErrorf(false, true, "%v", errNetworkIO))
- ac.down = nil
- }
- ac.state = Connecting
- ac.stateCV.Broadcast()
- t := ac.transport
- ac.mu.Unlock()
- if closeTransport && t != nil {
- t.Close()
- }
- sleepTime := ac.dopts.bs.backoff(retries)
- timeout := minConnectTimeout
- if timeout < sleepTime {
- timeout = sleepTime
- }
- ctx, cancel := context.WithTimeout(ac.ctx, timeout)
- connectTime := time.Now()
- sinfo := transport.TargetInfo{
- Addr: ac.addr.Addr,
- Metadata: ac.addr.Metadata,
- }
- newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
- if err != nil {
- cancel()
-
- if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
- return err
- }
- grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
- ac.mu.Lock()
- if ac.state == Shutdown {
- // ac.tearDown(...) has been invoked.
- ac.mu.Unlock()
- return errConnClosing
- }
- ac.errorf("transient failure: %v", err)
- ac.state = TransientFailure
- ac.stateCV.Broadcast()
- if ac.ready != nil {
- close(ac.ready)
- ac.ready = nil
- }
- ac.mu.Unlock()
- closeTransport = false
- select {
- case <-time.After(sleepTime - time.Since(connectTime)):
- case <-ac.ctx.Done():
- return ac.ctx.Err()
- }
- continue
- }
- ac.mu.Lock()
- ac.printf("ready")
- if ac.state == Shutdown {
- // ac.tearDown(...) has been invoked.
- ac.mu.Unlock()
- newTransport.Close()
- return errConnClosing
- }
- ac.state = Ready
- ac.stateCV.Broadcast()
- ac.transport = newTransport
- if ac.ready != nil {
- close(ac.ready)
- ac.ready = nil
- }
- if ac.cc.dopts.balancer != nil {
- ac.down = ac.cc.dopts.balancer.Up(ac.addr)
- }
- ac.mu.Unlock()
- return nil
- }
-}
-
-// Run in a goroutine to track the error in transport and create the
-// new transport if an error happens. It returns when the channel is closing.
-func (ac *addrConn) transportMonitor() {
- for {
- ac.mu.Lock()
- t := ac.transport
- ac.mu.Unlock()
- select {
- // This is needed to detect the teardown when
- // the addrConn is idle (i.e., no RPC in flight).
- case <-ac.ctx.Done():
- select {
- case <-t.Error():
- t.Close()
- default:
- }
- return
- case <-t.GoAway():
- // If GoAway happens without any network I/O error, ac is closed without shutting down the
- // underlying transport (the transport will be closed when all the pending RPCs finished or
- // failed.).
- // If GoAway and some network I/O error happen concurrently, ac and its underlying transport
- // are closed.
- // In both cases, a new ac is created.
- select {
- case <-t.Error():
- ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
- default:
- ac.cc.resetAddrConn(ac.addr, true, errConnDrain)
- }
- return
- case <-t.Error():
- select {
- case <-ac.ctx.Done():
- t.Close()
- return
- case <-t.GoAway():
- ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
- return
- default:
- }
- ac.mu.Lock()
- if ac.state == Shutdown {
- // ac has been shutdown.
- ac.mu.Unlock()
- return
- }
- ac.state = TransientFailure
- ac.stateCV.Broadcast()
- ac.mu.Unlock()
- if err := ac.resetTransport(true); err != nil {
- ac.mu.Lock()
- ac.printf("transport exiting: %v", err)
- ac.mu.Unlock()
- grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err)
- if err != errConnClosing {
- // Keep this ac in cc.conns, to get the reason it's torn down.
- ac.tearDown(err)
- }
- return
- }
- }
- }
-}
-
-// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
-// iv) transport is in TransientFailure and there is a balancer/failfast is true.
-func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
- for {
- ac.mu.Lock()
- switch {
- case ac.state == Shutdown:
- if failfast || !hasBalancer {
- // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
- err := ac.tearDownErr
- ac.mu.Unlock()
- return nil, err
- }
- ac.mu.Unlock()
- return nil, errConnClosing
- case ac.state == Ready:
- ct := ac.transport
- ac.mu.Unlock()
- return ct, nil
- case ac.state == TransientFailure:
- if failfast || hasBalancer {
- ac.mu.Unlock()
- return nil, errConnUnavailable
- }
- }
- ready := ac.ready
- if ready == nil {
- ready = make(chan struct{})
- ac.ready = ready
- }
- ac.mu.Unlock()
- select {
- case <-ctx.Done():
- return nil, toRPCErr(ctx.Err())
- // Wait until the new transport is ready or failed.
- case <-ready:
- }
- }
-}
-
-// tearDown starts to tear down the addrConn.
-// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
-// some edge cases (e.g., the caller opens and closes many addrConn's in a
-// tight loop.
-// tearDown doesn't remove ac from ac.cc.conns.
-func (ac *addrConn) tearDown(err error) {
- ac.cancel()
-
- ac.mu.Lock()
- defer ac.mu.Unlock()
- if ac.down != nil {
- ac.down(downErrorf(false, false, "%v", err))
- ac.down = nil
- }
- if err == errConnDrain && ac.transport != nil {
- // GracefulClose(...) may be executed multiple times when
- // i) receiving multiple GoAway frames from the server; or
- // ii) there are concurrent name resolver/Balancer triggered
- // address removal and GoAway.
- ac.transport.GracefulClose()
- }
- if ac.state == Shutdown {
- return
- }
- ac.state = Shutdown
- ac.tearDownErr = err
- ac.stateCV.Broadcast()
- if ac.events != nil {
- ac.events.Finish()
- ac.events = nil
- }
- if ac.ready != nil {
- close(ac.ready)
- ac.ready = nil
- }
- if ac.transport != nil && err != errConnDrain {
- ac.transport.Close()
- }
- return
-}
diff --git a/vendor/google.golang.org/grpc/codes/code_string.go b/vendor/google.golang.org/grpc/codes/code_string.go
deleted file mode 100644
index e6762d084..000000000
--- a/vendor/google.golang.org/grpc/codes/code_string.go
+++ /dev/null
@@ -1,16 +0,0 @@
-// generated by stringer -type=Code; DO NOT EDIT
-
-package codes
-
-import "fmt"
-
-const _Code_name = "OKCanceledUnknownInvalidArgumentDeadlineExceededNotFoundAlreadyExistsPermissionDeniedResourceExhaustedFailedPreconditionAbortedOutOfRangeUnimplementedInternalUnavailableDataLossUnauthenticated"
-
-var _Code_index = [...]uint8{0, 2, 10, 17, 32, 48, 56, 69, 85, 102, 120, 127, 137, 150, 158, 169, 177, 192}
-
-func (i Code) String() string {
- if i+1 >= Code(len(_Code_index)) {
- return fmt.Sprintf("Code(%d)", i)
- }
- return _Code_name[_Code_index[i]:_Code_index[i+1]]
-}
diff --git a/vendor/google.golang.org/grpc/codes/codes.go b/vendor/google.golang.org/grpc/codes/codes.go
deleted file mode 100644
index e14b464ac..000000000
--- a/vendor/google.golang.org/grpc/codes/codes.go
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-// Package codes defines the canonical error codes used by gRPC. It is
-// consistent across various languages.
-package codes // import "google.golang.org/grpc/codes"
-
-// A Code is an unsigned 32-bit error code as defined in the gRPC spec.
-type Code uint32
-
-//go:generate stringer -type=Code
-
-const (
- // OK is returned on success.
- OK Code = 0
-
- // Canceled indicates the operation was cancelled (typically by the caller).
- Canceled Code = 1
-
- // Unknown error. An example of where this error may be returned is
- // if a Status value received from another address space belongs to
- // an error-space that is not known in this address space. Also
- // errors raised by APIs that do not return enough error information
- // may be converted to this error.
- Unknown Code = 2
-
- // InvalidArgument indicates client specified an invalid argument.
- // Note that this differs from FailedPrecondition. It indicates arguments
- // that are problematic regardless of the state of the system
- // (e.g., a malformed file name).
- InvalidArgument Code = 3
-
- // DeadlineExceeded means operation expired before completion.
- // For operations that change the state of the system, this error may be
- // returned even if the operation has completed successfully. For
- // example, a successful response from a server could have been delayed
- // long enough for the deadline to expire.
- DeadlineExceeded Code = 4
-
- // NotFound means some requested entity (e.g., file or directory) was
- // not found.
- NotFound Code = 5
-
- // AlreadyExists means an attempt to create an entity failed because one
- // already exists.
- AlreadyExists Code = 6
-
- // PermissionDenied indicates the caller does not have permission to
- // execute the specified operation. It must not be used for rejections
- // caused by exhausting some resource (use ResourceExhausted
- // instead for those errors). It must not be
- // used if the caller cannot be identified (use Unauthenticated
- // instead for those errors).
- PermissionDenied Code = 7
-
- // Unauthenticated indicates the request does not have valid
- // authentication credentials for the operation.
- Unauthenticated Code = 16
-
- // ResourceExhausted indicates some resource has been exhausted, perhaps
- // a per-user quota, or perhaps the entire file system is out of space.
- ResourceExhausted Code = 8
-
- // FailedPrecondition indicates operation was rejected because the
- // system is not in a state required for the operation's execution.
- // For example, directory to be deleted may be non-empty, an rmdir
- // operation is applied to a non-directory, etc.
- //
- // A litmus test that may help a service implementor in deciding
- // between FailedPrecondition, Aborted, and Unavailable:
- // (a) Use Unavailable if the client can retry just the failing call.
- // (b) Use Aborted if the client should retry at a higher-level
- // (e.g., restarting a read-modify-write sequence).
- // (c) Use FailedPrecondition if the client should not retry until
- // the system state has been explicitly fixed. E.g., if an "rmdir"
- // fails because the directory is non-empty, FailedPrecondition
- // should be returned since the client should not retry unless
- // they have first fixed up the directory by deleting files from it.
- // (d) Use FailedPrecondition if the client performs conditional
- // REST Get/Update/Delete on a resource and the resource on the
- // server does not match the condition. E.g., conflicting
- // read-modify-write on the same resource.
- FailedPrecondition Code = 9
-
- // Aborted indicates the operation was aborted, typically due to a
- // concurrency issue like sequencer check failures, transaction aborts,
- // etc.
- //
- // See litmus test above for deciding between FailedPrecondition,
- // Aborted, and Unavailable.
- Aborted Code = 10
-
- // OutOfRange means operation was attempted past the valid range.
- // E.g., seeking or reading past end of file.
- //
- // Unlike InvalidArgument, this error indicates a problem that may
- // be fixed if the system state changes. For example, a 32-bit file
- // system will generate InvalidArgument if asked to read at an
- // offset that is not in the range [0,2^32-1], but it will generate
- // OutOfRange if asked to read from an offset past the current
- // file size.
- //
- // There is a fair bit of overlap between FailedPrecondition and
- // OutOfRange. We recommend using OutOfRange (the more specific
- // error) when it applies so that callers who are iterating through
- // a space can easily look for an OutOfRange error to detect when
- // they are done.
- OutOfRange Code = 11
-
- // Unimplemented indicates operation is not implemented or not
- // supported/enabled in this service.
- Unimplemented Code = 12
-
- // Internal errors. Means some invariants expected by underlying
- // system has been broken. If you see one of these errors,
- // something is very broken.
- Internal Code = 13
-
- // Unavailable indicates the service is currently unavailable.
- // This is a most likely a transient condition and may be corrected
- // by retrying with a backoff.
- //
- // See litmus test above for deciding between FailedPrecondition,
- // Aborted, and Unavailable.
- Unavailable Code = 14
-
- // DataLoss indicates unrecoverable data loss or corruption.
- DataLoss Code = 15
-)
diff --git a/vendor/google.golang.org/grpc/credentials/credentials.go b/vendor/google.golang.org/grpc/credentials/credentials.go
deleted file mode 100644
index 5555ef024..000000000
--- a/vendor/google.golang.org/grpc/credentials/credentials.go
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-// Package credentials implements various credentials supported by gRPC library,
-// which encapsulate all the state needed by a client to authenticate with a
-// server and make various assertions, e.g., about the client's identity, role,
-// or whether it is authorized to make a particular call.
-package credentials // import "google.golang.org/grpc/credentials"
-
-import (
- "crypto/tls"
- "crypto/x509"
- "errors"
- "fmt"
- "io/ioutil"
- "net"
- "strings"
-
- "golang.org/x/net/context"
-)
-
-var (
- // alpnProtoStr are the specified application level protocols for gRPC.
- alpnProtoStr = []string{"h2"}
-)
-
-// PerRPCCredentials defines the common interface for the credentials which need to
-// attach security information to every RPC (e.g., oauth2).
-type PerRPCCredentials interface {
- // GetRequestMetadata gets the current request metadata, refreshing
- // tokens if required. This should be called by the transport layer on
- // each request, and the data should be populated in headers or other
- // context. uri is the URI of the entry point for the request. When
- // supported by the underlying implementation, ctx can be used for
- // timeout and cancellation.
- // TODO(zhaoq): Define the set of the qualified keys instead of leaving
- // it as an arbitrary string.
- GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)
- // RequireTransportSecurity indicates whether the credentials requires
- // transport security.
- RequireTransportSecurity() bool
-}
-
-// ProtocolInfo provides information regarding the gRPC wire protocol version,
-// security protocol, security protocol version in use, server name, etc.
-type ProtocolInfo struct {
- // ProtocolVersion is the gRPC wire protocol version.
- ProtocolVersion string
- // SecurityProtocol is the security protocol in use.
- SecurityProtocol string
- // SecurityVersion is the security protocol version.
- SecurityVersion string
- // ServerName is the user-configured server name.
- ServerName string
-}
-
-// AuthInfo defines the common interface for the auth information the users are interested in.
-type AuthInfo interface {
- AuthType() string
-}
-
-var (
- // ErrConnDispatched indicates that rawConn has been dispatched out of gRPC
- // and the caller should not close rawConn.
- ErrConnDispatched = errors.New("credentials: rawConn is dispatched out of gRPC")
-)
-
-// TransportCredentials defines the common interface for all the live gRPC wire
-// protocols and supported transport security protocols (e.g., TLS, SSL).
-type TransportCredentials interface {
- // ClientHandshake does the authentication handshake specified by the corresponding
- // authentication protocol on rawConn for clients. It returns the authenticated
- // connection and the corresponding auth information about the connection.
- // Implementations must use the provided context to implement timely cancellation.
- ClientHandshake(context.Context, string, net.Conn) (net.Conn, AuthInfo, error)
- // ServerHandshake does the authentication handshake for servers. It returns
- // the authenticated connection and the corresponding auth information about
- // the connection.
- ServerHandshake(net.Conn) (net.Conn, AuthInfo, error)
- // Info provides the ProtocolInfo of this TransportCredentials.
- Info() ProtocolInfo
- // Clone makes a copy of this TransportCredentials.
- Clone() TransportCredentials
- // OverrideServerName overrides the server name used to verify the hostname on the returned certificates from the server.
- // gRPC internals also use it to override the virtual hosting name if it is set.
- // It must be called before dialing. Currently, this is only used by grpclb.
- OverrideServerName(string) error
-}
-
-// TLSInfo contains the auth information for a TLS authenticated connection.
-// It implements the AuthInfo interface.
-type TLSInfo struct {
- State tls.ConnectionState
-}
-
-// AuthType returns the type of TLSInfo as a string.
-func (t TLSInfo) AuthType() string {
- return "tls"
-}
-
-// tlsCreds is the credentials required for authenticating a connection using TLS.
-type tlsCreds struct {
- // TLS configuration
- config *tls.Config
-}
-
-func (c tlsCreds) Info() ProtocolInfo {
- return ProtocolInfo{
- SecurityProtocol: "tls",
- SecurityVersion: "1.2",
- ServerName: c.config.ServerName,
- }
-}
-
-func (c *tlsCreds) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (_ net.Conn, _ AuthInfo, err error) {
- // use local cfg to avoid clobbering ServerName if using multiple endpoints
- cfg := cloneTLSConfig(c.config)
- if cfg.ServerName == "" {
- colonPos := strings.LastIndex(addr, ":")
- if colonPos == -1 {
- colonPos = len(addr)
- }
- cfg.ServerName = addr[:colonPos]
- }
- conn := tls.Client(rawConn, cfg)
- errChannel := make(chan error, 1)
- go func() {
- errChannel <- conn.Handshake()
- }()
- select {
- case err := <-errChannel:
- if err != nil {
- return nil, nil, err
- }
- case <-ctx.Done():
- return nil, nil, ctx.Err()
- }
- // TODO(zhaoq): Omit the auth info for client now. It is more for
- // information than anything else.
- return conn, nil, nil
-}
-
-func (c *tlsCreds) ServerHandshake(rawConn net.Conn) (net.Conn, AuthInfo, error) {
- conn := tls.Server(rawConn, c.config)
- if err := conn.Handshake(); err != nil {
- return nil, nil, err
- }
- return conn, TLSInfo{conn.ConnectionState()}, nil
-}
-
-func (c *tlsCreds) Clone() TransportCredentials {
- return NewTLS(c.config)
-}
-
-func (c *tlsCreds) OverrideServerName(serverNameOverride string) error {
- c.config.ServerName = serverNameOverride
- return nil
-}
-
-// NewTLS uses c to construct a TransportCredentials based on TLS.
-func NewTLS(c *tls.Config) TransportCredentials {
- tc := &tlsCreds{cloneTLSConfig(c)}
- tc.config.NextProtos = alpnProtoStr
- return tc
-}
-
-// NewClientTLSFromCert constructs a TLS from the input certificate for client.
-// serverNameOverride is for testing only. If set to a non empty string,
-// it will override the virtual host name of authority (e.g. :authority header field) in requests.
-func NewClientTLSFromCert(cp *x509.CertPool, serverNameOverride string) TransportCredentials {
- return NewTLS(&tls.Config{ServerName: serverNameOverride, RootCAs: cp})
-}
-
-// NewClientTLSFromFile constructs a TLS from the input certificate file for client.
-// serverNameOverride is for testing only. If set to a non empty string,
-// it will override the virtual host name of authority (e.g. :authority header field) in requests.
-func NewClientTLSFromFile(certFile, serverNameOverride string) (TransportCredentials, error) {
- b, err := ioutil.ReadFile(certFile)
- if err != nil {
- return nil, err
- }
- cp := x509.NewCertPool()
- if !cp.AppendCertsFromPEM(b) {
- return nil, fmt.Errorf("credentials: failed to append certificates")
- }
- return NewTLS(&tls.Config{ServerName: serverNameOverride, RootCAs: cp}), nil
-}
-
-// NewServerTLSFromCert constructs a TLS from the input certificate for server.
-func NewServerTLSFromCert(cert *tls.Certificate) TransportCredentials {
- return NewTLS(&tls.Config{Certificates: []tls.Certificate{*cert}})
-}
-
-// NewServerTLSFromFile constructs a TLS from the input certificate file and key
-// file for server.
-func NewServerTLSFromFile(certFile, keyFile string) (TransportCredentials, error) {
- cert, err := tls.LoadX509KeyPair(certFile, keyFile)
- if err != nil {
- return nil, err
- }
- return NewTLS(&tls.Config{Certificates: []tls.Certificate{cert}}), nil
-}
diff --git a/vendor/google.golang.org/grpc/credentials/credentials_util_go17.go b/vendor/google.golang.org/grpc/credentials/credentials_util_go17.go
deleted file mode 100644
index 9647b9ec8..000000000
--- a/vendor/google.golang.org/grpc/credentials/credentials_util_go17.go
+++ /dev/null
@@ -1,76 +0,0 @@
-// +build go1.7
-
-/*
- *
- * Copyright 2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-package credentials
-
-import (
- "crypto/tls"
-)
-
-// cloneTLSConfig returns a shallow clone of the exported
-// fields of cfg, ignoring the unexported sync.Once, which
-// contains a mutex and must not be copied.
-//
-// If cfg is nil, a new zero tls.Config is returned.
-//
-// TODO replace this function with official clone function.
-func cloneTLSConfig(cfg *tls.Config) *tls.Config {
- if cfg == nil {
- return &tls.Config{}
- }
- return &tls.Config{
- Rand: cfg.Rand,
- Time: cfg.Time,
- Certificates: cfg.Certificates,
- NameToCertificate: cfg.NameToCertificate,
- GetCertificate: cfg.GetCertificate,
- RootCAs: cfg.RootCAs,
- NextProtos: cfg.NextProtos,
- ServerName: cfg.ServerName,
- ClientAuth: cfg.ClientAuth,
- ClientCAs: cfg.ClientCAs,
- InsecureSkipVerify: cfg.InsecureSkipVerify,
- CipherSuites: cfg.CipherSuites,
- PreferServerCipherSuites: cfg.PreferServerCipherSuites,
- SessionTicketsDisabled: cfg.SessionTicketsDisabled,
- SessionTicketKey: cfg.SessionTicketKey,
- ClientSessionCache: cfg.ClientSessionCache,
- MinVersion: cfg.MinVersion,
- MaxVersion: cfg.MaxVersion,
- CurvePreferences: cfg.CurvePreferences,
- DynamicRecordSizingDisabled: cfg.DynamicRecordSizingDisabled,
- Renegotiation: cfg.Renegotiation,
- }
-}
diff --git a/vendor/google.golang.org/grpc/credentials/credentials_util_pre_go17.go b/vendor/google.golang.org/grpc/credentials/credentials_util_pre_go17.go
deleted file mode 100644
index 09b8d12c7..000000000
--- a/vendor/google.golang.org/grpc/credentials/credentials_util_pre_go17.go
+++ /dev/null
@@ -1,74 +0,0 @@
-// +build !go1.7
-
-/*
- *
- * Copyright 2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-package credentials
-
-import (
- "crypto/tls"
-)
-
-// cloneTLSConfig returns a shallow clone of the exported
-// fields of cfg, ignoring the unexported sync.Once, which
-// contains a mutex and must not be copied.
-//
-// If cfg is nil, a new zero tls.Config is returned.
-//
-// TODO replace this function with official clone function.
-func cloneTLSConfig(cfg *tls.Config) *tls.Config {
- if cfg == nil {
- return &tls.Config{}
- }
- return &tls.Config{
- Rand: cfg.Rand,
- Time: cfg.Time,
- Certificates: cfg.Certificates,
- NameToCertificate: cfg.NameToCertificate,
- GetCertificate: cfg.GetCertificate,
- RootCAs: cfg.RootCAs,
- NextProtos: cfg.NextProtos,
- ServerName: cfg.ServerName,
- ClientAuth: cfg.ClientAuth,
- ClientCAs: cfg.ClientCAs,
- InsecureSkipVerify: cfg.InsecureSkipVerify,
- CipherSuites: cfg.CipherSuites,
- PreferServerCipherSuites: cfg.PreferServerCipherSuites,
- SessionTicketsDisabled: cfg.SessionTicketsDisabled,
- SessionTicketKey: cfg.SessionTicketKey,
- ClientSessionCache: cfg.ClientSessionCache,
- MinVersion: cfg.MinVersion,
- MaxVersion: cfg.MaxVersion,
- CurvePreferences: cfg.CurvePreferences,
- }
-}
diff --git a/vendor/google.golang.org/grpc/doc.go b/vendor/google.golang.org/grpc/doc.go
deleted file mode 100644
index a35f21885..000000000
--- a/vendor/google.golang.org/grpc/doc.go
+++ /dev/null
@@ -1,6 +0,0 @@
-/*
-Package grpc implements an RPC system called gRPC.
-
-See www.grpc.io for more information about gRPC.
-*/
-package grpc // import "google.golang.org/grpc"
diff --git a/vendor/google.golang.org/grpc/grpclog/logger.go b/vendor/google.golang.org/grpc/grpclog/logger.go
deleted file mode 100644
index 3b2933079..000000000
--- a/vendor/google.golang.org/grpc/grpclog/logger.go
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-/*
-Package grpclog defines logging for grpc.
-*/
-package grpclog // import "google.golang.org/grpc/grpclog"
-
-import (
- "log"
- "os"
-)
-
-// Use golang's standard logger by default.
-// Access is not mutex-protected: do not modify except in init()
-// functions.
-var logger Logger = log.New(os.Stderr, "", log.LstdFlags)
-
-// Logger mimics golang's standard Logger as an interface.
-type Logger interface {
- Fatal(args ...interface{})
- Fatalf(format string, args ...interface{})
- Fatalln(args ...interface{})
- Print(args ...interface{})
- Printf(format string, args ...interface{})
- Println(args ...interface{})
-}
-
-// SetLogger sets the logger that is used in grpc. Call only from
-// init() functions.
-func SetLogger(l Logger) {
- logger = l
-}
-
-// Fatal is equivalent to Print() followed by a call to os.Exit() with a non-zero exit code.
-func Fatal(args ...interface{}) {
- logger.Fatal(args...)
-}
-
-// Fatalf is equivalent to Printf() followed by a call to os.Exit() with a non-zero exit code.
-func Fatalf(format string, args ...interface{}) {
- logger.Fatalf(format, args...)
-}
-
-// Fatalln is equivalent to Println() followed by a call to os.Exit()) with a non-zero exit code.
-func Fatalln(args ...interface{}) {
- logger.Fatalln(args...)
-}
-
-// Print prints to the logger. Arguments are handled in the manner of fmt.Print.
-func Print(args ...interface{}) {
- logger.Print(args...)
-}
-
-// Printf prints to the logger. Arguments are handled in the manner of fmt.Printf.
-func Printf(format string, args ...interface{}) {
- logger.Printf(format, args...)
-}
-
-// Println prints to the logger. Arguments are handled in the manner of fmt.Println.
-func Println(args ...interface{}) {
- logger.Println(args...)
-}
diff --git a/vendor/google.golang.org/grpc/interceptor.go b/vendor/google.golang.org/grpc/interceptor.go
deleted file mode 100644
index 8d932efed..000000000
--- a/vendor/google.golang.org/grpc/interceptor.go
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- *
- * Copyright 2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-package grpc
-
-import (
- "golang.org/x/net/context"
-)
-
-// UnaryInvoker is called by UnaryClientInterceptor to complete RPCs.
-type UnaryInvoker func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error
-
-// UnaryClientInterceptor intercepts the execution of a unary RPC on the client. inovker is the handler to complete the RPC
-// and it is the responsibility of the interceptor to call it.
-// This is the EXPERIMENTAL API.
-type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
-
-// Streamer is called by StreamClientInterceptor to create a ClientStream.
-type Streamer func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error)
-
-// StreamClientInterceptor intercepts the creation of ClientStream. It may return a custom ClientStream to intercept all I/O
-// operations. streamer is the handlder to create a ClientStream and it is the responsibility of the interceptor to call it.
-// This is the EXPERIMENTAL API.
-type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
-
-// UnaryServerInfo consists of various information about a unary RPC on
-// server side. All per-rpc information may be mutated by the interceptor.
-type UnaryServerInfo struct {
- // Server is the service implementation the user provides. This is read-only.
- Server interface{}
- // FullMethod is the full RPC method string, i.e., /package.service/method.
- FullMethod string
-}
-
-// UnaryHandler defines the handler invoked by UnaryServerInterceptor to complete the normal
-// execution of a unary RPC.
-type UnaryHandler func(ctx context.Context, req interface{}) (interface{}, error)
-
-// UnaryServerInterceptor provides a hook to intercept the execution of a unary RPC on the server. info
-// contains all the information of this RPC the interceptor can operate on. And handler is the wrapper
-// of the service method implementation. It is the responsibility of the interceptor to invoke handler
-// to complete the RPC.
-type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
-
-// StreamServerInfo consists of various information about a streaming RPC on
-// server side. All per-rpc information may be mutated by the interceptor.
-type StreamServerInfo struct {
- // FullMethod is the full RPC method string, i.e., /package.service/method.
- FullMethod string
- // IsClientStream indicates whether the RPC is a client streaming RPC.
- IsClientStream bool
- // IsServerStream indicates whether the RPC is a server streaming RPC.
- IsServerStream bool
-}
-
-// StreamServerInterceptor provides a hook to intercept the execution of a streaming RPC on the server.
-// info contains all the information of this RPC the interceptor can operate on. And handler is the
-// service method implementation. It is the responsibility of the interceptor to invoke handler to
-// complete the RPC.
-type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go
deleted file mode 100644
index 5489143a8..000000000
--- a/vendor/google.golang.org/grpc/internal/internal.go
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-// Package internal contains gRPC-internal code for testing, to avoid polluting
-// the godoc of the top-level grpc package.
-package internal
-
-// TestingCloseConns closes all existing transports but keeps
-// grpcServer.lis accepting new connections.
-//
-// The provided grpcServer must be of type *grpc.Server. It is untyped
-// for circular dependency reasons.
-var TestingCloseConns func(grpcServer interface{})
-
-// TestingUseHandlerImpl enables the http.Handler-based server implementation.
-// It must be called before Serve and requires TLS credentials.
-//
-// The provided grpcServer must be of type *grpc.Server. It is untyped
-// for circular dependency reasons.
-var TestingUseHandlerImpl func(grpcServer interface{})
diff --git a/vendor/google.golang.org/grpc/metadata/metadata.go b/vendor/google.golang.org/grpc/metadata/metadata.go
deleted file mode 100644
index 3c0ca7a36..000000000
--- a/vendor/google.golang.org/grpc/metadata/metadata.go
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-// Package metadata define the structure of the metadata supported by gRPC library.
-package metadata // import "google.golang.org/grpc/metadata"
-
-import (
- "encoding/base64"
- "fmt"
- "strings"
-
- "golang.org/x/net/context"
-)
-
-const (
- binHdrSuffix = "-bin"
-)
-
-// encodeKeyValue encodes key and value qualified for transmission via gRPC.
-// Transmitting binary headers violates HTTP/2 spec.
-// TODO(zhaoq): Maybe check if k is ASCII also.
-func encodeKeyValue(k, v string) (string, string) {
- k = strings.ToLower(k)
- if strings.HasSuffix(k, binHdrSuffix) {
- val := base64.StdEncoding.EncodeToString([]byte(v))
- v = string(val)
- }
- return k, v
-}
-
-// DecodeKeyValue returns the original key and value corresponding to the
-// encoded data in k, v.
-// If k is a binary header and v contains comma, v is split on comma before decoded,
-// and the decoded v will be joined with comma before returned.
-func DecodeKeyValue(k, v string) (string, string, error) {
- if !strings.HasSuffix(k, binHdrSuffix) {
- return k, v, nil
- }
- vvs := strings.Split(v, ",")
- for i, vv := range vvs {
- val, err := base64.StdEncoding.DecodeString(vv)
- if err != nil {
- return "", "", err
- }
- vvs[i] = string(val)
- }
- return k, strings.Join(vvs, ","), nil
-}
-
-// MD is a mapping from metadata keys to values. Users should use the following
-// two convenience functions New and Pairs to generate MD.
-type MD map[string][]string
-
-// New creates a MD from given key-value map.
-func New(m map[string]string) MD {
- md := MD{}
- for k, v := range m {
- key, val := encodeKeyValue(k, v)
- md[key] = append(md[key], val)
- }
- return md
-}
-
-// Pairs returns an MD formed by the mapping of key, value ...
-// Pairs panics if len(kv) is odd.
-func Pairs(kv ...string) MD {
- if len(kv)%2 == 1 {
- panic(fmt.Sprintf("metadata: Pairs got the odd number of input pairs for metadata: %d", len(kv)))
- }
- md := MD{}
- var k string
- for i, s := range kv {
- if i%2 == 0 {
- k = s
- continue
- }
- key, val := encodeKeyValue(k, s)
- md[key] = append(md[key], val)
- }
- return md
-}
-
-// Len returns the number of items in md.
-func (md MD) Len() int {
- return len(md)
-}
-
-// Copy returns a copy of md.
-func (md MD) Copy() MD {
- return Join(md)
-}
-
-// Join joins any number of MDs into a single MD.
-// The order of values for each key is determined by the order in which
-// the MDs containing those values are presented to Join.
-func Join(mds ...MD) MD {
- out := MD{}
- for _, md := range mds {
- for k, v := range md {
- out[k] = append(out[k], v...)
- }
- }
- return out
-}
-
-type mdKey struct{}
-
-// NewContext creates a new context with md attached.
-func NewContext(ctx context.Context, md MD) context.Context {
- return context.WithValue(ctx, mdKey{}, md)
-}
-
-// FromContext returns the MD in ctx if it exists.
-func FromContext(ctx context.Context) (md MD, ok bool) {
- md, ok = ctx.Value(mdKey{}).(MD)
- return
-}
diff --git a/vendor/google.golang.org/grpc/naming/naming.go b/vendor/google.golang.org/grpc/naming/naming.go
deleted file mode 100644
index c2e0871e6..000000000
--- a/vendor/google.golang.org/grpc/naming/naming.go
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-// Package naming defines the naming API and related data structures for gRPC.
-// The interface is EXPERIMENTAL and may be suject to change.
-package naming
-
-// Operation defines the corresponding operations for a name resolution change.
-type Operation uint8
-
-const (
- // Add indicates a new address is added.
- Add Operation = iota
- // Delete indicates an exisiting address is deleted.
- Delete
-)
-
-// Update defines a name resolution update. Notice that it is not valid having both
-// empty string Addr and nil Metadata in an Update.
-type Update struct {
- // Op indicates the operation of the update.
- Op Operation
- // Addr is the updated address. It is empty string if there is no address update.
- Addr string
- // Metadata is the updated metadata. It is nil if there is no metadata update.
- // Metadata is not required for a custom naming implementation.
- Metadata interface{}
-}
-
-// Resolver creates a Watcher for a target to track its resolution changes.
-type Resolver interface {
- // Resolve creates a Watcher for target.
- Resolve(target string) (Watcher, error)
-}
-
-// Watcher watches for the updates on the specified target.
-type Watcher interface {
- // Next blocks until an update or error happens. It may return one or more
- // updates. The first call should get the full set of the results. It should
- // return an error if and only if Watcher cannot recover.
- Next() ([]*Update, error)
- // Close closes the Watcher.
- Close()
-}
diff --git a/vendor/google.golang.org/grpc/peer/peer.go b/vendor/google.golang.org/grpc/peer/peer.go
deleted file mode 100644
index bfa6205ba..000000000
--- a/vendor/google.golang.org/grpc/peer/peer.go
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-// Package peer defines various peer information associated with RPCs and
-// corresponding utils.
-package peer
-
-import (
- "net"
-
- "golang.org/x/net/context"
- "google.golang.org/grpc/credentials"
-)
-
-// Peer contains the information of the peer for an RPC.
-type Peer struct {
- // Addr is the peer address.
- Addr net.Addr
- // AuthInfo is the authentication information of the transport.
- // It is nil if there is no transport security being used.
- AuthInfo credentials.AuthInfo
-}
-
-type peerKey struct{}
-
-// NewContext creates a new context with peer information attached.
-func NewContext(ctx context.Context, p *Peer) context.Context {
- return context.WithValue(ctx, peerKey{}, p)
-}
-
-// FromContext returns the peer information in ctx if it exists.
-func FromContext(ctx context.Context) (p *Peer, ok bool) {
- p, ok = ctx.Value(peerKey{}).(*Peer)
- return
-}
diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go
deleted file mode 100644
index a25eaa8a2..000000000
--- a/vendor/google.golang.org/grpc/rpc_util.go
+++ /dev/null
@@ -1,457 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-package grpc
-
-import (
- "bytes"
- "compress/gzip"
- "encoding/binary"
- "fmt"
- "io"
- "io/ioutil"
- "math"
- "os"
-
- "github.com/golang/protobuf/proto"
- "golang.org/x/net/context"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/transport"
-)
-
-// Codec defines the interface gRPC uses to encode and decode messages.
-type Codec interface {
- // Marshal returns the wire format of v.
- Marshal(v interface{}) ([]byte, error)
- // Unmarshal parses the wire format into v.
- Unmarshal(data []byte, v interface{}) error
- // String returns the name of the Codec implementation. The returned
- // string will be used as part of content type in transmission.
- String() string
-}
-
-// protoCodec is a Codec implementation with protobuf. It is the default codec for gRPC.
-type protoCodec struct{}
-
-func (protoCodec) Marshal(v interface{}) ([]byte, error) {
- return proto.Marshal(v.(proto.Message))
-}
-
-func (protoCodec) Unmarshal(data []byte, v interface{}) error {
- return proto.Unmarshal(data, v.(proto.Message))
-}
-
-func (protoCodec) String() string {
- return "proto"
-}
-
-// Compressor defines the interface gRPC uses to compress a message.
-type Compressor interface {
- // Do compresses p into w.
- Do(w io.Writer, p []byte) error
- // Type returns the compression algorithm the Compressor uses.
- Type() string
-}
-
-// NewGZIPCompressor creates a Compressor based on GZIP.
-func NewGZIPCompressor() Compressor {
- return &gzipCompressor{}
-}
-
-type gzipCompressor struct {
-}
-
-func (c *gzipCompressor) Do(w io.Writer, p []byte) error {
- z := gzip.NewWriter(w)
- if _, err := z.Write(p); err != nil {
- return err
- }
- return z.Close()
-}
-
-func (c *gzipCompressor) Type() string {
- return "gzip"
-}
-
-// Decompressor defines the interface gRPC uses to decompress a message.
-type Decompressor interface {
- // Do reads the data from r and uncompress them.
- Do(r io.Reader) ([]byte, error)
- // Type returns the compression algorithm the Decompressor uses.
- Type() string
-}
-
-type gzipDecompressor struct {
-}
-
-// NewGZIPDecompressor creates a Decompressor based on GZIP.
-func NewGZIPDecompressor() Decompressor {
- return &gzipDecompressor{}
-}
-
-func (d *gzipDecompressor) Do(r io.Reader) ([]byte, error) {
- z, err := gzip.NewReader(r)
- if err != nil {
- return nil, err
- }
- defer z.Close()
- return ioutil.ReadAll(z)
-}
-
-func (d *gzipDecompressor) Type() string {
- return "gzip"
-}
-
-// callInfo contains all related configuration and information about an RPC.
-type callInfo struct {
- failFast bool
- headerMD metadata.MD
- trailerMD metadata.MD
- traceInfo traceInfo // in trace.go
-}
-
-var defaultCallInfo = callInfo{failFast: true}
-
-// CallOption configures a Call before it starts or extracts information from
-// a Call after it completes.
-type CallOption interface {
- // before is called before the call is sent to any server. If before
- // returns a non-nil error, the RPC fails with that error.
- before(*callInfo) error
-
- // after is called after the call has completed. after cannot return an
- // error, so any failures should be reported via output parameters.
- after(*callInfo)
-}
-
-type beforeCall func(c *callInfo) error
-
-func (o beforeCall) before(c *callInfo) error { return o(c) }
-func (o beforeCall) after(c *callInfo) {}
-
-type afterCall func(c *callInfo)
-
-func (o afterCall) before(c *callInfo) error { return nil }
-func (o afterCall) after(c *callInfo) { o(c) }
-
-// Header returns a CallOptions that retrieves the header metadata
-// for a unary RPC.
-func Header(md *metadata.MD) CallOption {
- return afterCall(func(c *callInfo) {
- *md = c.headerMD
- })
-}
-
-// Trailer returns a CallOptions that retrieves the trailer metadata
-// for a unary RPC.
-func Trailer(md *metadata.MD) CallOption {
- return afterCall(func(c *callInfo) {
- *md = c.trailerMD
- })
-}
-
-// FailFast configures the action to take when an RPC is attempted on broken
-// connections or unreachable servers. If failfast is true, the RPC will fail
-// immediately. Otherwise, the RPC client will block the call until a
-// connection is available (or the call is canceled or times out) and will retry
-// the call if it fails due to a transient error. Please refer to
-// https://github.com/grpc/grpc/blob/master/doc/fail_fast.md
-func FailFast(failFast bool) CallOption {
- return beforeCall(func(c *callInfo) error {
- c.failFast = failFast
- return nil
- })
-}
-
-// The format of the payload: compressed or not?
-type payloadFormat uint8
-
-const (
- compressionNone payloadFormat = iota // no compression
- compressionMade
-)
-
-// parser reads complete gRPC messages from the underlying reader.
-type parser struct {
- // r is the underlying reader.
- // See the comment on recvMsg for the permissible
- // error types.
- r io.Reader
-
- // The header of a gRPC message. Find more detail
- // at http://www.grpc.io/docs/guides/wire.html.
- header [5]byte
-}
-
-// recvMsg reads a complete gRPC message from the stream.
-//
-// It returns the message and its payload (compression/encoding)
-// format. The caller owns the returned msg memory.
-//
-// If there is an error, possible values are:
-// * io.EOF, when no messages remain
-// * io.ErrUnexpectedEOF
-// * of type transport.ConnectionError
-// * of type transport.StreamError
-// No other error values or types must be returned, which also means
-// that the underlying io.Reader must not return an incompatible
-// error.
-func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err error) {
- if _, err := io.ReadFull(p.r, p.header[:]); err != nil {
- return 0, nil, err
- }
-
- pf = payloadFormat(p.header[0])
- length := binary.BigEndian.Uint32(p.header[1:])
-
- if length == 0 {
- return pf, nil, nil
- }
- if length > uint32(maxMsgSize) {
- return 0, nil, Errorf(codes.Internal, "grpc: received message length %d exceeding the max size %d", length, maxMsgSize)
- }
- // TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
- // of making it for each message:
- msg = make([]byte, int(length))
- if _, err := io.ReadFull(p.r, msg); err != nil {
- if err == io.EOF {
- err = io.ErrUnexpectedEOF
- }
- return 0, nil, err
- }
- return pf, msg, nil
-}
-
-// encode serializes msg and prepends the message header. If msg is nil, it
-// generates the message header of 0 message length.
-func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer) ([]byte, error) {
- var b []byte
- var length uint
- if msg != nil {
- var err error
- // TODO(zhaoq): optimize to reduce memory alloc and copying.
- b, err = c.Marshal(msg)
- if err != nil {
- return nil, err
- }
- if cp != nil {
- if err := cp.Do(cbuf, b); err != nil {
- return nil, err
- }
- b = cbuf.Bytes()
- }
- length = uint(len(b))
- }
- if length > math.MaxUint32 {
- return nil, Errorf(codes.InvalidArgument, "grpc: message too large (%d bytes)", length)
- }
-
- const (
- payloadLen = 1
- sizeLen = 4
- )
-
- var buf = make([]byte, payloadLen+sizeLen+len(b))
-
- // Write payload format
- if cp == nil {
- buf[0] = byte(compressionNone)
- } else {
- buf[0] = byte(compressionMade)
- }
- // Write length of b into buf
- binary.BigEndian.PutUint32(buf[1:], uint32(length))
- // Copy encoded msg to buf
- copy(buf[5:], b)
-
- return buf, nil
-}
-
-func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) error {
- switch pf {
- case compressionNone:
- case compressionMade:
- if dc == nil || recvCompress != dc.Type() {
- return Errorf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
- }
- default:
- return Errorf(codes.Internal, "grpc: received unexpected payload format %d", pf)
- }
- return nil
-}
-
-func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int) error {
- pf, d, err := p.recvMsg(maxMsgSize)
- if err != nil {
- return err
- }
- if err := checkRecvPayload(pf, s.RecvCompress(), dc); err != nil {
- return err
- }
- if pf == compressionMade {
- d, err = dc.Do(bytes.NewReader(d))
- if err != nil {
- return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
- }
- }
- if len(d) > maxMsgSize {
- // TODO: Revisit the error code. Currently keep it consistent with java
- // implementation.
- return Errorf(codes.Internal, "grpc: received a message of %d bytes exceeding %d limit", len(d), maxMsgSize)
- }
- if err := c.Unmarshal(d, m); err != nil {
- return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
- }
- return nil
-}
-
-// rpcError defines the status from an RPC.
-type rpcError struct {
- code codes.Code
- desc string
-}
-
-func (e *rpcError) Error() string {
- return fmt.Sprintf("rpc error: code = %d desc = %s", e.code, e.desc)
-}
-
-// Code returns the error code for err if it was produced by the rpc system.
-// Otherwise, it returns codes.Unknown.
-func Code(err error) codes.Code {
- if err == nil {
- return codes.OK
- }
- if e, ok := err.(*rpcError); ok {
- return e.code
- }
- return codes.Unknown
-}
-
-// ErrorDesc returns the error description of err if it was produced by the rpc system.
-// Otherwise, it returns err.Error() or empty string when err is nil.
-func ErrorDesc(err error) string {
- if err == nil {
- return ""
- }
- if e, ok := err.(*rpcError); ok {
- return e.desc
- }
- return err.Error()
-}
-
-// Errorf returns an error containing an error code and a description;
-// Errorf returns nil if c is OK.
-func Errorf(c codes.Code, format string, a ...interface{}) error {
- if c == codes.OK {
- return nil
- }
- return &rpcError{
- code: c,
- desc: fmt.Sprintf(format, a...),
- }
-}
-
-// toRPCErr converts an error into a rpcError.
-func toRPCErr(err error) error {
- switch e := err.(type) {
- case *rpcError:
- return err
- case transport.StreamError:
- return &rpcError{
- code: e.Code,
- desc: e.Desc,
- }
- case transport.ConnectionError:
- return &rpcError{
- code: codes.Internal,
- desc: e.Desc,
- }
- default:
- switch err {
- case context.DeadlineExceeded:
- return &rpcError{
- code: codes.DeadlineExceeded,
- desc: err.Error(),
- }
- case context.Canceled:
- return &rpcError{
- code: codes.Canceled,
- desc: err.Error(),
- }
- case ErrClientConnClosing:
- return &rpcError{
- code: codes.FailedPrecondition,
- desc: err.Error(),
- }
- }
-
- }
- return Errorf(codes.Unknown, "%v", err)
-}
-
-// convertCode converts a standard Go error into its canonical code. Note that
-// this is only used to translate the error returned by the server applications.
-func convertCode(err error) codes.Code {
- switch err {
- case nil:
- return codes.OK
- case io.EOF:
- return codes.OutOfRange
- case io.ErrClosedPipe, io.ErrNoProgress, io.ErrShortBuffer, io.ErrShortWrite, io.ErrUnexpectedEOF:
- return codes.FailedPrecondition
- case os.ErrInvalid:
- return codes.InvalidArgument
- case context.Canceled:
- return codes.Canceled
- case context.DeadlineExceeded:
- return codes.DeadlineExceeded
- }
- switch {
- case os.IsExist(err):
- return codes.AlreadyExists
- case os.IsNotExist(err):
- return codes.NotFound
- case os.IsPermission(err):
- return codes.PermissionDenied
- }
- return codes.Unknown
-}
-
-// SupportPackageIsVersion4 is referenced from generated protocol buffer files
-// to assert that that code is compatible with this version of the grpc package.
-//
-// This constant may be renamed in the future if a change in the generated code
-// requires a synchronised update of grpc-go and protoc-gen-go. This constant
-// should not be referenced from any other code.
-const SupportPackageIsVersion4 = true
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
deleted file mode 100644
index e0bb187ef..000000000
--- a/vendor/google.golang.org/grpc/server.go
+++ /dev/null
@@ -1,944 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-package grpc
-
-import (
- "bytes"
- "errors"
- "fmt"
- "io"
- "net"
- "net/http"
- "reflect"
- "runtime"
- "strings"
- "sync"
- "time"
-
- "golang.org/x/net/context"
- "golang.org/x/net/http2"
- "golang.org/x/net/trace"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/internal"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/transport"
-)
-
-type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
-
-// MethodDesc represents an RPC service's method specification.
-type MethodDesc struct {
- MethodName string
- Handler methodHandler
-}
-
-// ServiceDesc represents an RPC service's specification.
-type ServiceDesc struct {
- ServiceName string
- // The pointer to the service interface. Used to check whether the user
- // provided implementation satisfies the interface requirements.
- HandlerType interface{}
- Methods []MethodDesc
- Streams []StreamDesc
- Metadata interface{}
-}
-
-// service consists of the information of the server serving this service and
-// the methods in this service.
-type service struct {
- server interface{} // the server for service methods
- md map[string]*MethodDesc
- sd map[string]*StreamDesc
- mdata interface{}
-}
-
-// Server is a gRPC server to serve RPC requests.
-type Server struct {
- opts options
-
- mu sync.Mutex // guards following
- lis map[net.Listener]bool
- conns map[io.Closer]bool
- drain bool
- ctx context.Context
- cancel context.CancelFunc
- // A CondVar to let GracefulStop() blocks until all the pending RPCs are finished
- // and all the transport goes away.
- cv *sync.Cond
- m map[string]*service // service name -> service info
- events trace.EventLog
-}
-
-type options struct {
- creds credentials.TransportCredentials
- codec Codec
- cp Compressor
- dc Decompressor
- maxMsgSize int
- unaryInt UnaryServerInterceptor
- streamInt StreamServerInterceptor
- maxConcurrentStreams uint32
- useHandlerImpl bool // use http.Handler-based server
-}
-
-var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit
-
-// A ServerOption sets options.
-type ServerOption func(*options)
-
-// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
-func CustomCodec(codec Codec) ServerOption {
- return func(o *options) {
- o.codec = codec
- }
-}
-
-// RPCCompressor returns a ServerOption that sets a compressor for outbound messages.
-func RPCCompressor(cp Compressor) ServerOption {
- return func(o *options) {
- o.cp = cp
- }
-}
-
-// RPCDecompressor returns a ServerOption that sets a decompressor for inbound messages.
-func RPCDecompressor(dc Decompressor) ServerOption {
- return func(o *options) {
- o.dc = dc
- }
-}
-
-// MaxMsgSize returns a ServerOption to set the max message size in bytes for inbound mesages.
-// If this is not set, gRPC uses the default 4MB.
-func MaxMsgSize(m int) ServerOption {
- return func(o *options) {
- o.maxMsgSize = m
- }
-}
-
-// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
-// of concurrent streams to each ServerTransport.
-func MaxConcurrentStreams(n uint32) ServerOption {
- return func(o *options) {
- o.maxConcurrentStreams = n
- }
-}
-
-// Creds returns a ServerOption that sets credentials for server connections.
-func Creds(c credentials.TransportCredentials) ServerOption {
- return func(o *options) {
- o.creds = c
- }
-}
-
-// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
-// server. Only one unary interceptor can be installed. The construction of multiple
-// interceptors (e.g., chaining) can be implemented at the caller.
-func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
- return func(o *options) {
- if o.unaryInt != nil {
- panic("The unary server interceptor has been set.")
- }
- o.unaryInt = i
- }
-}
-
-// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
-// server. Only one stream interceptor can be installed.
-func StreamInterceptor(i StreamServerInterceptor) ServerOption {
- return func(o *options) {
- if o.streamInt != nil {
- panic("The stream server interceptor has been set.")
- }
- o.streamInt = i
- }
-}
-
-// NewServer creates a gRPC server which has no service registered and has not
-// started to accept requests yet.
-func NewServer(opt ...ServerOption) *Server {
- var opts options
- opts.maxMsgSize = defaultMaxMsgSize
- for _, o := range opt {
- o(&opts)
- }
- if opts.codec == nil {
- // Set the default codec.
- opts.codec = protoCodec{}
- }
- s := &Server{
- lis: make(map[net.Listener]bool),
- opts: opts,
- conns: make(map[io.Closer]bool),
- m: make(map[string]*service),
- }
- s.cv = sync.NewCond(&s.mu)
- s.ctx, s.cancel = context.WithCancel(context.Background())
- if EnableTracing {
- _, file, line, _ := runtime.Caller(1)
- s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
- }
- return s
-}
-
-// printf records an event in s's event log, unless s has been stopped.
-// REQUIRES s.mu is held.
-func (s *Server) printf(format string, a ...interface{}) {
- if s.events != nil {
- s.events.Printf(format, a...)
- }
-}
-
-// errorf records an error in s's event log, unless s has been stopped.
-// REQUIRES s.mu is held.
-func (s *Server) errorf(format string, a ...interface{}) {
- if s.events != nil {
- s.events.Errorf(format, a...)
- }
-}
-
-// RegisterService register a service and its implementation to the gRPC
-// server. Called from the IDL generated code. This must be called before
-// invoking Serve.
-func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
- ht := reflect.TypeOf(sd.HandlerType).Elem()
- st := reflect.TypeOf(ss)
- if !st.Implements(ht) {
- grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
- }
- s.register(sd, ss)
-}
-
-func (s *Server) register(sd *ServiceDesc, ss interface{}) {
- s.mu.Lock()
- defer s.mu.Unlock()
- s.printf("RegisterService(%q)", sd.ServiceName)
- if _, ok := s.m[sd.ServiceName]; ok {
- grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
- }
- srv := &service{
- server: ss,
- md: make(map[string]*MethodDesc),
- sd: make(map[string]*StreamDesc),
- mdata: sd.Metadata,
- }
- for i := range sd.Methods {
- d := &sd.Methods[i]
- srv.md[d.MethodName] = d
- }
- for i := range sd.Streams {
- d := &sd.Streams[i]
- srv.sd[d.StreamName] = d
- }
- s.m[sd.ServiceName] = srv
-}
-
-// MethodInfo contains the information of an RPC including its method name and type.
-type MethodInfo struct {
- // Name is the method name only, without the service name or package name.
- Name string
- // IsClientStream indicates whether the RPC is a client streaming RPC.
- IsClientStream bool
- // IsServerStream indicates whether the RPC is a server streaming RPC.
- IsServerStream bool
-}
-
-// ServiceInfo contains unary RPC method info, streaming RPC methid info and metadata for a service.
-type ServiceInfo struct {
- Methods []MethodInfo
- // Metadata is the metadata specified in ServiceDesc when registering service.
- Metadata interface{}
-}
-
-// GetServiceInfo returns a map from service names to ServiceInfo.
-// Service names include the package names, in the form of <package>.<service>.
-func (s *Server) GetServiceInfo() map[string]ServiceInfo {
- ret := make(map[string]ServiceInfo)
- for n, srv := range s.m {
- methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
- for m := range srv.md {
- methods = append(methods, MethodInfo{
- Name: m,
- IsClientStream: false,
- IsServerStream: false,
- })
- }
- for m, d := range srv.sd {
- methods = append(methods, MethodInfo{
- Name: m,
- IsClientStream: d.ClientStreams,
- IsServerStream: d.ServerStreams,
- })
- }
-
- ret[n] = ServiceInfo{
- Methods: methods,
- Metadata: srv.mdata,
- }
- }
- return ret
-}
-
-var (
- // ErrServerStopped indicates that the operation is now illegal because of
- // the server being stopped.
- ErrServerStopped = errors.New("grpc: the server has been stopped")
-)
-
-func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
- if s.opts.creds == nil {
- return rawConn, nil, nil
- }
- return s.opts.creds.ServerHandshake(rawConn)
-}
-
-// Serve accepts incoming connections on the listener lis, creating a new
-// ServerTransport and service goroutine for each. The service goroutines
-// read gRPC requests and then call the registered handlers to reply to them.
-// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
-// this method returns.
-func (s *Server) Serve(lis net.Listener) error {
- s.mu.Lock()
- s.printf("serving")
- if s.lis == nil {
- s.mu.Unlock()
- lis.Close()
- return ErrServerStopped
- }
- s.lis[lis] = true
- s.mu.Unlock()
- defer func() {
- s.mu.Lock()
- if s.lis != nil && s.lis[lis] {
- lis.Close()
- delete(s.lis, lis)
- }
- s.mu.Unlock()
- }()
-
- var tempDelay time.Duration // how long to sleep on accept failure
-
- for {
- rawConn, err := lis.Accept()
- if err != nil {
- if ne, ok := err.(interface {
- Temporary() bool
- }); ok && ne.Temporary() {
- if tempDelay == 0 {
- tempDelay = 5 * time.Millisecond
- } else {
- tempDelay *= 2
- }
- if max := 1 * time.Second; tempDelay > max {
- tempDelay = max
- }
- s.mu.Lock()
- s.printf("Accept error: %v; retrying in %v", err, tempDelay)
- s.mu.Unlock()
- select {
- case <-time.After(tempDelay):
- case <-s.ctx.Done():
- }
- continue
- }
- s.mu.Lock()
- s.printf("done serving; Accept = %v", err)
- s.mu.Unlock()
- return err
- }
- tempDelay = 0
- // Start a new goroutine to deal with rawConn
- // so we don't stall this Accept loop goroutine.
- go s.handleRawConn(rawConn)
- }
-}
-
-// handleRawConn is run in its own goroutine and handles a just-accepted
-// connection that has not had any I/O performed on it yet.
-func (s *Server) handleRawConn(rawConn net.Conn) {
- conn, authInfo, err := s.useTransportAuthenticator(rawConn)
- if err != nil {
- s.mu.Lock()
- s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
- s.mu.Unlock()
- grpclog.Printf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
- // If serverHandShake returns ErrConnDispatched, keep rawConn open.
- if err != credentials.ErrConnDispatched {
- rawConn.Close()
- }
- return
- }
-
- s.mu.Lock()
- if s.conns == nil {
- s.mu.Unlock()
- conn.Close()
- return
- }
- s.mu.Unlock()
-
- if s.opts.useHandlerImpl {
- s.serveUsingHandler(conn)
- } else {
- s.serveNewHTTP2Transport(conn, authInfo)
- }
-}
-
-// serveNewHTTP2Transport sets up a new http/2 transport (using the
-// gRPC http2 server transport in transport/http2_server.go) and
-// serves streams on it.
-// This is run in its own goroutine (it does network I/O in
-// transport.NewServerTransport).
-func (s *Server) serveNewHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
- st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo)
- if err != nil {
- s.mu.Lock()
- s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
- s.mu.Unlock()
- c.Close()
- grpclog.Println("grpc: Server.Serve failed to create ServerTransport: ", err)
- return
- }
- if !s.addConn(st) {
- st.Close()
- return
- }
- s.serveStreams(st)
-}
-
-func (s *Server) serveStreams(st transport.ServerTransport) {
- defer s.removeConn(st)
- defer st.Close()
- var wg sync.WaitGroup
- st.HandleStreams(func(stream *transport.Stream) {
- wg.Add(1)
- go func() {
- defer wg.Done()
- s.handleStream(st, stream, s.traceInfo(st, stream))
- }()
- })
- wg.Wait()
-}
-
-var _ http.Handler = (*Server)(nil)
-
-// serveUsingHandler is called from handleRawConn when s is configured
-// to handle requests via the http.Handler interface. It sets up a
-// net/http.Server to handle the just-accepted conn. The http.Server
-// is configured to route all incoming requests (all HTTP/2 streams)
-// to ServeHTTP, which creates a new ServerTransport for each stream.
-// serveUsingHandler blocks until conn closes.
-//
-// This codepath is only used when Server.TestingUseHandlerImpl has
-// been configured. This lets the end2end tests exercise the ServeHTTP
-// method as one of the environment types.
-//
-// conn is the *tls.Conn that's already been authenticated.
-func (s *Server) serveUsingHandler(conn net.Conn) {
- if !s.addConn(conn) {
- conn.Close()
- return
- }
- defer s.removeConn(conn)
- h2s := &http2.Server{
- MaxConcurrentStreams: s.opts.maxConcurrentStreams,
- }
- h2s.ServeConn(conn, &http2.ServeConnOpts{
- Handler: s,
- })
-}
-
-func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- st, err := transport.NewServerHandlerTransport(w, r)
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- if !s.addConn(st) {
- st.Close()
- return
- }
- defer s.removeConn(st)
- s.serveStreams(st)
-}
-
-// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
-// If tracing is not enabled, it returns nil.
-func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
- if !EnableTracing {
- return nil
- }
- trInfo = &traceInfo{
- tr: trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()),
- }
- trInfo.firstLine.client = false
- trInfo.firstLine.remoteAddr = st.RemoteAddr()
- stream.TraceContext(trInfo.tr)
- if dl, ok := stream.Context().Deadline(); ok {
- trInfo.firstLine.deadline = dl.Sub(time.Now())
- }
- return trInfo
-}
-
-func (s *Server) addConn(c io.Closer) bool {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.conns == nil || s.drain {
- return false
- }
- s.conns[c] = true
- return true
-}
-
-func (s *Server) removeConn(c io.Closer) {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.conns != nil {
- delete(s.conns, c)
- s.cv.Broadcast()
- }
-}
-
-func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error {
- var cbuf *bytes.Buffer
- if cp != nil {
- cbuf = new(bytes.Buffer)
- }
- p, err := encode(s.opts.codec, msg, cp, cbuf)
- if err != nil {
- // This typically indicates a fatal issue (e.g., memory
- // corruption or hardware faults) the application program
- // cannot handle.
- //
- // TODO(zhaoq): There exist other options also such as only closing the
- // faulty stream locally and remotely (Other streams can keep going). Find
- // the optimal option.
- grpclog.Fatalf("grpc: Server failed to encode response %v", err)
- }
- return t.Write(stream, p, opts)
-}
-
-func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
- if trInfo != nil {
- defer trInfo.tr.Finish()
- trInfo.firstLine.client = false
- trInfo.tr.LazyLog(&trInfo.firstLine, false)
- defer func() {
- if err != nil && err != io.EOF {
- trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
- trInfo.tr.SetError()
- }
- }()
- }
- if s.opts.cp != nil {
- // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
- stream.SetSendCompress(s.opts.cp.Type())
- }
- p := &parser{r: stream}
- for {
- pf, req, err := p.recvMsg(s.opts.maxMsgSize)
- if err == io.EOF {
- // The entire stream is done (for unary RPC only).
- return err
- }
- if err == io.ErrUnexpectedEOF {
- err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
- }
- if err != nil {
- switch err := err.(type) {
- case *rpcError:
- if err := t.WriteStatus(stream, err.code, err.desc); err != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
- }
- case transport.ConnectionError:
- // Nothing to do here.
- case transport.StreamError:
- if err := t.WriteStatus(stream, err.Code, err.Desc); err != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
- }
- default:
- panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", err, err))
- }
- return err
- }
-
- if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
- switch err := err.(type) {
- case *rpcError:
- if err := t.WriteStatus(stream, err.code, err.desc); err != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
- }
- default:
- if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
- }
-
- }
- return err
- }
- statusCode := codes.OK
- statusDesc := ""
- df := func(v interface{}) error {
- if pf == compressionMade {
- var err error
- req, err = s.opts.dc.Do(bytes.NewReader(req))
- if err != nil {
- if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
- }
- return err
- }
- }
- if len(req) > s.opts.maxMsgSize {
- // TODO: Revisit the error code. Currently keep it consistent with
- // java implementation.
- statusCode = codes.Internal
- statusDesc = fmt.Sprintf("grpc: server received a message of %d bytes exceeding %d limit", len(req), s.opts.maxMsgSize)
- }
- if err := s.opts.codec.Unmarshal(req, v); err != nil {
- return err
- }
- if trInfo != nil {
- trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
- }
- return nil
- }
- reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
- if appErr != nil {
- if err, ok := appErr.(*rpcError); ok {
- statusCode = err.code
- statusDesc = err.desc
- } else {
- statusCode = convertCode(appErr)
- statusDesc = appErr.Error()
- }
- if trInfo != nil && statusCode != codes.OK {
- trInfo.tr.LazyLog(stringer(statusDesc), true)
- trInfo.tr.SetError()
- }
- if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
- return err
- }
- return nil
- }
- if trInfo != nil {
- trInfo.tr.LazyLog(stringer("OK"), false)
- }
- opts := &transport.Options{
- Last: true,
- Delay: false,
- }
- if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
- switch err := err.(type) {
- case transport.ConnectionError:
- // Nothing to do here.
- case transport.StreamError:
- statusCode = err.Code
- statusDesc = err.Desc
- default:
- statusCode = codes.Unknown
- statusDesc = err.Error()
- }
- return err
- }
- if trInfo != nil {
- trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
- }
- return t.WriteStatus(stream, statusCode, statusDesc)
- }
-}
-
-func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
- if s.opts.cp != nil {
- stream.SetSendCompress(s.opts.cp.Type())
- }
- ss := &serverStream{
- t: t,
- s: stream,
- p: &parser{r: stream},
- codec: s.opts.codec,
- cp: s.opts.cp,
- dc: s.opts.dc,
- maxMsgSize: s.opts.maxMsgSize,
- trInfo: trInfo,
- }
- if ss.cp != nil {
- ss.cbuf = new(bytes.Buffer)
- }
- if trInfo != nil {
- trInfo.tr.LazyLog(&trInfo.firstLine, false)
- defer func() {
- ss.mu.Lock()
- if err != nil && err != io.EOF {
- ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
- ss.trInfo.tr.SetError()
- }
- ss.trInfo.tr.Finish()
- ss.trInfo.tr = nil
- ss.mu.Unlock()
- }()
- }
- var appErr error
- if s.opts.streamInt == nil {
- appErr = sd.Handler(srv.server, ss)
- } else {
- info := &StreamServerInfo{
- FullMethod: stream.Method(),
- IsClientStream: sd.ClientStreams,
- IsServerStream: sd.ServerStreams,
- }
- appErr = s.opts.streamInt(srv.server, ss, info, sd.Handler)
- }
- if appErr != nil {
- if err, ok := appErr.(*rpcError); ok {
- ss.statusCode = err.code
- ss.statusDesc = err.desc
- } else if err, ok := appErr.(transport.StreamError); ok {
- ss.statusCode = err.Code
- ss.statusDesc = err.Desc
- } else {
- ss.statusCode = convertCode(appErr)
- ss.statusDesc = appErr.Error()
- }
- }
- if trInfo != nil {
- ss.mu.Lock()
- if ss.statusCode != codes.OK {
- ss.trInfo.tr.LazyLog(stringer(ss.statusDesc), true)
- ss.trInfo.tr.SetError()
- } else {
- ss.trInfo.tr.LazyLog(stringer("OK"), false)
- }
- ss.mu.Unlock()
- }
- return t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
-
-}
-
-func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
- sm := stream.Method()
- if sm != "" && sm[0] == '/' {
- sm = sm[1:]
- }
- pos := strings.LastIndex(sm, "/")
- if pos == -1 {
- if trInfo != nil {
- trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
- trInfo.tr.SetError()
- }
- if err := t.WriteStatus(stream, codes.InvalidArgument, fmt.Sprintf("malformed method name: %q", stream.Method())); err != nil {
- if trInfo != nil {
- trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
- trInfo.tr.SetError()
- }
- grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
- }
- if trInfo != nil {
- trInfo.tr.Finish()
- }
- return
- }
- service := sm[:pos]
- method := sm[pos+1:]
- srv, ok := s.m[service]
- if !ok {
- if trInfo != nil {
- trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
- trInfo.tr.SetError()
- }
- if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown service %v", service)); err != nil {
- if trInfo != nil {
- trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
- trInfo.tr.SetError()
- }
- grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
- }
- if trInfo != nil {
- trInfo.tr.Finish()
- }
- return
- }
- // Unary RPC or Streaming RPC?
- if md, ok := srv.md[method]; ok {
- s.processUnaryRPC(t, stream, srv, md, trInfo)
- return
- }
- if sd, ok := srv.sd[method]; ok {
- s.processStreamingRPC(t, stream, srv, sd, trInfo)
- return
- }
- if trInfo != nil {
- trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
- trInfo.tr.SetError()
- }
- if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method)); err != nil {
- if trInfo != nil {
- trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
- trInfo.tr.SetError()
- }
- grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
- }
- if trInfo != nil {
- trInfo.tr.Finish()
- }
-}
-
-// Stop stops the gRPC server. It immediately closes all open
-// connections and listeners.
-// It cancels all active RPCs on the server side and the corresponding
-// pending RPCs on the client side will get notified by connection
-// errors.
-func (s *Server) Stop() {
- s.mu.Lock()
- listeners := s.lis
- s.lis = nil
- st := s.conns
- s.conns = nil
- // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
- s.cv.Broadcast()
- s.mu.Unlock()
-
- for lis := range listeners {
- lis.Close()
- }
- for c := range st {
- c.Close()
- }
-
- s.mu.Lock()
- s.cancel()
- if s.events != nil {
- s.events.Finish()
- s.events = nil
- }
- s.mu.Unlock()
-}
-
-// GracefulStop stops the gRPC server gracefully. It stops the server to accept new
-// connections and RPCs and blocks until all the pending RPCs are finished.
-func (s *Server) GracefulStop() {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.conns == nil {
- return
- }
- for lis := range s.lis {
- lis.Close()
- }
- s.lis = nil
- s.cancel()
- if !s.drain {
- for c := range s.conns {
- c.(transport.ServerTransport).Drain()
- }
- s.drain = true
- }
- for len(s.conns) != 0 {
- s.cv.Wait()
- }
- s.conns = nil
- if s.events != nil {
- s.events.Finish()
- s.events = nil
- }
-}
-
-func init() {
- internal.TestingCloseConns = func(arg interface{}) {
- arg.(*Server).testingCloseConns()
- }
- internal.TestingUseHandlerImpl = func(arg interface{}) {
- arg.(*Server).opts.useHandlerImpl = true
- }
-}
-
-// testingCloseConns closes all existing transports but keeps s.lis
-// accepting new connections.
-func (s *Server) testingCloseConns() {
- s.mu.Lock()
- for c := range s.conns {
- c.Close()
- delete(s.conns, c)
- }
- s.mu.Unlock()
-}
-
-// SetHeader sets the header metadata.
-// When called multiple times, all the provided metadata will be merged.
-// All the metadata will be sent out when one of the following happens:
-// - grpc.SendHeader() is called;
-// - The first response is sent out;
-// - An RPC status is sent out (error or success).
-func SetHeader(ctx context.Context, md metadata.MD) error {
- if md.Len() == 0 {
- return nil
- }
- stream, ok := transport.StreamFromContext(ctx)
- if !ok {
- return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
- }
- return stream.SetHeader(md)
-}
-
-// SendHeader sends header metadata. It may be called at most once.
-// The provided md and headers set by SetHeader() will be sent.
-func SendHeader(ctx context.Context, md metadata.MD) error {
- stream, ok := transport.StreamFromContext(ctx)
- if !ok {
- return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
- }
- t := stream.ServerTransport()
- if t == nil {
- grpclog.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream)
- }
- if err := t.WriteHeader(stream, md); err != nil {
- return toRPCErr(err)
- }
- return nil
-}
-
-// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
-// When called more than once, all the provided metadata will be merged.
-func SetTrailer(ctx context.Context, md metadata.MD) error {
- if md.Len() == 0 {
- return nil
- }
- stream, ok := transport.StreamFromContext(ctx)
- if !ok {
- return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
- }
- return stream.SetTrailer(md)
-}
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
deleted file mode 100644
index 46810544f..000000000
--- a/vendor/google.golang.org/grpc/stream.go
+++ /dev/null
@@ -1,526 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-package grpc
-
-import (
- "bytes"
- "errors"
- "io"
- "math"
- "sync"
- "time"
-
- "golang.org/x/net/context"
- "golang.org/x/net/trace"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/transport"
-)
-
-// StreamHandler defines the handler called by gRPC server to complete the
-// execution of a streaming RPC.
-type StreamHandler func(srv interface{}, stream ServerStream) error
-
-// StreamDesc represents a streaming RPC service's method specification.
-type StreamDesc struct {
- StreamName string
- Handler StreamHandler
-
- // At least one of these is true.
- ServerStreams bool
- ClientStreams bool
-}
-
-// Stream defines the common interface a client or server stream has to satisfy.
-type Stream interface {
- // Context returns the context for this stream.
- Context() context.Context
- // SendMsg blocks until it sends m, the stream is done or the stream
- // breaks.
- // On error, it aborts the stream and returns an RPC status on client
- // side. On server side, it simply returns the error to the caller.
- // SendMsg is called by generated code. Also Users can call SendMsg
- // directly when it is really needed in their use cases.
- SendMsg(m interface{}) error
- // RecvMsg blocks until it receives a message or the stream is
- // done. On client side, it returns io.EOF when the stream is done. On
- // any other error, it aborts the stream and returns an RPC status. On
- // server side, it simply returns the error to the caller.
- RecvMsg(m interface{}) error
-}
-
-// ClientStream defines the interface a client stream has to satisfy.
-type ClientStream interface {
- // Header returns the header metadata received from the server if there
- // is any. It blocks if the metadata is not ready to read.
- Header() (metadata.MD, error)
- // Trailer returns the trailer metadata from the server, if there is any.
- // It must only be called after stream.CloseAndRecv has returned, or
- // stream.Recv has returned a non-nil error (including io.EOF).
- Trailer() metadata.MD
- // CloseSend closes the send direction of the stream. It closes the stream
- // when non-nil error is met.
- CloseSend() error
- Stream
-}
-
-// NewClientStream creates a new Stream for the client side. This is called
-// by generated code.
-func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
- if cc.dopts.streamInt != nil {
- return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
- }
- return newClientStream(ctx, desc, cc, method, opts...)
-}
-
-func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
- var (
- t transport.ClientTransport
- s *transport.Stream
- put func()
- )
- c := defaultCallInfo
- for _, o := range opts {
- if err := o.before(&c); err != nil {
- return nil, toRPCErr(err)
- }
- }
- callHdr := &transport.CallHdr{
- Host: cc.authority,
- Method: method,
- Flush: desc.ServerStreams && desc.ClientStreams,
- }
- if cc.dopts.cp != nil {
- callHdr.SendCompress = cc.dopts.cp.Type()
- }
- var trInfo traceInfo
- if EnableTracing {
- trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
- trInfo.firstLine.client = true
- if deadline, ok := ctx.Deadline(); ok {
- trInfo.firstLine.deadline = deadline.Sub(time.Now())
- }
- trInfo.tr.LazyLog(&trInfo.firstLine, false)
- ctx = trace.NewContext(ctx, trInfo.tr)
- defer func() {
- if err != nil {
- // Need to call tr.finish() if error is returned.
- // Because tr will not be returned to caller.
- trInfo.tr.LazyPrintf("RPC: [%v]", err)
- trInfo.tr.SetError()
- trInfo.tr.Finish()
- }
- }()
- }
- gopts := BalancerGetOptions{
- BlockingWait: !c.failFast,
- }
- for {
- t, put, err = cc.getTransport(ctx, gopts)
- if err != nil {
- // TODO(zhaoq): Probably revisit the error handling.
- if _, ok := err.(*rpcError); ok {
- return nil, err
- }
- if err == errConnClosing || err == errConnUnavailable {
- if c.failFast {
- return nil, Errorf(codes.Unavailable, "%v", err)
- }
- continue
- }
- // All the other errors are treated as Internal errors.
- return nil, Errorf(codes.Internal, "%v", err)
- }
-
- s, err = t.NewStream(ctx, callHdr)
- if err != nil {
- if put != nil {
- put()
- put = nil
- }
- if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
- if c.failFast {
- return nil, toRPCErr(err)
- }
- continue
- }
- return nil, toRPCErr(err)
- }
- break
- }
- cs := &clientStream{
- opts: opts,
- c: c,
- desc: desc,
- codec: cc.dopts.codec,
- cp: cc.dopts.cp,
- dc: cc.dopts.dc,
-
- put: put,
- t: t,
- s: s,
- p: &parser{r: s},
-
- tracing: EnableTracing,
- trInfo: trInfo,
- }
- if cc.dopts.cp != nil {
- cs.cbuf = new(bytes.Buffer)
- }
- // Listen on ctx.Done() to detect cancellation and s.Done() to detect normal termination
- // when there is no pending I/O operations on this stream.
- go func() {
- select {
- case <-t.Error():
- // Incur transport error, simply exit.
- case <-s.Done():
- // TODO: The trace of the RPC is terminated here when there is no pending
- // I/O, which is probably not the optimal solution.
- if s.StatusCode() == codes.OK {
- cs.finish(nil)
- } else {
- cs.finish(Errorf(s.StatusCode(), "%s", s.StatusDesc()))
- }
- cs.closeTransportStream(nil)
- case <-s.GoAway():
- cs.finish(errConnDrain)
- cs.closeTransportStream(errConnDrain)
- case <-s.Context().Done():
- err := s.Context().Err()
- cs.finish(err)
- cs.closeTransportStream(transport.ContextErr(err))
- }
- }()
- return cs, nil
-}
-
-// clientStream implements a client side Stream.
-type clientStream struct {
- opts []CallOption
- c callInfo
- t transport.ClientTransport
- s *transport.Stream
- p *parser
- desc *StreamDesc
- codec Codec
- cp Compressor
- cbuf *bytes.Buffer
- dc Decompressor
-
- tracing bool // set to EnableTracing when the clientStream is created.
-
- mu sync.Mutex
- put func()
- closed bool
- // trInfo.tr is set when the clientStream is created (if EnableTracing is true),
- // and is set to nil when the clientStream's finish method is called.
- trInfo traceInfo
-}
-
-func (cs *clientStream) Context() context.Context {
- return cs.s.Context()
-}
-
-func (cs *clientStream) Header() (metadata.MD, error) {
- m, err := cs.s.Header()
- if err != nil {
- if _, ok := err.(transport.ConnectionError); !ok {
- cs.closeTransportStream(err)
- }
- }
- return m, err
-}
-
-func (cs *clientStream) Trailer() metadata.MD {
- return cs.s.Trailer()
-}
-
-func (cs *clientStream) SendMsg(m interface{}) (err error) {
- if cs.tracing {
- cs.mu.Lock()
- if cs.trInfo.tr != nil {
- cs.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
- }
- cs.mu.Unlock()
- }
- defer func() {
- if err != nil {
- cs.finish(err)
- }
- if err == nil {
- return
- }
- if err == io.EOF {
- // Specialize the process for server streaming. SendMesg is only called
- // once when creating the stream object. io.EOF needs to be skipped when
- // the rpc is early finished (before the stream object is created.).
- // TODO: It is probably better to move this into the generated code.
- if !cs.desc.ClientStreams && cs.desc.ServerStreams {
- err = nil
- }
- return
- }
- if _, ok := err.(transport.ConnectionError); !ok {
- cs.closeTransportStream(err)
- }
- err = toRPCErr(err)
- }()
- out, err := encode(cs.codec, m, cs.cp, cs.cbuf)
- defer func() {
- if cs.cbuf != nil {
- cs.cbuf.Reset()
- }
- }()
- if err != nil {
- return Errorf(codes.Internal, "grpc: %v", err)
- }
- return cs.t.Write(cs.s, out, &transport.Options{Last: false})
-}
-
-func (cs *clientStream) RecvMsg(m interface{}) (err error) {
- err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32)
- defer func() {
- // err != nil indicates the termination of the stream.
- if err != nil {
- cs.finish(err)
- }
- }()
- if err == nil {
- if cs.tracing {
- cs.mu.Lock()
- if cs.trInfo.tr != nil {
- cs.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
- }
- cs.mu.Unlock()
- }
- if !cs.desc.ClientStreams || cs.desc.ServerStreams {
- return
- }
- // Special handling for client streaming rpc.
- err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32)
- cs.closeTransportStream(err)
- if err == nil {
- return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
- }
- if err == io.EOF {
- if cs.s.StatusCode() == codes.OK {
- cs.finish(err)
- return nil
- }
- return Errorf(cs.s.StatusCode(), "%s", cs.s.StatusDesc())
- }
- return toRPCErr(err)
- }
- if _, ok := err.(transport.ConnectionError); !ok {
- cs.closeTransportStream(err)
- }
- if err == io.EOF {
- if cs.s.StatusCode() == codes.OK {
- // Returns io.EOF to indicate the end of the stream.
- return
- }
- return Errorf(cs.s.StatusCode(), "%s", cs.s.StatusDesc())
- }
- return toRPCErr(err)
-}
-
-func (cs *clientStream) CloseSend() (err error) {
- err = cs.t.Write(cs.s, nil, &transport.Options{Last: true})
- defer func() {
- if err != nil {
- cs.finish(err)
- }
- }()
- if err == nil || err == io.EOF {
- return nil
- }
- if _, ok := err.(transport.ConnectionError); !ok {
- cs.closeTransportStream(err)
- }
- err = toRPCErr(err)
- return
-}
-
-func (cs *clientStream) closeTransportStream(err error) {
- cs.mu.Lock()
- if cs.closed {
- cs.mu.Unlock()
- return
- }
- cs.closed = true
- cs.mu.Unlock()
- cs.t.CloseStream(cs.s, err)
-}
-
-func (cs *clientStream) finish(err error) {
- cs.mu.Lock()
- defer cs.mu.Unlock()
- for _, o := range cs.opts {
- o.after(&cs.c)
- }
- if cs.put != nil {
- cs.put()
- cs.put = nil
- }
- if !cs.tracing {
- return
- }
- if cs.trInfo.tr != nil {
- if err == nil || err == io.EOF {
- cs.trInfo.tr.LazyPrintf("RPC: [OK]")
- } else {
- cs.trInfo.tr.LazyPrintf("RPC: [%v]", err)
- cs.trInfo.tr.SetError()
- }
- cs.trInfo.tr.Finish()
- cs.trInfo.tr = nil
- }
-}
-
-// ServerStream defines the interface a server stream has to satisfy.
-type ServerStream interface {
- // SetHeader sets the header metadata. It may be called multiple times.
- // When call multiple times, all the provided metadata will be merged.
- // All the metadata will be sent out when one of the following happens:
- // - ServerStream.SendHeader() is called;
- // - The first response is sent out;
- // - An RPC status is sent out (error or success).
- SetHeader(metadata.MD) error
- // SendHeader sends the header metadata.
- // The provided md and headers set by SetHeader() will be sent.
- // It fails if called multiple times.
- SendHeader(metadata.MD) error
- // SetTrailer sets the trailer metadata which will be sent with the RPC status.
- // When called more than once, all the provided metadata will be merged.
- SetTrailer(metadata.MD)
- Stream
-}
-
-// serverStream implements a server side Stream.
-type serverStream struct {
- t transport.ServerTransport
- s *transport.Stream
- p *parser
- codec Codec
- cp Compressor
- dc Decompressor
- cbuf *bytes.Buffer
- maxMsgSize int
- statusCode codes.Code
- statusDesc string
- trInfo *traceInfo
-
- mu sync.Mutex // protects trInfo.tr after the service handler runs.
-}
-
-func (ss *serverStream) Context() context.Context {
- return ss.s.Context()
-}
-
-func (ss *serverStream) SetHeader(md metadata.MD) error {
- if md.Len() == 0 {
- return nil
- }
- return ss.s.SetHeader(md)
-}
-
-func (ss *serverStream) SendHeader(md metadata.MD) error {
- return ss.t.WriteHeader(ss.s, md)
-}
-
-func (ss *serverStream) SetTrailer(md metadata.MD) {
- if md.Len() == 0 {
- return
- }
- ss.s.SetTrailer(md)
- return
-}
-
-func (ss *serverStream) SendMsg(m interface{}) (err error) {
- defer func() {
- if ss.trInfo != nil {
- ss.mu.Lock()
- if ss.trInfo.tr != nil {
- if err == nil {
- ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
- } else {
- ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
- ss.trInfo.tr.SetError()
- }
- }
- ss.mu.Unlock()
- }
- }()
- out, err := encode(ss.codec, m, ss.cp, ss.cbuf)
- defer func() {
- if ss.cbuf != nil {
- ss.cbuf.Reset()
- }
- }()
- if err != nil {
- err = Errorf(codes.Internal, "grpc: %v", err)
- return err
- }
- if err := ss.t.Write(ss.s, out, &transport.Options{Last: false}); err != nil {
- return toRPCErr(err)
- }
- return nil
-}
-
-func (ss *serverStream) RecvMsg(m interface{}) (err error) {
- defer func() {
- if ss.trInfo != nil {
- ss.mu.Lock()
- if ss.trInfo.tr != nil {
- if err == nil {
- ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
- } else if err != io.EOF {
- ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
- ss.trInfo.tr.SetError()
- }
- }
- ss.mu.Unlock()
- }
- }()
- if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize); err != nil {
- if err == io.EOF {
- return err
- }
- if err == io.ErrUnexpectedEOF {
- err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
- }
- return toRPCErr(err)
- }
- return nil
-}
diff --git a/vendor/google.golang.org/grpc/trace.go b/vendor/google.golang.org/grpc/trace.go
deleted file mode 100644
index f6747e1df..000000000
--- a/vendor/google.golang.org/grpc/trace.go
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-package grpc
-
-import (
- "bytes"
- "fmt"
- "io"
- "net"
- "strings"
- "time"
-
- "golang.org/x/net/trace"
-)
-
-// EnableTracing controls whether to trace RPCs using the golang.org/x/net/trace package.
-// This should only be set before any RPCs are sent or received by this program.
-var EnableTracing = true
-
-// methodFamily returns the trace family for the given method.
-// It turns "/pkg.Service/GetFoo" into "pkg.Service".
-func methodFamily(m string) string {
- m = strings.TrimPrefix(m, "/") // remove leading slash
- if i := strings.Index(m, "/"); i >= 0 {
- m = m[:i] // remove everything from second slash
- }
- if i := strings.LastIndex(m, "."); i >= 0 {
- m = m[i+1:] // cut down to last dotted component
- }
- return m
-}
-
-// traceInfo contains tracing information for an RPC.
-type traceInfo struct {
- tr trace.Trace
- firstLine firstLine
-}
-
-// firstLine is the first line of an RPC trace.
-type firstLine struct {
- client bool // whether this is a client (outgoing) RPC
- remoteAddr net.Addr
- deadline time.Duration // may be zero
-}
-
-func (f *firstLine) String() string {
- var line bytes.Buffer
- io.WriteString(&line, "RPC: ")
- if f.client {
- io.WriteString(&line, "to")
- } else {
- io.WriteString(&line, "from")
- }
- fmt.Fprintf(&line, " %v deadline:", f.remoteAddr)
- if f.deadline != 0 {
- fmt.Fprint(&line, f.deadline)
- } else {
- io.WriteString(&line, "none")
- }
- return line.String()
-}
-
-// payload represents an RPC request or response payload.
-type payload struct {
- sent bool // whether this is an outgoing payload
- msg interface{} // e.g. a proto.Message
- // TODO(dsymonds): add stringifying info to codec, and limit how much we hold here?
-}
-
-func (p payload) String() string {
- if p.sent {
- return fmt.Sprintf("sent: %v", p.msg)
- }
- return fmt.Sprintf("recv: %v", p.msg)
-}
-
-type fmtStringer struct {
- format string
- a []interface{}
-}
-
-func (f *fmtStringer) String() string {
- return fmt.Sprintf(f.format, f.a...)
-}
-
-type stringer string
-
-func (s stringer) String() string { return string(s) }
diff --git a/vendor/google.golang.org/grpc/transport/control.go b/vendor/google.golang.org/grpc/transport/control.go
deleted file mode 100644
index 4ef0830b5..000000000
--- a/vendor/google.golang.org/grpc/transport/control.go
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-package transport
-
-import (
- "fmt"
- "sync"
-
- "golang.org/x/net/http2"
-)
-
-const (
- // The default value of flow control window size in HTTP2 spec.
- defaultWindowSize = 65535
- // The initial window size for flow control.
- initialWindowSize = defaultWindowSize // for an RPC
- initialConnWindowSize = defaultWindowSize * 16 // for a connection
-)
-
-// The following defines various control items which could flow through
-// the control buffer of transport. They represent different aspects of
-// control tasks, e.g., flow control, settings, streaming resetting, etc.
-type windowUpdate struct {
- streamID uint32
- increment uint32
-}
-
-func (*windowUpdate) item() {}
-
-type settings struct {
- ack bool
- ss []http2.Setting
-}
-
-func (*settings) item() {}
-
-type resetStream struct {
- streamID uint32
- code http2.ErrCode
-}
-
-func (*resetStream) item() {}
-
-type goAway struct {
-}
-
-func (*goAway) item() {}
-
-type flushIO struct {
-}
-
-func (*flushIO) item() {}
-
-type ping struct {
- ack bool
- data [8]byte
-}
-
-func (*ping) item() {}
-
-// quotaPool is a pool which accumulates the quota and sends it to acquire()
-// when it is available.
-type quotaPool struct {
- c chan int
-
- mu sync.Mutex
- quota int
-}
-
-// newQuotaPool creates a quotaPool which has quota q available to consume.
-func newQuotaPool(q int) *quotaPool {
- qb := &quotaPool{
- c: make(chan int, 1),
- }
- if q > 0 {
- qb.c <- q
- } else {
- qb.quota = q
- }
- return qb
-}
-
-// add adds n to the available quota and tries to send it on acquire.
-func (qb *quotaPool) add(n int) {
- qb.mu.Lock()
- defer qb.mu.Unlock()
- qb.quota += n
- if qb.quota <= 0 {
- return
- }
- select {
- case qb.c <- qb.quota:
- qb.quota = 0
- default:
- }
-}
-
-// cancel cancels the pending quota sent on acquire, if any.
-func (qb *quotaPool) cancel() {
- qb.mu.Lock()
- defer qb.mu.Unlock()
- select {
- case n := <-qb.c:
- qb.quota += n
- default:
- }
-}
-
-// reset cancels the pending quota sent on acquired, incremented by v and sends
-// it back on acquire.
-func (qb *quotaPool) reset(v int) {
- qb.mu.Lock()
- defer qb.mu.Unlock()
- select {
- case n := <-qb.c:
- qb.quota += n
- default:
- }
- qb.quota += v
- if qb.quota <= 0 {
- return
- }
- select {
- case qb.c <- qb.quota:
- qb.quota = 0
- default:
- }
-}
-
-// acquire returns the channel on which available quota amounts are sent.
-func (qb *quotaPool) acquire() <-chan int {
- return qb.c
-}
-
-// inFlow deals with inbound flow control
-type inFlow struct {
- // The inbound flow control limit for pending data.
- limit uint32
-
- mu sync.Mutex
- // pendingData is the overall data which have been received but not been
- // consumed by applications.
- pendingData uint32
- // The amount of data the application has consumed but grpc has not sent
- // window update for them. Used to reduce window update frequency.
- pendingUpdate uint32
-}
-
-// onData is invoked when some data frame is received. It updates pendingData.
-func (f *inFlow) onData(n uint32) error {
- f.mu.Lock()
- defer f.mu.Unlock()
- f.pendingData += n
- if f.pendingData+f.pendingUpdate > f.limit {
- return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit)
- }
- return nil
-}
-
-// onRead is invoked when the application reads the data. It returns the window size
-// to be sent to the peer.
-func (f *inFlow) onRead(n uint32) uint32 {
- f.mu.Lock()
- defer f.mu.Unlock()
- if f.pendingData == 0 {
- return 0
- }
- f.pendingData -= n
- f.pendingUpdate += n
- if f.pendingUpdate >= f.limit/4 {
- wu := f.pendingUpdate
- f.pendingUpdate = 0
- return wu
- }
- return 0
-}
-
-func (f *inFlow) resetPendingData() uint32 {
- f.mu.Lock()
- defer f.mu.Unlock()
- n := f.pendingData
- f.pendingData = 0
- return n
-}
diff --git a/vendor/google.golang.org/grpc/transport/go16.go b/vendor/google.golang.org/grpc/transport/go16.go
deleted file mode 100644
index ee1c46bad..000000000
--- a/vendor/google.golang.org/grpc/transport/go16.go
+++ /dev/null
@@ -1,46 +0,0 @@
-// +build go1.6,!go1.7
-
-/*
- * Copyright 2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-package transport
-
-import (
- "net"
-
- "golang.org/x/net/context"
-)
-
-// dialContext connects to the address on the named network.
-func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
- return (&net.Dialer{Cancel: ctx.Done()}).Dial(network, address)
-}
diff --git a/vendor/google.golang.org/grpc/transport/go17.go b/vendor/google.golang.org/grpc/transport/go17.go
deleted file mode 100644
index 356f13ff1..000000000
--- a/vendor/google.golang.org/grpc/transport/go17.go
+++ /dev/null
@@ -1,46 +0,0 @@
-// +build go1.7
-
-/*
- * Copyright 2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-package transport
-
-import (
- "net"
-
- "golang.org/x/net/context"
-)
-
-// dialContext connects to the address on the named network.
-func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
- return (&net.Dialer{}).DialContext(ctx, network, address)
-}
diff --git a/vendor/google.golang.org/grpc/transport/handler_server.go b/vendor/google.golang.org/grpc/transport/handler_server.go
deleted file mode 100644
index 114e34906..000000000
--- a/vendor/google.golang.org/grpc/transport/handler_server.go
+++ /dev/null
@@ -1,397 +0,0 @@
-/*
- * Copyright 2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-// This file is the implementation of a gRPC server using HTTP/2 which
-// uses the standard Go http2 Server implementation (via the
-// http.Handler interface), rather than speaking low-level HTTP/2
-// frames itself. It is the implementation of *grpc.Server.ServeHTTP.
-
-package transport
-
-import (
- "errors"
- "fmt"
- "io"
- "net"
- "net/http"
- "strings"
- "sync"
- "time"
-
- "golang.org/x/net/context"
- "golang.org/x/net/http2"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/peer"
-)
-
-// NewServerHandlerTransport returns a ServerTransport handling gRPC
-// from inside an http.Handler. It requires that the http Server
-// supports HTTP/2.
-func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTransport, error) {
- if r.ProtoMajor != 2 {
- return nil, errors.New("gRPC requires HTTP/2")
- }
- if r.Method != "POST" {
- return nil, errors.New("invalid gRPC request method")
- }
- if !validContentType(r.Header.Get("Content-Type")) {
- return nil, errors.New("invalid gRPC request content-type")
- }
- if _, ok := w.(http.Flusher); !ok {
- return nil, errors.New("gRPC requires a ResponseWriter supporting http.Flusher")
- }
- if _, ok := w.(http.CloseNotifier); !ok {
- return nil, errors.New("gRPC requires a ResponseWriter supporting http.CloseNotifier")
- }
-
- st := &serverHandlerTransport{
- rw: w,
- req: r,
- closedCh: make(chan struct{}),
- writes: make(chan func()),
- }
-
- if v := r.Header.Get("grpc-timeout"); v != "" {
- to, err := decodeTimeout(v)
- if err != nil {
- return nil, streamErrorf(codes.Internal, "malformed time-out: %v", err)
- }
- st.timeoutSet = true
- st.timeout = to
- }
-
- var metakv []string
- if r.Host != "" {
- metakv = append(metakv, ":authority", r.Host)
- }
- for k, vv := range r.Header {
- k = strings.ToLower(k)
- if isReservedHeader(k) && !isWhitelistedPseudoHeader(k) {
- continue
- }
- for _, v := range vv {
- if k == "user-agent" {
- // user-agent is special. Copying logic of http_util.go.
- if i := strings.LastIndex(v, " "); i == -1 {
- // There is no application user agent string being set
- continue
- } else {
- v = v[:i]
- }
- }
- metakv = append(metakv, k, v)
- }
- }
- st.headerMD = metadata.Pairs(metakv...)
-
- return st, nil
-}
-
-// serverHandlerTransport is an implementation of ServerTransport
-// which replies to exactly one gRPC request (exactly one HTTP request),
-// using the net/http.Handler interface. This http.Handler is guaranteed
-// at this point to be speaking over HTTP/2, so it's able to speak valid
-// gRPC.
-type serverHandlerTransport struct {
- rw http.ResponseWriter
- req *http.Request
- timeoutSet bool
- timeout time.Duration
- didCommonHeaders bool
-
- headerMD metadata.MD
-
- closeOnce sync.Once
- closedCh chan struct{} // closed on Close
-
- // writes is a channel of code to run serialized in the
- // ServeHTTP (HandleStreams) goroutine. The channel is closed
- // when WriteStatus is called.
- writes chan func()
-}
-
-func (ht *serverHandlerTransport) Close() error {
- ht.closeOnce.Do(ht.closeCloseChanOnce)
- return nil
-}
-
-func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) }
-
-func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) }
-
-// strAddr is a net.Addr backed by either a TCP "ip:port" string, or
-// the empty string if unknown.
-type strAddr string
-
-func (a strAddr) Network() string {
- if a != "" {
- // Per the documentation on net/http.Request.RemoteAddr, if this is
- // set, it's set to the IP:port of the peer (hence, TCP):
- // https://golang.org/pkg/net/http/#Request
- //
- // If we want to support Unix sockets later, we can
- // add our own grpc-specific convention within the
- // grpc codebase to set RemoteAddr to a different
- // format, or probably better: we can attach it to the
- // context and use that from serverHandlerTransport.RemoteAddr.
- return "tcp"
- }
- return ""
-}
-
-func (a strAddr) String() string { return string(a) }
-
-// do runs fn in the ServeHTTP goroutine.
-func (ht *serverHandlerTransport) do(fn func()) error {
- select {
- case ht.writes <- fn:
- return nil
- case <-ht.closedCh:
- return ErrConnClosing
- }
-}
-
-func (ht *serverHandlerTransport) WriteStatus(s *Stream, statusCode codes.Code, statusDesc string) error {
- err := ht.do(func() {
- ht.writeCommonHeaders(s)
-
- // And flush, in case no header or body has been sent yet.
- // This forces a separation of headers and trailers if this is the
- // first call (for example, in end2end tests's TestNoService).
- ht.rw.(http.Flusher).Flush()
-
- h := ht.rw.Header()
- h.Set("Grpc-Status", fmt.Sprintf("%d", statusCode))
- if statusDesc != "" {
- h.Set("Grpc-Message", encodeGrpcMessage(statusDesc))
- }
- if md := s.Trailer(); len(md) > 0 {
- for k, vv := range md {
- // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
- if isReservedHeader(k) {
- continue
- }
- for _, v := range vv {
- // http2 ResponseWriter mechanism to
- // send undeclared Trailers after the
- // headers have possibly been written.
- h.Add(http2.TrailerPrefix+k, v)
- }
- }
- }
- })
- close(ht.writes)
- return err
-}
-
-// writeCommonHeaders sets common headers on the first write
-// call (Write, WriteHeader, or WriteStatus).
-func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
- if ht.didCommonHeaders {
- return
- }
- ht.didCommonHeaders = true
-
- h := ht.rw.Header()
- h["Date"] = nil // suppress Date to make tests happy; TODO: restore
- h.Set("Content-Type", "application/grpc")
-
- // Predeclare trailers we'll set later in WriteStatus (after the body).
- // This is a SHOULD in the HTTP RFC, and the way you add (known)
- // Trailers per the net/http.ResponseWriter contract.
- // See https://golang.org/pkg/net/http/#ResponseWriter
- // and https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
- h.Add("Trailer", "Grpc-Status")
- h.Add("Trailer", "Grpc-Message")
-
- if s.sendCompress != "" {
- h.Set("Grpc-Encoding", s.sendCompress)
- }
-}
-
-func (ht *serverHandlerTransport) Write(s *Stream, data []byte, opts *Options) error {
- return ht.do(func() {
- ht.writeCommonHeaders(s)
- ht.rw.Write(data)
- if !opts.Delay {
- ht.rw.(http.Flusher).Flush()
- }
- })
-}
-
-func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
- return ht.do(func() {
- ht.writeCommonHeaders(s)
- h := ht.rw.Header()
- for k, vv := range md {
- // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
- if isReservedHeader(k) {
- continue
- }
- for _, v := range vv {
- h.Add(k, v)
- }
- }
- ht.rw.WriteHeader(200)
- ht.rw.(http.Flusher).Flush()
- })
-}
-
-func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
- // With this transport type there will be exactly 1 stream: this HTTP request.
-
- var ctx context.Context
- var cancel context.CancelFunc
- if ht.timeoutSet {
- ctx, cancel = context.WithTimeout(context.Background(), ht.timeout)
- } else {
- ctx, cancel = context.WithCancel(context.Background())
- }
-
- // requestOver is closed when either the request's context is done
- // or the status has been written via WriteStatus.
- requestOver := make(chan struct{})
-
- // clientGone receives a single value if peer is gone, either
- // because the underlying connection is dead or because the
- // peer sends an http2 RST_STREAM.
- clientGone := ht.rw.(http.CloseNotifier).CloseNotify()
- go func() {
- select {
- case <-requestOver:
- return
- case <-ht.closedCh:
- case <-clientGone:
- }
- cancel()
- }()
-
- req := ht.req
-
- s := &Stream{
- id: 0, // irrelevant
- windowHandler: func(int) {}, // nothing
- cancel: cancel,
- buf: newRecvBuffer(),
- st: ht,
- method: req.URL.Path,
- recvCompress: req.Header.Get("grpc-encoding"),
- }
- pr := &peer.Peer{
- Addr: ht.RemoteAddr(),
- }
- if req.TLS != nil {
- pr.AuthInfo = credentials.TLSInfo{State: *req.TLS}
- }
- ctx = metadata.NewContext(ctx, ht.headerMD)
- ctx = peer.NewContext(ctx, pr)
- s.ctx = newContextWithStream(ctx, s)
- s.dec = &recvBufferReader{ctx: s.ctx, recv: s.buf}
-
- // readerDone is closed when the Body.Read-ing goroutine exits.
- readerDone := make(chan struct{})
- go func() {
- defer close(readerDone)
-
- // TODO: minimize garbage, optimize recvBuffer code/ownership
- const readSize = 8196
- for buf := make([]byte, readSize); ; {
- n, err := req.Body.Read(buf)
- if n > 0 {
- s.buf.put(&recvMsg{data: buf[:n:n]})
- buf = buf[n:]
- }
- if err != nil {
- s.buf.put(&recvMsg{err: mapRecvMsgError(err)})
- return
- }
- if len(buf) == 0 {
- buf = make([]byte, readSize)
- }
- }
- }()
-
- // startStream is provided by the *grpc.Server's serveStreams.
- // It starts a goroutine serving s and exits immediately.
- // The goroutine that is started is the one that then calls
- // into ht, calling WriteHeader, Write, WriteStatus, Close, etc.
- startStream(s)
-
- ht.runStream()
- close(requestOver)
-
- // Wait for reading goroutine to finish.
- req.Body.Close()
- <-readerDone
-}
-
-func (ht *serverHandlerTransport) runStream() {
- for {
- select {
- case fn, ok := <-ht.writes:
- if !ok {
- return
- }
- fn()
- case <-ht.closedCh:
- return
- }
- }
-}
-
-func (ht *serverHandlerTransport) Drain() {
- panic("Drain() is not implemented")
-}
-
-// mapRecvMsgError returns the non-nil err into the appropriate
-// error value as expected by callers of *grpc.parser.recvMsg.
-// In particular, in can only be:
-// * io.EOF
-// * io.ErrUnexpectedEOF
-// * of type transport.ConnectionError
-// * of type transport.StreamError
-func mapRecvMsgError(err error) error {
- if err == io.EOF || err == io.ErrUnexpectedEOF {
- return err
- }
- if se, ok := err.(http2.StreamError); ok {
- if code, ok := http2ErrConvTab[se.Code]; ok {
- return StreamError{
- Code: code,
- Desc: se.Error(),
- }
- }
- }
- return connectionErrorf(true, err, err.Error())
-}
diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go
deleted file mode 100644
index 2b0f68016..000000000
--- a/vendor/google.golang.org/grpc/transport/http2_client.go
+++ /dev/null
@@ -1,1072 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-package transport
-
-import (
- "bytes"
- "fmt"
- "io"
- "math"
- "net"
- "strings"
- "sync"
- "time"
-
- "golang.org/x/net/context"
- "golang.org/x/net/http2"
- "golang.org/x/net/http2/hpack"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/peer"
-)
-
-// http2Client implements the ClientTransport interface with HTTP2.
-type http2Client struct {
- target string // server name/addr
- userAgent string
- md interface{}
- conn net.Conn // underlying communication channel
- authInfo credentials.AuthInfo // auth info about the connection
- nextID uint32 // the next stream ID to be used
-
- // writableChan synchronizes write access to the transport.
- // A writer acquires the write lock by sending a value on writableChan
- // and releases it by receiving from writableChan.
- writableChan chan int
- // shutdownChan is closed when Close is called.
- // Blocking operations should select on shutdownChan to avoid
- // blocking forever after Close.
- // TODO(zhaoq): Maybe have a channel context?
- shutdownChan chan struct{}
- // errorChan is closed to notify the I/O error to the caller.
- errorChan chan struct{}
- // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
- // that the server sent GoAway on this transport.
- goAway chan struct{}
-
- framer *framer
- hBuf *bytes.Buffer // the buffer for HPACK encoding
- hEnc *hpack.Encoder // HPACK encoder
-
- // controlBuf delivers all the control related tasks (e.g., window
- // updates, reset streams, and various settings) to the controller.
- controlBuf *recvBuffer
- fc *inFlow
- // sendQuotaPool provides flow control to outbound message.
- sendQuotaPool *quotaPool
- // streamsQuota limits the max number of concurrent streams.
- streamsQuota *quotaPool
-
- // The scheme used: https if TLS is on, http otherwise.
- scheme string
-
- creds []credentials.PerRPCCredentials
-
- mu sync.Mutex // guard the following variables
- state transportState // the state of underlying connection
- activeStreams map[uint32]*Stream
- // The max number of concurrent streams
- maxStreams int
- // the per-stream outbound flow control window size set by the peer.
- streamSendQuota uint32
- // goAwayID records the Last-Stream-ID in the GoAway frame from the server.
- goAwayID uint32
- // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
- prevGoAwayID uint32
-}
-
-func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
- if fn != nil {
- return fn(ctx, addr)
- }
- return dialContext(ctx, "tcp", addr)
-}
-
-func isTemporary(err error) bool {
- switch err {
- case io.EOF:
- // Connection closures may be resolved upon retry, and are thus
- // treated as temporary.
- return true
- case context.DeadlineExceeded:
- // In Go 1.7, context.DeadlineExceeded implements Timeout(), and this
- // special case is not needed. Until then, we need to keep this
- // clause.
- return true
- }
-
- switch err := err.(type) {
- case interface {
- Temporary() bool
- }:
- return err.Temporary()
- case interface {
- Timeout() bool
- }:
- // Timeouts may be resolved upon retry, and are thus treated as
- // temporary.
- return err.Timeout()
- }
- return false
-}
-
-// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
-// and starts to receive messages on it. Non-nil error returns if construction
-// fails.
-func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (_ ClientTransport, err error) {
- scheme := "http"
- conn, err := dial(ctx, opts.Dialer, addr.Addr)
- if err != nil {
- return nil, connectionErrorf(true, err, "transport: %v", err)
- }
- // Any further errors will close the underlying connection
- defer func(conn net.Conn) {
- if err != nil {
- conn.Close()
- }
- }(conn)
- var authInfo credentials.AuthInfo
- if creds := opts.TransportCredentials; creds != nil {
- scheme = "https"
- conn, authInfo, err = creds.ClientHandshake(ctx, addr.Addr, conn)
- if err != nil {
- // Credentials handshake errors are typically considered permanent
- // to avoid retrying on e.g. bad certificates.
- temp := isTemporary(err)
- return nil, connectionErrorf(temp, err, "transport: %v", err)
- }
- }
- ua := primaryUA
- if opts.UserAgent != "" {
- ua = opts.UserAgent + " " + ua
- }
- var buf bytes.Buffer
- t := &http2Client{
- target: addr.Addr,
- userAgent: ua,
- md: addr.Metadata,
- conn: conn,
- authInfo: authInfo,
- // The client initiated stream id is odd starting from 1.
- nextID: 1,
- writableChan: make(chan int, 1),
- shutdownChan: make(chan struct{}),
- errorChan: make(chan struct{}),
- goAway: make(chan struct{}),
- framer: newFramer(conn),
- hBuf: &buf,
- hEnc: hpack.NewEncoder(&buf),
- controlBuf: newRecvBuffer(),
- fc: &inFlow{limit: initialConnWindowSize},
- sendQuotaPool: newQuotaPool(defaultWindowSize),
- scheme: scheme,
- state: reachable,
- activeStreams: make(map[uint32]*Stream),
- creds: opts.PerRPCCredentials,
- maxStreams: math.MaxInt32,
- streamSendQuota: defaultWindowSize,
- }
- // Start the reader goroutine for incoming message. Each transport has
- // a dedicated goroutine which reads HTTP2 frame from network. Then it
- // dispatches the frame to the corresponding stream entity.
- go t.reader()
- // Send connection preface to server.
- n, err := t.conn.Write(clientPreface)
- if err != nil {
- t.Close()
- return nil, connectionErrorf(true, err, "transport: %v", err)
- }
- if n != len(clientPreface) {
- t.Close()
- return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
- }
- if initialWindowSize != defaultWindowSize {
- err = t.framer.writeSettings(true, http2.Setting{
- ID: http2.SettingInitialWindowSize,
- Val: uint32(initialWindowSize),
- })
- } else {
- err = t.framer.writeSettings(true)
- }
- if err != nil {
- t.Close()
- return nil, connectionErrorf(true, err, "transport: %v", err)
- }
- // Adjust the connection flow control window if needed.
- if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
- if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil {
- t.Close()
- return nil, connectionErrorf(true, err, "transport: %v", err)
- }
- }
- go t.controller()
- t.writableChan <- 0
- return t, nil
-}
-
-func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
- // TODO(zhaoq): Handle uint32 overflow of Stream.id.
- s := &Stream{
- id: t.nextID,
- done: make(chan struct{}),
- goAway: make(chan struct{}),
- method: callHdr.Method,
- sendCompress: callHdr.SendCompress,
- buf: newRecvBuffer(),
- fc: &inFlow{limit: initialWindowSize},
- sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
- headerChan: make(chan struct{}),
- }
- t.nextID += 2
- s.windowHandler = func(n int) {
- t.updateWindow(s, uint32(n))
- }
- // The client side stream context should have exactly the same life cycle with the user provided context.
- // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
- // So we use the original context here instead of creating a copy.
- s.ctx = ctx
- s.dec = &recvBufferReader{
- ctx: s.ctx,
- goAway: s.goAway,
- recv: s.buf,
- }
- return s
-}
-
-// NewStream creates a stream and register it into the transport as "active"
-// streams.
-func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
- pr := &peer.Peer{
- Addr: t.conn.RemoteAddr(),
- }
- // Attach Auth info if there is any.
- if t.authInfo != nil {
- pr.AuthInfo = t.authInfo
- }
- ctx = peer.NewContext(ctx, pr)
- authData := make(map[string]string)
- for _, c := range t.creds {
- // Construct URI required to get auth request metadata.
- var port string
- if pos := strings.LastIndex(t.target, ":"); pos != -1 {
- // Omit port if it is the default one.
- if t.target[pos+1:] != "443" {
- port = ":" + t.target[pos+1:]
- }
- }
- pos := strings.LastIndex(callHdr.Method, "/")
- if pos == -1 {
- return nil, streamErrorf(codes.InvalidArgument, "transport: malformed method name: %q", callHdr.Method)
- }
- audience := "https://" + callHdr.Host + port + callHdr.Method[:pos]
- data, err := c.GetRequestMetadata(ctx, audience)
- if err != nil {
- return nil, streamErrorf(codes.InvalidArgument, "transport: %v", err)
- }
- for k, v := range data {
- authData[k] = v
- }
- }
- t.mu.Lock()
- if t.activeStreams == nil {
- t.mu.Unlock()
- return nil, ErrConnClosing
- }
- if t.state == draining {
- t.mu.Unlock()
- return nil, ErrStreamDrain
- }
- if t.state != reachable {
- t.mu.Unlock()
- return nil, ErrConnClosing
- }
- checkStreamsQuota := t.streamsQuota != nil
- t.mu.Unlock()
- if checkStreamsQuota {
- sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
- if err != nil {
- return nil, err
- }
- // Returns the quota balance back.
- if sq > 1 {
- t.streamsQuota.add(sq - 1)
- }
- }
- if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
- // Return the quota back now because there is no stream returned to the caller.
- if _, ok := err.(StreamError); ok && checkStreamsQuota {
- t.streamsQuota.add(1)
- }
- return nil, err
- }
- t.mu.Lock()
- if t.state == draining {
- t.mu.Unlock()
- if checkStreamsQuota {
- t.streamsQuota.add(1)
- }
- // Need to make t writable again so that the rpc in flight can still proceed.
- t.writableChan <- 0
- return nil, ErrStreamDrain
- }
- if t.state != reachable {
- t.mu.Unlock()
- return nil, ErrConnClosing
- }
- s := t.newStream(ctx, callHdr)
- t.activeStreams[s.id] = s
-
- // This stream is not counted when applySetings(...) initialize t.streamsQuota.
- // Reset t.streamsQuota to the right value.
- var reset bool
- if !checkStreamsQuota && t.streamsQuota != nil {
- reset = true
- }
- t.mu.Unlock()
- if reset {
- t.streamsQuota.reset(-1)
- }
-
- // HPACK encodes various headers. Note that once WriteField(...) is
- // called, the corresponding headers/continuation frame has to be sent
- // because hpack.Encoder is stateful.
- t.hBuf.Reset()
- t.hEnc.WriteField(hpack.HeaderField{Name: ":method", Value: "POST"})
- t.hEnc.WriteField(hpack.HeaderField{Name: ":scheme", Value: t.scheme})
- t.hEnc.WriteField(hpack.HeaderField{Name: ":path", Value: callHdr.Method})
- t.hEnc.WriteField(hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
- t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
- t.hEnc.WriteField(hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
- t.hEnc.WriteField(hpack.HeaderField{Name: "te", Value: "trailers"})
-
- if callHdr.SendCompress != "" {
- t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
- }
- if dl, ok := ctx.Deadline(); ok {
- // Send out timeout regardless its value. The server can detect timeout context by itself.
- timeout := dl.Sub(time.Now())
- t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
- }
-
- for k, v := range authData {
- // Capital header names are illegal in HTTP/2.
- k = strings.ToLower(k)
- t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: v})
- }
- var (
- hasMD bool
- endHeaders bool
- )
- if md, ok := metadata.FromContext(ctx); ok {
- hasMD = true
- for k, v := range md {
- // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
- if isReservedHeader(k) {
- continue
- }
- for _, entry := range v {
- t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
- }
- }
- }
- if md, ok := t.md.(*metadata.MD); ok {
- for k, v := range *md {
- if isReservedHeader(k) {
- continue
- }
- for _, entry := range v {
- t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
- }
- }
- }
- first := true
- // Sends the headers in a single batch even when they span multiple frames.
- for !endHeaders {
- size := t.hBuf.Len()
- if size > http2MaxFrameLen {
- size = http2MaxFrameLen
- } else {
- endHeaders = true
- }
- var flush bool
- if endHeaders && (hasMD || callHdr.Flush) {
- flush = true
- }
- if first {
- // Sends a HeadersFrame to server to start a new stream.
- p := http2.HeadersFrameParam{
- StreamID: s.id,
- BlockFragment: t.hBuf.Next(size),
- EndStream: false,
- EndHeaders: endHeaders,
- }
- // Do a force flush for the buffered frames iff it is the last headers frame
- // and there is header metadata to be sent. Otherwise, there is flushing until
- // the corresponding data frame is written.
- err = t.framer.writeHeaders(flush, p)
- first = false
- } else {
- // Sends Continuation frames for the leftover headers.
- err = t.framer.writeContinuation(flush, s.id, endHeaders, t.hBuf.Next(size))
- }
- if err != nil {
- t.notifyError(err)
- return nil, connectionErrorf(true, err, "transport: %v", err)
- }
- }
- t.writableChan <- 0
- return s, nil
-}
-
-// CloseStream clears the footprint of a stream when the stream is not needed any more.
-// This must not be executed in reader's goroutine.
-func (t *http2Client) CloseStream(s *Stream, err error) {
- var updateStreams bool
- t.mu.Lock()
- if t.activeStreams == nil {
- t.mu.Unlock()
- return
- }
- if t.streamsQuota != nil {
- updateStreams = true
- }
- delete(t.activeStreams, s.id)
- if t.state == draining && len(t.activeStreams) == 0 {
- // The transport is draining and s is the last live stream on t.
- t.mu.Unlock()
- t.Close()
- return
- }
- t.mu.Unlock()
- if updateStreams {
- t.streamsQuota.add(1)
- }
- s.mu.Lock()
- if q := s.fc.resetPendingData(); q > 0 {
- if n := t.fc.onRead(q); n > 0 {
- t.controlBuf.put(&windowUpdate{0, n})
- }
- }
- if s.state == streamDone {
- s.mu.Unlock()
- return
- }
- if !s.headerDone {
- close(s.headerChan)
- s.headerDone = true
- }
- s.state = streamDone
- s.mu.Unlock()
- if se, ok := err.(StreamError); ok && se.Code != codes.DeadlineExceeded {
- t.controlBuf.put(&resetStream{s.id, http2.ErrCodeCancel})
- }
-}
-
-// Close kicks off the shutdown process of the transport. This should be called
-// only once on a transport. Once it is called, the transport should not be
-// accessed any more.
-func (t *http2Client) Close() (err error) {
- t.mu.Lock()
- if t.state == closing {
- t.mu.Unlock()
- return
- }
- if t.state == reachable || t.state == draining {
- close(t.errorChan)
- }
- t.state = closing
- t.mu.Unlock()
- close(t.shutdownChan)
- err = t.conn.Close()
- t.mu.Lock()
- streams := t.activeStreams
- t.activeStreams = nil
- t.mu.Unlock()
- // Notify all active streams.
- for _, s := range streams {
- s.mu.Lock()
- if !s.headerDone {
- close(s.headerChan)
- s.headerDone = true
- }
- s.mu.Unlock()
- s.write(recvMsg{err: ErrConnClosing})
- }
- return
-}
-
-func (t *http2Client) GracefulClose() error {
- t.mu.Lock()
- switch t.state {
- case unreachable:
- // The server may close the connection concurrently. t is not available for
- // any streams. Close it now.
- t.mu.Unlock()
- t.Close()
- return nil
- case closing:
- t.mu.Unlock()
- return nil
- }
- // Notify the streams which were initiated after the server sent GOAWAY.
- select {
- case <-t.goAway:
- n := t.prevGoAwayID
- if n == 0 && t.nextID > 1 {
- n = t.nextID - 2
- }
- m := t.goAwayID + 2
- if m == 2 {
- m = 1
- }
- for i := m; i <= n; i += 2 {
- if s, ok := t.activeStreams[i]; ok {
- close(s.goAway)
- }
- }
- default:
- }
- if t.state == draining {
- t.mu.Unlock()
- return nil
- }
- t.state = draining
- active := len(t.activeStreams)
- t.mu.Unlock()
- if active == 0 {
- return t.Close()
- }
- return nil
-}
-
-// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
-// should proceed only if Write returns nil.
-// TODO(zhaoq): opts.Delay is ignored in this implementation. Support it later
-// if it improves the performance.
-func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
- r := bytes.NewBuffer(data)
- for {
- var p []byte
- if r.Len() > 0 {
- size := http2MaxFrameLen
- s.sendQuotaPool.add(0)
- // Wait until the stream has some quota to send the data.
- sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire())
- if err != nil {
- return err
- }
- t.sendQuotaPool.add(0)
- // Wait until the transport has some quota to send the data.
- tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire())
- if err != nil {
- if _, ok := err.(StreamError); ok || err == io.EOF {
- t.sendQuotaPool.cancel()
- }
- return err
- }
- if sq < size {
- size = sq
- }
- if tq < size {
- size = tq
- }
- p = r.Next(size)
- ps := len(p)
- if ps < sq {
- // Overbooked stream quota. Return it back.
- s.sendQuotaPool.add(sq - ps)
- }
- if ps < tq {
- // Overbooked transport quota. Return it back.
- t.sendQuotaPool.add(tq - ps)
- }
- }
- var (
- endStream bool
- forceFlush bool
- )
- if opts.Last && r.Len() == 0 {
- endStream = true
- }
- // Indicate there is a writer who is about to write a data frame.
- t.framer.adjustNumWriters(1)
- // Got some quota. Try to acquire writing privilege on the transport.
- if _, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.writableChan); err != nil {
- if _, ok := err.(StreamError); ok || err == io.EOF {
- // Return the connection quota back.
- t.sendQuotaPool.add(len(p))
- }
- if t.framer.adjustNumWriters(-1) == 0 {
- // This writer is the last one in this batch and has the
- // responsibility to flush the buffered frames. It queues
- // a flush request to controlBuf instead of flushing directly
- // in order to avoid the race with other writing or flushing.
- t.controlBuf.put(&flushIO{})
- }
- return err
- }
- select {
- case <-s.ctx.Done():
- t.sendQuotaPool.add(len(p))
- if t.framer.adjustNumWriters(-1) == 0 {
- t.controlBuf.put(&flushIO{})
- }
- t.writableChan <- 0
- return ContextErr(s.ctx.Err())
- default:
- }
- if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 {
- // Do a force flush iff this is last frame for the entire gRPC message
- // and the caller is the only writer at this moment.
- forceFlush = true
- }
- // If WriteData fails, all the pending streams will be handled
- // by http2Client.Close(). No explicit CloseStream() needs to be
- // invoked.
- if err := t.framer.writeData(forceFlush, s.id, endStream, p); err != nil {
- t.notifyError(err)
- return connectionErrorf(true, err, "transport: %v", err)
- }
- if t.framer.adjustNumWriters(-1) == 0 {
- t.framer.flushWrite()
- }
- t.writableChan <- 0
- if r.Len() == 0 {
- break
- }
- }
- if !opts.Last {
- return nil
- }
- s.mu.Lock()
- if s.state != streamDone {
- s.state = streamWriteDone
- }
- s.mu.Unlock()
- return nil
-}
-
-func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
- t.mu.Lock()
- defer t.mu.Unlock()
- s, ok := t.activeStreams[f.Header().StreamID]
- return s, ok
-}
-
-// updateWindow adjusts the inbound quota for the stream and the transport.
-// Window updates will deliver to the controller for sending when
-// the cumulative quota exceeds the corresponding threshold.
-func (t *http2Client) updateWindow(s *Stream, n uint32) {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.state == streamDone {
- return
- }
- if w := t.fc.onRead(n); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
- if w := s.fc.onRead(n); w > 0 {
- t.controlBuf.put(&windowUpdate{s.id, w})
- }
-}
-
-func (t *http2Client) handleData(f *http2.DataFrame) {
- size := len(f.Data())
- if err := t.fc.onData(uint32(size)); err != nil {
- t.notifyError(connectionErrorf(true, err, "%v", err))
- return
- }
- // Select the right stream to dispatch.
- s, ok := t.getStream(f)
- if !ok {
- if w := t.fc.onRead(uint32(size)); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
- return
- }
- if size > 0 {
- s.mu.Lock()
- if s.state == streamDone {
- s.mu.Unlock()
- // The stream has been closed. Release the corresponding quota.
- if w := t.fc.onRead(uint32(size)); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
- return
- }
- if err := s.fc.onData(uint32(size)); err != nil {
- s.state = streamDone
- s.statusCode = codes.Internal
- s.statusDesc = err.Error()
- close(s.done)
- s.mu.Unlock()
- s.write(recvMsg{err: io.EOF})
- t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
- return
- }
- s.mu.Unlock()
- // TODO(bradfitz, zhaoq): A copy is required here because there is no
- // guarantee f.Data() is consumed before the arrival of next frame.
- // Can this copy be eliminated?
- data := make([]byte, size)
- copy(data, f.Data())
- s.write(recvMsg{data: data})
- }
- // The server has closed the stream without sending trailers. Record that
- // the read direction is closed, and set the status appropriately.
- if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
- s.mu.Lock()
- if s.state == streamDone {
- s.mu.Unlock()
- return
- }
- s.state = streamDone
- s.statusCode = codes.Internal
- s.statusDesc = "server closed the stream without sending trailers"
- close(s.done)
- s.mu.Unlock()
- s.write(recvMsg{err: io.EOF})
- }
-}
-
-func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
- s, ok := t.getStream(f)
- if !ok {
- return
- }
- s.mu.Lock()
- if s.state == streamDone {
- s.mu.Unlock()
- return
- }
- s.state = streamDone
- if !s.headerDone {
- close(s.headerChan)
- s.headerDone = true
- }
- s.statusCode, ok = http2ErrConvTab[http2.ErrCode(f.ErrCode)]
- if !ok {
- grpclog.Println("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error ", f.ErrCode)
- s.statusCode = codes.Unknown
- }
- s.statusDesc = fmt.Sprintf("stream terminated by RST_STREAM with error code: %d", f.ErrCode)
- close(s.done)
- s.mu.Unlock()
- s.write(recvMsg{err: io.EOF})
-}
-
-func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
- if f.IsAck() {
- return
- }
- var ss []http2.Setting
- f.ForeachSetting(func(s http2.Setting) error {
- ss = append(ss, s)
- return nil
- })
- // The settings will be applied once the ack is sent.
- t.controlBuf.put(&settings{ack: true, ss: ss})
-}
-
-func (t *http2Client) handlePing(f *http2.PingFrame) {
- if f.IsAck() { // Do nothing.
- return
- }
- pingAck := &ping{ack: true}
- copy(pingAck.data[:], f.Data[:])
- t.controlBuf.put(pingAck)
-}
-
-func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
- t.mu.Lock()
- if t.state == reachable || t.state == draining {
- if f.LastStreamID > 0 && f.LastStreamID%2 != 1 {
- t.mu.Unlock()
- t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID))
- return
- }
- select {
- case <-t.goAway:
- id := t.goAwayID
- // t.goAway has been closed (i.e.,multiple GoAways).
- if id < f.LastStreamID {
- t.mu.Unlock()
- t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID))
- return
- }
- t.prevGoAwayID = id
- t.goAwayID = f.LastStreamID
- t.mu.Unlock()
- return
- default:
- }
- t.goAwayID = f.LastStreamID
- close(t.goAway)
- }
- t.mu.Unlock()
-}
-
-func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
- id := f.Header().StreamID
- incr := f.Increment
- if id == 0 {
- t.sendQuotaPool.add(int(incr))
- return
- }
- if s, ok := t.getStream(f); ok {
- s.sendQuotaPool.add(int(incr))
- }
-}
-
-// operateHeaders takes action on the decoded headers.
-func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
- s, ok := t.getStream(frame)
- if !ok {
- return
- }
- var state decodeState
- for _, hf := range frame.Fields {
- state.processHeaderField(hf)
- }
- if state.err != nil {
- s.mu.Lock()
- if !s.headerDone {
- close(s.headerChan)
- s.headerDone = true
- }
- s.mu.Unlock()
- s.write(recvMsg{err: state.err})
- // Something wrong. Stops reading even when there is remaining.
- return
- }
-
- endStream := frame.StreamEnded()
-
- s.mu.Lock()
- if !endStream {
- s.recvCompress = state.encoding
- }
- if !s.headerDone {
- if !endStream && len(state.mdata) > 0 {
- s.header = state.mdata
- }
- close(s.headerChan)
- s.headerDone = true
- }
- if !endStream || s.state == streamDone {
- s.mu.Unlock()
- return
- }
-
- if len(state.mdata) > 0 {
- s.trailer = state.mdata
- }
- s.statusCode = state.statusCode
- s.statusDesc = state.statusDesc
- close(s.done)
- s.state = streamDone
- s.mu.Unlock()
- s.write(recvMsg{err: io.EOF})
-}
-
-func handleMalformedHTTP2(s *Stream, err error) {
- s.mu.Lock()
- if !s.headerDone {
- close(s.headerChan)
- s.headerDone = true
- }
- s.mu.Unlock()
- s.write(recvMsg{err: err})
-}
-
-// reader runs as a separate goroutine in charge of reading data from network
-// connection.
-//
-// TODO(zhaoq): currently one reader per transport. Investigate whether this is
-// optimal.
-// TODO(zhaoq): Check the validity of the incoming frame sequence.
-func (t *http2Client) reader() {
- // Check the validity of server preface.
- frame, err := t.framer.readFrame()
- if err != nil {
- t.notifyError(err)
- return
- }
- sf, ok := frame.(*http2.SettingsFrame)
- if !ok {
- t.notifyError(err)
- return
- }
- t.handleSettings(sf)
-
- // loop to keep reading incoming messages on this transport.
- for {
- frame, err := t.framer.readFrame()
- if err != nil {
- // Abort an active stream if the http2.Framer returns a
- // http2.StreamError. This can happen only if the server's response
- // is malformed http2.
- if se, ok := err.(http2.StreamError); ok {
- t.mu.Lock()
- s := t.activeStreams[se.StreamID]
- t.mu.Unlock()
- if s != nil {
- // use error detail to provide better err message
- handleMalformedHTTP2(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.errorDetail()))
- }
- continue
- } else {
- // Transport error.
- t.notifyError(err)
- return
- }
- }
- switch frame := frame.(type) {
- case *http2.MetaHeadersFrame:
- t.operateHeaders(frame)
- case *http2.DataFrame:
- t.handleData(frame)
- case *http2.RSTStreamFrame:
- t.handleRSTStream(frame)
- case *http2.SettingsFrame:
- t.handleSettings(frame)
- case *http2.PingFrame:
- t.handlePing(frame)
- case *http2.GoAwayFrame:
- t.handleGoAway(frame)
- case *http2.WindowUpdateFrame:
- t.handleWindowUpdate(frame)
- default:
- grpclog.Printf("transport: http2Client.reader got unhandled frame type %v.", frame)
- }
- }
-}
-
-func (t *http2Client) applySettings(ss []http2.Setting) {
- for _, s := range ss {
- switch s.ID {
- case http2.SettingMaxConcurrentStreams:
- // TODO(zhaoq): This is a hack to avoid significant refactoring of the
- // code to deal with the unrealistic int32 overflow. Probably will try
- // to find a better way to handle this later.
- if s.Val > math.MaxInt32 {
- s.Val = math.MaxInt32
- }
- t.mu.Lock()
- reset := t.streamsQuota != nil
- if !reset {
- t.streamsQuota = newQuotaPool(int(s.Val) - len(t.activeStreams))
- }
- ms := t.maxStreams
- t.maxStreams = int(s.Val)
- t.mu.Unlock()
- if reset {
- t.streamsQuota.reset(int(s.Val) - ms)
- }
- case http2.SettingInitialWindowSize:
- t.mu.Lock()
- for _, stream := range t.activeStreams {
- // Adjust the sending quota for each stream.
- stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
- }
- t.streamSendQuota = s.Val
- t.mu.Unlock()
- }
- }
-}
-
-// controller running in a separate goroutine takes charge of sending control
-// frames (e.g., window update, reset stream, setting, etc.) to the server.
-func (t *http2Client) controller() {
- for {
- select {
- case i := <-t.controlBuf.get():
- t.controlBuf.load()
- select {
- case <-t.writableChan:
- switch i := i.(type) {
- case *windowUpdate:
- t.framer.writeWindowUpdate(true, i.streamID, i.increment)
- case *settings:
- if i.ack {
- t.framer.writeSettingsAck(true)
- t.applySettings(i.ss)
- } else {
- t.framer.writeSettings(true, i.ss...)
- }
- case *resetStream:
- t.framer.writeRSTStream(true, i.streamID, i.code)
- case *flushIO:
- t.framer.flushWrite()
- case *ping:
- t.framer.writePing(true, i.ack, i.data)
- default:
- grpclog.Printf("transport: http2Client.controller got unexpected item type %v\n", i)
- }
- t.writableChan <- 0
- continue
- case <-t.shutdownChan:
- return
- }
- case <-t.shutdownChan:
- return
- }
- }
-}
-
-func (t *http2Client) Error() <-chan struct{} {
- return t.errorChan
-}
-
-func (t *http2Client) GoAway() <-chan struct{} {
- return t.goAway
-}
-
-func (t *http2Client) notifyError(err error) {
- t.mu.Lock()
- // make sure t.errorChan is closed only once.
- if t.state == draining {
- t.mu.Unlock()
- t.Close()
- return
- }
- if t.state == reachable {
- t.state = unreachable
- close(t.errorChan)
- grpclog.Printf("transport: http2Client.notifyError got notified that the client transport was broken %v.", err)
- }
- t.mu.Unlock()
-}
diff --git a/vendor/google.golang.org/grpc/transport/http2_server.go b/vendor/google.golang.org/grpc/transport/http2_server.go
deleted file mode 100644
index a62fb7c22..000000000
--- a/vendor/google.golang.org/grpc/transport/http2_server.go
+++ /dev/null
@@ -1,775 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-package transport
-
-import (
- "bytes"
- "errors"
- "io"
- "math"
- "net"
- "strconv"
- "sync"
-
- "golang.org/x/net/context"
- "golang.org/x/net/http2"
- "golang.org/x/net/http2/hpack"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/peer"
-)
-
-// ErrIllegalHeaderWrite indicates that setting header is illegal because of
-// the stream's state.
-var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
-
-// http2Server implements the ServerTransport interface with HTTP2.
-type http2Server struct {
- conn net.Conn
- maxStreamID uint32 // max stream ID ever seen
- authInfo credentials.AuthInfo // auth info about the connection
- // writableChan synchronizes write access to the transport.
- // A writer acquires the write lock by receiving a value on writableChan
- // and releases it by sending on writableChan.
- writableChan chan int
- // shutdownChan is closed when Close is called.
- // Blocking operations should select on shutdownChan to avoid
- // blocking forever after Close.
- shutdownChan chan struct{}
- framer *framer
- hBuf *bytes.Buffer // the buffer for HPACK encoding
- hEnc *hpack.Encoder // HPACK encoder
-
- // The max number of concurrent streams.
- maxStreams uint32
- // controlBuf delivers all the control related tasks (e.g., window
- // updates, reset streams, and various settings) to the controller.
- controlBuf *recvBuffer
- fc *inFlow
- // sendQuotaPool provides flow control to outbound message.
- sendQuotaPool *quotaPool
-
- mu sync.Mutex // guard the following
- state transportState
- activeStreams map[uint32]*Stream
- // the per-stream outbound flow control window size set by the peer.
- streamSendQuota uint32
-}
-
-// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
-// returned if something goes wrong.
-func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (_ ServerTransport, err error) {
- framer := newFramer(conn)
- // Send initial settings as connection preface to client.
- var settings []http2.Setting
- // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
- // permitted in the HTTP2 spec.
- if maxStreams == 0 {
- maxStreams = math.MaxUint32
- } else {
- settings = append(settings, http2.Setting{
- ID: http2.SettingMaxConcurrentStreams,
- Val: maxStreams,
- })
- }
- if initialWindowSize != defaultWindowSize {
- settings = append(settings, http2.Setting{
- ID: http2.SettingInitialWindowSize,
- Val: uint32(initialWindowSize)})
- }
- if err := framer.writeSettings(true, settings...); err != nil {
- return nil, connectionErrorf(true, err, "transport: %v", err)
- }
- // Adjust the connection flow control window if needed.
- if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
- if err := framer.writeWindowUpdate(true, 0, delta); err != nil {
- return nil, connectionErrorf(true, err, "transport: %v", err)
- }
- }
- var buf bytes.Buffer
- t := &http2Server{
- conn: conn,
- authInfo: authInfo,
- framer: framer,
- hBuf: &buf,
- hEnc: hpack.NewEncoder(&buf),
- maxStreams: maxStreams,
- controlBuf: newRecvBuffer(),
- fc: &inFlow{limit: initialConnWindowSize},
- sendQuotaPool: newQuotaPool(defaultWindowSize),
- state: reachable,
- writableChan: make(chan int, 1),
- shutdownChan: make(chan struct{}),
- activeStreams: make(map[uint32]*Stream),
- streamSendQuota: defaultWindowSize,
- }
- go t.controller()
- t.writableChan <- 0
- return t, nil
-}
-
-// operateHeader takes action on the decoded headers.
-func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) (close bool) {
- buf := newRecvBuffer()
- s := &Stream{
- id: frame.Header().StreamID,
- st: t,
- buf: buf,
- fc: &inFlow{limit: initialWindowSize},
- }
-
- var state decodeState
- for _, hf := range frame.Fields {
- state.processHeaderField(hf)
- }
- if err := state.err; err != nil {
- if se, ok := err.(StreamError); ok {
- t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]})
- }
- return
- }
-
- if frame.StreamEnded() {
- // s is just created by the caller. No lock needed.
- s.state = streamReadDone
- }
- s.recvCompress = state.encoding
- if state.timeoutSet {
- s.ctx, s.cancel = context.WithTimeout(context.TODO(), state.timeout)
- } else {
- s.ctx, s.cancel = context.WithCancel(context.TODO())
- }
- pr := &peer.Peer{
- Addr: t.conn.RemoteAddr(),
- }
- // Attach Auth info if there is any.
- if t.authInfo != nil {
- pr.AuthInfo = t.authInfo
- }
- s.ctx = peer.NewContext(s.ctx, pr)
- // Cache the current stream to the context so that the server application
- // can find out. Required when the server wants to send some metadata
- // back to the client (unary call only).
- s.ctx = newContextWithStream(s.ctx, s)
- // Attach the received metadata to the context.
- if len(state.mdata) > 0 {
- s.ctx = metadata.NewContext(s.ctx, state.mdata)
- }
-
- s.dec = &recvBufferReader{
- ctx: s.ctx,
- recv: s.buf,
- }
- s.recvCompress = state.encoding
- s.method = state.method
- t.mu.Lock()
- if t.state != reachable {
- t.mu.Unlock()
- return
- }
- if uint32(len(t.activeStreams)) >= t.maxStreams {
- t.mu.Unlock()
- t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
- return
- }
- if s.id%2 != 1 || s.id <= t.maxStreamID {
- t.mu.Unlock()
- // illegal gRPC stream id.
- grpclog.Println("transport: http2Server.HandleStreams received an illegal stream id: ", s.id)
- return true
- }
- t.maxStreamID = s.id
- s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
- t.activeStreams[s.id] = s
- t.mu.Unlock()
- s.windowHandler = func(n int) {
- t.updateWindow(s, uint32(n))
- }
- handle(s)
- return
-}
-
-// HandleStreams receives incoming streams using the given handler. This is
-// typically run in a separate goroutine.
-func (t *http2Server) HandleStreams(handle func(*Stream)) {
- // Check the validity of client preface.
- preface := make([]byte, len(clientPreface))
- if _, err := io.ReadFull(t.conn, preface); err != nil {
- grpclog.Printf("transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
- t.Close()
- return
- }
- if !bytes.Equal(preface, clientPreface) {
- grpclog.Printf("transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
- t.Close()
- return
- }
-
- frame, err := t.framer.readFrame()
- if err == io.EOF || err == io.ErrUnexpectedEOF {
- t.Close()
- return
- }
- if err != nil {
- grpclog.Printf("transport: http2Server.HandleStreams failed to read frame: %v", err)
- t.Close()
- return
- }
- sf, ok := frame.(*http2.SettingsFrame)
- if !ok {
- grpclog.Printf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
- t.Close()
- return
- }
- t.handleSettings(sf)
-
- for {
- frame, err := t.framer.readFrame()
- if err != nil {
- if se, ok := err.(http2.StreamError); ok {
- t.mu.Lock()
- s := t.activeStreams[se.StreamID]
- t.mu.Unlock()
- if s != nil {
- t.closeStream(s)
- }
- t.controlBuf.put(&resetStream{se.StreamID, se.Code})
- continue
- }
- if err == io.EOF || err == io.ErrUnexpectedEOF {
- t.Close()
- return
- }
- grpclog.Printf("transport: http2Server.HandleStreams failed to read frame: %v", err)
- t.Close()
- return
- }
- switch frame := frame.(type) {
- case *http2.MetaHeadersFrame:
- if t.operateHeaders(frame, handle) {
- t.Close()
- break
- }
- case *http2.DataFrame:
- t.handleData(frame)
- case *http2.RSTStreamFrame:
- t.handleRSTStream(frame)
- case *http2.SettingsFrame:
- t.handleSettings(frame)
- case *http2.PingFrame:
- t.handlePing(frame)
- case *http2.WindowUpdateFrame:
- t.handleWindowUpdate(frame)
- case *http2.GoAwayFrame:
- // TODO: Handle GoAway from the client appropriately.
- default:
- grpclog.Printf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
- }
- }
-}
-
-func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
- t.mu.Lock()
- defer t.mu.Unlock()
- if t.activeStreams == nil {
- // The transport is closing.
- return nil, false
- }
- s, ok := t.activeStreams[f.Header().StreamID]
- if !ok {
- // The stream is already done.
- return nil, false
- }
- return s, true
-}
-
-// updateWindow adjusts the inbound quota for the stream and the transport.
-// Window updates will deliver to the controller for sending when
-// the cumulative quota exceeds the corresponding threshold.
-func (t *http2Server) updateWindow(s *Stream, n uint32) {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.state == streamDone {
- return
- }
- if w := t.fc.onRead(n); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
- if w := s.fc.onRead(n); w > 0 {
- t.controlBuf.put(&windowUpdate{s.id, w})
- }
-}
-
-func (t *http2Server) handleData(f *http2.DataFrame) {
- size := len(f.Data())
- if err := t.fc.onData(uint32(size)); err != nil {
- grpclog.Printf("transport: http2Server %v", err)
- t.Close()
- return
- }
- // Select the right stream to dispatch.
- s, ok := t.getStream(f)
- if !ok {
- if w := t.fc.onRead(uint32(size)); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
- return
- }
- if size > 0 {
- s.mu.Lock()
- if s.state == streamDone {
- s.mu.Unlock()
- // The stream has been closed. Release the corresponding quota.
- if w := t.fc.onRead(uint32(size)); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
- return
- }
- if err := s.fc.onData(uint32(size)); err != nil {
- s.mu.Unlock()
- t.closeStream(s)
- t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
- return
- }
- s.mu.Unlock()
- // TODO(bradfitz, zhaoq): A copy is required here because there is no
- // guarantee f.Data() is consumed before the arrival of next frame.
- // Can this copy be eliminated?
- data := make([]byte, size)
- copy(data, f.Data())
- s.write(recvMsg{data: data})
- }
- if f.Header().Flags.Has(http2.FlagDataEndStream) {
- // Received the end of stream from the client.
- s.mu.Lock()
- if s.state != streamDone {
- s.state = streamReadDone
- }
- s.mu.Unlock()
- s.write(recvMsg{err: io.EOF})
- }
-}
-
-func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
- s, ok := t.getStream(f)
- if !ok {
- return
- }
- t.closeStream(s)
-}
-
-func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
- if f.IsAck() {
- return
- }
- var ss []http2.Setting
- f.ForeachSetting(func(s http2.Setting) error {
- ss = append(ss, s)
- return nil
- })
- // The settings will be applied once the ack is sent.
- t.controlBuf.put(&settings{ack: true, ss: ss})
-}
-
-func (t *http2Server) handlePing(f *http2.PingFrame) {
- if f.IsAck() { // Do nothing.
- return
- }
- pingAck := &ping{ack: true}
- copy(pingAck.data[:], f.Data[:])
- t.controlBuf.put(pingAck)
-}
-
-func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
- id := f.Header().StreamID
- incr := f.Increment
- if id == 0 {
- t.sendQuotaPool.add(int(incr))
- return
- }
- if s, ok := t.getStream(f); ok {
- s.sendQuotaPool.add(int(incr))
- }
-}
-
-func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) error {
- first := true
- endHeaders := false
- var err error
- // Sends the headers in a single batch.
- for !endHeaders {
- size := t.hBuf.Len()
- if size > http2MaxFrameLen {
- size = http2MaxFrameLen
- } else {
- endHeaders = true
- }
- if first {
- p := http2.HeadersFrameParam{
- StreamID: s.id,
- BlockFragment: b.Next(size),
- EndStream: endStream,
- EndHeaders: endHeaders,
- }
- err = t.framer.writeHeaders(endHeaders, p)
- first = false
- } else {
- err = t.framer.writeContinuation(endHeaders, s.id, endHeaders, b.Next(size))
- }
- if err != nil {
- t.Close()
- return connectionErrorf(true, err, "transport: %v", err)
- }
- }
- return nil
-}
-
-// WriteHeader sends the header metedata md back to the client.
-func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
- s.mu.Lock()
- if s.headerOk || s.state == streamDone {
- s.mu.Unlock()
- return ErrIllegalHeaderWrite
- }
- s.headerOk = true
- if md.Len() > 0 {
- if s.header.Len() > 0 {
- s.header = metadata.Join(s.header, md)
- } else {
- s.header = md
- }
- }
- md = s.header
- s.mu.Unlock()
- if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
- return err
- }
- t.hBuf.Reset()
- t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
- t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
- if s.sendCompress != "" {
- t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
- }
- for k, v := range md {
- if isReservedHeader(k) {
- // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
- continue
- }
- for _, entry := range v {
- t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
- }
- }
- if err := t.writeHeaders(s, t.hBuf, false); err != nil {
- return err
- }
- t.writableChan <- 0
- return nil
-}
-
-// WriteStatus sends stream status to the client and terminates the stream.
-// There is no further I/O operations being able to perform on this stream.
-// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
-// OK is adopted.
-func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc string) error {
- var headersSent, hasHeader bool
- s.mu.Lock()
- if s.state == streamDone {
- s.mu.Unlock()
- return nil
- }
- if s.headerOk {
- headersSent = true
- }
- if s.header.Len() > 0 {
- hasHeader = true
- }
- s.mu.Unlock()
-
- if !headersSent && hasHeader {
- t.WriteHeader(s, nil)
- headersSent = true
- }
-
- if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
- return err
- }
- t.hBuf.Reset()
- if !headersSent {
- t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
- t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
- }
- t.hEnc.WriteField(
- hpack.HeaderField{
- Name: "grpc-status",
- Value: strconv.Itoa(int(statusCode)),
- })
- t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(statusDesc)})
- // Attach the trailer metadata.
- for k, v := range s.trailer {
- // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
- if isReservedHeader(k) {
- continue
- }
- for _, entry := range v {
- t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
- }
- }
- if err := t.writeHeaders(s, t.hBuf, true); err != nil {
- t.Close()
- return err
- }
- t.closeStream(s)
- t.writableChan <- 0
- return nil
-}
-
-// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
-// is returns if it fails (e.g., framing error, transport error).
-func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
- // TODO(zhaoq): Support multi-writers for a single stream.
- var writeHeaderFrame bool
- s.mu.Lock()
- if s.state == streamDone {
- s.mu.Unlock()
- return streamErrorf(codes.Unknown, "the stream has been done")
- }
- if !s.headerOk {
- writeHeaderFrame = true
- }
- s.mu.Unlock()
- if writeHeaderFrame {
- t.WriteHeader(s, nil)
- }
- r := bytes.NewBuffer(data)
- for {
- if r.Len() == 0 {
- return nil
- }
- size := http2MaxFrameLen
- s.sendQuotaPool.add(0)
- // Wait until the stream has some quota to send the data.
- sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire())
- if err != nil {
- return err
- }
- t.sendQuotaPool.add(0)
- // Wait until the transport has some quota to send the data.
- tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire())
- if err != nil {
- if _, ok := err.(StreamError); ok {
- t.sendQuotaPool.cancel()
- }
- return err
- }
- if sq < size {
- size = sq
- }
- if tq < size {
- size = tq
- }
- p := r.Next(size)
- ps := len(p)
- if ps < sq {
- // Overbooked stream quota. Return it back.
- s.sendQuotaPool.add(sq - ps)
- }
- if ps < tq {
- // Overbooked transport quota. Return it back.
- t.sendQuotaPool.add(tq - ps)
- }
- t.framer.adjustNumWriters(1)
- // Got some quota. Try to acquire writing privilege on the
- // transport.
- if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
- if _, ok := err.(StreamError); ok {
- // Return the connection quota back.
- t.sendQuotaPool.add(ps)
- }
- if t.framer.adjustNumWriters(-1) == 0 {
- // This writer is the last one in this batch and has the
- // responsibility to flush the buffered frames. It queues
- // a flush request to controlBuf instead of flushing directly
- // in order to avoid the race with other writing or flushing.
- t.controlBuf.put(&flushIO{})
- }
- return err
- }
- select {
- case <-s.ctx.Done():
- t.sendQuotaPool.add(ps)
- if t.framer.adjustNumWriters(-1) == 0 {
- t.controlBuf.put(&flushIO{})
- }
- t.writableChan <- 0
- return ContextErr(s.ctx.Err())
- default:
- }
- var forceFlush bool
- if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && !opts.Last {
- forceFlush = true
- }
- if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil {
- t.Close()
- return connectionErrorf(true, err, "transport: %v", err)
- }
- if t.framer.adjustNumWriters(-1) == 0 {
- t.framer.flushWrite()
- }
- t.writableChan <- 0
- }
-
-}
-
-func (t *http2Server) applySettings(ss []http2.Setting) {
- for _, s := range ss {
- if s.ID == http2.SettingInitialWindowSize {
- t.mu.Lock()
- defer t.mu.Unlock()
- for _, stream := range t.activeStreams {
- stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
- }
- t.streamSendQuota = s.Val
- }
-
- }
-}
-
-// controller running in a separate goroutine takes charge of sending control
-// frames (e.g., window update, reset stream, setting, etc.) to the server.
-func (t *http2Server) controller() {
- for {
- select {
- case i := <-t.controlBuf.get():
- t.controlBuf.load()
- select {
- case <-t.writableChan:
- switch i := i.(type) {
- case *windowUpdate:
- t.framer.writeWindowUpdate(true, i.streamID, i.increment)
- case *settings:
- if i.ack {
- t.framer.writeSettingsAck(true)
- t.applySettings(i.ss)
- } else {
- t.framer.writeSettings(true, i.ss...)
- }
- case *resetStream:
- t.framer.writeRSTStream(true, i.streamID, i.code)
- case *goAway:
- t.mu.Lock()
- if t.state == closing {
- t.mu.Unlock()
- // The transport is closing.
- return
- }
- sid := t.maxStreamID
- t.state = draining
- t.mu.Unlock()
- t.framer.writeGoAway(true, sid, http2.ErrCodeNo, nil)
- case *flushIO:
- t.framer.flushWrite()
- case *ping:
- t.framer.writePing(true, i.ack, i.data)
- default:
- grpclog.Printf("transport: http2Server.controller got unexpected item type %v\n", i)
- }
- t.writableChan <- 0
- continue
- case <-t.shutdownChan:
- return
- }
- case <-t.shutdownChan:
- return
- }
- }
-}
-
-// Close starts shutting down the http2Server transport.
-// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
-// could cause some resource issue. Revisit this later.
-func (t *http2Server) Close() (err error) {
- t.mu.Lock()
- if t.state == closing {
- t.mu.Unlock()
- return errors.New("transport: Close() was already called")
- }
- t.state = closing
- streams := t.activeStreams
- t.activeStreams = nil
- t.mu.Unlock()
- close(t.shutdownChan)
- err = t.conn.Close()
- // Cancel all active streams.
- for _, s := range streams {
- s.cancel()
- }
- return
-}
-
-// closeStream clears the footprint of a stream when the stream is not needed
-// any more.
-func (t *http2Server) closeStream(s *Stream) {
- t.mu.Lock()
- delete(t.activeStreams, s.id)
- if t.state == draining && len(t.activeStreams) == 0 {
- defer t.Close()
- }
- t.mu.Unlock()
- // In case stream sending and receiving are invoked in separate
- // goroutines (e.g., bi-directional streaming), cancel needs to be
- // called to interrupt the potential blocking on other goroutines.
- s.cancel()
- s.mu.Lock()
- if q := s.fc.resetPendingData(); q > 0 {
- if w := t.fc.onRead(q); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
- }
- if s.state == streamDone {
- s.mu.Unlock()
- return
- }
- s.state = streamDone
- s.mu.Unlock()
-}
-
-func (t *http2Server) RemoteAddr() net.Addr {
- return t.conn.RemoteAddr()
-}
-
-func (t *http2Server) Drain() {
- t.controlBuf.put(&goAway{})
-}
diff --git a/vendor/google.golang.org/grpc/transport/http_util.go b/vendor/google.golang.org/grpc/transport/http_util.go
deleted file mode 100644
index a3c68d4ca..000000000
--- a/vendor/google.golang.org/grpc/transport/http_util.go
+++ /dev/null
@@ -1,513 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-package transport
-
-import (
- "bufio"
- "bytes"
- "fmt"
- "io"
- "net"
- "strconv"
- "strings"
- "sync/atomic"
- "time"
-
- "golang.org/x/net/http2"
- "golang.org/x/net/http2/hpack"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/metadata"
-)
-
-const (
- // The primary user agent
- primaryUA = "grpc-go/1.0"
- // http2MaxFrameLen specifies the max length of a HTTP2 frame.
- http2MaxFrameLen = 16384 // 16KB frame
- // http://http2.github.io/http2-spec/#SettingValues
- http2InitHeaderTableSize = 4096
- // http2IOBufSize specifies the buffer size for sending frames.
- http2IOBufSize = 32 * 1024
-)
-
-var (
- clientPreface = []byte(http2.ClientPreface)
- http2ErrConvTab = map[http2.ErrCode]codes.Code{
- http2.ErrCodeNo: codes.Internal,
- http2.ErrCodeProtocol: codes.Internal,
- http2.ErrCodeInternal: codes.Internal,
- http2.ErrCodeFlowControl: codes.ResourceExhausted,
- http2.ErrCodeSettingsTimeout: codes.Internal,
- http2.ErrCodeStreamClosed: codes.Internal,
- http2.ErrCodeFrameSize: codes.Internal,
- http2.ErrCodeRefusedStream: codes.Unavailable,
- http2.ErrCodeCancel: codes.Canceled,
- http2.ErrCodeCompression: codes.Internal,
- http2.ErrCodeConnect: codes.Internal,
- http2.ErrCodeEnhanceYourCalm: codes.ResourceExhausted,
- http2.ErrCodeInadequateSecurity: codes.PermissionDenied,
- http2.ErrCodeHTTP11Required: codes.FailedPrecondition,
- }
- statusCodeConvTab = map[codes.Code]http2.ErrCode{
- codes.Internal: http2.ErrCodeInternal,
- codes.Canceled: http2.ErrCodeCancel,
- codes.Unavailable: http2.ErrCodeRefusedStream,
- codes.ResourceExhausted: http2.ErrCodeEnhanceYourCalm,
- codes.PermissionDenied: http2.ErrCodeInadequateSecurity,
- }
-)
-
-// Records the states during HPACK decoding. Must be reset once the
-// decoding of the entire headers are finished.
-type decodeState struct {
- err error // first error encountered decoding
-
- encoding string
- // statusCode caches the stream status received from the trailer
- // the server sent. Client side only.
- statusCode codes.Code
- statusDesc string
- // Server side only fields.
- timeoutSet bool
- timeout time.Duration
- method string
- // key-value metadata map from the peer.
- mdata map[string][]string
-}
-
-// isReservedHeader checks whether hdr belongs to HTTP2 headers
-// reserved by gRPC protocol. Any other headers are classified as the
-// user-specified metadata.
-func isReservedHeader(hdr string) bool {
- if hdr != "" && hdr[0] == ':' {
- return true
- }
- switch hdr {
- case "content-type",
- "grpc-message-type",
- "grpc-encoding",
- "grpc-message",
- "grpc-status",
- "grpc-timeout",
- "te":
- return true
- default:
- return false
- }
-}
-
-// isWhitelistedPseudoHeader checks whether hdr belongs to HTTP2 pseudoheaders
-// that should be propagated into metadata visible to users.
-func isWhitelistedPseudoHeader(hdr string) bool {
- switch hdr {
- case ":authority":
- return true
- default:
- return false
- }
-}
-
-func (d *decodeState) setErr(err error) {
- if d.err == nil {
- d.err = err
- }
-}
-
-func validContentType(t string) bool {
- e := "application/grpc"
- if !strings.HasPrefix(t, e) {
- return false
- }
- // Support variations on the content-type
- // (e.g. "application/grpc+blah", "application/grpc;blah").
- if len(t) > len(e) && t[len(e)] != '+' && t[len(e)] != ';' {
- return false
- }
- return true
-}
-
-func (d *decodeState) processHeaderField(f hpack.HeaderField) {
- switch f.Name {
- case "content-type":
- if !validContentType(f.Value) {
- d.setErr(streamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value))
- return
- }
- case "grpc-encoding":
- d.encoding = f.Value
- case "grpc-status":
- code, err := strconv.Atoi(f.Value)
- if err != nil {
- d.setErr(streamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err))
- return
- }
- d.statusCode = codes.Code(code)
- case "grpc-message":
- d.statusDesc = decodeGrpcMessage(f.Value)
- case "grpc-timeout":
- d.timeoutSet = true
- var err error
- d.timeout, err = decodeTimeout(f.Value)
- if err != nil {
- d.setErr(streamErrorf(codes.Internal, "transport: malformed time-out: %v", err))
- return
- }
- case ":path":
- d.method = f.Value
- default:
- if !isReservedHeader(f.Name) || isWhitelistedPseudoHeader(f.Name) {
- if f.Name == "user-agent" {
- i := strings.LastIndex(f.Value, " ")
- if i == -1 {
- // There is no application user agent string being set.
- return
- }
- // Extract the application user agent string.
- f.Value = f.Value[:i]
- }
- if d.mdata == nil {
- d.mdata = make(map[string][]string)
- }
- k, v, err := metadata.DecodeKeyValue(f.Name, f.Value)
- if err != nil {
- grpclog.Printf("Failed to decode (%q, %q): %v", f.Name, f.Value, err)
- return
- }
- d.mdata[k] = append(d.mdata[k], v)
- }
- }
-}
-
-type timeoutUnit uint8
-
-const (
- hour timeoutUnit = 'H'
- minute timeoutUnit = 'M'
- second timeoutUnit = 'S'
- millisecond timeoutUnit = 'm'
- microsecond timeoutUnit = 'u'
- nanosecond timeoutUnit = 'n'
-)
-
-func timeoutUnitToDuration(u timeoutUnit) (d time.Duration, ok bool) {
- switch u {
- case hour:
- return time.Hour, true
- case minute:
- return time.Minute, true
- case second:
- return time.Second, true
- case millisecond:
- return time.Millisecond, true
- case microsecond:
- return time.Microsecond, true
- case nanosecond:
- return time.Nanosecond, true
- default:
- }
- return
-}
-
-const maxTimeoutValue int64 = 100000000 - 1
-
-// div does integer division and round-up the result. Note that this is
-// equivalent to (d+r-1)/r but has less chance to overflow.
-func div(d, r time.Duration) int64 {
- if m := d % r; m > 0 {
- return int64(d/r + 1)
- }
- return int64(d / r)
-}
-
-// TODO(zhaoq): It is the simplistic and not bandwidth efficient. Improve it.
-func encodeTimeout(t time.Duration) string {
- if t <= 0 {
- return "0n"
- }
- if d := div(t, time.Nanosecond); d <= maxTimeoutValue {
- return strconv.FormatInt(d, 10) + "n"
- }
- if d := div(t, time.Microsecond); d <= maxTimeoutValue {
- return strconv.FormatInt(d, 10) + "u"
- }
- if d := div(t, time.Millisecond); d <= maxTimeoutValue {
- return strconv.FormatInt(d, 10) + "m"
- }
- if d := div(t, time.Second); d <= maxTimeoutValue {
- return strconv.FormatInt(d, 10) + "S"
- }
- if d := div(t, time.Minute); d <= maxTimeoutValue {
- return strconv.FormatInt(d, 10) + "M"
- }
- // Note that maxTimeoutValue * time.Hour > MaxInt64.
- return strconv.FormatInt(div(t, time.Hour), 10) + "H"
-}
-
-func decodeTimeout(s string) (time.Duration, error) {
- size := len(s)
- if size < 2 {
- return 0, fmt.Errorf("transport: timeout string is too short: %q", s)
- }
- unit := timeoutUnit(s[size-1])
- d, ok := timeoutUnitToDuration(unit)
- if !ok {
- return 0, fmt.Errorf("transport: timeout unit is not recognized: %q", s)
- }
- t, err := strconv.ParseInt(s[:size-1], 10, 64)
- if err != nil {
- return 0, err
- }
- return d * time.Duration(t), nil
-}
-
-const (
- spaceByte = ' '
- tildaByte = '~'
- percentByte = '%'
-)
-
-// encodeGrpcMessage is used to encode status code in header field
-// "grpc-message".
-// It checks to see if each individual byte in msg is an
-// allowable byte, and then either percent encoding or passing it through.
-// When percent encoding, the byte is converted into hexadecimal notation
-// with a '%' prepended.
-func encodeGrpcMessage(msg string) string {
- if msg == "" {
- return ""
- }
- lenMsg := len(msg)
- for i := 0; i < lenMsg; i++ {
- c := msg[i]
- if !(c >= spaceByte && c < tildaByte && c != percentByte) {
- return encodeGrpcMessageUnchecked(msg)
- }
- }
- return msg
-}
-
-func encodeGrpcMessageUnchecked(msg string) string {
- var buf bytes.Buffer
- lenMsg := len(msg)
- for i := 0; i < lenMsg; i++ {
- c := msg[i]
- if c >= spaceByte && c < tildaByte && c != percentByte {
- buf.WriteByte(c)
- } else {
- buf.WriteString(fmt.Sprintf("%%%02X", c))
- }
- }
- return buf.String()
-}
-
-// decodeGrpcMessage decodes the msg encoded by encodeGrpcMessage.
-func decodeGrpcMessage(msg string) string {
- if msg == "" {
- return ""
- }
- lenMsg := len(msg)
- for i := 0; i < lenMsg; i++ {
- if msg[i] == percentByte && i+2 < lenMsg {
- return decodeGrpcMessageUnchecked(msg)
- }
- }
- return msg
-}
-
-func decodeGrpcMessageUnchecked(msg string) string {
- var buf bytes.Buffer
- lenMsg := len(msg)
- for i := 0; i < lenMsg; i++ {
- c := msg[i]
- if c == percentByte && i+2 < lenMsg {
- parsed, err := strconv.ParseUint(msg[i+1:i+3], 16, 8)
- if err != nil {
- buf.WriteByte(c)
- } else {
- buf.WriteByte(byte(parsed))
- i += 2
- }
- } else {
- buf.WriteByte(c)
- }
- }
- return buf.String()
-}
-
-type framer struct {
- numWriters int32
- reader io.Reader
- writer *bufio.Writer
- fr *http2.Framer
-}
-
-func newFramer(conn net.Conn) *framer {
- f := &framer{
- reader: bufio.NewReaderSize(conn, http2IOBufSize),
- writer: bufio.NewWriterSize(conn, http2IOBufSize),
- }
- f.fr = http2.NewFramer(f.writer, f.reader)
- f.fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil)
- return f
-}
-
-func (f *framer) adjustNumWriters(i int32) int32 {
- return atomic.AddInt32(&f.numWriters, i)
-}
-
-// The following writeXXX functions can only be called when the caller gets
-// unblocked from writableChan channel (i.e., owns the privilege to write).
-
-func (f *framer) writeContinuation(forceFlush bool, streamID uint32, endHeaders bool, headerBlockFragment []byte) error {
- if err := f.fr.WriteContinuation(streamID, endHeaders, headerBlockFragment); err != nil {
- return err
- }
- if forceFlush {
- return f.writer.Flush()
- }
- return nil
-}
-
-func (f *framer) writeData(forceFlush bool, streamID uint32, endStream bool, data []byte) error {
- if err := f.fr.WriteData(streamID, endStream, data); err != nil {
- return err
- }
- if forceFlush {
- return f.writer.Flush()
- }
- return nil
-}
-
-func (f *framer) writeGoAway(forceFlush bool, maxStreamID uint32, code http2.ErrCode, debugData []byte) error {
- if err := f.fr.WriteGoAway(maxStreamID, code, debugData); err != nil {
- return err
- }
- if forceFlush {
- return f.writer.Flush()
- }
- return nil
-}
-
-func (f *framer) writeHeaders(forceFlush bool, p http2.HeadersFrameParam) error {
- if err := f.fr.WriteHeaders(p); err != nil {
- return err
- }
- if forceFlush {
- return f.writer.Flush()
- }
- return nil
-}
-
-func (f *framer) writePing(forceFlush, ack bool, data [8]byte) error {
- if err := f.fr.WritePing(ack, data); err != nil {
- return err
- }
- if forceFlush {
- return f.writer.Flush()
- }
- return nil
-}
-
-func (f *framer) writePriority(forceFlush bool, streamID uint32, p http2.PriorityParam) error {
- if err := f.fr.WritePriority(streamID, p); err != nil {
- return err
- }
- if forceFlush {
- return f.writer.Flush()
- }
- return nil
-}
-
-func (f *framer) writePushPromise(forceFlush bool, p http2.PushPromiseParam) error {
- if err := f.fr.WritePushPromise(p); err != nil {
- return err
- }
- if forceFlush {
- return f.writer.Flush()
- }
- return nil
-}
-
-func (f *framer) writeRSTStream(forceFlush bool, streamID uint32, code http2.ErrCode) error {
- if err := f.fr.WriteRSTStream(streamID, code); err != nil {
- return err
- }
- if forceFlush {
- return f.writer.Flush()
- }
- return nil
-}
-
-func (f *framer) writeSettings(forceFlush bool, settings ...http2.Setting) error {
- if err := f.fr.WriteSettings(settings...); err != nil {
- return err
- }
- if forceFlush {
- return f.writer.Flush()
- }
- return nil
-}
-
-func (f *framer) writeSettingsAck(forceFlush bool) error {
- if err := f.fr.WriteSettingsAck(); err != nil {
- return err
- }
- if forceFlush {
- return f.writer.Flush()
- }
- return nil
-}
-
-func (f *framer) writeWindowUpdate(forceFlush bool, streamID, incr uint32) error {
- if err := f.fr.WriteWindowUpdate(streamID, incr); err != nil {
- return err
- }
- if forceFlush {
- return f.writer.Flush()
- }
- return nil
-}
-
-func (f *framer) flushWrite() error {
- return f.writer.Flush()
-}
-
-func (f *framer) readFrame() (http2.Frame, error) {
- return f.fr.ReadFrame()
-}
-
-func (f *framer) errorDetail() error {
- return f.fr.ErrorDetail()
-}
diff --git a/vendor/google.golang.org/grpc/transport/pre_go16.go b/vendor/google.golang.org/grpc/transport/pre_go16.go
deleted file mode 100644
index 33d91c17c..000000000
--- a/vendor/google.golang.org/grpc/transport/pre_go16.go
+++ /dev/null
@@ -1,51 +0,0 @@
-// +build !go1.6
-
-/*
- * Copyright 2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-package transport
-
-import (
- "net"
- "time"
-
- "golang.org/x/net/context"
-)
-
-// dialContext connects to the address on the named network.
-func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
- var dialer net.Dialer
- if deadline, ok := ctx.Deadline(); ok {
- dialer.Timeout = deadline.Sub(time.Now())
- }
- return dialer.Dial(network, address)
-}
diff --git a/vendor/google.golang.org/grpc/transport/transport.go b/vendor/google.golang.org/grpc/transport/transport.go
deleted file mode 100644
index 413f7493b..000000000
--- a/vendor/google.golang.org/grpc/transport/transport.go
+++ /dev/null
@@ -1,595 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-/*
-Package transport defines and implements message oriented communication channel
-to complete various transactions (e.g., an RPC).
-*/
-package transport // import "google.golang.org/grpc/transport"
-
-import (
- "bytes"
- "fmt"
- "io"
- "net"
- "sync"
-
- "golang.org/x/net/context"
- "golang.org/x/net/trace"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/metadata"
-)
-
-// recvMsg represents the received msg from the transport. All transport
-// protocol specific info has been removed.
-type recvMsg struct {
- data []byte
- // nil: received some data
- // io.EOF: stream is completed. data is nil.
- // other non-nil error: transport failure. data is nil.
- err error
-}
-
-func (*recvMsg) item() {}
-
-// All items in an out of a recvBuffer should be the same type.
-type item interface {
- item()
-}
-
-// recvBuffer is an unbounded channel of item.
-type recvBuffer struct {
- c chan item
- mu sync.Mutex
- backlog []item
-}
-
-func newRecvBuffer() *recvBuffer {
- b := &recvBuffer{
- c: make(chan item, 1),
- }
- return b
-}
-
-func (b *recvBuffer) put(r item) {
- b.mu.Lock()
- defer b.mu.Unlock()
- if len(b.backlog) == 0 {
- select {
- case b.c <- r:
- return
- default:
- }
- }
- b.backlog = append(b.backlog, r)
-}
-
-func (b *recvBuffer) load() {
- b.mu.Lock()
- defer b.mu.Unlock()
- if len(b.backlog) > 0 {
- select {
- case b.c <- b.backlog[0]:
- b.backlog = b.backlog[1:]
- default:
- }
- }
-}
-
-// get returns the channel that receives an item in the buffer.
-//
-// Upon receipt of an item, the caller should call load to send another
-// item onto the channel if there is any.
-func (b *recvBuffer) get() <-chan item {
- return b.c
-}
-
-// recvBufferReader implements io.Reader interface to read the data from
-// recvBuffer.
-type recvBufferReader struct {
- ctx context.Context
- goAway chan struct{}
- recv *recvBuffer
- last *bytes.Reader // Stores the remaining data in the previous calls.
- err error
-}
-
-// Read reads the next len(p) bytes from last. If last is drained, it tries to
-// read additional data from recv. It blocks if there no additional data available
-// in recv. If Read returns any non-nil error, it will continue to return that error.
-func (r *recvBufferReader) Read(p []byte) (n int, err error) {
- if r.err != nil {
- return 0, r.err
- }
- defer func() { r.err = err }()
- if r.last != nil && r.last.Len() > 0 {
- // Read remaining data left in last call.
- return r.last.Read(p)
- }
- select {
- case <-r.ctx.Done():
- return 0, ContextErr(r.ctx.Err())
- case <-r.goAway:
- return 0, ErrStreamDrain
- case i := <-r.recv.get():
- r.recv.load()
- m := i.(*recvMsg)
- if m.err != nil {
- return 0, m.err
- }
- r.last = bytes.NewReader(m.data)
- return r.last.Read(p)
- }
-}
-
-type streamState uint8
-
-const (
- streamActive streamState = iota
- streamWriteDone // EndStream sent
- streamReadDone // EndStream received
- streamDone // the entire stream is finished.
-)
-
-// Stream represents an RPC in the transport layer.
-type Stream struct {
- id uint32
- // nil for client side Stream.
- st ServerTransport
- // ctx is the associated context of the stream.
- ctx context.Context
- // cancel is always nil for client side Stream.
- cancel context.CancelFunc
- // done is closed when the final status arrives.
- done chan struct{}
- // goAway is closed when the server sent GoAways signal before this stream was initiated.
- goAway chan struct{}
- // method records the associated RPC method of the stream.
- method string
- recvCompress string
- sendCompress string
- buf *recvBuffer
- dec io.Reader
- fc *inFlow
- recvQuota uint32
- // The accumulated inbound quota pending for window update.
- updateQuota uint32
- // The handler to control the window update procedure for both this
- // particular stream and the associated transport.
- windowHandler func(int)
-
- sendQuotaPool *quotaPool
- // Close headerChan to indicate the end of reception of header metadata.
- headerChan chan struct{}
- // header caches the received header metadata.
- header metadata.MD
- // The key-value map of trailer metadata.
- trailer metadata.MD
-
- mu sync.RWMutex // guard the following
- // headerOK becomes true from the first header is about to send.
- headerOk bool
- state streamState
- // true iff headerChan is closed. Used to avoid closing headerChan
- // multiple times.
- headerDone bool
- // the status received from the server.
- statusCode codes.Code
- statusDesc string
-}
-
-// RecvCompress returns the compression algorithm applied to the inbound
-// message. It is empty string if there is no compression applied.
-func (s *Stream) RecvCompress() string {
- return s.recvCompress
-}
-
-// SetSendCompress sets the compression algorithm to the stream.
-func (s *Stream) SetSendCompress(str string) {
- s.sendCompress = str
-}
-
-// Done returns a chanel which is closed when it receives the final status
-// from the server.
-func (s *Stream) Done() <-chan struct{} {
- return s.done
-}
-
-// GoAway returns a channel which is closed when the server sent GoAways signal
-// before this stream was initiated.
-func (s *Stream) GoAway() <-chan struct{} {
- return s.goAway
-}
-
-// Header acquires the key-value pairs of header metadata once it
-// is available. It blocks until i) the metadata is ready or ii) there is no
-// header metadata or iii) the stream is cancelled/expired.
-func (s *Stream) Header() (metadata.MD, error) {
- select {
- case <-s.ctx.Done():
- return nil, ContextErr(s.ctx.Err())
- case <-s.goAway:
- return nil, ErrStreamDrain
- case <-s.headerChan:
- return s.header.Copy(), nil
- }
-}
-
-// Trailer returns the cached trailer metedata. Note that if it is not called
-// after the entire stream is done, it could return an empty MD. Client
-// side only.
-func (s *Stream) Trailer() metadata.MD {
- s.mu.RLock()
- defer s.mu.RUnlock()
- return s.trailer.Copy()
-}
-
-// ServerTransport returns the underlying ServerTransport for the stream.
-// The client side stream always returns nil.
-func (s *Stream) ServerTransport() ServerTransport {
- return s.st
-}
-
-// Context returns the context of the stream.
-func (s *Stream) Context() context.Context {
- return s.ctx
-}
-
-// TraceContext recreates the context of s with a trace.Trace.
-func (s *Stream) TraceContext(tr trace.Trace) {
- s.ctx = trace.NewContext(s.ctx, tr)
-}
-
-// Method returns the method for the stream.
-func (s *Stream) Method() string {
- return s.method
-}
-
-// StatusCode returns statusCode received from the server.
-func (s *Stream) StatusCode() codes.Code {
- return s.statusCode
-}
-
-// StatusDesc returns statusDesc received from the server.
-func (s *Stream) StatusDesc() string {
- return s.statusDesc
-}
-
-// SetHeader sets the header metadata. This can be called multiple times.
-// Server side only.
-func (s *Stream) SetHeader(md metadata.MD) error {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.headerOk || s.state == streamDone {
- return ErrIllegalHeaderWrite
- }
- if md.Len() == 0 {
- return nil
- }
- s.header = metadata.Join(s.header, md)
- return nil
-}
-
-// SetTrailer sets the trailer metadata which will be sent with the RPC status
-// by the server. This can be called multiple times. Server side only.
-func (s *Stream) SetTrailer(md metadata.MD) error {
- if md.Len() == 0 {
- return nil
- }
- s.mu.Lock()
- defer s.mu.Unlock()
- s.trailer = metadata.Join(s.trailer, md)
- return nil
-}
-
-func (s *Stream) write(m recvMsg) {
- s.buf.put(&m)
-}
-
-// Read reads all the data available for this Stream from the transport and
-// passes them into the decoder, which converts them into a gRPC message stream.
-// The error is io.EOF when the stream is done or another non-nil error if
-// the stream broke.
-func (s *Stream) Read(p []byte) (n int, err error) {
- n, err = s.dec.Read(p)
- if err != nil {
- return
- }
- s.windowHandler(n)
- return
-}
-
-// The key to save transport.Stream in the context.
-type streamKey struct{}
-
-// newContextWithStream creates a new context from ctx and attaches stream
-// to it.
-func newContextWithStream(ctx context.Context, stream *Stream) context.Context {
- return context.WithValue(ctx, streamKey{}, stream)
-}
-
-// StreamFromContext returns the stream saved in ctx.
-func StreamFromContext(ctx context.Context) (s *Stream, ok bool) {
- s, ok = ctx.Value(streamKey{}).(*Stream)
- return
-}
-
-// state of transport
-type transportState int
-
-const (
- reachable transportState = iota
- unreachable
- closing
- draining
-)
-
-// NewServerTransport creates a ServerTransport with conn or non-nil error
-// if it fails.
-func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (ServerTransport, error) {
- return newHTTP2Server(conn, maxStreams, authInfo)
-}
-
-// ConnectOptions covers all relevant options for communicating with the server.
-type ConnectOptions struct {
- // UserAgent is the application user agent.
- UserAgent string
- // Dialer specifies how to dial a network address.
- Dialer func(context.Context, string) (net.Conn, error)
- // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
- PerRPCCredentials []credentials.PerRPCCredentials
- // TransportCredentials stores the Authenticator required to setup a client connection.
- TransportCredentials credentials.TransportCredentials
-}
-
-// TargetInfo contains the information of the target such as network address and metadata.
-type TargetInfo struct {
- Addr string
- Metadata interface{}
-}
-
-// NewClientTransport establishes the transport with the required ConnectOptions
-// and returns it to the caller.
-func NewClientTransport(ctx context.Context, target TargetInfo, opts ConnectOptions) (ClientTransport, error) {
- return newHTTP2Client(ctx, target, opts)
-}
-
-// Options provides additional hints and information for message
-// transmission.
-type Options struct {
- // Last indicates whether this write is the last piece for
- // this stream.
- Last bool
-
- // Delay is a hint to the transport implementation for whether
- // the data could be buffered for a batching write. The
- // Transport implementation may ignore the hint.
- Delay bool
-}
-
-// CallHdr carries the information of a particular RPC.
-type CallHdr struct {
- // Host specifies the peer's host.
- Host string
-
- // Method specifies the operation to perform.
- Method string
-
- // RecvCompress specifies the compression algorithm applied on
- // inbound messages.
- RecvCompress string
-
- // SendCompress specifies the compression algorithm applied on
- // outbound message.
- SendCompress string
-
- // Flush indicates whether a new stream command should be sent
- // to the peer without waiting for the first data. This is
- // only a hint. The transport may modify the flush decision
- // for performance purposes.
- Flush bool
-}
-
-// ClientTransport is the common interface for all gRPC client-side transport
-// implementations.
-type ClientTransport interface {
- // Close tears down this transport. Once it returns, the transport
- // should not be accessed any more. The caller must make sure this
- // is called only once.
- Close() error
-
- // GracefulClose starts to tear down the transport. It stops accepting
- // new RPCs and wait the completion of the pending RPCs.
- GracefulClose() error
-
- // Write sends the data for the given stream. A nil stream indicates
- // the write is to be performed on the transport as a whole.
- Write(s *Stream, data []byte, opts *Options) error
-
- // NewStream creates a Stream for an RPC.
- NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
-
- // CloseStream clears the footprint of a stream when the stream is
- // not needed any more. The err indicates the error incurred when
- // CloseStream is called. Must be called when a stream is finished
- // unless the associated transport is closing.
- CloseStream(stream *Stream, err error)
-
- // Error returns a channel that is closed when some I/O error
- // happens. Typically the caller should have a goroutine to monitor
- // this in order to take action (e.g., close the current transport
- // and create a new one) in error case. It should not return nil
- // once the transport is initiated.
- Error() <-chan struct{}
-
- // GoAway returns a channel that is closed when ClientTranspor
- // receives the draining signal from the server (e.g., GOAWAY frame in
- // HTTP/2).
- GoAway() <-chan struct{}
-}
-
-// ServerTransport is the common interface for all gRPC server-side transport
-// implementations.
-//
-// Methods may be called concurrently from multiple goroutines, but
-// Write methods for a given Stream will be called serially.
-type ServerTransport interface {
- // HandleStreams receives incoming streams using the given handler.
- HandleStreams(func(*Stream))
-
- // WriteHeader sends the header metadata for the given stream.
- // WriteHeader may not be called on all streams.
- WriteHeader(s *Stream, md metadata.MD) error
-
- // Write sends the data for the given stream.
- // Write may not be called on all streams.
- Write(s *Stream, data []byte, opts *Options) error
-
- // WriteStatus sends the status of a stream to the client.
- // WriteStatus is the final call made on a stream and always
- // occurs.
- WriteStatus(s *Stream, statusCode codes.Code, statusDesc string) error
-
- // Close tears down the transport. Once it is called, the transport
- // should not be accessed any more. All the pending streams and their
- // handlers will be terminated asynchronously.
- Close() error
-
- // RemoteAddr returns the remote network address.
- RemoteAddr() net.Addr
-
- // Drain notifies the client this ServerTransport stops accepting new RPCs.
- Drain()
-}
-
-// streamErrorf creates an StreamError with the specified error code and description.
-func streamErrorf(c codes.Code, format string, a ...interface{}) StreamError {
- return StreamError{
- Code: c,
- Desc: fmt.Sprintf(format, a...),
- }
-}
-
-// connectionErrorf creates an ConnectionError with the specified error description.
-func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
- return ConnectionError{
- Desc: fmt.Sprintf(format, a...),
- temp: temp,
- err: e,
- }
-}
-
-// ConnectionError is an error that results in the termination of the
-// entire connection and the retry of all the active streams.
-type ConnectionError struct {
- Desc string
- temp bool
- err error
-}
-
-func (e ConnectionError) Error() string {
- return fmt.Sprintf("connection error: desc = %q", e.Desc)
-}
-
-// Temporary indicates if this connection error is temporary or fatal.
-func (e ConnectionError) Temporary() bool {
- return e.temp
-}
-
-// Origin returns the original error of this connection error.
-func (e ConnectionError) Origin() error {
- // Never return nil error here.
- // If the original error is nil, return itself.
- if e.err == nil {
- return e
- }
- return e.err
-}
-
-var (
- // ErrConnClosing indicates that the transport is closing.
- ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
- // ErrStreamDrain indicates that the stream is rejected by the server because
- // the server stops accepting new RPCs.
- ErrStreamDrain = streamErrorf(codes.Unavailable, "the server stops accepting new RPCs")
-)
-
-// StreamError is an error that only affects one stream within a connection.
-type StreamError struct {
- Code codes.Code
- Desc string
-}
-
-func (e StreamError) Error() string {
- return fmt.Sprintf("stream error: code = %d desc = %q", e.Code, e.Desc)
-}
-
-// ContextErr converts the error from context package into a StreamError.
-func ContextErr(err error) StreamError {
- switch err {
- case context.DeadlineExceeded:
- return streamErrorf(codes.DeadlineExceeded, "%v", err)
- case context.Canceled:
- return streamErrorf(codes.Canceled, "%v", err)
- }
- panic(fmt.Sprintf("Unexpected error from context packet: %v", err))
-}
-
-// wait blocks until it can receive from ctx.Done, closing, or proceed.
-// If it receives from ctx.Done, it returns 0, the StreamError for ctx.Err.
-// If it receives from done, it returns 0, io.EOF if ctx is not done; otherwise
-// it return the StreamError for ctx.Err.
-// If it receives from goAway, it returns 0, ErrStreamDrain.
-// If it receives from closing, it returns 0, ErrConnClosing.
-// If it receives from proceed, it returns the received integer, nil.
-func wait(ctx context.Context, done, goAway, closing <-chan struct{}, proceed <-chan int) (int, error) {
- select {
- case <-ctx.Done():
- return 0, ContextErr(ctx.Err())
- case <-done:
- // User cancellation has precedence.
- select {
- case <-ctx.Done():
- return 0, ContextErr(ctx.Err())
- default:
- }
- return 0, io.EOF
- case <-goAway:
- return 0, ErrStreamDrain
- case <-closing:
- return 0, ErrConnClosing
- case i := <-proceed:
- return i, nil
- }
-}