summaryrefslogtreecommitdiff
path: root/vendor/k8s.io/kubernetes/pkg/proxy
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/k8s.io/kubernetes/pkg/proxy')
-rw-r--r--vendor/k8s.io/kubernetes/pkg/proxy/doc.go18
-rw-r--r--vendor/k8s.io/kubernetes/pkg/proxy/healthcheck/doc.go18
-rw-r--r--vendor/k8s.io/kubernetes/pkg/proxy/healthcheck/healthcheck.go333
-rw-r--r--vendor/k8s.io/kubernetes/pkg/proxy/iptables/metrics.go50
-rw-r--r--vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier.go1732
-rw-r--r--vendor/k8s.io/kubernetes/pkg/proxy/types.go44
-rw-r--r--vendor/k8s.io/kubernetes/pkg/proxy/util/conntrack.go58
7 files changed, 2253 insertions, 0 deletions
diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/doc.go b/vendor/k8s.io/kubernetes/pkg/proxy/doc.go
new file mode 100644
index 000000000..3bed0fa39
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/proxy/doc.go
@@ -0,0 +1,18 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package proxy implements the layer-3 network proxy.
+package proxy // import "k8s.io/kubernetes/pkg/proxy"
diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/healthcheck/doc.go b/vendor/k8s.io/kubernetes/pkg/proxy/healthcheck/doc.go
new file mode 100644
index 000000000..0a9ea0944
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/proxy/healthcheck/doc.go
@@ -0,0 +1,18 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package healthcheck provides tools for serving kube-proxy healthchecks.
+package healthcheck // import "k8s.io/kubernetes/pkg/proxy/healthcheck"
diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/healthcheck/healthcheck.go b/vendor/k8s.io/kubernetes/pkg/proxy/healthcheck/healthcheck.go
new file mode 100644
index 000000000..39f10f71a
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/proxy/healthcheck/healthcheck.go
@@ -0,0 +1,333 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package healthcheck
+
+import (
+ "fmt"
+ "net"
+ "net/http"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/golang/glog"
+ "github.com/renstrom/dedent"
+
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/clock"
+ clientv1 "k8s.io/client-go/pkg/api/v1"
+ "k8s.io/client-go/tools/record"
+ "k8s.io/kubernetes/pkg/api"
+)
+
+// Server serves HTTP endpoints for each service name, with results
+// based on the endpoints. If there are 0 endpoints for a service, it returns a
+// 503 "Service Unavailable" error (telling LBs not to use this node). If there
+// are 1 or more endpoints, it returns a 200 "OK".
+type Server interface {
+ // Make the new set of services be active. Services that were open before
+ // will be closed. Services that are new will be opened. Service that
+ // existed and are in the new set will be left alone. The value of the map
+ // is the healthcheck-port to listen on.
+ SyncServices(newServices map[types.NamespacedName]uint16) error
+ // Make the new set of endpoints be active. Endpoints for services that do
+ // not exist will be dropped. The value of the map is the number of
+ // endpoints the service has on this node.
+ SyncEndpoints(newEndpoints map[types.NamespacedName]int) error
+}
+
+// Listener allows for testing of Server. If the Listener argument
+// to NewServer() is nil, the real net.Listen function will be used.
+type Listener interface {
+ // Listen is very much like net.Listen, except the first arg (network) is
+ // fixed to be "tcp".
+ Listen(addr string) (net.Listener, error)
+}
+
+// HTTPServerFactory allows for testing of Server. If the
+// HTTPServerFactory argument to NewServer() is nil, the real
+// http.Server type will be used.
+type HTTPServerFactory interface {
+ // New creates an instance of a type satisfying HTTPServer. This is
+ // designed to include http.Server.
+ New(addr string, handler http.Handler) HTTPServer
+}
+
+// HTTPServer allows for testing of Server.
+type HTTPServer interface {
+ // Server is designed so that http.Server satifies this interface,
+ Serve(listener net.Listener) error
+}
+
+// NewServer allocates a new healthcheck server manager. If either
+// of the injected arguments are nil, defaults will be used.
+func NewServer(hostname string, recorder record.EventRecorder, listener Listener, httpServerFactory HTTPServerFactory) Server {
+ if listener == nil {
+ listener = stdNetListener{}
+ }
+ if httpServerFactory == nil {
+ httpServerFactory = stdHTTPServerFactory{}
+ }
+ return &server{
+ hostname: hostname,
+ recorder: recorder,
+ listener: listener,
+ httpFactory: httpServerFactory,
+ services: map[types.NamespacedName]*hcInstance{},
+ }
+}
+
+// Implement Listener in terms of net.Listen.
+type stdNetListener struct{}
+
+func (stdNetListener) Listen(addr string) (net.Listener, error) {
+ return net.Listen("tcp", addr)
+}
+
+var _ Listener = stdNetListener{}
+
+// Implement HTTPServerFactory in terms of http.Server.
+type stdHTTPServerFactory struct{}
+
+func (stdHTTPServerFactory) New(addr string, handler http.Handler) HTTPServer {
+ return &http.Server{
+ Addr: addr,
+ Handler: handler,
+ }
+}
+
+var _ HTTPServerFactory = stdHTTPServerFactory{}
+
+type server struct {
+ hostname string
+ recorder record.EventRecorder // can be nil
+ listener Listener
+ httpFactory HTTPServerFactory
+
+ lock sync.Mutex
+ services map[types.NamespacedName]*hcInstance
+}
+
+func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) error {
+ hcs.lock.Lock()
+ defer hcs.lock.Unlock()
+
+ // Remove any that are not needed any more.
+ for nsn, svc := range hcs.services {
+ if port, found := newServices[nsn]; !found || port != svc.port {
+ glog.V(2).Infof("Closing healthcheck %q on port %d", nsn.String(), svc.port)
+ if err := svc.listener.Close(); err != nil {
+ glog.Errorf("Close(%v): %v", svc.listener.Addr(), err)
+ }
+ delete(hcs.services, nsn)
+ }
+ }
+
+ // Add any that are needed.
+ for nsn, port := range newServices {
+ if hcs.services[nsn] != nil {
+ glog.V(3).Infof("Existing healthcheck %q on port %d", nsn.String(), port)
+ continue
+ }
+
+ glog.V(2).Infof("Opening healthcheck %q on port %d", nsn.String(), port)
+ svc := &hcInstance{port: port}
+ addr := fmt.Sprintf(":%d", port)
+ svc.server = hcs.httpFactory.New(addr, hcHandler{name: nsn, hcs: hcs})
+ var err error
+ svc.listener, err = hcs.listener.Listen(addr)
+ if err != nil {
+ msg := fmt.Sprintf("node %s failed to start healthcheck %q on port %d: %v", hcs.hostname, nsn.String(), port, err)
+
+ if hcs.recorder != nil {
+ hcs.recorder.Eventf(
+ &clientv1.ObjectReference{
+ Kind: "Service",
+ Namespace: nsn.Namespace,
+ Name: nsn.Name,
+ UID: types.UID(nsn.String()),
+ }, api.EventTypeWarning, "FailedToStartHealthcheck", msg)
+ }
+ glog.Error(msg)
+ continue
+ }
+ hcs.services[nsn] = svc
+
+ go func(nsn types.NamespacedName, svc *hcInstance) {
+ // Serve() will exit when the listener is closed.
+ glog.V(3).Infof("Starting goroutine for healthcheck %q on port %d", nsn.String(), svc.port)
+ if err := svc.server.Serve(svc.listener); err != nil {
+ glog.V(3).Infof("Healthcheck %q closed: %v", nsn.String(), err)
+ return
+ }
+ glog.V(3).Infof("Healthcheck %q closed", nsn.String())
+ }(nsn, svc)
+ }
+ return nil
+}
+
+type hcInstance struct {
+ port uint16
+ listener net.Listener
+ server HTTPServer
+ endpoints int // number of local endpoints for a service
+}
+
+type hcHandler struct {
+ name types.NamespacedName
+ hcs *server
+}
+
+var _ http.Handler = hcHandler{}
+
+func (h hcHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ h.hcs.lock.Lock()
+ svc, ok := h.hcs.services[h.name]
+ if !ok || svc == nil {
+ h.hcs.lock.Unlock()
+ glog.Errorf("Received request for closed healthcheck %q", h.name.String())
+ return
+ }
+ count := svc.endpoints
+ h.hcs.lock.Unlock()
+
+ resp.Header().Set("Content-Type", "application/json")
+ if count == 0 {
+ resp.WriteHeader(http.StatusServiceUnavailable)
+ } else {
+ resp.WriteHeader(http.StatusOK)
+ }
+ fmt.Fprintf(resp, strings.Trim(dedent.Dedent(fmt.Sprintf(`
+ {
+ "service": {
+ "namespace": %q,
+ "name": %q
+ },
+ "localEndpoints": %d
+ }
+ `, h.name.Namespace, h.name.Name, count)), "\n"))
+}
+
+func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error {
+ hcs.lock.Lock()
+ defer hcs.lock.Unlock()
+
+ for nsn, count := range newEndpoints {
+ if hcs.services[nsn] == nil {
+ glog.V(3).Infof("Not saving endpoints for unknown healthcheck %q", nsn.String())
+ continue
+ }
+ glog.V(3).Infof("Reporting %d endpoints for healthcheck %q", count, nsn.String())
+ hcs.services[nsn].endpoints = count
+ }
+ for nsn, hci := range hcs.services {
+ if _, found := newEndpoints[nsn]; !found {
+ hci.endpoints = 0
+ }
+ }
+ return nil
+}
+
+// HealthzUpdater allows callers to update healthz timestamp only.
+type HealthzUpdater interface {
+ UpdateTimestamp()
+}
+
+// HealthzServer returns 200 "OK" by default. Once timestamp has been
+// updated, it verifies we don't exceed max no respond duration since
+// last update.
+type HealthzServer struct {
+ listener Listener
+ httpFactory HTTPServerFactory
+ clock clock.Clock
+
+ addr string
+ port int32
+ healthTimeout time.Duration
+
+ lastUpdated atomic.Value
+}
+
+// NewDefaultHealthzServer returns a default healthz http server.
+func NewDefaultHealthzServer(addr string, healthTimeout time.Duration) *HealthzServer {
+ return newHealthzServer(nil, nil, nil, addr, healthTimeout)
+}
+
+func newHealthzServer(listener Listener, httpServerFactory HTTPServerFactory, c clock.Clock, addr string, healthTimeout time.Duration) *HealthzServer {
+ if listener == nil {
+ listener = stdNetListener{}
+ }
+ if httpServerFactory == nil {
+ httpServerFactory = stdHTTPServerFactory{}
+ }
+ if c == nil {
+ c = clock.RealClock{}
+ }
+ return &HealthzServer{
+ listener: listener,
+ httpFactory: httpServerFactory,
+ clock: c,
+ addr: addr,
+ healthTimeout: healthTimeout,
+ }
+}
+
+// UpdateTimestamp updates the lastUpdated timestamp.
+func (hs *HealthzServer) UpdateTimestamp() {
+ hs.lastUpdated.Store(hs.clock.Now())
+}
+
+// Run starts the healthz http server and returns.
+func (hs *HealthzServer) Run() {
+ serveMux := http.NewServeMux()
+ serveMux.Handle("/healthz", healthzHandler{hs: hs})
+ server := hs.httpFactory.New(hs.addr, serveMux)
+ listener, err := hs.listener.Listen(hs.addr)
+ if err != nil {
+ glog.Errorf("Failed to start healthz on %s: %v", hs.addr, err)
+ return
+ }
+ go func() {
+ glog.V(3).Infof("Starting goroutine for healthz on %s", hs.addr)
+ if err := server.Serve(listener); err != nil {
+ glog.Errorf("Healhz closed: %v", err)
+ return
+ }
+ glog.Errorf("Unexpected healhz closed.")
+ }()
+}
+
+type healthzHandler struct {
+ hs *HealthzServer
+}
+
+func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ lastUpdated := time.Time{}
+ if val := h.hs.lastUpdated.Load(); val != nil {
+ lastUpdated = val.(time.Time)
+ }
+ currentTime := h.hs.clock.Now()
+
+ resp.Header().Set("Content-Type", "application/json")
+ if !lastUpdated.IsZero() && currentTime.After(lastUpdated.Add(h.hs.healthTimeout)) {
+ resp.WriteHeader(http.StatusServiceUnavailable)
+ } else {
+ resp.WriteHeader(http.StatusOK)
+ }
+ fmt.Fprintf(resp, fmt.Sprintf(`{"lastUpdated": %q,"currentTime": %q}`, lastUpdated, currentTime))
+}
diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/iptables/metrics.go b/vendor/k8s.io/kubernetes/pkg/proxy/iptables/metrics.go
new file mode 100644
index 000000000..fabe6a595
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/proxy/iptables/metrics.go
@@ -0,0 +1,50 @@
+/*
+Copyright 2017 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package iptables
+
+import (
+ "sync"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+const kubeProxySubsystem = "kubeproxy"
+
+var (
+ SyncProxyRulesLatency = prometheus.NewHistogram(
+ prometheus.HistogramOpts{
+ Subsystem: kubeProxySubsystem,
+ Name: "sync_proxy_rules_latency_microseconds",
+ Help: "SyncProxyRules latency",
+ Buckets: prometheus.ExponentialBuckets(1000, 2, 15),
+ },
+ )
+)
+
+var registerMetricsOnce sync.Once
+
+func RegisterMetrics() {
+ registerMetricsOnce.Do(func() {
+ prometheus.MustRegister(SyncProxyRulesLatency)
+ })
+}
+
+// Gets the time since the specified start in microseconds.
+func sinceInMicroseconds(start time.Time) float64 {
+ return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
+}
diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier.go b/vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier.go
new file mode 100644
index 000000000..9d29d7b42
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier.go
@@ -0,0 +1,1732 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package iptables
+
+//
+// NOTE: this needs to be tested in e2e since it uses iptables for everything.
+//
+
+import (
+ "bytes"
+ "crypto/sha256"
+ "encoding/base32"
+ "fmt"
+ "net"
+ "reflect"
+ "strconv"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/golang/glog"
+
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/sets"
+ "k8s.io/apimachinery/pkg/util/wait"
+ utilfeature "k8s.io/apiserver/pkg/util/feature"
+ clientv1 "k8s.io/client-go/pkg/api/v1"
+ "k8s.io/client-go/tools/record"
+ "k8s.io/kubernetes/pkg/api"
+ "k8s.io/kubernetes/pkg/api/helper"
+ apiservice "k8s.io/kubernetes/pkg/api/service"
+ "k8s.io/kubernetes/pkg/features"
+ "k8s.io/kubernetes/pkg/proxy"
+ "k8s.io/kubernetes/pkg/proxy/healthcheck"
+ utilproxy "k8s.io/kubernetes/pkg/proxy/util"
+ "k8s.io/kubernetes/pkg/util/async"
+ utilexec "k8s.io/kubernetes/pkg/util/exec"
+ utiliptables "k8s.io/kubernetes/pkg/util/iptables"
+ utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
+ utilversion "k8s.io/kubernetes/pkg/util/version"
+)
+
+const (
+ // iptablesMinVersion is the minimum version of iptables for which we will use the Proxier
+ // from this package instead of the userspace Proxier. While most of the
+ // features we need were available earlier, the '-C' flag was added more
+ // recently. We use that indirectly in Ensure* functions, and if we don't
+ // have it, we have to be extra careful about the exact args we feed in being
+ // the same as the args we read back (iptables itself normalizes some args).
+ // This is the "new" Proxier, so we require "new" versions of tools.
+ iptablesMinVersion = utiliptables.MinCheckVersion
+
+ // the services chain
+ kubeServicesChain utiliptables.Chain = "KUBE-SERVICES"
+
+ // the nodeports chain
+ kubeNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS"
+
+ // the kubernetes postrouting chain
+ kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING"
+
+ // the mark-for-masquerade chain
+ KubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
+
+ // the mark-for-drop chain
+ KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP"
+)
+
+// IPTablesVersioner can query the current iptables version.
+type IPTablesVersioner interface {
+ // returns "X.Y.Z"
+ GetVersion() (string, error)
+}
+
+// KernelCompatTester tests whether the required kernel capabilities are
+// present to run the iptables proxier.
+type KernelCompatTester interface {
+ IsCompatible() error
+}
+
+// CanUseIPTablesProxier returns true if we should use the iptables Proxier
+// instead of the "classic" userspace Proxier. This is determined by checking
+// the iptables version and for the existence of kernel features. It may return
+// an error if it fails to get the iptables version without error, in which
+// case it will also return false.
+func CanUseIPTablesProxier(iptver IPTablesVersioner, kcompat KernelCompatTester) (bool, error) {
+ minVersion, err := utilversion.ParseGeneric(iptablesMinVersion)
+ if err != nil {
+ return false, err
+ }
+ versionString, err := iptver.GetVersion()
+ if err != nil {
+ return false, err
+ }
+ version, err := utilversion.ParseGeneric(versionString)
+ if err != nil {
+ return false, err
+ }
+ if version.LessThan(minVersion) {
+ return false, nil
+ }
+
+ // Check that the kernel supports what we need.
+ if err := kcompat.IsCompatible(); err != nil {
+ return false, err
+ }
+ return true, nil
+}
+
+type LinuxKernelCompatTester struct{}
+
+func (lkct LinuxKernelCompatTester) IsCompatible() error {
+ // Check for the required sysctls. We don't care about the value, just
+ // that it exists. If this Proxier is chosen, we'll initialize it as we
+ // need.
+ _, err := utilsysctl.New().GetSysctl(sysctlRouteLocalnet)
+ return err
+}
+
+const sysctlRouteLocalnet = "net/ipv4/conf/all/route_localnet"
+const sysctlBridgeCallIPTables = "net/bridge/bridge-nf-call-iptables"
+
+// internal struct for string service information
+type serviceInfo struct {
+ clusterIP net.IP
+ port int
+ protocol api.Protocol
+ nodePort int
+ loadBalancerStatus api.LoadBalancerStatus
+ sessionAffinityType api.ServiceAffinity
+ stickyMaxAgeMinutes int
+ externalIPs []string
+ loadBalancerSourceRanges []string
+ onlyNodeLocalEndpoints bool
+ healthCheckNodePort int
+ // The following fields are computed and stored for performance reasons.
+ serviceNameString string
+ servicePortChainName utiliptables.Chain
+ serviceFirewallChainName utiliptables.Chain
+ serviceLBChainName utiliptables.Chain
+}
+
+// internal struct for endpoints information
+type endpointsInfo struct {
+ endpoint string // TODO: should be an endpointString type
+ isLocal bool
+ // The following fields we lazily compute and store here for performance
+ // reasons. If the protocol is the same as you expect it to be, then the
+ // chainName can be reused, otherwise it should be recomputed.
+ protocol string
+ chainName utiliptables.Chain
+}
+
+// Returns just the IP part of the endpoint.
+func (e *endpointsInfo) IPPart() string {
+ if index := strings.Index(e.endpoint, ":"); index != -1 {
+ return e.endpoint[0:index]
+ }
+ return e.endpoint
+}
+
+// Returns the endpoint chain name for a given endpointsInfo.
+func (e *endpointsInfo) endpointChain(svcNameString, protocol string) utiliptables.Chain {
+ if e.protocol != protocol {
+ e.protocol = protocol
+ e.chainName = servicePortEndpointChainName(svcNameString, protocol, e.endpoint)
+ }
+ return e.chainName
+}
+
+func (e *endpointsInfo) String() string {
+ return fmt.Sprintf("%v", *e)
+}
+
+// returns a new serviceInfo struct
+func newServiceInfo(svcPortName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo {
+ onlyNodeLocalEndpoints := false
+ if utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) &&
+ apiservice.RequestsOnlyLocalTraffic(service) {
+ onlyNodeLocalEndpoints = true
+ }
+ info := &serviceInfo{
+ clusterIP: net.ParseIP(service.Spec.ClusterIP),
+ port: int(port.Port),
+ protocol: port.Protocol,
+ nodePort: int(port.NodePort),
+ // Deep-copy in case the service instance changes
+ loadBalancerStatus: *helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer),
+ sessionAffinityType: service.Spec.SessionAffinity,
+ stickyMaxAgeMinutes: 180, // TODO: paramaterize this in the API.
+ externalIPs: make([]string, len(service.Spec.ExternalIPs)),
+ loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)),
+ onlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
+ }
+ copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges)
+ copy(info.externalIPs, service.Spec.ExternalIPs)
+
+ if apiservice.NeedsHealthCheck(service) {
+ p := apiservice.GetServiceHealthCheckNodePort(service)
+ if p == 0 {
+ glog.Errorf("Service %q has no healthcheck nodeport", svcPortName.NamespacedName.String())
+ } else {
+ info.healthCheckNodePort = int(p)
+ }
+ }
+
+ // Store the following for performance reasons.
+ protocol := strings.ToLower(string(info.protocol))
+ info.serviceNameString = svcPortName.String()
+ info.servicePortChainName = servicePortChainName(info.serviceNameString, protocol)
+ info.serviceFirewallChainName = serviceFirewallChainName(info.serviceNameString, protocol)
+ info.serviceLBChainName = serviceLBChainName(info.serviceNameString, protocol)
+
+ return info
+}
+
+type endpointsChange struct {
+ previous proxyEndpointsMap
+ current proxyEndpointsMap
+}
+
+type endpointsChangeMap struct {
+ lock sync.Mutex
+ hostname string
+ items map[types.NamespacedName]*endpointsChange
+}
+
+type serviceChange struct {
+ previous proxyServiceMap
+ current proxyServiceMap
+}
+
+type serviceChangeMap struct {
+ lock sync.Mutex
+ items map[types.NamespacedName]*serviceChange
+}
+
+type updateEndpointMapResult struct {
+ hcEndpoints map[types.NamespacedName]int
+ staleEndpoints map[endpointServicePair]bool
+ staleServiceNames map[proxy.ServicePortName]bool
+}
+
+type updateServiceMapResult struct {
+ hcServices map[types.NamespacedName]uint16
+ staleServices sets.String
+}
+
+type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
+type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo
+
+func newEndpointsChangeMap(hostname string) endpointsChangeMap {
+ return endpointsChangeMap{
+ hostname: hostname,
+ items: make(map[types.NamespacedName]*endpointsChange),
+ }
+}
+
+func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) bool {
+ ecm.lock.Lock()
+ defer ecm.lock.Unlock()
+
+ change, exists := ecm.items[*namespacedName]
+ if !exists {
+ change = &endpointsChange{}
+ change.previous = endpointsToEndpointsMap(previous, ecm.hostname)
+ ecm.items[*namespacedName] = change
+ }
+ change.current = endpointsToEndpointsMap(current, ecm.hostname)
+ if reflect.DeepEqual(change.previous, change.current) {
+ delete(ecm.items, *namespacedName)
+ }
+ return len(ecm.items) > 0
+}
+
+func newServiceChangeMap() serviceChangeMap {
+ return serviceChangeMap{
+ items: make(map[types.NamespacedName]*serviceChange),
+ }
+}
+
+func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) bool {
+ scm.lock.Lock()
+ defer scm.lock.Unlock()
+
+ change, exists := scm.items[*namespacedName]
+ if !exists {
+ change = &serviceChange{}
+ change.previous = serviceToServiceMap(previous)
+ scm.items[*namespacedName] = change
+ }
+ change.current = serviceToServiceMap(current)
+ if reflect.DeepEqual(change.previous, change.current) {
+ delete(scm.items, *namespacedName)
+ }
+ return len(scm.items) > 0
+}
+
+func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String {
+ existingPorts := sets.NewString()
+ for svcPortName, info := range other {
+ existingPorts.Insert(svcPortName.Port)
+ _, exists := (*sm)[svcPortName]
+ if !exists {
+ glog.V(1).Infof("Adding new service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol)
+ } else {
+ glog.V(1).Infof("Updating existing service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol)
+ }
+ (*sm)[svcPortName] = info
+ }
+ return existingPorts
+}
+
+func (sm *proxyServiceMap) unmerge(other proxyServiceMap, existingPorts, staleServices sets.String) {
+ for svcPortName := range other {
+ if existingPorts.Has(svcPortName.Port) {
+ continue
+ }
+ info, exists := (*sm)[svcPortName]
+ if exists {
+ glog.V(1).Infof("Removing service port %q", svcPortName)
+ if info.protocol == api.ProtocolUDP {
+ staleServices.Insert(info.clusterIP.String())
+ }
+ delete(*sm, svcPortName)
+ } else {
+ glog.Errorf("Service port %q removed, but doesn't exists", svcPortName)
+ }
+ }
+}
+
+func (em proxyEndpointsMap) merge(other proxyEndpointsMap) {
+ for svcPortName := range other {
+ em[svcPortName] = other[svcPortName]
+ }
+}
+
+func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) {
+ for svcPortName := range other {
+ delete(em, svcPortName)
+ }
+}
+
+// Proxier is an iptables based proxy for connections between a localhost:lport
+// and services that provide the actual backends.
+type Proxier struct {
+ // endpointsChanges and serviceChanges contains all changes to endpoints and
+ // services that happened since iptables was synced. For a single object,
+ // changes are accumulated, i.e. previous is state from before all of them,
+ // current is state after applying all of those.
+ endpointsChanges endpointsChangeMap
+ serviceChanges serviceChangeMap
+
+ mu sync.Mutex // protects the following fields
+ serviceMap proxyServiceMap
+ endpointsMap proxyEndpointsMap
+ portsMap map[localPort]closeable
+ // endpointsSynced and servicesSynced are set to true when corresponding
+ // objects are synced after startup. This is used to avoid updating iptables
+ // with some partial data after kube-proxy restart.
+ endpointsSynced bool
+ servicesSynced bool
+ initialized int32
+ syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
+
+ // These are effectively const and do not need the mutex to be held.
+ iptables utiliptables.Interface
+ masqueradeAll bool
+ masqueradeMark string
+ exec utilexec.Interface
+ clusterCIDR string
+ hostname string
+ nodeIP net.IP
+ portMapper portOpener
+ recorder record.EventRecorder
+ healthChecker healthcheck.Server
+ healthzServer healthcheck.HealthzUpdater
+
+ // Since converting probabilities (floats) to strings is expensive
+ // and we are using only probabilities in the format of 1/n, we are
+ // precomputing some number of those and cache for future reuse.
+ precomputedProbabilities []string
+
+ // The following buffers are used to reuse memory and avoid allocations
+ // that are significantly impacting performance.
+ iptablesData *bytes.Buffer
+ filterChains *bytes.Buffer
+ filterRules *bytes.Buffer
+ natChains *bytes.Buffer
+ natRules *bytes.Buffer
+}
+
+type localPort struct {
+ desc string
+ ip string
+ port int
+ protocol string
+}
+
+func (lp *localPort) String() string {
+ return fmt.Sprintf("%q (%s:%d/%s)", lp.desc, lp.ip, lp.port, lp.protocol)
+}
+
+type closeable interface {
+ Close() error
+}
+
+// portOpener is an interface around port opening/closing.
+// Abstracted out for testing.
+type portOpener interface {
+ OpenLocalPort(lp *localPort) (closeable, error)
+}
+
+// listenPortOpener opens ports by calling bind() and listen().
+type listenPortOpener struct{}
+
+// OpenLocalPort holds the given local port open.
+func (l *listenPortOpener) OpenLocalPort(lp *localPort) (closeable, error) {
+ return openLocalPort(lp)
+}
+
+// Proxier implements ProxyProvider
+var _ proxy.ProxyProvider = &Proxier{}
+
+// NewProxier returns a new Proxier given an iptables Interface instance.
+// Because of the iptables logic, it is assumed that there is only a single Proxier active on a machine.
+// An error will be returned if iptables fails to update or acquire the initial lock.
+// Once a proxier is created, it will keep iptables up to date in the background and
+// will not terminate if a particular iptables call fails.
+func NewProxier(ipt utiliptables.Interface,
+ sysctl utilsysctl.Interface,
+ exec utilexec.Interface,
+ syncPeriod time.Duration,
+ minSyncPeriod time.Duration,
+ masqueradeAll bool,
+ masqueradeBit int,
+ clusterCIDR string,
+ hostname string,
+ nodeIP net.IP,
+ recorder record.EventRecorder,
+ healthzServer healthcheck.HealthzUpdater,
+) (*Proxier, error) {
+ // check valid user input
+ if minSyncPeriod > syncPeriod {
+ return nil, fmt.Errorf("minSyncPeriod (%v) must be <= syncPeriod (%v)", minSyncPeriod, syncPeriod)
+ }
+
+ // Set the route_localnet sysctl we need for
+ if err := sysctl.SetSysctl(sysctlRouteLocalnet, 1); err != nil {
+ return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err)
+ }
+
+ // Proxy needs br_netfilter and bridge-nf-call-iptables=1 when containers
+ // are connected to a Linux bridge (but not SDN bridges). Until most
+ // plugins handle this, log when config is missing
+ if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 {
+ glog.Warningf("missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended")
+ }
+
+ // Generate the masquerade mark to use for SNAT rules.
+ if masqueradeBit < 0 || masqueradeBit > 31 {
+ return nil, fmt.Errorf("invalid iptables-masquerade-bit %v not in [0, 31]", masqueradeBit)
+ }
+ masqueradeValue := 1 << uint(masqueradeBit)
+ masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)
+
+ if nodeIP == nil {
+ glog.Warningf("invalid nodeIP, initializing kube-proxy with 127.0.0.1 as nodeIP")
+ nodeIP = net.ParseIP("127.0.0.1")
+ }
+
+ if len(clusterCIDR) == 0 {
+ glog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic")
+ }
+
+ healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
+
+ proxier := &Proxier{
+ portsMap: make(map[localPort]closeable),
+ serviceMap: make(proxyServiceMap),
+ serviceChanges: newServiceChangeMap(),
+ endpointsMap: make(proxyEndpointsMap),
+ endpointsChanges: newEndpointsChangeMap(hostname),
+ iptables: ipt,
+ masqueradeAll: masqueradeAll,
+ masqueradeMark: masqueradeMark,
+ exec: exec,
+ clusterCIDR: clusterCIDR,
+ hostname: hostname,
+ nodeIP: nodeIP,
+ portMapper: &listenPortOpener{},
+ recorder: recorder,
+ healthChecker: healthChecker,
+ healthzServer: healthzServer,
+ precomputedProbabilities: make([]string, 0, 1001),
+ iptablesData: bytes.NewBuffer(nil),
+ filterChains: bytes.NewBuffer(nil),
+ filterRules: bytes.NewBuffer(nil),
+ natChains: bytes.NewBuffer(nil),
+ natRules: bytes.NewBuffer(nil),
+ }
+ burstSyncs := 2
+ glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
+ proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
+ return proxier, nil
+}
+
+// CleanupLeftovers removes all iptables rules and chains created by the Proxier
+// It returns true if an error was encountered. Errors are logged.
+func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
+ // Unlink the services chain.
+ args := []string{
+ "-m", "comment", "--comment", "kubernetes service portals",
+ "-j", string(kubeServicesChain),
+ }
+ tableChainsWithJumpServices := []struct {
+ table utiliptables.Table
+ chain utiliptables.Chain
+ }{
+ {utiliptables.TableFilter, utiliptables.ChainInput},
+ {utiliptables.TableFilter, utiliptables.ChainOutput},
+ {utiliptables.TableNAT, utiliptables.ChainOutput},
+ {utiliptables.TableNAT, utiliptables.ChainPrerouting},
+ }
+ for _, tc := range tableChainsWithJumpServices {
+ if err := ipt.DeleteRule(tc.table, tc.chain, args...); err != nil {
+ if !utiliptables.IsNotFoundError(err) {
+ glog.Errorf("Error removing pure-iptables proxy rule: %v", err)
+ encounteredError = true
+ }
+ }
+ }
+
+ // Unlink the postrouting chain.
+ args = []string{
+ "-m", "comment", "--comment", "kubernetes postrouting rules",
+ "-j", string(kubePostroutingChain),
+ }
+ if err := ipt.DeleteRule(utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
+ if !utiliptables.IsNotFoundError(err) {
+ glog.Errorf("Error removing pure-iptables proxy rule: %v", err)
+ encounteredError = true
+ }
+ }
+
+ // Flush and remove all of our chains.
+ iptablesData := bytes.NewBuffer(nil)
+ if err := ipt.SaveInto(utiliptables.TableNAT, iptablesData); err != nil {
+ glog.Errorf("Failed to execute iptables-save for %s: %v", utiliptables.TableNAT, err)
+ encounteredError = true
+ } else {
+ existingNATChains := utiliptables.GetChainLines(utiliptables.TableNAT, iptablesData.Bytes())
+ natChains := bytes.NewBuffer(nil)
+ natRules := bytes.NewBuffer(nil)
+ writeLine(natChains, "*nat")
+ // Start with chains we know we need to remove.
+ for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} {
+ if _, found := existingNATChains[chain]; found {
+ chainString := string(chain)
+ writeLine(natChains, existingNATChains[chain]) // flush
+ writeLine(natRules, "-X", chainString) // delete
+ }
+ }
+ // Hunt for service and endpoint chains.
+ for chain := range existingNATChains {
+ chainString := string(chain)
+ if strings.HasPrefix(chainString, "KUBE-SVC-") || strings.HasPrefix(chainString, "KUBE-SEP-") || strings.HasPrefix(chainString, "KUBE-FW-") || strings.HasPrefix(chainString, "KUBE-XLB-") {
+ writeLine(natChains, existingNATChains[chain]) // flush
+ writeLine(natRules, "-X", chainString) // delete
+ }
+ }
+ writeLine(natRules, "COMMIT")
+ natLines := append(natChains.Bytes(), natRules.Bytes()...)
+ // Write it.
+ err = ipt.Restore(utiliptables.TableNAT, natLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
+ if err != nil {
+ glog.Errorf("Failed to execute iptables-restore for %s: %v", utiliptables.TableNAT, err)
+ encounteredError = true
+ }
+ }
+ {
+ filterBuf := bytes.NewBuffer(nil)
+ writeLine(filterBuf, "*filter")
+ writeLine(filterBuf, fmt.Sprintf(":%s - [0:0]", kubeServicesChain))
+ writeLine(filterBuf, fmt.Sprintf("-X %s", kubeServicesChain))
+ writeLine(filterBuf, "COMMIT")
+ // Write it.
+ if err := ipt.Restore(utiliptables.TableFilter, filterBuf.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters); err != nil {
+ glog.Errorf("Failed to execute iptables-restore for %s: %v", utiliptables.TableFilter, err)
+ encounteredError = true
+ }
+ }
+ return encounteredError
+}
+
+func computeProbability(n int) string {
+ return fmt.Sprintf("%0.5f", 1.0/float64(n))
+}
+
+// This assumes proxier.mu is held
+func (proxier *Proxier) precomputeProbabilities(numberOfPrecomputed int) {
+ if len(proxier.precomputedProbabilities) == 0 {
+ proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, "<bad value>")
+ }
+ for i := len(proxier.precomputedProbabilities); i <= numberOfPrecomputed; i++ {
+ proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, computeProbability(i))
+ }
+}
+
+// This assumes proxier.mu is held
+func (proxier *Proxier) probability(n int) string {
+ if n >= len(proxier.precomputedProbabilities) {
+ proxier.precomputeProbabilities(n)
+ }
+ return proxier.precomputedProbabilities[n]
+}
+
+// Sync is called to synchronize the proxier state to iptables as soon as possible.
+func (proxier *Proxier) Sync() {
+ proxier.syncRunner.Run()
+}
+
+// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
+func (proxier *Proxier) SyncLoop() {
+ // Update healthz timestamp at beginning in case Sync() never succeeds.
+ if proxier.healthzServer != nil {
+ proxier.healthzServer.UpdateTimestamp()
+ }
+ proxier.syncRunner.Loop(wait.NeverStop)
+}
+
+func (proxier *Proxier) setInitialized(value bool) {
+ var initialized int32
+ if value {
+ initialized = 1
+ }
+ atomic.StoreInt32(&proxier.initialized, initialized)
+}
+
+func (proxier *Proxier) isInitialized() bool {
+ return atomic.LoadInt32(&proxier.initialized) > 0
+}
+
+func (proxier *Proxier) OnServiceAdd(service *api.Service) {
+ namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
+ if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() {
+ proxier.syncRunner.Run()
+ }
+}
+
+func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
+ namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
+ if proxier.serviceChanges.update(&namespacedName, oldService, service) && proxier.isInitialized() {
+ proxier.syncRunner.Run()
+ }
+}
+
+func (proxier *Proxier) OnServiceDelete(service *api.Service) {
+ namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
+ if proxier.serviceChanges.update(&namespacedName, service, nil) && proxier.isInitialized() {
+ proxier.syncRunner.Run()
+ }
+}
+
+func (proxier *Proxier) OnServiceSynced() {
+ proxier.mu.Lock()
+ proxier.servicesSynced = true
+ proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
+ proxier.mu.Unlock()
+
+ // Sync unconditionally - this is called once per lifetime.
+ proxier.syncProxyRules()
+}
+
+func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool {
+ // if ClusterIP is "None" or empty, skip proxying
+ if !helper.IsServiceIPSet(service) {
+ glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
+ return true
+ }
+ // Even if ClusterIP is set, ServiceTypeExternalName services don't get proxied
+ if service.Spec.Type == api.ServiceTypeExternalName {
+ glog.V(3).Infof("Skipping service %s due to Type=ExternalName", svcName)
+ return true
+ }
+ return false
+}
+
+// <serviceMap> is updated by this function (based on the given changes).
+// <changes> map is cleared after applying them.
+func updateServiceMap(
+ serviceMap proxyServiceMap,
+ changes *serviceChangeMap) (result updateServiceMapResult) {
+ result.staleServices = sets.NewString()
+
+ func() {
+ changes.lock.Lock()
+ defer changes.lock.Unlock()
+ for _, change := range changes.items {
+ existingPorts := serviceMap.merge(change.current)
+ serviceMap.unmerge(change.previous, existingPorts, result.staleServices)
+ }
+ changes.items = make(map[types.NamespacedName]*serviceChange)
+ }()
+
+ // TODO: If this will appear to be computationally expensive, consider
+ // computing this incrementally similarly to serviceMap.
+ result.hcServices = make(map[types.NamespacedName]uint16)
+ for svcPortName, info := range serviceMap {
+ if info.healthCheckNodePort != 0 {
+ result.hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort)
+ }
+ }
+
+ return result
+}
+
+func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
+ namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
+ if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) && proxier.isInitialized() {
+ proxier.syncRunner.Run()
+ }
+}
+
+func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {
+ namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
+ if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) && proxier.isInitialized() {
+ proxier.syncRunner.Run()
+ }
+}
+
+func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
+ namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
+ if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) && proxier.isInitialized() {
+ proxier.syncRunner.Run()
+ }
+}
+
+func (proxier *Proxier) OnEndpointsSynced() {
+ proxier.mu.Lock()
+ proxier.endpointsSynced = true
+ proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
+ proxier.mu.Unlock()
+
+ // Sync unconditionally - this is called once per lifetime.
+ proxier.syncProxyRules()
+}
+
+// <endpointsMap> is updated by this function (based on the given changes).
+// <changes> map is cleared after applying them.
+func updateEndpointsMap(
+ endpointsMap proxyEndpointsMap,
+ changes *endpointsChangeMap,
+ hostname string) (result updateEndpointMapResult) {
+ result.staleEndpoints = make(map[endpointServicePair]bool)
+ result.staleServiceNames = make(map[proxy.ServicePortName]bool)
+
+ func() {
+ changes.lock.Lock()
+ defer changes.lock.Unlock()
+ for _, change := range changes.items {
+ endpointsMap.unmerge(change.previous)
+ endpointsMap.merge(change.current)
+ detectStaleConnections(change.previous, change.current, result.staleEndpoints, result.staleServiceNames)
+ }
+ changes.items = make(map[types.NamespacedName]*endpointsChange)
+ }()
+
+ if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
+ return
+ }
+
+ // TODO: If this will appear to be computationally expensive, consider
+ // computing this incrementally similarly to endpointsMap.
+ result.hcEndpoints = make(map[types.NamespacedName]int)
+ localIPs := getLocalIPs(endpointsMap)
+ for nsn, ips := range localIPs {
+ result.hcEndpoints[nsn] = len(ips)
+ }
+
+ return result
+}
+
+// <staleEndpoints> and <staleServices> are modified by this function with detected stale connections.
+func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool, staleServiceNames map[proxy.ServicePortName]bool) {
+ for svcPortName, epList := range oldEndpointsMap {
+ for _, ep := range epList {
+ stale := true
+ for i := range newEndpointsMap[svcPortName] {
+ if *newEndpointsMap[svcPortName][i] == *ep {
+ stale = false
+ break
+ }
+ }
+ if stale {
+ glog.V(4).Infof("Stale endpoint %v -> %v", svcPortName, ep.endpoint)
+ staleEndpoints[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPortName}] = true
+ }
+ }
+ }
+
+ for svcPortName, epList := range newEndpointsMap {
+ // For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service.
+ if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 {
+ staleServiceNames[svcPortName] = true
+ }
+ }
+}
+
+func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String {
+ localIPs := make(map[types.NamespacedName]sets.String)
+ for svcPortName := range endpointsMap {
+ for _, ep := range endpointsMap[svcPortName] {
+ if ep.isLocal {
+ nsn := svcPortName.NamespacedName
+ if localIPs[nsn] == nil {
+ localIPs[nsn] = sets.NewString()
+ }
+ localIPs[nsn].Insert(ep.IPPart()) // just the IP part
+ }
+ }
+ }
+ return localIPs
+}
+
+// Translates single Endpoints object to proxyEndpointsMap.
+// This function is used for incremental updated of endpointsMap.
+//
+// NOTE: endpoints object should NOT be modified.
+func endpointsToEndpointsMap(endpoints *api.Endpoints, hostname string) proxyEndpointsMap {
+ if endpoints == nil {
+ return nil
+ }
+
+ endpointsMap := make(proxyEndpointsMap)
+ // We need to build a map of portname -> all ip:ports for that
+ // portname. Explode Endpoints.Subsets[*] into this structure.
+ for i := range endpoints.Subsets {
+ ss := &endpoints.Subsets[i]
+ for i := range ss.Ports {
+ port := &ss.Ports[i]
+ if port.Port == 0 {
+ glog.Warningf("ignoring invalid endpoint port %s", port.Name)
+ continue
+ }
+ svcPortName := proxy.ServicePortName{
+ NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name},
+ Port: port.Name,
+ }
+ for i := range ss.Addresses {
+ addr := &ss.Addresses[i]
+ if addr.IP == "" {
+ glog.Warningf("ignoring invalid endpoint port %s with empty host", port.Name)
+ continue
+ }
+ epInfo := &endpointsInfo{
+ endpoint: net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))),
+ isLocal: addr.NodeName != nil && *addr.NodeName == hostname,
+ }
+ endpointsMap[svcPortName] = append(endpointsMap[svcPortName], epInfo)
+ }
+ if glog.V(3) {
+ newEPList := []string{}
+ for _, ep := range endpointsMap[svcPortName] {
+ newEPList = append(newEPList, ep.endpoint)
+ }
+ glog.Infof("Setting endpoints for %q to %+v", svcPortName, newEPList)
+ }
+ }
+ }
+ return endpointsMap
+}
+
+// Translates single Service object to proxyServiceMap.
+//
+// NOTE: service object should NOT be modified.
+func serviceToServiceMap(service *api.Service) proxyServiceMap {
+ if service == nil {
+ return nil
+ }
+ svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
+ if shouldSkipService(svcName, service) {
+ return nil
+ }
+
+ serviceMap := make(proxyServiceMap)
+ for i := range service.Spec.Ports {
+ servicePort := &service.Spec.Ports[i]
+ svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
+ serviceMap[svcPortName] = newServiceInfo(svcPortName, servicePort, service)
+ }
+ return serviceMap
+}
+
+// portProtoHash takes the ServicePortName and protocol for a service
+// returns the associated 16 character hash. This is computed by hashing (sha256)
+// then encoding to base32 and truncating to 16 chars. We do this because IPTables
+// Chain Names must be <= 28 chars long, and the longer they are the harder they are to read.
+func portProtoHash(servicePortName string, protocol string) string {
+ hash := sha256.Sum256([]byte(servicePortName + protocol))
+ encoded := base32.StdEncoding.EncodeToString(hash[:])
+ return encoded[:16]
+}
+
+// servicePortChainName takes the ServicePortName for a service and
+// returns the associated iptables chain. This is computed by hashing (sha256)
+// then encoding to base32 and truncating with the prefix "KUBE-SVC-".
+func servicePortChainName(servicePortName string, protocol string) utiliptables.Chain {
+ return utiliptables.Chain("KUBE-SVC-" + portProtoHash(servicePortName, protocol))
+}
+
+// serviceFirewallChainName takes the ServicePortName for a service and
+// returns the associated iptables chain. This is computed by hashing (sha256)
+// then encoding to base32 and truncating with the prefix "KUBE-FW-".
+func serviceFirewallChainName(servicePortName string, protocol string) utiliptables.Chain {
+ return utiliptables.Chain("KUBE-FW-" + portProtoHash(servicePortName, protocol))
+}
+
+// serviceLBPortChainName takes the ServicePortName for a service and
+// returns the associated iptables chain. This is computed by hashing (sha256)
+// then encoding to base32 and truncating with the prefix "KUBE-XLB-". We do
+// this because IPTables Chain Names must be <= 28 chars long, and the longer
+// they are the harder they are to read.
+func serviceLBChainName(servicePortName string, protocol string) utiliptables.Chain {
+ return utiliptables.Chain("KUBE-XLB-" + portProtoHash(servicePortName, protocol))
+}
+
+// This is the same as servicePortChainName but with the endpoint included.
+func servicePortEndpointChainName(servicePortName string, protocol string, endpoint string) utiliptables.Chain {
+ hash := sha256.Sum256([]byte(servicePortName + protocol + endpoint))
+ encoded := base32.StdEncoding.EncodeToString(hash[:])
+ return utiliptables.Chain("KUBE-SEP-" + encoded[:16])
+}
+
+type endpointServicePair struct {
+ endpoint string
+ servicePortName proxy.ServicePortName
+}
+
+func (esp *endpointServicePair) IPPart() string {
+ if index := strings.Index(esp.endpoint, ":"); index != -1 {
+ return esp.endpoint[0:index]
+ }
+ return esp.endpoint
+}
+
+const noConnectionToDelete = "0 flow entries have been deleted"
+
+// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
+// risk sending more traffic to it, all of which will be lost (because UDP).
+// This assumes the proxier mutex is held
+func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServicePair]bool) {
+ for epSvcPair := range connectionMap {
+ if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP {
+ endpointIP := epSvcPair.endpoint[0:strings.Index(epSvcPair.endpoint, ":")]
+ glog.V(2).Infof("Deleting connection tracking state for service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP)
+ err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp")
+ if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
+ // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
+ // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
+ // is expensive to baby sit all udp connections to kubernetes services.
+ glog.Errorf("conntrack return with error: %v", err)
+ }
+ }
+ }
+}
+
+// This is where all of the iptables-save/restore calls happen.
+// The only other iptables rules are those that are setup in iptablesInit()
+// This assumes proxier.mu is NOT held
+func (proxier *Proxier) syncProxyRules() {
+ proxier.mu.Lock()
+ defer proxier.mu.Unlock()
+
+ start := time.Now()
+ defer func() {
+ SyncProxyRulesLatency.Observe(sinceInMicroseconds(start))
+ glog.V(4).Infof("syncProxyRules took %v", time.Since(start))
+ }()
+ // don't sync rules till we've received services and endpoints
+ if !proxier.endpointsSynced || !proxier.servicesSynced {
+ glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
+ return
+ }
+
+ // We assume that if this was called, we really want to sync them,
+ // even if nothing changed in the meantime. In other words, callers are
+ // responsible for detecting no-op changes and not calling this function.
+ serviceUpdateResult := updateServiceMap(
+ proxier.serviceMap, &proxier.serviceChanges)
+ endpointUpdateResult := updateEndpointsMap(
+ proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname)
+
+ staleServices := serviceUpdateResult.staleServices
+ // merge stale services gathered from updateEndpointsMap
+ for svcPortName := range endpointUpdateResult.staleServiceNames {
+ if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.protocol == api.ProtocolUDP {
+ glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.clusterIP.String())
+ staleServices.Insert(svcInfo.clusterIP.String())
+ }
+ }
+
+ glog.V(3).Infof("Syncing iptables rules")
+
+ // Create and link the kube services chain.
+ {
+ tablesNeedServicesChain := []utiliptables.Table{utiliptables.TableFilter, utiliptables.TableNAT}
+ for _, table := range tablesNeedServicesChain {
+ if _, err := proxier.iptables.EnsureChain(table, kubeServicesChain); err != nil {
+ glog.Errorf("Failed to ensure that %s chain %s exists: %v", table, kubeServicesChain, err)
+ return
+ }
+ }
+
+ tableChainsNeedJumpServices := []struct {
+ table utiliptables.Table
+ chain utiliptables.Chain
+ }{
+ {utiliptables.TableFilter, utiliptables.ChainInput},
+ {utiliptables.TableFilter, utiliptables.ChainOutput},
+ {utiliptables.TableNAT, utiliptables.ChainOutput},
+ {utiliptables.TableNAT, utiliptables.ChainPrerouting},
+ }
+ comment := "kubernetes service portals"
+ args := []string{"-m", "comment", "--comment", comment, "-j", string(kubeServicesChain)}
+ for _, tc := range tableChainsNeedJumpServices {
+ if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil {
+ glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeServicesChain, err)
+ return
+ }
+ }
+ }
+
+ // Create and link the kube postrouting chain.
+ {
+ if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, kubePostroutingChain); err != nil {
+ glog.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubePostroutingChain, err)
+ return
+ }
+
+ comment := "kubernetes postrouting rules"
+ args := []string{"-m", "comment", "--comment", comment, "-j", string(kubePostroutingChain)}
+ if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
+ glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, kubePostroutingChain, err)
+ return
+ }
+ }
+
+ //
+ // Below this point we will not return until we try to write the iptables rules.
+ //
+
+ // Get iptables-save output so we can check for existing chains and rules.
+ // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
+ existingFilterChains := make(map[utiliptables.Chain]string)
+ proxier.iptablesData.Reset()
+ err := proxier.iptables.SaveInto(utiliptables.TableFilter, proxier.iptablesData)
+ if err != nil { // if we failed to get any rules
+ glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
+ } else { // otherwise parse the output
+ existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, proxier.iptablesData.Bytes())
+ }
+
+ existingNATChains := make(map[utiliptables.Chain]string)
+ proxier.iptablesData.Reset()
+ err = proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData)
+ if err != nil { // if we failed to get any rules
+ glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
+ } else { // otherwise parse the output
+ existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, proxier.iptablesData.Bytes())
+ }
+
+ // Reset all buffers used later.
+ // This is to avoid memory reallocations and thus improve performance.
+ proxier.filterChains.Reset()
+ proxier.filterRules.Reset()
+ proxier.natChains.Reset()
+ proxier.natRules.Reset()
+
+ // Write table headers.
+ writeLine(proxier.filterChains, "*filter")
+ writeLine(proxier.natChains, "*nat")
+
+ // Make sure we keep stats for the top-level chains, if they existed
+ // (which most should have because we created them above).
+ if chain, ok := existingFilterChains[kubeServicesChain]; ok {
+ writeLine(proxier.filterChains, chain)
+ } else {
+ writeLine(proxier.filterChains, utiliptables.MakeChainLine(kubeServicesChain))
+ }
+ if chain, ok := existingNATChains[kubeServicesChain]; ok {
+ writeLine(proxier.natChains, chain)
+ } else {
+ writeLine(proxier.natChains, utiliptables.MakeChainLine(kubeServicesChain))
+ }
+ if chain, ok := existingNATChains[kubeNodePortsChain]; ok {
+ writeLine(proxier.natChains, chain)
+ } else {
+ writeLine(proxier.natChains, utiliptables.MakeChainLine(kubeNodePortsChain))
+ }
+ if chain, ok := existingNATChains[kubePostroutingChain]; ok {
+ writeLine(proxier.natChains, chain)
+ } else {
+ writeLine(proxier.natChains, utiliptables.MakeChainLine(kubePostroutingChain))
+ }
+ if chain, ok := existingNATChains[KubeMarkMasqChain]; ok {
+ writeLine(proxier.natChains, chain)
+ } else {
+ writeLine(proxier.natChains, utiliptables.MakeChainLine(KubeMarkMasqChain))
+ }
+
+ // Install the kubernetes-specific postrouting rules. We use a whole chain for
+ // this so that it is easier to flush and change, for example if the mark
+ // value should ever change.
+ writeLine(proxier.natRules, []string{
+ "-A", string(kubePostroutingChain),
+ "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
+ "-m", "mark", "--mark", proxier.masqueradeMark,
+ "-j", "MASQUERADE",
+ }...)
+
+ // Install the kubernetes-specific masquerade mark rule. We use a whole chain for
+ // this so that it is easier to flush and change, for example if the mark
+ // value should ever change.
+ writeLine(proxier.natRules, []string{
+ "-A", string(KubeMarkMasqChain),
+ "-j", "MARK", "--set-xmark", proxier.masqueradeMark,
+ }...)
+
+ // Accumulate NAT chains to keep.
+ activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set
+
+ // Accumulate the set of local ports that we will be holding open once this update is complete
+ replacementPortsMap := map[localPort]closeable{}
+
+ // We are creating those slices ones here to avoid memory reallocations
+ // in every loop. Note that reuse the memory, instead of doing:
+ // slice = <some new slice>
+ // you should always do one of the below:
+ // slice = slice[:0] // and then append to it
+ // slice = append(slice[:0], ...)
+ endpoints := make([]*endpointsInfo, 0)
+ endpointChains := make([]utiliptables.Chain, 0)
+ // To avoid growing this slice, we arbitrarily set its size to 64,
+ // there is never more than that many arguments for a single line.
+ // Note that even if we go over 64, it will still be correct - it
+ // is just for efficiency, not correctness.
+ args := make([]string, 64)
+
+ // Build rules for each service.
+ var svcNameString string
+ for svcName, svcInfo := range proxier.serviceMap {
+ protocol := strings.ToLower(string(svcInfo.protocol))
+ svcNameString = svcInfo.serviceNameString
+
+ // Create the per-service chain, retaining counters if possible.
+ svcChain := svcInfo.servicePortChainName
+ if chain, ok := existingNATChains[svcChain]; ok {
+ writeLine(proxier.natChains, chain)
+ } else {
+ writeLine(proxier.natChains, utiliptables.MakeChainLine(svcChain))
+ }
+ activeNATChains[svcChain] = true
+
+ svcXlbChain := svcInfo.serviceLBChainName
+ if svcInfo.onlyNodeLocalEndpoints {
+ // Only for services request OnlyLocal traffic
+ // create the per-service LB chain, retaining counters if possible.
+ if lbChain, ok := existingNATChains[svcXlbChain]; ok {
+ writeLine(proxier.natChains, lbChain)
+ } else {
+ writeLine(proxier.natChains, utiliptables.MakeChainLine(svcXlbChain))
+ }
+ activeNATChains[svcXlbChain] = true
+ } else if activeNATChains[svcXlbChain] {
+ // Cleanup the previously created XLB chain for this service
+ delete(activeNATChains, svcXlbChain)
+ }
+
+ // Capture the clusterIP.
+ args = append(args[:0],
+ "-A", string(kubeServicesChain),
+ "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
+ "-m", protocol, "-p", protocol,
+ "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()),
+ "--dport", strconv.Itoa(svcInfo.port),
+ )
+ if proxier.masqueradeAll {
+ writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
+ } else if len(proxier.clusterCIDR) > 0 {
+ // This masquerades off-cluster traffic to a service VIP. The idea
+ // is that you can establish a static route for your Service range,
+ // routing to any node, and that node will bridge into the Service
+ // for you. Since that might bounce off-node, we masquerade here.
+ // If/when we support "Local" policy for VIPs, we should update this.
+ writeLine(proxier.natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...)
+ }
+ writeLine(proxier.natRules, append(args, "-j", string(svcChain))...)
+
+ // Capture externalIPs.
+ for _, externalIP := range svcInfo.externalIPs {
+ // If the "external" IP happens to be an IP that is local to this
+ // machine, hold the local port open so no other process can open it
+ // (because the socket might open but it would never work).
+ if local, err := isLocalIP(externalIP); err != nil {
+ glog.Errorf("can't determine if IP is local, assuming not: %v", err)
+ } else if local {
+ lp := localPort{
+ desc: "externalIP for " + svcNameString,
+ ip: externalIP,
+ port: svcInfo.port,
+ protocol: protocol,
+ }
+ if proxier.portsMap[lp] != nil {
+ glog.V(4).Infof("Port %s was open before and is still needed", lp.String())
+ replacementPortsMap[lp] = proxier.portsMap[lp]
+ } else {
+ socket, err := proxier.portMapper.OpenLocalPort(&lp)
+ if err != nil {
+ msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err)
+
+ proxier.recorder.Eventf(
+ &clientv1.ObjectReference{
+ Kind: "Node",
+ Name: proxier.hostname,
+ UID: types.UID(proxier.hostname),
+ Namespace: "",
+ }, api.EventTypeWarning, err.Error(), msg)
+ glog.Error(msg)
+ continue
+ }
+ replacementPortsMap[lp] = socket
+ }
+ } // We're holding the port, so it's OK to install iptables rules.
+ args = append(args[:0],
+ "-A", string(kubeServicesChain),
+ "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString),
+ "-m", protocol, "-p", protocol,
+ "-d", fmt.Sprintf("%s/32", externalIP),
+ "--dport", strconv.Itoa(svcInfo.port),
+ )
+ // We have to SNAT packets to external IPs.
+ writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
+
+ // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container)
+ // nor from a local process to be forwarded to the service.
+ // This rule roughly translates to "all traffic from off-machine".
+ // This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later.
+ externalTrafficOnlyArgs := append(args,
+ "-m", "physdev", "!", "--physdev-is-in",
+ "-m", "addrtype", "!", "--src-type", "LOCAL")
+ writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", string(svcChain))...)
+ dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
+ // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local.
+ // This covers cases like GCE load-balancers which get added to the local routing table.
+ writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...)
+
+ // If the service has no endpoints then reject packets coming via externalIP
+ // Install ICMP Reject rule in filter table for destination=externalIP and dport=svcport
+ if len(proxier.endpointsMap[svcName]) == 0 {
+ writeLine(proxier.filterRules,
+ "-A", string(kubeServicesChain),
+ "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
+ "-m", protocol, "-p", protocol,
+ "-d", fmt.Sprintf("%s/32", externalIP),
+ "--dport", strconv.Itoa(svcInfo.port),
+ "-j", "REJECT",
+ )
+ }
+ }
+
+ // Capture load-balancer ingress.
+ fwChain := svcInfo.serviceFirewallChainName
+ for _, ingress := range svcInfo.loadBalancerStatus.Ingress {
+ if ingress.IP != "" {
+ // create service firewall chain
+ if chain, ok := existingNATChains[fwChain]; ok {
+ writeLine(proxier.natChains, chain)
+ } else {
+ writeLine(proxier.natChains, utiliptables.MakeChainLine(fwChain))
+ }
+ activeNATChains[fwChain] = true
+ // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
+ // This currently works for loadbalancers that preserves source ips.
+ // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
+
+ args = append(args[:0],
+ "-A", string(kubeServicesChain),
+ "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
+ "-m", protocol, "-p", protocol,
+ "-d", fmt.Sprintf("%s/32", ingress.IP),
+ "--dport", strconv.Itoa(svcInfo.port),
+ )
+ // jump to service firewall chain
+ writeLine(proxier.natRules, append(args, "-j", string(fwChain))...)
+
+ args = append(args[:0],
+ "-A", string(fwChain),
+ "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
+ )
+
+ // Each source match rule in the FW chain may jump to either the SVC or the XLB chain
+ chosenChain := svcXlbChain
+ // If we are proxying globally, we need to masquerade in case we cross nodes.
+ // If we are proxying only locally, we can retain the source IP.
+ if !svcInfo.onlyNodeLocalEndpoints {
+ writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
+ chosenChain = svcChain
+ }
+
+ if len(svcInfo.loadBalancerSourceRanges) == 0 {
+ // allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain
+ writeLine(proxier.natRules, append(args, "-j", string(chosenChain))...)
+ } else {
+ // firewall filter based on each source range
+ allowFromNode := false
+ for _, src := range svcInfo.loadBalancerSourceRanges {
+ writeLine(proxier.natRules, append(args, "-s", src, "-j", string(chosenChain))...)
+ // ignore error because it has been validated
+ _, cidr, _ := net.ParseCIDR(src)
+ if cidr.Contains(proxier.nodeIP) {
+ allowFromNode = true
+ }
+ }
+ // generally, ip route rule was added to intercept request to loadbalancer vip from the
+ // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
+ // Need to add the following rule to allow request on host.
+ if allowFromNode {
+ writeLine(proxier.natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", string(chosenChain))...)
+ }
+ }
+
+ // If the packet was able to reach the end of firewall chain, then it did not get DNATed.
+ // It means the packet cannot go thru the firewall, then mark it for DROP
+ writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...)
+ }
+ }
+
+ // Capture nodeports. If we had more than 2 rules it might be
+ // worthwhile to make a new per-service chain for nodeport rules, but
+ // with just 2 rules it ends up being a waste and a cognitive burden.
+ if svcInfo.nodePort != 0 {
+ // Hold the local port open so no other process can open it
+ // (because the socket might open but it would never work).
+ lp := localPort{
+ desc: "nodePort for " + svcNameString,
+ ip: "",
+ port: svcInfo.nodePort,
+ protocol: protocol,
+ }
+ if proxier.portsMap[lp] != nil {
+ glog.V(4).Infof("Port %s was open before and is still needed", lp.String())
+ replacementPortsMap[lp] = proxier.portsMap[lp]
+ } else {
+ socket, err := proxier.portMapper.OpenLocalPort(&lp)
+ if err != nil {
+ glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
+ continue
+ }
+ if lp.protocol == "udp" {
+ proxier.clearUDPConntrackForPort(lp.port)
+ }
+ replacementPortsMap[lp] = socket
+ } // We're holding the port, so it's OK to install iptables rules.
+
+ args = append(args[:0],
+ "-A", string(kubeNodePortsChain),
+ "-m", "comment", "--comment", svcNameString,
+ "-m", protocol, "-p", protocol,
+ "--dport", strconv.Itoa(svcInfo.nodePort),
+ )
+ if !svcInfo.onlyNodeLocalEndpoints {
+ // Nodeports need SNAT, unless they're local.
+ writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
+ // Jump to the service chain.
+ writeLine(proxier.natRules, append(args, "-j", string(svcChain))...)
+ } else {
+ // TODO: Make all nodePorts jump to the firewall chain.
+ // Currently we only create it for loadbalancers (#33586).
+ writeLine(proxier.natRules, append(args, "-j", string(svcXlbChain))...)
+ }
+
+ // If the service has no endpoints then reject packets. The filter
+ // table doesn't currently have the same per-service structure that
+ // the nat table does, so we just stick this into the kube-services
+ // chain.
+ if len(proxier.endpointsMap[svcName]) == 0 {
+ writeLine(proxier.filterRules,
+ "-A", string(kubeServicesChain),
+ "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
+ "-m", "addrtype", "--dst-type", "LOCAL",
+ "-m", protocol, "-p", protocol,
+ "--dport", strconv.Itoa(svcInfo.nodePort),
+ "-j", "REJECT",
+ )
+ }
+ }
+
+ // If the service has no endpoints then reject packets.
+ if len(proxier.endpointsMap[svcName]) == 0 {
+ writeLine(proxier.filterRules,
+ "-A", string(kubeServicesChain),
+ "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
+ "-m", protocol, "-p", protocol,
+ "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()),
+ "--dport", strconv.Itoa(svcInfo.port),
+ "-j", "REJECT",
+ )
+ continue
+ }
+
+ // From here on, we assume there are active endpoints.
+
+ // Generate the per-endpoint chains. We do this in multiple passes so we
+ // can group rules together.
+ // These two slices parallel each other - keep in sync
+ endpoints = endpoints[:0]
+ endpointChains = endpointChains[:0]
+ var endpointChain utiliptables.Chain
+ for _, ep := range proxier.endpointsMap[svcName] {
+ endpoints = append(endpoints, ep)
+ endpointChain = ep.endpointChain(svcNameString, protocol)
+ endpointChains = append(endpointChains, endpointChain)
+
+ // Create the endpoint chain, retaining counters if possible.
+ if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok {
+ writeLine(proxier.natChains, chain)
+ } else {
+ writeLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain))
+ }
+ activeNATChains[endpointChain] = true
+ }
+
+ // First write session affinity rules, if applicable.
+ if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
+ for _, endpointChain := range endpointChains {
+ writeLine(proxier.natRules,
+ "-A", string(svcChain),
+ "-m", "comment", "--comment", svcNameString,
+ "-m", "recent", "--name", string(endpointChain),
+ "--rcheck", "--seconds", strconv.Itoa(svcInfo.stickyMaxAgeMinutes*60), "--reap",
+ "-j", string(endpointChain))
+ }
+ }
+
+ // Now write loadbalancing & DNAT rules.
+ n := len(endpointChains)
+ for i, endpointChain := range endpointChains {
+ // Balancing rules in the per-service chain.
+ args = append(args[:0], []string{
+ "-A", string(svcChain),
+ "-m", "comment", "--comment", svcNameString,
+ }...)
+ if i < (n - 1) {
+ // Each rule is a probabilistic match.
+ args = append(args,
+ "-m", "statistic",
+ "--mode", "random",
+ "--probability", proxier.probability(n-i))
+ }
+ // The final (or only if n == 1) rule is a guaranteed match.
+ args = append(args, "-j", string(endpointChain))
+ writeLine(proxier.natRules, args...)
+
+ // Rules in the per-endpoint chain.
+ args = append(args[:0],
+ "-A", string(endpointChain),
+ "-m", "comment", "--comment", svcNameString,
+ )
+ // Handle traffic that loops back to the originator with SNAT.
+ writeLine(proxier.natRules, append(args,
+ "-s", fmt.Sprintf("%s/32", endpoints[i].IPPart()),
+ "-j", string(KubeMarkMasqChain))...)
+ // Update client-affinity lists.
+ if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
+ args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
+ }
+ // DNAT to final destination.
+ args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].endpoint)
+ writeLine(proxier.natRules, args...)
+ }
+
+ // The logic below this applies only if this service is marked as OnlyLocal
+ if !svcInfo.onlyNodeLocalEndpoints {
+ continue
+ }
+
+ // Now write ingress loadbalancing & DNAT rules only for services that request OnlyLocal traffic.
+ // TODO - This logic may be combinable with the block above that creates the svc balancer chain
+ localEndpoints := make([]*endpointsInfo, 0)
+ localEndpointChains := make([]utiliptables.Chain, 0)
+ for i := range endpointChains {
+ if endpoints[i].isLocal {
+ // These slices parallel each other; must be kept in sync
+ localEndpoints = append(localEndpoints, endpoints[i])
+ localEndpointChains = append(localEndpointChains, endpointChains[i])
+ }
+ }
+ // First rule in the chain redirects all pod -> external VIP traffic to the
+ // Service's ClusterIP instead. This happens whether or not we have local
+ // endpoints; only if clusterCIDR is specified
+ if len(proxier.clusterCIDR) > 0 {
+ args = append(args[:0],
+ "-A", string(svcXlbChain),
+ "-m", "comment", "--comment",
+ `"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`,
+ "-s", proxier.clusterCIDR,
+ "-j", string(svcChain),
+ )
+ writeLine(proxier.natRules, args...)
+ }
+
+ numLocalEndpoints := len(localEndpointChains)
+ if numLocalEndpoints == 0 {
+ // Blackhole all traffic since there are no local endpoints
+ args = append(args[:0],
+ "-A", string(svcXlbChain),
+ "-m", "comment", "--comment",
+ fmt.Sprintf(`"%s has no local endpoints"`, svcNameString),
+ "-j",
+ string(KubeMarkDropChain),
+ )
+ writeLine(proxier.natRules, args...)
+ } else {
+ // Setup probability filter rules only over local endpoints
+ for i, endpointChain := range localEndpointChains {
+ // Balancing rules in the per-service chain.
+ args = append(args[:0],
+ "-A", string(svcXlbChain),
+ "-m", "comment", "--comment",
+ fmt.Sprintf(`"Balancing rule %d for %s"`, i, svcNameString),
+ )
+ if i < (numLocalEndpoints - 1) {
+ // Each rule is a probabilistic match.
+ args = append(args,
+ "-m", "statistic",
+ "--mode", "random",
+ "--probability", proxier.probability(numLocalEndpoints-i))
+ }
+ // The final (or only if n == 1) rule is a guaranteed match.
+ args = append(args, "-j", string(endpointChain))
+ writeLine(proxier.natRules, args...)
+ }
+ }
+ }
+
+ // Delete chains no longer in use.
+ for chain := range existingNATChains {
+ if !activeNATChains[chain] {
+ chainString := string(chain)
+ if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") && !strings.HasPrefix(chainString, "KUBE-XLB-") {
+ // Ignore chains that aren't ours.
+ continue
+ }
+ // We must (as per iptables) write a chain-line for it, which has
+ // the nice effect of flushing the chain. Then we can remove the
+ // chain.
+ writeLine(proxier.natChains, existingNATChains[chain])
+ writeLine(proxier.natRules, "-X", chainString)
+ }
+ }
+
+ // Finally, tail-call to the nodeports chain. This needs to be after all
+ // other service portal rules.
+ writeLine(proxier.natRules,
+ "-A", string(kubeServicesChain),
+ "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
+ "-m", "addrtype", "--dst-type", "LOCAL",
+ "-j", string(kubeNodePortsChain))
+
+ // Write the end-of-table markers.
+ writeLine(proxier.filterRules, "COMMIT")
+ writeLine(proxier.natRules, "COMMIT")
+
+ // Sync rules.
+ // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table
+ proxier.iptablesData.Reset()
+ proxier.iptablesData.Write(proxier.filterChains.Bytes())
+ proxier.iptablesData.Write(proxier.filterRules.Bytes())
+ proxier.iptablesData.Write(proxier.natChains.Bytes())
+ proxier.iptablesData.Write(proxier.natRules.Bytes())
+
+ glog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
+ err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
+ if err != nil {
+ glog.Errorf("Failed to execute iptables-restore: %v", err)
+ glog.V(2).Infof("Rules:\n%s", proxier.iptablesData.Bytes())
+ // Revert new local ports.
+ revertPorts(replacementPortsMap, proxier.portsMap)
+ return
+ }
+
+ // Close old local ports and save new ones.
+ for k, v := range proxier.portsMap {
+ if replacementPortsMap[k] == nil {
+ v.Close()
+ }
+ }
+ proxier.portsMap = replacementPortsMap
+
+ // Update healthz timestamp.
+ if proxier.healthzServer != nil {
+ proxier.healthzServer.UpdateTimestamp()
+ }
+
+ // Update healthchecks. The endpoints list might include services that are
+ // not "OnlyLocal", but the services list will not, and the healthChecker
+ // will just drop those endpoints.
+ if err := proxier.healthChecker.SyncServices(serviceUpdateResult.hcServices); err != nil {
+ glog.Errorf("Error syncing healtcheck services: %v", err)
+ }
+ if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil {
+ glog.Errorf("Error syncing healthcheck endoints: %v", err)
+ }
+
+ // Finish housekeeping.
+ // TODO: these and clearUDPConntrackForPort() could be made more consistent.
+ utilproxy.DeleteServiceConnections(proxier.exec, staleServices.List())
+ proxier.deleteEndpointConnections(endpointUpdateResult.staleEndpoints)
+}
+
+// Clear UDP conntrack for port or all conntrack entries when port equal zero.
+// When a packet arrives, it will not go through NAT table again, because it is not "the first" packet.
+// The solution is clearing the conntrack. Known issus:
+// https://github.com/docker/docker/issues/8795
+// https://github.com/kubernetes/kubernetes/issues/31983
+func (proxier *Proxier) clearUDPConntrackForPort(port int) {
+ glog.V(2).Infof("Deleting conntrack entries for udp connections")
+ if port > 0 {
+ err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "-p", "udp", "--dport", strconv.Itoa(port))
+ if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
+ glog.Errorf("conntrack return with error: %v", err)
+ }
+ } else {
+ glog.Errorf("Wrong port number. The port number must be greater than zero")
+ }
+}
+
+// Join all words with spaces, terminate with newline and write to buf.
+func writeLine(buf *bytes.Buffer, words ...string) {
+ // We avoid strings.Join for performance reasons.
+ for i := range words {
+ buf.WriteString(words[i])
+ if i < len(words)-1 {
+ buf.WriteByte(' ')
+ } else {
+ buf.WriteByte('\n')
+ }
+ }
+}
+
+func isLocalIP(ip string) (bool, error) {
+ addrs, err := net.InterfaceAddrs()
+ if err != nil {
+ return false, err
+ }
+ for i := range addrs {
+ intf, _, err := net.ParseCIDR(addrs[i].String())
+ if err != nil {
+ return false, err
+ }
+ if net.ParseIP(ip).Equal(intf) {
+ return true, nil
+ }
+ }
+ return false, nil
+}
+
+func openLocalPort(lp *localPort) (closeable, error) {
+ // For ports on node IPs, open the actual port and hold it, even though we
+ // use iptables to redirect traffic.
+ // This ensures a) that it's safe to use that port and b) that (a) stays
+ // true. The risk is that some process on the node (e.g. sshd or kubelet)
+ // is using a port and we give that same port out to a Service. That would
+ // be bad because iptables would silently claim the traffic but the process
+ // would never know.
+ // NOTE: We should not need to have a real listen()ing socket - bind()
+ // should be enough, but I can't figure out a way to e2e test without
+ // it. Tools like 'ss' and 'netstat' do not show sockets that are
+ // bind()ed but not listen()ed, and at least the default debian netcat
+ // has no way to avoid about 10 seconds of retries.
+ var socket closeable
+ switch lp.protocol {
+ case "tcp":
+ listener, err := net.Listen("tcp", net.JoinHostPort(lp.ip, strconv.Itoa(lp.port)))
+ if err != nil {
+ return nil, err
+ }
+ socket = listener
+ case "udp":
+ addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(lp.ip, strconv.Itoa(lp.port)))
+ if err != nil {
+ return nil, err
+ }
+ conn, err := net.ListenUDP("udp", addr)
+ if err != nil {
+ return nil, err
+ }
+ socket = conn
+ default:
+ return nil, fmt.Errorf("unknown protocol %q", lp.protocol)
+ }
+ glog.V(2).Infof("Opened local port %s", lp.String())
+ return socket, nil
+}
+
+// revertPorts is closing ports in replacementPortsMap but not in originalPortsMap. In other words, it only
+// closes the ports opened in this sync.
+func revertPorts(replacementPortsMap, originalPortsMap map[localPort]closeable) {
+ for k, v := range replacementPortsMap {
+ // Only close newly opened local ports - leave ones that were open before this update
+ if originalPortsMap[k] == nil {
+ glog.V(2).Infof("Closing local port %s after iptables-restore failure", k.String())
+ v.Close()
+ }
+ }
+}
diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/types.go b/vendor/k8s.io/kubernetes/pkg/proxy/types.go
new file mode 100644
index 000000000..578baff69
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/proxy/types.go
@@ -0,0 +1,44 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package proxy
+
+import (
+ "fmt"
+
+ "k8s.io/apimachinery/pkg/types"
+)
+
+// ProxyProvider is the interface provided by proxier implementations.
+type ProxyProvider interface {
+ // Sync immediately synchronizes the ProxyProvider's current state to iptables.
+ Sync()
+ // SyncLoop runs periodic work.
+ // This is expected to run as a goroutine or as the main loop of the app.
+ // It does not return.
+ SyncLoop()
+}
+
+// ServicePortName carries a namespace + name + portname. This is the unique
+// identfier for a load-balanced service.
+type ServicePortName struct {
+ types.NamespacedName
+ Port string
+}
+
+func (spn ServicePortName) String() string {
+ return fmt.Sprintf("%s:%s", spn.NamespacedName.String(), spn.Port)
+}
diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/util/conntrack.go b/vendor/k8s.io/kubernetes/pkg/proxy/util/conntrack.go
new file mode 100644
index 000000000..436045ecb
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/proxy/util/conntrack.go
@@ -0,0 +1,58 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package util
+
+import (
+ "fmt"
+ "strings"
+
+ "k8s.io/kubernetes/pkg/util/exec"
+
+ "github.com/golang/glog"
+)
+
+// Utilities for dealing with conntrack
+
+const noConnectionToDelete = "0 flow entries have been deleted"
+
+// DeleteServiceConnection uses the conntrack tool to delete the conntrack entries
+// for the UDP connections specified by the given service IPs
+func DeleteServiceConnections(execer exec.Interface, svcIPs []string) {
+ for _, ip := range svcIPs {
+ glog.V(2).Infof("Deleting connection tracking state for service IP %s", ip)
+ err := ExecConntrackTool(execer, "-D", "--orig-dst", ip, "-p", "udp")
+ if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
+ // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
+ // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
+ // is expensive to baby-sit all udp connections to kubernetes services.
+ glog.Errorf("conntrack returned error: %v", err)
+ }
+ }
+}
+
+// ExecConntrackTool executes the conntrack tool using the given parameters
+func ExecConntrackTool(execer exec.Interface, parameters ...string) error {
+ conntrackPath, err := execer.LookPath("conntrack")
+ if err != nil {
+ return fmt.Errorf("error looking for path of conntrack: %v", err)
+ }
+ output, err := execer.Command(conntrackPath, parameters...).CombinedOutput()
+ if err != nil {
+ return fmt.Errorf("conntrack command returned: %q, error message: %s", string(output), err)
+ }
+ return nil
+}