diff options
Diffstat (limited to 'vendor/k8s.io/kubernetes/pkg/proxy')
-rw-r--r-- | vendor/k8s.io/kubernetes/pkg/proxy/doc.go | 18 | ||||
-rw-r--r-- | vendor/k8s.io/kubernetes/pkg/proxy/healthcheck/doc.go | 18 | ||||
-rw-r--r-- | vendor/k8s.io/kubernetes/pkg/proxy/healthcheck/healthcheck.go | 333 | ||||
-rw-r--r-- | vendor/k8s.io/kubernetes/pkg/proxy/iptables/metrics.go | 50 | ||||
-rw-r--r-- | vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier.go | 1732 | ||||
-rw-r--r-- | vendor/k8s.io/kubernetes/pkg/proxy/types.go | 44 | ||||
-rw-r--r-- | vendor/k8s.io/kubernetes/pkg/proxy/util/conntrack.go | 58 |
7 files changed, 2253 insertions, 0 deletions
diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/doc.go b/vendor/k8s.io/kubernetes/pkg/proxy/doc.go new file mode 100644 index 000000000..3bed0fa39 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/proxy/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package proxy implements the layer-3 network proxy. +package proxy // import "k8s.io/kubernetes/pkg/proxy" diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/healthcheck/doc.go b/vendor/k8s.io/kubernetes/pkg/proxy/healthcheck/doc.go new file mode 100644 index 000000000..0a9ea0944 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/proxy/healthcheck/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package healthcheck provides tools for serving kube-proxy healthchecks. +package healthcheck // import "k8s.io/kubernetes/pkg/proxy/healthcheck" diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/healthcheck/healthcheck.go b/vendor/k8s.io/kubernetes/pkg/proxy/healthcheck/healthcheck.go new file mode 100644 index 000000000..39f10f71a --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/proxy/healthcheck/healthcheck.go @@ -0,0 +1,333 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthcheck + +import ( + "fmt" + "net" + "net/http" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/golang/glog" + "github.com/renstrom/dedent" + + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/clock" + clientv1 "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/tools/record" + "k8s.io/kubernetes/pkg/api" +) + +// Server serves HTTP endpoints for each service name, with results +// based on the endpoints. If there are 0 endpoints for a service, it returns a +// 503 "Service Unavailable" error (telling LBs not to use this node). If there +// are 1 or more endpoints, it returns a 200 "OK". +type Server interface { + // Make the new set of services be active. Services that were open before + // will be closed. Services that are new will be opened. Service that + // existed and are in the new set will be left alone. The value of the map + // is the healthcheck-port to listen on. + SyncServices(newServices map[types.NamespacedName]uint16) error + // Make the new set of endpoints be active. Endpoints for services that do + // not exist will be dropped. The value of the map is the number of + // endpoints the service has on this node. + SyncEndpoints(newEndpoints map[types.NamespacedName]int) error +} + +// Listener allows for testing of Server. If the Listener argument +// to NewServer() is nil, the real net.Listen function will be used. +type Listener interface { + // Listen is very much like net.Listen, except the first arg (network) is + // fixed to be "tcp". + Listen(addr string) (net.Listener, error) +} + +// HTTPServerFactory allows for testing of Server. If the +// HTTPServerFactory argument to NewServer() is nil, the real +// http.Server type will be used. +type HTTPServerFactory interface { + // New creates an instance of a type satisfying HTTPServer. This is + // designed to include http.Server. + New(addr string, handler http.Handler) HTTPServer +} + +// HTTPServer allows for testing of Server. +type HTTPServer interface { + // Server is designed so that http.Server satifies this interface, + Serve(listener net.Listener) error +} + +// NewServer allocates a new healthcheck server manager. If either +// of the injected arguments are nil, defaults will be used. +func NewServer(hostname string, recorder record.EventRecorder, listener Listener, httpServerFactory HTTPServerFactory) Server { + if listener == nil { + listener = stdNetListener{} + } + if httpServerFactory == nil { + httpServerFactory = stdHTTPServerFactory{} + } + return &server{ + hostname: hostname, + recorder: recorder, + listener: listener, + httpFactory: httpServerFactory, + services: map[types.NamespacedName]*hcInstance{}, + } +} + +// Implement Listener in terms of net.Listen. +type stdNetListener struct{} + +func (stdNetListener) Listen(addr string) (net.Listener, error) { + return net.Listen("tcp", addr) +} + +var _ Listener = stdNetListener{} + +// Implement HTTPServerFactory in terms of http.Server. +type stdHTTPServerFactory struct{} + +func (stdHTTPServerFactory) New(addr string, handler http.Handler) HTTPServer { + return &http.Server{ + Addr: addr, + Handler: handler, + } +} + +var _ HTTPServerFactory = stdHTTPServerFactory{} + +type server struct { + hostname string + recorder record.EventRecorder // can be nil + listener Listener + httpFactory HTTPServerFactory + + lock sync.Mutex + services map[types.NamespacedName]*hcInstance +} + +func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) error { + hcs.lock.Lock() + defer hcs.lock.Unlock() + + // Remove any that are not needed any more. + for nsn, svc := range hcs.services { + if port, found := newServices[nsn]; !found || port != svc.port { + glog.V(2).Infof("Closing healthcheck %q on port %d", nsn.String(), svc.port) + if err := svc.listener.Close(); err != nil { + glog.Errorf("Close(%v): %v", svc.listener.Addr(), err) + } + delete(hcs.services, nsn) + } + } + + // Add any that are needed. + for nsn, port := range newServices { + if hcs.services[nsn] != nil { + glog.V(3).Infof("Existing healthcheck %q on port %d", nsn.String(), port) + continue + } + + glog.V(2).Infof("Opening healthcheck %q on port %d", nsn.String(), port) + svc := &hcInstance{port: port} + addr := fmt.Sprintf(":%d", port) + svc.server = hcs.httpFactory.New(addr, hcHandler{name: nsn, hcs: hcs}) + var err error + svc.listener, err = hcs.listener.Listen(addr) + if err != nil { + msg := fmt.Sprintf("node %s failed to start healthcheck %q on port %d: %v", hcs.hostname, nsn.String(), port, err) + + if hcs.recorder != nil { + hcs.recorder.Eventf( + &clientv1.ObjectReference{ + Kind: "Service", + Namespace: nsn.Namespace, + Name: nsn.Name, + UID: types.UID(nsn.String()), + }, api.EventTypeWarning, "FailedToStartHealthcheck", msg) + } + glog.Error(msg) + continue + } + hcs.services[nsn] = svc + + go func(nsn types.NamespacedName, svc *hcInstance) { + // Serve() will exit when the listener is closed. + glog.V(3).Infof("Starting goroutine for healthcheck %q on port %d", nsn.String(), svc.port) + if err := svc.server.Serve(svc.listener); err != nil { + glog.V(3).Infof("Healthcheck %q closed: %v", nsn.String(), err) + return + } + glog.V(3).Infof("Healthcheck %q closed", nsn.String()) + }(nsn, svc) + } + return nil +} + +type hcInstance struct { + port uint16 + listener net.Listener + server HTTPServer + endpoints int // number of local endpoints for a service +} + +type hcHandler struct { + name types.NamespacedName + hcs *server +} + +var _ http.Handler = hcHandler{} + +func (h hcHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + h.hcs.lock.Lock() + svc, ok := h.hcs.services[h.name] + if !ok || svc == nil { + h.hcs.lock.Unlock() + glog.Errorf("Received request for closed healthcheck %q", h.name.String()) + return + } + count := svc.endpoints + h.hcs.lock.Unlock() + + resp.Header().Set("Content-Type", "application/json") + if count == 0 { + resp.WriteHeader(http.StatusServiceUnavailable) + } else { + resp.WriteHeader(http.StatusOK) + } + fmt.Fprintf(resp, strings.Trim(dedent.Dedent(fmt.Sprintf(` + { + "service": { + "namespace": %q, + "name": %q + }, + "localEndpoints": %d + } + `, h.name.Namespace, h.name.Name, count)), "\n")) +} + +func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error { + hcs.lock.Lock() + defer hcs.lock.Unlock() + + for nsn, count := range newEndpoints { + if hcs.services[nsn] == nil { + glog.V(3).Infof("Not saving endpoints for unknown healthcheck %q", nsn.String()) + continue + } + glog.V(3).Infof("Reporting %d endpoints for healthcheck %q", count, nsn.String()) + hcs.services[nsn].endpoints = count + } + for nsn, hci := range hcs.services { + if _, found := newEndpoints[nsn]; !found { + hci.endpoints = 0 + } + } + return nil +} + +// HealthzUpdater allows callers to update healthz timestamp only. +type HealthzUpdater interface { + UpdateTimestamp() +} + +// HealthzServer returns 200 "OK" by default. Once timestamp has been +// updated, it verifies we don't exceed max no respond duration since +// last update. +type HealthzServer struct { + listener Listener + httpFactory HTTPServerFactory + clock clock.Clock + + addr string + port int32 + healthTimeout time.Duration + + lastUpdated atomic.Value +} + +// NewDefaultHealthzServer returns a default healthz http server. +func NewDefaultHealthzServer(addr string, healthTimeout time.Duration) *HealthzServer { + return newHealthzServer(nil, nil, nil, addr, healthTimeout) +} + +func newHealthzServer(listener Listener, httpServerFactory HTTPServerFactory, c clock.Clock, addr string, healthTimeout time.Duration) *HealthzServer { + if listener == nil { + listener = stdNetListener{} + } + if httpServerFactory == nil { + httpServerFactory = stdHTTPServerFactory{} + } + if c == nil { + c = clock.RealClock{} + } + return &HealthzServer{ + listener: listener, + httpFactory: httpServerFactory, + clock: c, + addr: addr, + healthTimeout: healthTimeout, + } +} + +// UpdateTimestamp updates the lastUpdated timestamp. +func (hs *HealthzServer) UpdateTimestamp() { + hs.lastUpdated.Store(hs.clock.Now()) +} + +// Run starts the healthz http server and returns. +func (hs *HealthzServer) Run() { + serveMux := http.NewServeMux() + serveMux.Handle("/healthz", healthzHandler{hs: hs}) + server := hs.httpFactory.New(hs.addr, serveMux) + listener, err := hs.listener.Listen(hs.addr) + if err != nil { + glog.Errorf("Failed to start healthz on %s: %v", hs.addr, err) + return + } + go func() { + glog.V(3).Infof("Starting goroutine for healthz on %s", hs.addr) + if err := server.Serve(listener); err != nil { + glog.Errorf("Healhz closed: %v", err) + return + } + glog.Errorf("Unexpected healhz closed.") + }() +} + +type healthzHandler struct { + hs *HealthzServer +} + +func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + lastUpdated := time.Time{} + if val := h.hs.lastUpdated.Load(); val != nil { + lastUpdated = val.(time.Time) + } + currentTime := h.hs.clock.Now() + + resp.Header().Set("Content-Type", "application/json") + if !lastUpdated.IsZero() && currentTime.After(lastUpdated.Add(h.hs.healthTimeout)) { + resp.WriteHeader(http.StatusServiceUnavailable) + } else { + resp.WriteHeader(http.StatusOK) + } + fmt.Fprintf(resp, fmt.Sprintf(`{"lastUpdated": %q,"currentTime": %q}`, lastUpdated, currentTime)) +} diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/iptables/metrics.go b/vendor/k8s.io/kubernetes/pkg/proxy/iptables/metrics.go new file mode 100644 index 000000000..fabe6a595 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/proxy/iptables/metrics.go @@ -0,0 +1,50 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package iptables + +import ( + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +const kubeProxySubsystem = "kubeproxy" + +var ( + SyncProxyRulesLatency = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Subsystem: kubeProxySubsystem, + Name: "sync_proxy_rules_latency_microseconds", + Help: "SyncProxyRules latency", + Buckets: prometheus.ExponentialBuckets(1000, 2, 15), + }, + ) +) + +var registerMetricsOnce sync.Once + +func RegisterMetrics() { + registerMetricsOnce.Do(func() { + prometheus.MustRegister(SyncProxyRulesLatency) + }) +} + +// Gets the time since the specified start in microseconds. +func sinceInMicroseconds(start time.Time) float64 { + return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) +} diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier.go b/vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier.go new file mode 100644 index 000000000..9d29d7b42 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier.go @@ -0,0 +1,1732 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package iptables + +// +// NOTE: this needs to be tested in e2e since it uses iptables for everything. +// + +import ( + "bytes" + "crypto/sha256" + "encoding/base32" + "fmt" + "net" + "reflect" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/golang/glog" + + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" + clientv1 "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/tools/record" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/helper" + apiservice "k8s.io/kubernetes/pkg/api/service" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/proxy/healthcheck" + utilproxy "k8s.io/kubernetes/pkg/proxy/util" + "k8s.io/kubernetes/pkg/util/async" + utilexec "k8s.io/kubernetes/pkg/util/exec" + utiliptables "k8s.io/kubernetes/pkg/util/iptables" + utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" + utilversion "k8s.io/kubernetes/pkg/util/version" +) + +const ( + // iptablesMinVersion is the minimum version of iptables for which we will use the Proxier + // from this package instead of the userspace Proxier. While most of the + // features we need were available earlier, the '-C' flag was added more + // recently. We use that indirectly in Ensure* functions, and if we don't + // have it, we have to be extra careful about the exact args we feed in being + // the same as the args we read back (iptables itself normalizes some args). + // This is the "new" Proxier, so we require "new" versions of tools. + iptablesMinVersion = utiliptables.MinCheckVersion + + // the services chain + kubeServicesChain utiliptables.Chain = "KUBE-SERVICES" + + // the nodeports chain + kubeNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS" + + // the kubernetes postrouting chain + kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING" + + // the mark-for-masquerade chain + KubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ" + + // the mark-for-drop chain + KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP" +) + +// IPTablesVersioner can query the current iptables version. +type IPTablesVersioner interface { + // returns "X.Y.Z" + GetVersion() (string, error) +} + +// KernelCompatTester tests whether the required kernel capabilities are +// present to run the iptables proxier. +type KernelCompatTester interface { + IsCompatible() error +} + +// CanUseIPTablesProxier returns true if we should use the iptables Proxier +// instead of the "classic" userspace Proxier. This is determined by checking +// the iptables version and for the existence of kernel features. It may return +// an error if it fails to get the iptables version without error, in which +// case it will also return false. +func CanUseIPTablesProxier(iptver IPTablesVersioner, kcompat KernelCompatTester) (bool, error) { + minVersion, err := utilversion.ParseGeneric(iptablesMinVersion) + if err != nil { + return false, err + } + versionString, err := iptver.GetVersion() + if err != nil { + return false, err + } + version, err := utilversion.ParseGeneric(versionString) + if err != nil { + return false, err + } + if version.LessThan(minVersion) { + return false, nil + } + + // Check that the kernel supports what we need. + if err := kcompat.IsCompatible(); err != nil { + return false, err + } + return true, nil +} + +type LinuxKernelCompatTester struct{} + +func (lkct LinuxKernelCompatTester) IsCompatible() error { + // Check for the required sysctls. We don't care about the value, just + // that it exists. If this Proxier is chosen, we'll initialize it as we + // need. + _, err := utilsysctl.New().GetSysctl(sysctlRouteLocalnet) + return err +} + +const sysctlRouteLocalnet = "net/ipv4/conf/all/route_localnet" +const sysctlBridgeCallIPTables = "net/bridge/bridge-nf-call-iptables" + +// internal struct for string service information +type serviceInfo struct { + clusterIP net.IP + port int + protocol api.Protocol + nodePort int + loadBalancerStatus api.LoadBalancerStatus + sessionAffinityType api.ServiceAffinity + stickyMaxAgeMinutes int + externalIPs []string + loadBalancerSourceRanges []string + onlyNodeLocalEndpoints bool + healthCheckNodePort int + // The following fields are computed and stored for performance reasons. + serviceNameString string + servicePortChainName utiliptables.Chain + serviceFirewallChainName utiliptables.Chain + serviceLBChainName utiliptables.Chain +} + +// internal struct for endpoints information +type endpointsInfo struct { + endpoint string // TODO: should be an endpointString type + isLocal bool + // The following fields we lazily compute and store here for performance + // reasons. If the protocol is the same as you expect it to be, then the + // chainName can be reused, otherwise it should be recomputed. + protocol string + chainName utiliptables.Chain +} + +// Returns just the IP part of the endpoint. +func (e *endpointsInfo) IPPart() string { + if index := strings.Index(e.endpoint, ":"); index != -1 { + return e.endpoint[0:index] + } + return e.endpoint +} + +// Returns the endpoint chain name for a given endpointsInfo. +func (e *endpointsInfo) endpointChain(svcNameString, protocol string) utiliptables.Chain { + if e.protocol != protocol { + e.protocol = protocol + e.chainName = servicePortEndpointChainName(svcNameString, protocol, e.endpoint) + } + return e.chainName +} + +func (e *endpointsInfo) String() string { + return fmt.Sprintf("%v", *e) +} + +// returns a new serviceInfo struct +func newServiceInfo(svcPortName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo { + onlyNodeLocalEndpoints := false + if utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) && + apiservice.RequestsOnlyLocalTraffic(service) { + onlyNodeLocalEndpoints = true + } + info := &serviceInfo{ + clusterIP: net.ParseIP(service.Spec.ClusterIP), + port: int(port.Port), + protocol: port.Protocol, + nodePort: int(port.NodePort), + // Deep-copy in case the service instance changes + loadBalancerStatus: *helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer), + sessionAffinityType: service.Spec.SessionAffinity, + stickyMaxAgeMinutes: 180, // TODO: paramaterize this in the API. + externalIPs: make([]string, len(service.Spec.ExternalIPs)), + loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)), + onlyNodeLocalEndpoints: onlyNodeLocalEndpoints, + } + copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) + copy(info.externalIPs, service.Spec.ExternalIPs) + + if apiservice.NeedsHealthCheck(service) { + p := apiservice.GetServiceHealthCheckNodePort(service) + if p == 0 { + glog.Errorf("Service %q has no healthcheck nodeport", svcPortName.NamespacedName.String()) + } else { + info.healthCheckNodePort = int(p) + } + } + + // Store the following for performance reasons. + protocol := strings.ToLower(string(info.protocol)) + info.serviceNameString = svcPortName.String() + info.servicePortChainName = servicePortChainName(info.serviceNameString, protocol) + info.serviceFirewallChainName = serviceFirewallChainName(info.serviceNameString, protocol) + info.serviceLBChainName = serviceLBChainName(info.serviceNameString, protocol) + + return info +} + +type endpointsChange struct { + previous proxyEndpointsMap + current proxyEndpointsMap +} + +type endpointsChangeMap struct { + lock sync.Mutex + hostname string + items map[types.NamespacedName]*endpointsChange +} + +type serviceChange struct { + previous proxyServiceMap + current proxyServiceMap +} + +type serviceChangeMap struct { + lock sync.Mutex + items map[types.NamespacedName]*serviceChange +} + +type updateEndpointMapResult struct { + hcEndpoints map[types.NamespacedName]int + staleEndpoints map[endpointServicePair]bool + staleServiceNames map[proxy.ServicePortName]bool +} + +type updateServiceMapResult struct { + hcServices map[types.NamespacedName]uint16 + staleServices sets.String +} + +type proxyServiceMap map[proxy.ServicePortName]*serviceInfo +type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo + +func newEndpointsChangeMap(hostname string) endpointsChangeMap { + return endpointsChangeMap{ + hostname: hostname, + items: make(map[types.NamespacedName]*endpointsChange), + } +} + +func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) bool { + ecm.lock.Lock() + defer ecm.lock.Unlock() + + change, exists := ecm.items[*namespacedName] + if !exists { + change = &endpointsChange{} + change.previous = endpointsToEndpointsMap(previous, ecm.hostname) + ecm.items[*namespacedName] = change + } + change.current = endpointsToEndpointsMap(current, ecm.hostname) + if reflect.DeepEqual(change.previous, change.current) { + delete(ecm.items, *namespacedName) + } + return len(ecm.items) > 0 +} + +func newServiceChangeMap() serviceChangeMap { + return serviceChangeMap{ + items: make(map[types.NamespacedName]*serviceChange), + } +} + +func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) bool { + scm.lock.Lock() + defer scm.lock.Unlock() + + change, exists := scm.items[*namespacedName] + if !exists { + change = &serviceChange{} + change.previous = serviceToServiceMap(previous) + scm.items[*namespacedName] = change + } + change.current = serviceToServiceMap(current) + if reflect.DeepEqual(change.previous, change.current) { + delete(scm.items, *namespacedName) + } + return len(scm.items) > 0 +} + +func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String { + existingPorts := sets.NewString() + for svcPortName, info := range other { + existingPorts.Insert(svcPortName.Port) + _, exists := (*sm)[svcPortName] + if !exists { + glog.V(1).Infof("Adding new service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol) + } else { + glog.V(1).Infof("Updating existing service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol) + } + (*sm)[svcPortName] = info + } + return existingPorts +} + +func (sm *proxyServiceMap) unmerge(other proxyServiceMap, existingPorts, staleServices sets.String) { + for svcPortName := range other { + if existingPorts.Has(svcPortName.Port) { + continue + } + info, exists := (*sm)[svcPortName] + if exists { + glog.V(1).Infof("Removing service port %q", svcPortName) + if info.protocol == api.ProtocolUDP { + staleServices.Insert(info.clusterIP.String()) + } + delete(*sm, svcPortName) + } else { + glog.Errorf("Service port %q removed, but doesn't exists", svcPortName) + } + } +} + +func (em proxyEndpointsMap) merge(other proxyEndpointsMap) { + for svcPortName := range other { + em[svcPortName] = other[svcPortName] + } +} + +func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) { + for svcPortName := range other { + delete(em, svcPortName) + } +} + +// Proxier is an iptables based proxy for connections between a localhost:lport +// and services that provide the actual backends. +type Proxier struct { + // endpointsChanges and serviceChanges contains all changes to endpoints and + // services that happened since iptables was synced. For a single object, + // changes are accumulated, i.e. previous is state from before all of them, + // current is state after applying all of those. + endpointsChanges endpointsChangeMap + serviceChanges serviceChangeMap + + mu sync.Mutex // protects the following fields + serviceMap proxyServiceMap + endpointsMap proxyEndpointsMap + portsMap map[localPort]closeable + // endpointsSynced and servicesSynced are set to true when corresponding + // objects are synced after startup. This is used to avoid updating iptables + // with some partial data after kube-proxy restart. + endpointsSynced bool + servicesSynced bool + initialized int32 + syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules + + // These are effectively const and do not need the mutex to be held. + iptables utiliptables.Interface + masqueradeAll bool + masqueradeMark string + exec utilexec.Interface + clusterCIDR string + hostname string + nodeIP net.IP + portMapper portOpener + recorder record.EventRecorder + healthChecker healthcheck.Server + healthzServer healthcheck.HealthzUpdater + + // Since converting probabilities (floats) to strings is expensive + // and we are using only probabilities in the format of 1/n, we are + // precomputing some number of those and cache for future reuse. + precomputedProbabilities []string + + // The following buffers are used to reuse memory and avoid allocations + // that are significantly impacting performance. + iptablesData *bytes.Buffer + filterChains *bytes.Buffer + filterRules *bytes.Buffer + natChains *bytes.Buffer + natRules *bytes.Buffer +} + +type localPort struct { + desc string + ip string + port int + protocol string +} + +func (lp *localPort) String() string { + return fmt.Sprintf("%q (%s:%d/%s)", lp.desc, lp.ip, lp.port, lp.protocol) +} + +type closeable interface { + Close() error +} + +// portOpener is an interface around port opening/closing. +// Abstracted out for testing. +type portOpener interface { + OpenLocalPort(lp *localPort) (closeable, error) +} + +// listenPortOpener opens ports by calling bind() and listen(). +type listenPortOpener struct{} + +// OpenLocalPort holds the given local port open. +func (l *listenPortOpener) OpenLocalPort(lp *localPort) (closeable, error) { + return openLocalPort(lp) +} + +// Proxier implements ProxyProvider +var _ proxy.ProxyProvider = &Proxier{} + +// NewProxier returns a new Proxier given an iptables Interface instance. +// Because of the iptables logic, it is assumed that there is only a single Proxier active on a machine. +// An error will be returned if iptables fails to update or acquire the initial lock. +// Once a proxier is created, it will keep iptables up to date in the background and +// will not terminate if a particular iptables call fails. +func NewProxier(ipt utiliptables.Interface, + sysctl utilsysctl.Interface, + exec utilexec.Interface, + syncPeriod time.Duration, + minSyncPeriod time.Duration, + masqueradeAll bool, + masqueradeBit int, + clusterCIDR string, + hostname string, + nodeIP net.IP, + recorder record.EventRecorder, + healthzServer healthcheck.HealthzUpdater, +) (*Proxier, error) { + // check valid user input + if minSyncPeriod > syncPeriod { + return nil, fmt.Errorf("minSyncPeriod (%v) must be <= syncPeriod (%v)", minSyncPeriod, syncPeriod) + } + + // Set the route_localnet sysctl we need for + if err := sysctl.SetSysctl(sysctlRouteLocalnet, 1); err != nil { + return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err) + } + + // Proxy needs br_netfilter and bridge-nf-call-iptables=1 when containers + // are connected to a Linux bridge (but not SDN bridges). Until most + // plugins handle this, log when config is missing + if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 { + glog.Warningf("missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended") + } + + // Generate the masquerade mark to use for SNAT rules. + if masqueradeBit < 0 || masqueradeBit > 31 { + return nil, fmt.Errorf("invalid iptables-masquerade-bit %v not in [0, 31]", masqueradeBit) + } + masqueradeValue := 1 << uint(masqueradeBit) + masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue) + + if nodeIP == nil { + glog.Warningf("invalid nodeIP, initializing kube-proxy with 127.0.0.1 as nodeIP") + nodeIP = net.ParseIP("127.0.0.1") + } + + if len(clusterCIDR) == 0 { + glog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic") + } + + healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps + + proxier := &Proxier{ + portsMap: make(map[localPort]closeable), + serviceMap: make(proxyServiceMap), + serviceChanges: newServiceChangeMap(), + endpointsMap: make(proxyEndpointsMap), + endpointsChanges: newEndpointsChangeMap(hostname), + iptables: ipt, + masqueradeAll: masqueradeAll, + masqueradeMark: masqueradeMark, + exec: exec, + clusterCIDR: clusterCIDR, + hostname: hostname, + nodeIP: nodeIP, + portMapper: &listenPortOpener{}, + recorder: recorder, + healthChecker: healthChecker, + healthzServer: healthzServer, + precomputedProbabilities: make([]string, 0, 1001), + iptablesData: bytes.NewBuffer(nil), + filterChains: bytes.NewBuffer(nil), + filterRules: bytes.NewBuffer(nil), + natChains: bytes.NewBuffer(nil), + natRules: bytes.NewBuffer(nil), + } + burstSyncs := 2 + glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs) + proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) + return proxier, nil +} + +// CleanupLeftovers removes all iptables rules and chains created by the Proxier +// It returns true if an error was encountered. Errors are logged. +func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { + // Unlink the services chain. + args := []string{ + "-m", "comment", "--comment", "kubernetes service portals", + "-j", string(kubeServicesChain), + } + tableChainsWithJumpServices := []struct { + table utiliptables.Table + chain utiliptables.Chain + }{ + {utiliptables.TableFilter, utiliptables.ChainInput}, + {utiliptables.TableFilter, utiliptables.ChainOutput}, + {utiliptables.TableNAT, utiliptables.ChainOutput}, + {utiliptables.TableNAT, utiliptables.ChainPrerouting}, + } + for _, tc := range tableChainsWithJumpServices { + if err := ipt.DeleteRule(tc.table, tc.chain, args...); err != nil { + if !utiliptables.IsNotFoundError(err) { + glog.Errorf("Error removing pure-iptables proxy rule: %v", err) + encounteredError = true + } + } + } + + // Unlink the postrouting chain. + args = []string{ + "-m", "comment", "--comment", "kubernetes postrouting rules", + "-j", string(kubePostroutingChain), + } + if err := ipt.DeleteRule(utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { + if !utiliptables.IsNotFoundError(err) { + glog.Errorf("Error removing pure-iptables proxy rule: %v", err) + encounteredError = true + } + } + + // Flush and remove all of our chains. + iptablesData := bytes.NewBuffer(nil) + if err := ipt.SaveInto(utiliptables.TableNAT, iptablesData); err != nil { + glog.Errorf("Failed to execute iptables-save for %s: %v", utiliptables.TableNAT, err) + encounteredError = true + } else { + existingNATChains := utiliptables.GetChainLines(utiliptables.TableNAT, iptablesData.Bytes()) + natChains := bytes.NewBuffer(nil) + natRules := bytes.NewBuffer(nil) + writeLine(natChains, "*nat") + // Start with chains we know we need to remove. + for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} { + if _, found := existingNATChains[chain]; found { + chainString := string(chain) + writeLine(natChains, existingNATChains[chain]) // flush + writeLine(natRules, "-X", chainString) // delete + } + } + // Hunt for service and endpoint chains. + for chain := range existingNATChains { + chainString := string(chain) + if strings.HasPrefix(chainString, "KUBE-SVC-") || strings.HasPrefix(chainString, "KUBE-SEP-") || strings.HasPrefix(chainString, "KUBE-FW-") || strings.HasPrefix(chainString, "KUBE-XLB-") { + writeLine(natChains, existingNATChains[chain]) // flush + writeLine(natRules, "-X", chainString) // delete + } + } + writeLine(natRules, "COMMIT") + natLines := append(natChains.Bytes(), natRules.Bytes()...) + // Write it. + err = ipt.Restore(utiliptables.TableNAT, natLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) + if err != nil { + glog.Errorf("Failed to execute iptables-restore for %s: %v", utiliptables.TableNAT, err) + encounteredError = true + } + } + { + filterBuf := bytes.NewBuffer(nil) + writeLine(filterBuf, "*filter") + writeLine(filterBuf, fmt.Sprintf(":%s - [0:0]", kubeServicesChain)) + writeLine(filterBuf, fmt.Sprintf("-X %s", kubeServicesChain)) + writeLine(filterBuf, "COMMIT") + // Write it. + if err := ipt.Restore(utiliptables.TableFilter, filterBuf.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters); err != nil { + glog.Errorf("Failed to execute iptables-restore for %s: %v", utiliptables.TableFilter, err) + encounteredError = true + } + } + return encounteredError +} + +func computeProbability(n int) string { + return fmt.Sprintf("%0.5f", 1.0/float64(n)) +} + +// This assumes proxier.mu is held +func (proxier *Proxier) precomputeProbabilities(numberOfPrecomputed int) { + if len(proxier.precomputedProbabilities) == 0 { + proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, "<bad value>") + } + for i := len(proxier.precomputedProbabilities); i <= numberOfPrecomputed; i++ { + proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, computeProbability(i)) + } +} + +// This assumes proxier.mu is held +func (proxier *Proxier) probability(n int) string { + if n >= len(proxier.precomputedProbabilities) { + proxier.precomputeProbabilities(n) + } + return proxier.precomputedProbabilities[n] +} + +// Sync is called to synchronize the proxier state to iptables as soon as possible. +func (proxier *Proxier) Sync() { + proxier.syncRunner.Run() +} + +// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. +func (proxier *Proxier) SyncLoop() { + // Update healthz timestamp at beginning in case Sync() never succeeds. + if proxier.healthzServer != nil { + proxier.healthzServer.UpdateTimestamp() + } + proxier.syncRunner.Loop(wait.NeverStop) +} + +func (proxier *Proxier) setInitialized(value bool) { + var initialized int32 + if value { + initialized = 1 + } + atomic.StoreInt32(&proxier.initialized, initialized) +} + +func (proxier *Proxier) isInitialized() bool { + return atomic.LoadInt32(&proxier.initialized) > 0 +} + +func (proxier *Proxier) OnServiceAdd(service *api.Service) { + namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() { + proxier.syncRunner.Run() + } +} + +func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { + namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + if proxier.serviceChanges.update(&namespacedName, oldService, service) && proxier.isInitialized() { + proxier.syncRunner.Run() + } +} + +func (proxier *Proxier) OnServiceDelete(service *api.Service) { + namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + if proxier.serviceChanges.update(&namespacedName, service, nil) && proxier.isInitialized() { + proxier.syncRunner.Run() + } +} + +func (proxier *Proxier) OnServiceSynced() { + proxier.mu.Lock() + proxier.servicesSynced = true + proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) + proxier.mu.Unlock() + + // Sync unconditionally - this is called once per lifetime. + proxier.syncProxyRules() +} + +func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool { + // if ClusterIP is "None" or empty, skip proxying + if !helper.IsServiceIPSet(service) { + glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) + return true + } + // Even if ClusterIP is set, ServiceTypeExternalName services don't get proxied + if service.Spec.Type == api.ServiceTypeExternalName { + glog.V(3).Infof("Skipping service %s due to Type=ExternalName", svcName) + return true + } + return false +} + +// <serviceMap> is updated by this function (based on the given changes). +// <changes> map is cleared after applying them. +func updateServiceMap( + serviceMap proxyServiceMap, + changes *serviceChangeMap) (result updateServiceMapResult) { + result.staleServices = sets.NewString() + + func() { + changes.lock.Lock() + defer changes.lock.Unlock() + for _, change := range changes.items { + existingPorts := serviceMap.merge(change.current) + serviceMap.unmerge(change.previous, existingPorts, result.staleServices) + } + changes.items = make(map[types.NamespacedName]*serviceChange) + }() + + // TODO: If this will appear to be computationally expensive, consider + // computing this incrementally similarly to serviceMap. + result.hcServices = make(map[types.NamespacedName]uint16) + for svcPortName, info := range serviceMap { + if info.healthCheckNodePort != 0 { + result.hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort) + } + } + + return result +} + +func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { + namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} + if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) && proxier.isInitialized() { + proxier.syncRunner.Run() + } +} + +func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { + namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} + if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) && proxier.isInitialized() { + proxier.syncRunner.Run() + } +} + +func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { + namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} + if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) && proxier.isInitialized() { + proxier.syncRunner.Run() + } +} + +func (proxier *Proxier) OnEndpointsSynced() { + proxier.mu.Lock() + proxier.endpointsSynced = true + proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) + proxier.mu.Unlock() + + // Sync unconditionally - this is called once per lifetime. + proxier.syncProxyRules() +} + +// <endpointsMap> is updated by this function (based on the given changes). +// <changes> map is cleared after applying them. +func updateEndpointsMap( + endpointsMap proxyEndpointsMap, + changes *endpointsChangeMap, + hostname string) (result updateEndpointMapResult) { + result.staleEndpoints = make(map[endpointServicePair]bool) + result.staleServiceNames = make(map[proxy.ServicePortName]bool) + + func() { + changes.lock.Lock() + defer changes.lock.Unlock() + for _, change := range changes.items { + endpointsMap.unmerge(change.previous) + endpointsMap.merge(change.current) + detectStaleConnections(change.previous, change.current, result.staleEndpoints, result.staleServiceNames) + } + changes.items = make(map[types.NamespacedName]*endpointsChange) + }() + + if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) { + return + } + + // TODO: If this will appear to be computationally expensive, consider + // computing this incrementally similarly to endpointsMap. + result.hcEndpoints = make(map[types.NamespacedName]int) + localIPs := getLocalIPs(endpointsMap) + for nsn, ips := range localIPs { + result.hcEndpoints[nsn] = len(ips) + } + + return result +} + +// <staleEndpoints> and <staleServices> are modified by this function with detected stale connections. +func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool, staleServiceNames map[proxy.ServicePortName]bool) { + for svcPortName, epList := range oldEndpointsMap { + for _, ep := range epList { + stale := true + for i := range newEndpointsMap[svcPortName] { + if *newEndpointsMap[svcPortName][i] == *ep { + stale = false + break + } + } + if stale { + glog.V(4).Infof("Stale endpoint %v -> %v", svcPortName, ep.endpoint) + staleEndpoints[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPortName}] = true + } + } + } + + for svcPortName, epList := range newEndpointsMap { + // For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service. + if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 { + staleServiceNames[svcPortName] = true + } + } +} + +func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String { + localIPs := make(map[types.NamespacedName]sets.String) + for svcPortName := range endpointsMap { + for _, ep := range endpointsMap[svcPortName] { + if ep.isLocal { + nsn := svcPortName.NamespacedName + if localIPs[nsn] == nil { + localIPs[nsn] = sets.NewString() + } + localIPs[nsn].Insert(ep.IPPart()) // just the IP part + } + } + } + return localIPs +} + +// Translates single Endpoints object to proxyEndpointsMap. +// This function is used for incremental updated of endpointsMap. +// +// NOTE: endpoints object should NOT be modified. +func endpointsToEndpointsMap(endpoints *api.Endpoints, hostname string) proxyEndpointsMap { + if endpoints == nil { + return nil + } + + endpointsMap := make(proxyEndpointsMap) + // We need to build a map of portname -> all ip:ports for that + // portname. Explode Endpoints.Subsets[*] into this structure. + for i := range endpoints.Subsets { + ss := &endpoints.Subsets[i] + for i := range ss.Ports { + port := &ss.Ports[i] + if port.Port == 0 { + glog.Warningf("ignoring invalid endpoint port %s", port.Name) + continue + } + svcPortName := proxy.ServicePortName{ + NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, + Port: port.Name, + } + for i := range ss.Addresses { + addr := &ss.Addresses[i] + if addr.IP == "" { + glog.Warningf("ignoring invalid endpoint port %s with empty host", port.Name) + continue + } + epInfo := &endpointsInfo{ + endpoint: net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))), + isLocal: addr.NodeName != nil && *addr.NodeName == hostname, + } + endpointsMap[svcPortName] = append(endpointsMap[svcPortName], epInfo) + } + if glog.V(3) { + newEPList := []string{} + for _, ep := range endpointsMap[svcPortName] { + newEPList = append(newEPList, ep.endpoint) + } + glog.Infof("Setting endpoints for %q to %+v", svcPortName, newEPList) + } + } + } + return endpointsMap +} + +// Translates single Service object to proxyServiceMap. +// +// NOTE: service object should NOT be modified. +func serviceToServiceMap(service *api.Service) proxyServiceMap { + if service == nil { + return nil + } + svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + if shouldSkipService(svcName, service) { + return nil + } + + serviceMap := make(proxyServiceMap) + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} + serviceMap[svcPortName] = newServiceInfo(svcPortName, servicePort, service) + } + return serviceMap +} + +// portProtoHash takes the ServicePortName and protocol for a service +// returns the associated 16 character hash. This is computed by hashing (sha256) +// then encoding to base32 and truncating to 16 chars. We do this because IPTables +// Chain Names must be <= 28 chars long, and the longer they are the harder they are to read. +func portProtoHash(servicePortName string, protocol string) string { + hash := sha256.Sum256([]byte(servicePortName + protocol)) + encoded := base32.StdEncoding.EncodeToString(hash[:]) + return encoded[:16] +} + +// servicePortChainName takes the ServicePortName for a service and +// returns the associated iptables chain. This is computed by hashing (sha256) +// then encoding to base32 and truncating with the prefix "KUBE-SVC-". +func servicePortChainName(servicePortName string, protocol string) utiliptables.Chain { + return utiliptables.Chain("KUBE-SVC-" + portProtoHash(servicePortName, protocol)) +} + +// serviceFirewallChainName takes the ServicePortName for a service and +// returns the associated iptables chain. This is computed by hashing (sha256) +// then encoding to base32 and truncating with the prefix "KUBE-FW-". +func serviceFirewallChainName(servicePortName string, protocol string) utiliptables.Chain { + return utiliptables.Chain("KUBE-FW-" + portProtoHash(servicePortName, protocol)) +} + +// serviceLBPortChainName takes the ServicePortName for a service and +// returns the associated iptables chain. This is computed by hashing (sha256) +// then encoding to base32 and truncating with the prefix "KUBE-XLB-". We do +// this because IPTables Chain Names must be <= 28 chars long, and the longer +// they are the harder they are to read. +func serviceLBChainName(servicePortName string, protocol string) utiliptables.Chain { + return utiliptables.Chain("KUBE-XLB-" + portProtoHash(servicePortName, protocol)) +} + +// This is the same as servicePortChainName but with the endpoint included. +func servicePortEndpointChainName(servicePortName string, protocol string, endpoint string) utiliptables.Chain { + hash := sha256.Sum256([]byte(servicePortName + protocol + endpoint)) + encoded := base32.StdEncoding.EncodeToString(hash[:]) + return utiliptables.Chain("KUBE-SEP-" + encoded[:16]) +} + +type endpointServicePair struct { + endpoint string + servicePortName proxy.ServicePortName +} + +func (esp *endpointServicePair) IPPart() string { + if index := strings.Index(esp.endpoint, ":"); index != -1 { + return esp.endpoint[0:index] + } + return esp.endpoint +} + +const noConnectionToDelete = "0 flow entries have been deleted" + +// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we +// risk sending more traffic to it, all of which will be lost (because UDP). +// This assumes the proxier mutex is held +func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServicePair]bool) { + for epSvcPair := range connectionMap { + if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP { + endpointIP := epSvcPair.endpoint[0:strings.Index(epSvcPair.endpoint, ":")] + glog.V(2).Infof("Deleting connection tracking state for service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP) + err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp") + if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { + // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. + // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it + // is expensive to baby sit all udp connections to kubernetes services. + glog.Errorf("conntrack return with error: %v", err) + } + } + } +} + +// This is where all of the iptables-save/restore calls happen. +// The only other iptables rules are those that are setup in iptablesInit() +// This assumes proxier.mu is NOT held +func (proxier *Proxier) syncProxyRules() { + proxier.mu.Lock() + defer proxier.mu.Unlock() + + start := time.Now() + defer func() { + SyncProxyRulesLatency.Observe(sinceInMicroseconds(start)) + glog.V(4).Infof("syncProxyRules took %v", time.Since(start)) + }() + // don't sync rules till we've received services and endpoints + if !proxier.endpointsSynced || !proxier.servicesSynced { + glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master") + return + } + + // We assume that if this was called, we really want to sync them, + // even if nothing changed in the meantime. In other words, callers are + // responsible for detecting no-op changes and not calling this function. + serviceUpdateResult := updateServiceMap( + proxier.serviceMap, &proxier.serviceChanges) + endpointUpdateResult := updateEndpointsMap( + proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname) + + staleServices := serviceUpdateResult.staleServices + // merge stale services gathered from updateEndpointsMap + for svcPortName := range endpointUpdateResult.staleServiceNames { + if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.protocol == api.ProtocolUDP { + glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.clusterIP.String()) + staleServices.Insert(svcInfo.clusterIP.String()) + } + } + + glog.V(3).Infof("Syncing iptables rules") + + // Create and link the kube services chain. + { + tablesNeedServicesChain := []utiliptables.Table{utiliptables.TableFilter, utiliptables.TableNAT} + for _, table := range tablesNeedServicesChain { + if _, err := proxier.iptables.EnsureChain(table, kubeServicesChain); err != nil { + glog.Errorf("Failed to ensure that %s chain %s exists: %v", table, kubeServicesChain, err) + return + } + } + + tableChainsNeedJumpServices := []struct { + table utiliptables.Table + chain utiliptables.Chain + }{ + {utiliptables.TableFilter, utiliptables.ChainInput}, + {utiliptables.TableFilter, utiliptables.ChainOutput}, + {utiliptables.TableNAT, utiliptables.ChainOutput}, + {utiliptables.TableNAT, utiliptables.ChainPrerouting}, + } + comment := "kubernetes service portals" + args := []string{"-m", "comment", "--comment", comment, "-j", string(kubeServicesChain)} + for _, tc := range tableChainsNeedJumpServices { + if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil { + glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeServicesChain, err) + return + } + } + } + + // Create and link the kube postrouting chain. + { + if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, kubePostroutingChain); err != nil { + glog.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubePostroutingChain, err) + return + } + + comment := "kubernetes postrouting rules" + args := []string{"-m", "comment", "--comment", comment, "-j", string(kubePostroutingChain)} + if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { + glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, kubePostroutingChain, err) + return + } + } + + // + // Below this point we will not return until we try to write the iptables rules. + // + + // Get iptables-save output so we can check for existing chains and rules. + // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore + existingFilterChains := make(map[utiliptables.Chain]string) + proxier.iptablesData.Reset() + err := proxier.iptables.SaveInto(utiliptables.TableFilter, proxier.iptablesData) + if err != nil { // if we failed to get any rules + glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) + } else { // otherwise parse the output + existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, proxier.iptablesData.Bytes()) + } + + existingNATChains := make(map[utiliptables.Chain]string) + proxier.iptablesData.Reset() + err = proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData) + if err != nil { // if we failed to get any rules + glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) + } else { // otherwise parse the output + existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, proxier.iptablesData.Bytes()) + } + + // Reset all buffers used later. + // This is to avoid memory reallocations and thus improve performance. + proxier.filterChains.Reset() + proxier.filterRules.Reset() + proxier.natChains.Reset() + proxier.natRules.Reset() + + // Write table headers. + writeLine(proxier.filterChains, "*filter") + writeLine(proxier.natChains, "*nat") + + // Make sure we keep stats for the top-level chains, if they existed + // (which most should have because we created them above). + if chain, ok := existingFilterChains[kubeServicesChain]; ok { + writeLine(proxier.filterChains, chain) + } else { + writeLine(proxier.filterChains, utiliptables.MakeChainLine(kubeServicesChain)) + } + if chain, ok := existingNATChains[kubeServicesChain]; ok { + writeLine(proxier.natChains, chain) + } else { + writeLine(proxier.natChains, utiliptables.MakeChainLine(kubeServicesChain)) + } + if chain, ok := existingNATChains[kubeNodePortsChain]; ok { + writeLine(proxier.natChains, chain) + } else { + writeLine(proxier.natChains, utiliptables.MakeChainLine(kubeNodePortsChain)) + } + if chain, ok := existingNATChains[kubePostroutingChain]; ok { + writeLine(proxier.natChains, chain) + } else { + writeLine(proxier.natChains, utiliptables.MakeChainLine(kubePostroutingChain)) + } + if chain, ok := existingNATChains[KubeMarkMasqChain]; ok { + writeLine(proxier.natChains, chain) + } else { + writeLine(proxier.natChains, utiliptables.MakeChainLine(KubeMarkMasqChain)) + } + + // Install the kubernetes-specific postrouting rules. We use a whole chain for + // this so that it is easier to flush and change, for example if the mark + // value should ever change. + writeLine(proxier.natRules, []string{ + "-A", string(kubePostroutingChain), + "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`, + "-m", "mark", "--mark", proxier.masqueradeMark, + "-j", "MASQUERADE", + }...) + + // Install the kubernetes-specific masquerade mark rule. We use a whole chain for + // this so that it is easier to flush and change, for example if the mark + // value should ever change. + writeLine(proxier.natRules, []string{ + "-A", string(KubeMarkMasqChain), + "-j", "MARK", "--set-xmark", proxier.masqueradeMark, + }...) + + // Accumulate NAT chains to keep. + activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set + + // Accumulate the set of local ports that we will be holding open once this update is complete + replacementPortsMap := map[localPort]closeable{} + + // We are creating those slices ones here to avoid memory reallocations + // in every loop. Note that reuse the memory, instead of doing: + // slice = <some new slice> + // you should always do one of the below: + // slice = slice[:0] // and then append to it + // slice = append(slice[:0], ...) + endpoints := make([]*endpointsInfo, 0) + endpointChains := make([]utiliptables.Chain, 0) + // To avoid growing this slice, we arbitrarily set its size to 64, + // there is never more than that many arguments for a single line. + // Note that even if we go over 64, it will still be correct - it + // is just for efficiency, not correctness. + args := make([]string, 64) + + // Build rules for each service. + var svcNameString string + for svcName, svcInfo := range proxier.serviceMap { + protocol := strings.ToLower(string(svcInfo.protocol)) + svcNameString = svcInfo.serviceNameString + + // Create the per-service chain, retaining counters if possible. + svcChain := svcInfo.servicePortChainName + if chain, ok := existingNATChains[svcChain]; ok { + writeLine(proxier.natChains, chain) + } else { + writeLine(proxier.natChains, utiliptables.MakeChainLine(svcChain)) + } + activeNATChains[svcChain] = true + + svcXlbChain := svcInfo.serviceLBChainName + if svcInfo.onlyNodeLocalEndpoints { + // Only for services request OnlyLocal traffic + // create the per-service LB chain, retaining counters if possible. + if lbChain, ok := existingNATChains[svcXlbChain]; ok { + writeLine(proxier.natChains, lbChain) + } else { + writeLine(proxier.natChains, utiliptables.MakeChainLine(svcXlbChain)) + } + activeNATChains[svcXlbChain] = true + } else if activeNATChains[svcXlbChain] { + // Cleanup the previously created XLB chain for this service + delete(activeNATChains, svcXlbChain) + } + + // Capture the clusterIP. + args = append(args[:0], + "-A", string(kubeServicesChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString), + "-m", protocol, "-p", protocol, + "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), + "--dport", strconv.Itoa(svcInfo.port), + ) + if proxier.masqueradeAll { + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + } else if len(proxier.clusterCIDR) > 0 { + // This masquerades off-cluster traffic to a service VIP. The idea + // is that you can establish a static route for your Service range, + // routing to any node, and that node will bridge into the Service + // for you. Since that might bounce off-node, we masquerade here. + // If/when we support "Local" policy for VIPs, we should update this. + writeLine(proxier.natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...) + } + writeLine(proxier.natRules, append(args, "-j", string(svcChain))...) + + // Capture externalIPs. + for _, externalIP := range svcInfo.externalIPs { + // If the "external" IP happens to be an IP that is local to this + // machine, hold the local port open so no other process can open it + // (because the socket might open but it would never work). + if local, err := isLocalIP(externalIP); err != nil { + glog.Errorf("can't determine if IP is local, assuming not: %v", err) + } else if local { + lp := localPort{ + desc: "externalIP for " + svcNameString, + ip: externalIP, + port: svcInfo.port, + protocol: protocol, + } + if proxier.portsMap[lp] != nil { + glog.V(4).Infof("Port %s was open before and is still needed", lp.String()) + replacementPortsMap[lp] = proxier.portsMap[lp] + } else { + socket, err := proxier.portMapper.OpenLocalPort(&lp) + if err != nil { + msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err) + + proxier.recorder.Eventf( + &clientv1.ObjectReference{ + Kind: "Node", + Name: proxier.hostname, + UID: types.UID(proxier.hostname), + Namespace: "", + }, api.EventTypeWarning, err.Error(), msg) + glog.Error(msg) + continue + } + replacementPortsMap[lp] = socket + } + } // We're holding the port, so it's OK to install iptables rules. + args = append(args[:0], + "-A", string(kubeServicesChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString), + "-m", protocol, "-p", protocol, + "-d", fmt.Sprintf("%s/32", externalIP), + "--dport", strconv.Itoa(svcInfo.port), + ) + // We have to SNAT packets to external IPs. + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + + // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container) + // nor from a local process to be forwarded to the service. + // This rule roughly translates to "all traffic from off-machine". + // This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later. + externalTrafficOnlyArgs := append(args, + "-m", "physdev", "!", "--physdev-is-in", + "-m", "addrtype", "!", "--src-type", "LOCAL") + writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", string(svcChain))...) + dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL") + // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local. + // This covers cases like GCE load-balancers which get added to the local routing table. + writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...) + + // If the service has no endpoints then reject packets coming via externalIP + // Install ICMP Reject rule in filter table for destination=externalIP and dport=svcport + if len(proxier.endpointsMap[svcName]) == 0 { + writeLine(proxier.filterRules, + "-A", string(kubeServicesChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), + "-m", protocol, "-p", protocol, + "-d", fmt.Sprintf("%s/32", externalIP), + "--dport", strconv.Itoa(svcInfo.port), + "-j", "REJECT", + ) + } + } + + // Capture load-balancer ingress. + fwChain := svcInfo.serviceFirewallChainName + for _, ingress := range svcInfo.loadBalancerStatus.Ingress { + if ingress.IP != "" { + // create service firewall chain + if chain, ok := existingNATChains[fwChain]; ok { + writeLine(proxier.natChains, chain) + } else { + writeLine(proxier.natChains, utiliptables.MakeChainLine(fwChain)) + } + activeNATChains[fwChain] = true + // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. + // This currently works for loadbalancers that preserves source ips. + // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. + + args = append(args[:0], + "-A", string(kubeServicesChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString), + "-m", protocol, "-p", protocol, + "-d", fmt.Sprintf("%s/32", ingress.IP), + "--dport", strconv.Itoa(svcInfo.port), + ) + // jump to service firewall chain + writeLine(proxier.natRules, append(args, "-j", string(fwChain))...) + + args = append(args[:0], + "-A", string(fwChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString), + ) + + // Each source match rule in the FW chain may jump to either the SVC or the XLB chain + chosenChain := svcXlbChain + // If we are proxying globally, we need to masquerade in case we cross nodes. + // If we are proxying only locally, we can retain the source IP. + if !svcInfo.onlyNodeLocalEndpoints { + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + chosenChain = svcChain + } + + if len(svcInfo.loadBalancerSourceRanges) == 0 { + // allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain + writeLine(proxier.natRules, append(args, "-j", string(chosenChain))...) + } else { + // firewall filter based on each source range + allowFromNode := false + for _, src := range svcInfo.loadBalancerSourceRanges { + writeLine(proxier.natRules, append(args, "-s", src, "-j", string(chosenChain))...) + // ignore error because it has been validated + _, cidr, _ := net.ParseCIDR(src) + if cidr.Contains(proxier.nodeIP) { + allowFromNode = true + } + } + // generally, ip route rule was added to intercept request to loadbalancer vip from the + // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly. + // Need to add the following rule to allow request on host. + if allowFromNode { + writeLine(proxier.natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", string(chosenChain))...) + } + } + + // If the packet was able to reach the end of firewall chain, then it did not get DNATed. + // It means the packet cannot go thru the firewall, then mark it for DROP + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...) + } + } + + // Capture nodeports. If we had more than 2 rules it might be + // worthwhile to make a new per-service chain for nodeport rules, but + // with just 2 rules it ends up being a waste and a cognitive burden. + if svcInfo.nodePort != 0 { + // Hold the local port open so no other process can open it + // (because the socket might open but it would never work). + lp := localPort{ + desc: "nodePort for " + svcNameString, + ip: "", + port: svcInfo.nodePort, + protocol: protocol, + } + if proxier.portsMap[lp] != nil { + glog.V(4).Infof("Port %s was open before and is still needed", lp.String()) + replacementPortsMap[lp] = proxier.portsMap[lp] + } else { + socket, err := proxier.portMapper.OpenLocalPort(&lp) + if err != nil { + glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err) + continue + } + if lp.protocol == "udp" { + proxier.clearUDPConntrackForPort(lp.port) + } + replacementPortsMap[lp] = socket + } // We're holding the port, so it's OK to install iptables rules. + + args = append(args[:0], + "-A", string(kubeNodePortsChain), + "-m", "comment", "--comment", svcNameString, + "-m", protocol, "-p", protocol, + "--dport", strconv.Itoa(svcInfo.nodePort), + ) + if !svcInfo.onlyNodeLocalEndpoints { + // Nodeports need SNAT, unless they're local. + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + // Jump to the service chain. + writeLine(proxier.natRules, append(args, "-j", string(svcChain))...) + } else { + // TODO: Make all nodePorts jump to the firewall chain. + // Currently we only create it for loadbalancers (#33586). + writeLine(proxier.natRules, append(args, "-j", string(svcXlbChain))...) + } + + // If the service has no endpoints then reject packets. The filter + // table doesn't currently have the same per-service structure that + // the nat table does, so we just stick this into the kube-services + // chain. + if len(proxier.endpointsMap[svcName]) == 0 { + writeLine(proxier.filterRules, + "-A", string(kubeServicesChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), + "-m", "addrtype", "--dst-type", "LOCAL", + "-m", protocol, "-p", protocol, + "--dport", strconv.Itoa(svcInfo.nodePort), + "-j", "REJECT", + ) + } + } + + // If the service has no endpoints then reject packets. + if len(proxier.endpointsMap[svcName]) == 0 { + writeLine(proxier.filterRules, + "-A", string(kubeServicesChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), + "-m", protocol, "-p", protocol, + "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), + "--dport", strconv.Itoa(svcInfo.port), + "-j", "REJECT", + ) + continue + } + + // From here on, we assume there are active endpoints. + + // Generate the per-endpoint chains. We do this in multiple passes so we + // can group rules together. + // These two slices parallel each other - keep in sync + endpoints = endpoints[:0] + endpointChains = endpointChains[:0] + var endpointChain utiliptables.Chain + for _, ep := range proxier.endpointsMap[svcName] { + endpoints = append(endpoints, ep) + endpointChain = ep.endpointChain(svcNameString, protocol) + endpointChains = append(endpointChains, endpointChain) + + // Create the endpoint chain, retaining counters if possible. + if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok { + writeLine(proxier.natChains, chain) + } else { + writeLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain)) + } + activeNATChains[endpointChain] = true + } + + // First write session affinity rules, if applicable. + if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { + for _, endpointChain := range endpointChains { + writeLine(proxier.natRules, + "-A", string(svcChain), + "-m", "comment", "--comment", svcNameString, + "-m", "recent", "--name", string(endpointChain), + "--rcheck", "--seconds", strconv.Itoa(svcInfo.stickyMaxAgeMinutes*60), "--reap", + "-j", string(endpointChain)) + } + } + + // Now write loadbalancing & DNAT rules. + n := len(endpointChains) + for i, endpointChain := range endpointChains { + // Balancing rules in the per-service chain. + args = append(args[:0], []string{ + "-A", string(svcChain), + "-m", "comment", "--comment", svcNameString, + }...) + if i < (n - 1) { + // Each rule is a probabilistic match. + args = append(args, + "-m", "statistic", + "--mode", "random", + "--probability", proxier.probability(n-i)) + } + // The final (or only if n == 1) rule is a guaranteed match. + args = append(args, "-j", string(endpointChain)) + writeLine(proxier.natRules, args...) + + // Rules in the per-endpoint chain. + args = append(args[:0], + "-A", string(endpointChain), + "-m", "comment", "--comment", svcNameString, + ) + // Handle traffic that loops back to the originator with SNAT. + writeLine(proxier.natRules, append(args, + "-s", fmt.Sprintf("%s/32", endpoints[i].IPPart()), + "-j", string(KubeMarkMasqChain))...) + // Update client-affinity lists. + if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { + args = append(args, "-m", "recent", "--name", string(endpointChain), "--set") + } + // DNAT to final destination. + args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].endpoint) + writeLine(proxier.natRules, args...) + } + + // The logic below this applies only if this service is marked as OnlyLocal + if !svcInfo.onlyNodeLocalEndpoints { + continue + } + + // Now write ingress loadbalancing & DNAT rules only for services that request OnlyLocal traffic. + // TODO - This logic may be combinable with the block above that creates the svc balancer chain + localEndpoints := make([]*endpointsInfo, 0) + localEndpointChains := make([]utiliptables.Chain, 0) + for i := range endpointChains { + if endpoints[i].isLocal { + // These slices parallel each other; must be kept in sync + localEndpoints = append(localEndpoints, endpoints[i]) + localEndpointChains = append(localEndpointChains, endpointChains[i]) + } + } + // First rule in the chain redirects all pod -> external VIP traffic to the + // Service's ClusterIP instead. This happens whether or not we have local + // endpoints; only if clusterCIDR is specified + if len(proxier.clusterCIDR) > 0 { + args = append(args[:0], + "-A", string(svcXlbChain), + "-m", "comment", "--comment", + `"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`, + "-s", proxier.clusterCIDR, + "-j", string(svcChain), + ) + writeLine(proxier.natRules, args...) + } + + numLocalEndpoints := len(localEndpointChains) + if numLocalEndpoints == 0 { + // Blackhole all traffic since there are no local endpoints + args = append(args[:0], + "-A", string(svcXlbChain), + "-m", "comment", "--comment", + fmt.Sprintf(`"%s has no local endpoints"`, svcNameString), + "-j", + string(KubeMarkDropChain), + ) + writeLine(proxier.natRules, args...) + } else { + // Setup probability filter rules only over local endpoints + for i, endpointChain := range localEndpointChains { + // Balancing rules in the per-service chain. + args = append(args[:0], + "-A", string(svcXlbChain), + "-m", "comment", "--comment", + fmt.Sprintf(`"Balancing rule %d for %s"`, i, svcNameString), + ) + if i < (numLocalEndpoints - 1) { + // Each rule is a probabilistic match. + args = append(args, + "-m", "statistic", + "--mode", "random", + "--probability", proxier.probability(numLocalEndpoints-i)) + } + // The final (or only if n == 1) rule is a guaranteed match. + args = append(args, "-j", string(endpointChain)) + writeLine(proxier.natRules, args...) + } + } + } + + // Delete chains no longer in use. + for chain := range existingNATChains { + if !activeNATChains[chain] { + chainString := string(chain) + if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") && !strings.HasPrefix(chainString, "KUBE-XLB-") { + // Ignore chains that aren't ours. + continue + } + // We must (as per iptables) write a chain-line for it, which has + // the nice effect of flushing the chain. Then we can remove the + // chain. + writeLine(proxier.natChains, existingNATChains[chain]) + writeLine(proxier.natRules, "-X", chainString) + } + } + + // Finally, tail-call to the nodeports chain. This needs to be after all + // other service portal rules. + writeLine(proxier.natRules, + "-A", string(kubeServicesChain), + "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`, + "-m", "addrtype", "--dst-type", "LOCAL", + "-j", string(kubeNodePortsChain)) + + // Write the end-of-table markers. + writeLine(proxier.filterRules, "COMMIT") + writeLine(proxier.natRules, "COMMIT") + + // Sync rules. + // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table + proxier.iptablesData.Reset() + proxier.iptablesData.Write(proxier.filterChains.Bytes()) + proxier.iptablesData.Write(proxier.filterRules.Bytes()) + proxier.iptablesData.Write(proxier.natChains.Bytes()) + proxier.iptablesData.Write(proxier.natRules.Bytes()) + + glog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes()) + err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters) + if err != nil { + glog.Errorf("Failed to execute iptables-restore: %v", err) + glog.V(2).Infof("Rules:\n%s", proxier.iptablesData.Bytes()) + // Revert new local ports. + revertPorts(replacementPortsMap, proxier.portsMap) + return + } + + // Close old local ports and save new ones. + for k, v := range proxier.portsMap { + if replacementPortsMap[k] == nil { + v.Close() + } + } + proxier.portsMap = replacementPortsMap + + // Update healthz timestamp. + if proxier.healthzServer != nil { + proxier.healthzServer.UpdateTimestamp() + } + + // Update healthchecks. The endpoints list might include services that are + // not "OnlyLocal", but the services list will not, and the healthChecker + // will just drop those endpoints. + if err := proxier.healthChecker.SyncServices(serviceUpdateResult.hcServices); err != nil { + glog.Errorf("Error syncing healtcheck services: %v", err) + } + if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil { + glog.Errorf("Error syncing healthcheck endoints: %v", err) + } + + // Finish housekeeping. + // TODO: these and clearUDPConntrackForPort() could be made more consistent. + utilproxy.DeleteServiceConnections(proxier.exec, staleServices.List()) + proxier.deleteEndpointConnections(endpointUpdateResult.staleEndpoints) +} + +// Clear UDP conntrack for port or all conntrack entries when port equal zero. +// When a packet arrives, it will not go through NAT table again, because it is not "the first" packet. +// The solution is clearing the conntrack. Known issus: +// https://github.com/docker/docker/issues/8795 +// https://github.com/kubernetes/kubernetes/issues/31983 +func (proxier *Proxier) clearUDPConntrackForPort(port int) { + glog.V(2).Infof("Deleting conntrack entries for udp connections") + if port > 0 { + err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "-p", "udp", "--dport", strconv.Itoa(port)) + if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { + glog.Errorf("conntrack return with error: %v", err) + } + } else { + glog.Errorf("Wrong port number. The port number must be greater than zero") + } +} + +// Join all words with spaces, terminate with newline and write to buf. +func writeLine(buf *bytes.Buffer, words ...string) { + // We avoid strings.Join for performance reasons. + for i := range words { + buf.WriteString(words[i]) + if i < len(words)-1 { + buf.WriteByte(' ') + } else { + buf.WriteByte('\n') + } + } +} + +func isLocalIP(ip string) (bool, error) { + addrs, err := net.InterfaceAddrs() + if err != nil { + return false, err + } + for i := range addrs { + intf, _, err := net.ParseCIDR(addrs[i].String()) + if err != nil { + return false, err + } + if net.ParseIP(ip).Equal(intf) { + return true, nil + } + } + return false, nil +} + +func openLocalPort(lp *localPort) (closeable, error) { + // For ports on node IPs, open the actual port and hold it, even though we + // use iptables to redirect traffic. + // This ensures a) that it's safe to use that port and b) that (a) stays + // true. The risk is that some process on the node (e.g. sshd or kubelet) + // is using a port and we give that same port out to a Service. That would + // be bad because iptables would silently claim the traffic but the process + // would never know. + // NOTE: We should not need to have a real listen()ing socket - bind() + // should be enough, but I can't figure out a way to e2e test without + // it. Tools like 'ss' and 'netstat' do not show sockets that are + // bind()ed but not listen()ed, and at least the default debian netcat + // has no way to avoid about 10 seconds of retries. + var socket closeable + switch lp.protocol { + case "tcp": + listener, err := net.Listen("tcp", net.JoinHostPort(lp.ip, strconv.Itoa(lp.port))) + if err != nil { + return nil, err + } + socket = listener + case "udp": + addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(lp.ip, strconv.Itoa(lp.port))) + if err != nil { + return nil, err + } + conn, err := net.ListenUDP("udp", addr) + if err != nil { + return nil, err + } + socket = conn + default: + return nil, fmt.Errorf("unknown protocol %q", lp.protocol) + } + glog.V(2).Infof("Opened local port %s", lp.String()) + return socket, nil +} + +// revertPorts is closing ports in replacementPortsMap but not in originalPortsMap. In other words, it only +// closes the ports opened in this sync. +func revertPorts(replacementPortsMap, originalPortsMap map[localPort]closeable) { + for k, v := range replacementPortsMap { + // Only close newly opened local ports - leave ones that were open before this update + if originalPortsMap[k] == nil { + glog.V(2).Infof("Closing local port %s after iptables-restore failure", k.String()) + v.Close() + } + } +} diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/types.go b/vendor/k8s.io/kubernetes/pkg/proxy/types.go new file mode 100644 index 000000000..578baff69 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/proxy/types.go @@ -0,0 +1,44 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/types" +) + +// ProxyProvider is the interface provided by proxier implementations. +type ProxyProvider interface { + // Sync immediately synchronizes the ProxyProvider's current state to iptables. + Sync() + // SyncLoop runs periodic work. + // This is expected to run as a goroutine or as the main loop of the app. + // It does not return. + SyncLoop() +} + +// ServicePortName carries a namespace + name + portname. This is the unique +// identfier for a load-balanced service. +type ServicePortName struct { + types.NamespacedName + Port string +} + +func (spn ServicePortName) String() string { + return fmt.Sprintf("%s:%s", spn.NamespacedName.String(), spn.Port) +} diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/util/conntrack.go b/vendor/k8s.io/kubernetes/pkg/proxy/util/conntrack.go new file mode 100644 index 000000000..436045ecb --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/proxy/util/conntrack.go @@ -0,0 +1,58 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "strings" + + "k8s.io/kubernetes/pkg/util/exec" + + "github.com/golang/glog" +) + +// Utilities for dealing with conntrack + +const noConnectionToDelete = "0 flow entries have been deleted" + +// DeleteServiceConnection uses the conntrack tool to delete the conntrack entries +// for the UDP connections specified by the given service IPs +func DeleteServiceConnections(execer exec.Interface, svcIPs []string) { + for _, ip := range svcIPs { + glog.V(2).Infof("Deleting connection tracking state for service IP %s", ip) + err := ExecConntrackTool(execer, "-D", "--orig-dst", ip, "-p", "udp") + if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { + // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. + // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it + // is expensive to baby-sit all udp connections to kubernetes services. + glog.Errorf("conntrack returned error: %v", err) + } + } +} + +// ExecConntrackTool executes the conntrack tool using the given parameters +func ExecConntrackTool(execer exec.Interface, parameters ...string) error { + conntrackPath, err := execer.LookPath("conntrack") + if err != nil { + return fmt.Errorf("error looking for path of conntrack: %v", err) + } + output, err := execer.Command(conntrackPath, parameters...).CombinedOutput() + if err != nil { + return fmt.Errorf("conntrack command returned: %q, error message: %s", string(output), err) + } + return nil +} |