summaryrefslogtreecommitdiff
path: root/vendor/k8s.io/client-go/rest/request.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/k8s.io/client-go/rest/request.go')
-rw-r--r--vendor/k8s.io/client-go/rest/request.go271
1 files changed, 77 insertions, 194 deletions
diff --git a/vendor/k8s.io/client-go/rest/request.go b/vendor/k8s.io/client-go/rest/request.go
index cfb4511ba..6ca9e0197 100644
--- a/vendor/k8s.io/client-go/rest/request.go
+++ b/vendor/k8s.io/client-go/rest/request.go
@@ -33,27 +33,20 @@ import (
"time"
"github.com/golang/glog"
+ "golang.org/x/net/http2"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/fields"
- "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
"k8s.io/apimachinery/pkg/util/net"
- "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
- "k8s.io/client-go/pkg/api/v1"
restclientwatch "k8s.io/client-go/rest/watch"
"k8s.io/client-go/tools/metrics"
"k8s.io/client-go/util/flowcontrol"
)
var (
- // specialParams lists parameters that are handled specially and which users of Request
- // are therefore not allowed to set manually.
- specialParams = sets.NewString("timeout")
-
// longThrottleLatency defines threshold for logging requests. All requests being
// throttle for more than longThrottleLatency will be logged.
longThrottleLatency = 50 * time.Millisecond
@@ -119,7 +112,7 @@ type Request struct {
}
// NewRequest creates a new request helper object for accessing runtime.Objects on a server.
-func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, serializers Serializers, backoff BackoffManager, throttle flowcontrol.RateLimiter) *Request {
+func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, serializers Serializers, backoff BackoffManager, throttle flowcontrol.RateLimiter, timeout time.Duration) *Request {
if backoff == nil {
glog.V(2).Infof("Not implementing request backoff strategy.")
backoff = &NoBackoff{}
@@ -138,6 +131,7 @@ func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPa
serializers: serializers,
backoffMgr: backoff,
throttle: throttle,
+ timeout: timeout,
}
switch {
case len(content.AcceptContentTypes) > 0:
@@ -186,6 +180,24 @@ func (r *Request) Resource(resource string) *Request {
return r
}
+// BackOff sets the request's backoff manager to the one specified,
+// or defaults to the stub implementation if nil is provided
+func (r *Request) BackOff(manager BackoffManager) *Request {
+ if manager == nil {
+ r.backoffMgr = &NoBackoff{}
+ return r
+ }
+
+ r.backoffMgr = manager
+ return r
+}
+
+// Throttle receives a rate-limiter and sets or replaces an existing request limiter
+func (r *Request) Throttle(limiter flowcontrol.RateLimiter) *Request {
+ r.throttle = limiter
+ return r
+}
+
// SubResource sets a sub-resource path which can be multiple segments segment after the resource
// name but before the suffix.
func (r *Request) SubResource(subresources ...string) *Request {
@@ -269,7 +281,7 @@ func (r *Request) AbsPath(segments ...string) *Request {
}
// RequestURI overwrites existing path and parameters with the value of the provided server relative
-// URI. Some parameters (those in specialParameters) cannot be overwritten.
+// URI.
func (r *Request) RequestURI(uri string) *Request {
if r.err != nil {
return r
@@ -291,143 +303,6 @@ func (r *Request) RequestURI(uri string) *Request {
return r
}
-const (
- // A constant that clients can use to refer in a field selector to the object name field.
- // Will be automatically emitted as the correct name for the API version.
- nodeUnschedulable = "spec.unschedulable"
- objectNameField = "metadata.name"
- podHost = "spec.nodeName"
- podStatus = "status.phase"
- secretType = "type"
-
- eventReason = "reason"
- eventSource = "source"
- eventType = "type"
- eventInvolvedKind = "involvedObject.kind"
- eventInvolvedNamespace = "involvedObject.namespace"
- eventInvolvedName = "involvedObject.name"
- eventInvolvedUID = "involvedObject.uid"
- eventInvolvedAPIVersion = "involvedObject.apiVersion"
- eventInvolvedResourceVersion = "involvedObject.resourceVersion"
- eventInvolvedFieldPath = "involvedObject.fieldPath"
-)
-
-type clientFieldNameToAPIVersionFieldName map[string]string
-
-func (c clientFieldNameToAPIVersionFieldName) filterField(field, value string) (newField, newValue string, err error) {
- newFieldName, ok := c[field]
- if !ok {
- return "", "", fmt.Errorf("%v - %v - no field mapping defined", field, value)
- }
- return newFieldName, value, nil
-}
-
-type resourceTypeToFieldMapping map[string]clientFieldNameToAPIVersionFieldName
-
-func (r resourceTypeToFieldMapping) filterField(resourceType, field, value string) (newField, newValue string, err error) {
- fMapping, ok := r[resourceType]
- if !ok {
- return "", "", fmt.Errorf("%v - %v - %v - no field mapping defined", resourceType, field, value)
- }
- return fMapping.filterField(field, value)
-}
-
-type versionToResourceToFieldMapping map[schema.GroupVersion]resourceTypeToFieldMapping
-
-// filterField transforms the given field/value selector for the given groupVersion and resource
-func (v versionToResourceToFieldMapping) filterField(groupVersion *schema.GroupVersion, resourceType, field, value string) (newField, newValue string, err error) {
- rMapping, ok := v[*groupVersion]
- if !ok {
- // no groupVersion overrides registered, default to identity mapping
- return field, value, nil
- }
- newField, newValue, err = rMapping.filterField(resourceType, field, value)
- if err != nil {
- // no groupVersionResource overrides registered, default to identity mapping
- return field, value, nil
- }
- return newField, newValue, nil
-}
-
-var fieldMappings = versionToResourceToFieldMapping{
- v1.SchemeGroupVersion: resourceTypeToFieldMapping{
- "nodes": clientFieldNameToAPIVersionFieldName{
- objectNameField: objectNameField,
- nodeUnschedulable: nodeUnschedulable,
- },
- "pods": clientFieldNameToAPIVersionFieldName{
- objectNameField: objectNameField,
- podHost: podHost,
- podStatus: podStatus,
- },
- "secrets": clientFieldNameToAPIVersionFieldName{
- secretType: secretType,
- },
- "serviceAccounts": clientFieldNameToAPIVersionFieldName{
- objectNameField: objectNameField,
- },
- "endpoints": clientFieldNameToAPIVersionFieldName{
- objectNameField: objectNameField,
- },
- "events": clientFieldNameToAPIVersionFieldName{
- objectNameField: objectNameField,
- eventReason: eventReason,
- eventSource: eventSource,
- eventType: eventType,
- eventInvolvedKind: eventInvolvedKind,
- eventInvolvedNamespace: eventInvolvedNamespace,
- eventInvolvedName: eventInvolvedName,
- eventInvolvedUID: eventInvolvedUID,
- eventInvolvedAPIVersion: eventInvolvedAPIVersion,
- eventInvolvedResourceVersion: eventInvolvedResourceVersion,
- eventInvolvedFieldPath: eventInvolvedFieldPath,
- },
- },
-}
-
-// FieldsSelectorParam adds the given selector as a query parameter with the name paramName.
-func (r *Request) FieldsSelectorParam(s fields.Selector) *Request {
- if r.err != nil {
- return r
- }
- if s == nil {
- return r
- }
- if s.Empty() {
- return r
- }
- s2, err := s.Transform(func(field, value string) (newField, newValue string, err error) {
- return fieldMappings.filterField(r.content.GroupVersion, r.resource, field, value)
- })
- if err != nil {
- r.err = err
- return r
- }
- return r.setParam(metav1.FieldSelectorQueryParam(r.content.GroupVersion.String()), s2.String())
-}
-
-// LabelsSelectorParam adds the given selector as a query parameter
-func (r *Request) LabelsSelectorParam(s labels.Selector) *Request {
- if r.err != nil {
- return r
- }
- if s == nil {
- return r
- }
- if s.Empty() {
- return r
- }
- return r.setParam(metav1.LabelSelectorQueryParam(r.content.GroupVersion.String()), s.String())
-}
-
-// UintParam creates a query parameter with the given value.
-func (r *Request) UintParam(paramName string, u uint64) *Request {
- if r.err != nil {
- return r
- }
- return r.setParam(paramName, strconv.FormatUint(u, 10))
-}
-
// Param creates a query parameter with the given string value.
func (r *Request) Param(paramName, s string) *Request {
if r.err != nil {
@@ -439,6 +314,8 @@ func (r *Request) Param(paramName, s string) *Request {
// VersionedParams will take the provided object, serialize it to a map[string][]string using the
// implicit RESTClient API version and the default parameter codec, and then add those as parameters
// to the request. Use this to provide versioned query parameters from client libraries.
+// VersionedParams will not write query parameters that have omitempty set and are empty. If a
+// parameter has already been set it is appended to (Params and VersionedParams are additive).
func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request {
if r.err != nil {
return r
@@ -449,52 +326,15 @@ func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCod
return r
}
for k, v := range params {
- for _, value := range v {
- // TODO: Move it to setParam method, once we get rid of
- // FieldSelectorParam & LabelSelectorParam methods.
- if k == metav1.LabelSelectorQueryParam(r.content.GroupVersion.String()) && value == "" {
- // Don't set an empty selector for backward compatibility.
- // Since there is no way to get the difference between empty
- // and unspecified string, we don't set it to avoid having
- // labelSelector= param in every request.
- continue
- }
- if k == metav1.FieldSelectorQueryParam(r.content.GroupVersion.String()) {
- if len(value) == 0 {
- // Don't set an empty selector for backward compatibility.
- // Since there is no way to get the difference between empty
- // and unspecified string, we don't set it to avoid having
- // fieldSelector= param in every request.
- continue
- }
- // TODO: Filtering should be handled somewhere else.
- selector, err := fields.ParseSelector(value)
- if err != nil {
- r.err = fmt.Errorf("unparsable field selector: %v", err)
- return r
- }
- filteredSelector, err := selector.Transform(
- func(field, value string) (newField, newValue string, err error) {
- return fieldMappings.filterField(r.content.GroupVersion, r.resource, field, value)
- })
- if err != nil {
- r.err = fmt.Errorf("untransformable field selector: %v", err)
- return r
- }
- value = filteredSelector.String()
- }
-
- r.setParam(k, value)
+ if r.params == nil {
+ r.params = make(url.Values)
}
+ r.params[k] = append(r.params[k], v...)
}
return r
}
func (r *Request) setParam(paramName, value string) *Request {
- if specialParams.Has(paramName) {
- r.err = fmt.Errorf("must set %v through the corresponding function, not directly.", paramName)
- return r
- }
if r.params == nil {
r.params = make(url.Values)
}
@@ -502,11 +342,14 @@ func (r *Request) setParam(paramName, value string) *Request {
return r
}
-func (r *Request) SetHeader(key, value string) *Request {
+func (r *Request) SetHeader(key string, values ...string) *Request {
if r.headers == nil {
r.headers = http.Header{}
}
- r.headers.Set(key, value)
+ r.headers.Del(key)
+ for _, value := range values {
+ r.headers.Add(key, value)
+ }
return r
}
@@ -609,7 +452,7 @@ func (r *Request) URL() *url.URL {
// finalURLTemplate is similar to URL(), but will make all specific parameter values equal
// - instead of name or namespace, "{name}" and "{namespace}" will be used, and all query
// parameters will be reset. This creates a copy of the request so as not to change the
-// underyling object. This means some useful request info (like the types of field
+// underlying object. This means some useful request info (like the types of field
// selectors in use) will be lost.
// TODO: preserve field selector keys
func (r Request) finalURLTemplate() url.URL {
@@ -921,8 +764,29 @@ func (r *Request) DoRaw() ([]byte, error) {
func (r *Request) transformResponse(resp *http.Response, req *http.Request) Result {
var body []byte
if resp.Body != nil {
- if data, err := ioutil.ReadAll(resp.Body); err == nil {
+ data, err := ioutil.ReadAll(resp.Body)
+ switch err.(type) {
+ case nil:
body = data
+ case http2.StreamError:
+ // This is trying to catch the scenario that the server may close the connection when sending the
+ // response body. This can be caused by server timeout due to a slow network connection.
+ // TODO: Add test for this. Steps may be:
+ // 1. client-go (or kubectl) sends a GET request.
+ // 2. Apiserver sends back the headers and then part of the body
+ // 3. Apiserver closes connection.
+ // 4. client-go should catch this and return an error.
+ glog.V(2).Infof("Stream error %#v when reading response body, may be caused by closed connection.", err)
+ streamErr := fmt.Errorf("Stream error %#v when reading response body, may be caused by closed connection. Please retry.", err)
+ return Result{
+ err: streamErr,
+ }
+ default:
+ glog.Errorf("Unexpected error when reading response body: %#v", err)
+ unexpectedErr := fmt.Errorf("Unexpected error %#v when reading response body. Please retry.", err)
+ return Result{
+ err: unexpectedErr,
+ }
}
}
@@ -978,6 +842,25 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
}
}
+// truncateBody decides if the body should be truncated, based on the glog Verbosity.
+func truncateBody(body string) string {
+ max := 0
+ switch {
+ case bool(glog.V(10)):
+ return body
+ case bool(glog.V(9)):
+ max = 10240
+ case bool(glog.V(8)):
+ max = 1024
+ }
+
+ if len(body) <= max {
+ return body
+ }
+
+ return body[:max] + fmt.Sprintf(" [truncated %d chars]", len(body)-max)
+}
+
// glogBody logs a body output that could be either JSON or protobuf. It explicitly guards against
// allocating a new string for the body output unless necessary. Uses a simple heuristic to determine
// whether the body is printable.
@@ -986,9 +869,9 @@ func glogBody(prefix string, body []byte) {
if bytes.IndexFunc(body, func(r rune) bool {
return r < 0x0a
}) != -1 {
- glog.Infof("%s:\n%s", prefix, hex.Dump(body))
+ glog.Infof("%s:\n%s", prefix, truncateBody(hex.Dump(body)))
} else {
- glog.Infof("%s: %s", prefix, string(body))
+ glog.Infof("%s: %s", prefix, truncateBody(string(body)))
}
}
}
@@ -1069,7 +952,7 @@ func isTextResponse(resp *http.Response) bool {
func checkWait(resp *http.Response) (int, bool) {
switch r := resp.StatusCode; {
// any 500 error code and 429 can trigger a wait
- case r == errors.StatusTooManyRequests, r >= 500:
+ case r == http.StatusTooManyRequests, r >= 500:
default:
return 0, false
}