diff options
Diffstat (limited to 'vendor/k8s.io/client-go/rest/watch')
-rw-r--r-- | vendor/k8s.io/client-go/rest/watch/decoder.go | 72 | ||||
-rw-r--r-- | vendor/k8s.io/client-go/rest/watch/encoder.go | 56 |
2 files changed, 128 insertions, 0 deletions
diff --git a/vendor/k8s.io/client-go/rest/watch/decoder.go b/vendor/k8s.io/client-go/rest/watch/decoder.go new file mode 100644 index 000000000..73bb63add --- /dev/null +++ b/vendor/k8s.io/client-go/rest/watch/decoder.go @@ -0,0 +1,72 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package versioned + +import ( + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer/streaming" + "k8s.io/apimachinery/pkg/watch" +) + +// Decoder implements the watch.Decoder interface for io.ReadClosers that +// have contents which consist of a series of watchEvent objects encoded +// with the given streaming decoder. The internal objects will be then +// decoded by the embedded decoder. +type Decoder struct { + decoder streaming.Decoder + embeddedDecoder runtime.Decoder +} + +// NewDecoder creates an Decoder for the given writer and codec. +func NewDecoder(decoder streaming.Decoder, embeddedDecoder runtime.Decoder) *Decoder { + return &Decoder{ + decoder: decoder, + embeddedDecoder: embeddedDecoder, + } +} + +// Decode blocks until it can return the next object in the reader. Returns an error +// if the reader is closed or an object can't be decoded. +func (d *Decoder) Decode() (watch.EventType, runtime.Object, error) { + var got metav1.WatchEvent + res, _, err := d.decoder.Decode(nil, &got) + if err != nil { + return "", nil, err + } + if res != &got { + return "", nil, fmt.Errorf("unable to decode to metav1.Event") + } + switch got.Type { + case string(watch.Added), string(watch.Modified), string(watch.Deleted), string(watch.Error): + default: + return "", nil, fmt.Errorf("got invalid watch event type: %v", got.Type) + } + + obj, err := runtime.Decode(d.embeddedDecoder, got.Object.Raw) + if err != nil { + return "", nil, fmt.Errorf("unable to decode watch event: %v", err) + } + return watch.EventType(got.Type), obj, nil +} + +// Close closes the underlying r. +func (d *Decoder) Close() { + d.decoder.Close() +} diff --git a/vendor/k8s.io/client-go/rest/watch/encoder.go b/vendor/k8s.io/client-go/rest/watch/encoder.go new file mode 100644 index 000000000..e55aa12d9 --- /dev/null +++ b/vendor/k8s.io/client-go/rest/watch/encoder.go @@ -0,0 +1,56 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package versioned + +import ( + "encoding/json" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer/streaming" + "k8s.io/apimachinery/pkg/watch" +) + +// Encoder serializes watch.Events into io.Writer. The internal objects +// are encoded using embedded encoder, and the outer Event is serialized +// using encoder. +// TODO: this type is only used by tests +type Encoder struct { + encoder streaming.Encoder + embeddedEncoder runtime.Encoder +} + +func NewEncoder(encoder streaming.Encoder, embeddedEncoder runtime.Encoder) *Encoder { + return &Encoder{ + encoder: encoder, + embeddedEncoder: embeddedEncoder, + } +} + +// Encode writes an event to the writer. Returns an error +// if the writer is closed or an object can't be encoded. +func (e *Encoder) Encode(event *watch.Event) error { + data, err := runtime.Encode(e.embeddedEncoder, event.Object) + if err != nil { + return err + } + // FIXME: get rid of json.RawMessage. + return e.encoder.Encode(&metav1.WatchEvent{ + Type: string(event.Type), + Object: runtime.RawExtension{Raw: json.RawMessage(data)}, + }) +} |