diff options
Diffstat (limited to 'vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier.go')
-rw-r--r-- | vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier.go | 1732 |
1 files changed, 0 insertions, 1732 deletions
diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier.go b/vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier.go deleted file mode 100644 index 9d29d7b42..000000000 --- a/vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier.go +++ /dev/null @@ -1,1732 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package iptables - -// -// NOTE: this needs to be tested in e2e since it uses iptables for everything. -// - -import ( - "bytes" - "crypto/sha256" - "encoding/base32" - "fmt" - "net" - "reflect" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/golang/glog" - - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" - utilfeature "k8s.io/apiserver/pkg/util/feature" - clientv1 "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/tools/record" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/helper" - apiservice "k8s.io/kubernetes/pkg/api/service" - "k8s.io/kubernetes/pkg/features" - "k8s.io/kubernetes/pkg/proxy" - "k8s.io/kubernetes/pkg/proxy/healthcheck" - utilproxy "k8s.io/kubernetes/pkg/proxy/util" - "k8s.io/kubernetes/pkg/util/async" - utilexec "k8s.io/kubernetes/pkg/util/exec" - utiliptables "k8s.io/kubernetes/pkg/util/iptables" - utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" - utilversion "k8s.io/kubernetes/pkg/util/version" -) - -const ( - // iptablesMinVersion is the minimum version of iptables for which we will use the Proxier - // from this package instead of the userspace Proxier. While most of the - // features we need were available earlier, the '-C' flag was added more - // recently. We use that indirectly in Ensure* functions, and if we don't - // have it, we have to be extra careful about the exact args we feed in being - // the same as the args we read back (iptables itself normalizes some args). - // This is the "new" Proxier, so we require "new" versions of tools. - iptablesMinVersion = utiliptables.MinCheckVersion - - // the services chain - kubeServicesChain utiliptables.Chain = "KUBE-SERVICES" - - // the nodeports chain - kubeNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS" - - // the kubernetes postrouting chain - kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING" - - // the mark-for-masquerade chain - KubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ" - - // the mark-for-drop chain - KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP" -) - -// IPTablesVersioner can query the current iptables version. -type IPTablesVersioner interface { - // returns "X.Y.Z" - GetVersion() (string, error) -} - -// KernelCompatTester tests whether the required kernel capabilities are -// present to run the iptables proxier. -type KernelCompatTester interface { - IsCompatible() error -} - -// CanUseIPTablesProxier returns true if we should use the iptables Proxier -// instead of the "classic" userspace Proxier. This is determined by checking -// the iptables version and for the existence of kernel features. It may return -// an error if it fails to get the iptables version without error, in which -// case it will also return false. -func CanUseIPTablesProxier(iptver IPTablesVersioner, kcompat KernelCompatTester) (bool, error) { - minVersion, err := utilversion.ParseGeneric(iptablesMinVersion) - if err != nil { - return false, err - } - versionString, err := iptver.GetVersion() - if err != nil { - return false, err - } - version, err := utilversion.ParseGeneric(versionString) - if err != nil { - return false, err - } - if version.LessThan(minVersion) { - return false, nil - } - - // Check that the kernel supports what we need. - if err := kcompat.IsCompatible(); err != nil { - return false, err - } - return true, nil -} - -type LinuxKernelCompatTester struct{} - -func (lkct LinuxKernelCompatTester) IsCompatible() error { - // Check for the required sysctls. We don't care about the value, just - // that it exists. If this Proxier is chosen, we'll initialize it as we - // need. - _, err := utilsysctl.New().GetSysctl(sysctlRouteLocalnet) - return err -} - -const sysctlRouteLocalnet = "net/ipv4/conf/all/route_localnet" -const sysctlBridgeCallIPTables = "net/bridge/bridge-nf-call-iptables" - -// internal struct for string service information -type serviceInfo struct { - clusterIP net.IP - port int - protocol api.Protocol - nodePort int - loadBalancerStatus api.LoadBalancerStatus - sessionAffinityType api.ServiceAffinity - stickyMaxAgeMinutes int - externalIPs []string - loadBalancerSourceRanges []string - onlyNodeLocalEndpoints bool - healthCheckNodePort int - // The following fields are computed and stored for performance reasons. - serviceNameString string - servicePortChainName utiliptables.Chain - serviceFirewallChainName utiliptables.Chain - serviceLBChainName utiliptables.Chain -} - -// internal struct for endpoints information -type endpointsInfo struct { - endpoint string // TODO: should be an endpointString type - isLocal bool - // The following fields we lazily compute and store here for performance - // reasons. If the protocol is the same as you expect it to be, then the - // chainName can be reused, otherwise it should be recomputed. - protocol string - chainName utiliptables.Chain -} - -// Returns just the IP part of the endpoint. -func (e *endpointsInfo) IPPart() string { - if index := strings.Index(e.endpoint, ":"); index != -1 { - return e.endpoint[0:index] - } - return e.endpoint -} - -// Returns the endpoint chain name for a given endpointsInfo. -func (e *endpointsInfo) endpointChain(svcNameString, protocol string) utiliptables.Chain { - if e.protocol != protocol { - e.protocol = protocol - e.chainName = servicePortEndpointChainName(svcNameString, protocol, e.endpoint) - } - return e.chainName -} - -func (e *endpointsInfo) String() string { - return fmt.Sprintf("%v", *e) -} - -// returns a new serviceInfo struct -func newServiceInfo(svcPortName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo { - onlyNodeLocalEndpoints := false - if utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) && - apiservice.RequestsOnlyLocalTraffic(service) { - onlyNodeLocalEndpoints = true - } - info := &serviceInfo{ - clusterIP: net.ParseIP(service.Spec.ClusterIP), - port: int(port.Port), - protocol: port.Protocol, - nodePort: int(port.NodePort), - // Deep-copy in case the service instance changes - loadBalancerStatus: *helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer), - sessionAffinityType: service.Spec.SessionAffinity, - stickyMaxAgeMinutes: 180, // TODO: paramaterize this in the API. - externalIPs: make([]string, len(service.Spec.ExternalIPs)), - loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)), - onlyNodeLocalEndpoints: onlyNodeLocalEndpoints, - } - copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) - copy(info.externalIPs, service.Spec.ExternalIPs) - - if apiservice.NeedsHealthCheck(service) { - p := apiservice.GetServiceHealthCheckNodePort(service) - if p == 0 { - glog.Errorf("Service %q has no healthcheck nodeport", svcPortName.NamespacedName.String()) - } else { - info.healthCheckNodePort = int(p) - } - } - - // Store the following for performance reasons. - protocol := strings.ToLower(string(info.protocol)) - info.serviceNameString = svcPortName.String() - info.servicePortChainName = servicePortChainName(info.serviceNameString, protocol) - info.serviceFirewallChainName = serviceFirewallChainName(info.serviceNameString, protocol) - info.serviceLBChainName = serviceLBChainName(info.serviceNameString, protocol) - - return info -} - -type endpointsChange struct { - previous proxyEndpointsMap - current proxyEndpointsMap -} - -type endpointsChangeMap struct { - lock sync.Mutex - hostname string - items map[types.NamespacedName]*endpointsChange -} - -type serviceChange struct { - previous proxyServiceMap - current proxyServiceMap -} - -type serviceChangeMap struct { - lock sync.Mutex - items map[types.NamespacedName]*serviceChange -} - -type updateEndpointMapResult struct { - hcEndpoints map[types.NamespacedName]int - staleEndpoints map[endpointServicePair]bool - staleServiceNames map[proxy.ServicePortName]bool -} - -type updateServiceMapResult struct { - hcServices map[types.NamespacedName]uint16 - staleServices sets.String -} - -type proxyServiceMap map[proxy.ServicePortName]*serviceInfo -type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo - -func newEndpointsChangeMap(hostname string) endpointsChangeMap { - return endpointsChangeMap{ - hostname: hostname, - items: make(map[types.NamespacedName]*endpointsChange), - } -} - -func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) bool { - ecm.lock.Lock() - defer ecm.lock.Unlock() - - change, exists := ecm.items[*namespacedName] - if !exists { - change = &endpointsChange{} - change.previous = endpointsToEndpointsMap(previous, ecm.hostname) - ecm.items[*namespacedName] = change - } - change.current = endpointsToEndpointsMap(current, ecm.hostname) - if reflect.DeepEqual(change.previous, change.current) { - delete(ecm.items, *namespacedName) - } - return len(ecm.items) > 0 -} - -func newServiceChangeMap() serviceChangeMap { - return serviceChangeMap{ - items: make(map[types.NamespacedName]*serviceChange), - } -} - -func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) bool { - scm.lock.Lock() - defer scm.lock.Unlock() - - change, exists := scm.items[*namespacedName] - if !exists { - change = &serviceChange{} - change.previous = serviceToServiceMap(previous) - scm.items[*namespacedName] = change - } - change.current = serviceToServiceMap(current) - if reflect.DeepEqual(change.previous, change.current) { - delete(scm.items, *namespacedName) - } - return len(scm.items) > 0 -} - -func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String { - existingPorts := sets.NewString() - for svcPortName, info := range other { - existingPorts.Insert(svcPortName.Port) - _, exists := (*sm)[svcPortName] - if !exists { - glog.V(1).Infof("Adding new service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol) - } else { - glog.V(1).Infof("Updating existing service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol) - } - (*sm)[svcPortName] = info - } - return existingPorts -} - -func (sm *proxyServiceMap) unmerge(other proxyServiceMap, existingPorts, staleServices sets.String) { - for svcPortName := range other { - if existingPorts.Has(svcPortName.Port) { - continue - } - info, exists := (*sm)[svcPortName] - if exists { - glog.V(1).Infof("Removing service port %q", svcPortName) - if info.protocol == api.ProtocolUDP { - staleServices.Insert(info.clusterIP.String()) - } - delete(*sm, svcPortName) - } else { - glog.Errorf("Service port %q removed, but doesn't exists", svcPortName) - } - } -} - -func (em proxyEndpointsMap) merge(other proxyEndpointsMap) { - for svcPortName := range other { - em[svcPortName] = other[svcPortName] - } -} - -func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) { - for svcPortName := range other { - delete(em, svcPortName) - } -} - -// Proxier is an iptables based proxy for connections between a localhost:lport -// and services that provide the actual backends. -type Proxier struct { - // endpointsChanges and serviceChanges contains all changes to endpoints and - // services that happened since iptables was synced. For a single object, - // changes are accumulated, i.e. previous is state from before all of them, - // current is state after applying all of those. - endpointsChanges endpointsChangeMap - serviceChanges serviceChangeMap - - mu sync.Mutex // protects the following fields - serviceMap proxyServiceMap - endpointsMap proxyEndpointsMap - portsMap map[localPort]closeable - // endpointsSynced and servicesSynced are set to true when corresponding - // objects are synced after startup. This is used to avoid updating iptables - // with some partial data after kube-proxy restart. - endpointsSynced bool - servicesSynced bool - initialized int32 - syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules - - // These are effectively const and do not need the mutex to be held. - iptables utiliptables.Interface - masqueradeAll bool - masqueradeMark string - exec utilexec.Interface - clusterCIDR string - hostname string - nodeIP net.IP - portMapper portOpener - recorder record.EventRecorder - healthChecker healthcheck.Server - healthzServer healthcheck.HealthzUpdater - - // Since converting probabilities (floats) to strings is expensive - // and we are using only probabilities in the format of 1/n, we are - // precomputing some number of those and cache for future reuse. - precomputedProbabilities []string - - // The following buffers are used to reuse memory and avoid allocations - // that are significantly impacting performance. - iptablesData *bytes.Buffer - filterChains *bytes.Buffer - filterRules *bytes.Buffer - natChains *bytes.Buffer - natRules *bytes.Buffer -} - -type localPort struct { - desc string - ip string - port int - protocol string -} - -func (lp *localPort) String() string { - return fmt.Sprintf("%q (%s:%d/%s)", lp.desc, lp.ip, lp.port, lp.protocol) -} - -type closeable interface { - Close() error -} - -// portOpener is an interface around port opening/closing. -// Abstracted out for testing. -type portOpener interface { - OpenLocalPort(lp *localPort) (closeable, error) -} - -// listenPortOpener opens ports by calling bind() and listen(). -type listenPortOpener struct{} - -// OpenLocalPort holds the given local port open. -func (l *listenPortOpener) OpenLocalPort(lp *localPort) (closeable, error) { - return openLocalPort(lp) -} - -// Proxier implements ProxyProvider -var _ proxy.ProxyProvider = &Proxier{} - -// NewProxier returns a new Proxier given an iptables Interface instance. -// Because of the iptables logic, it is assumed that there is only a single Proxier active on a machine. -// An error will be returned if iptables fails to update or acquire the initial lock. -// Once a proxier is created, it will keep iptables up to date in the background and -// will not terminate if a particular iptables call fails. -func NewProxier(ipt utiliptables.Interface, - sysctl utilsysctl.Interface, - exec utilexec.Interface, - syncPeriod time.Duration, - minSyncPeriod time.Duration, - masqueradeAll bool, - masqueradeBit int, - clusterCIDR string, - hostname string, - nodeIP net.IP, - recorder record.EventRecorder, - healthzServer healthcheck.HealthzUpdater, -) (*Proxier, error) { - // check valid user input - if minSyncPeriod > syncPeriod { - return nil, fmt.Errorf("minSyncPeriod (%v) must be <= syncPeriod (%v)", minSyncPeriod, syncPeriod) - } - - // Set the route_localnet sysctl we need for - if err := sysctl.SetSysctl(sysctlRouteLocalnet, 1); err != nil { - return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err) - } - - // Proxy needs br_netfilter and bridge-nf-call-iptables=1 when containers - // are connected to a Linux bridge (but not SDN bridges). Until most - // plugins handle this, log when config is missing - if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 { - glog.Warningf("missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended") - } - - // Generate the masquerade mark to use for SNAT rules. - if masqueradeBit < 0 || masqueradeBit > 31 { - return nil, fmt.Errorf("invalid iptables-masquerade-bit %v not in [0, 31]", masqueradeBit) - } - masqueradeValue := 1 << uint(masqueradeBit) - masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue) - - if nodeIP == nil { - glog.Warningf("invalid nodeIP, initializing kube-proxy with 127.0.0.1 as nodeIP") - nodeIP = net.ParseIP("127.0.0.1") - } - - if len(clusterCIDR) == 0 { - glog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic") - } - - healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps - - proxier := &Proxier{ - portsMap: make(map[localPort]closeable), - serviceMap: make(proxyServiceMap), - serviceChanges: newServiceChangeMap(), - endpointsMap: make(proxyEndpointsMap), - endpointsChanges: newEndpointsChangeMap(hostname), - iptables: ipt, - masqueradeAll: masqueradeAll, - masqueradeMark: masqueradeMark, - exec: exec, - clusterCIDR: clusterCIDR, - hostname: hostname, - nodeIP: nodeIP, - portMapper: &listenPortOpener{}, - recorder: recorder, - healthChecker: healthChecker, - healthzServer: healthzServer, - precomputedProbabilities: make([]string, 0, 1001), - iptablesData: bytes.NewBuffer(nil), - filterChains: bytes.NewBuffer(nil), - filterRules: bytes.NewBuffer(nil), - natChains: bytes.NewBuffer(nil), - natRules: bytes.NewBuffer(nil), - } - burstSyncs := 2 - glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs) - proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) - return proxier, nil -} - -// CleanupLeftovers removes all iptables rules and chains created by the Proxier -// It returns true if an error was encountered. Errors are logged. -func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { - // Unlink the services chain. - args := []string{ - "-m", "comment", "--comment", "kubernetes service portals", - "-j", string(kubeServicesChain), - } - tableChainsWithJumpServices := []struct { - table utiliptables.Table - chain utiliptables.Chain - }{ - {utiliptables.TableFilter, utiliptables.ChainInput}, - {utiliptables.TableFilter, utiliptables.ChainOutput}, - {utiliptables.TableNAT, utiliptables.ChainOutput}, - {utiliptables.TableNAT, utiliptables.ChainPrerouting}, - } - for _, tc := range tableChainsWithJumpServices { - if err := ipt.DeleteRule(tc.table, tc.chain, args...); err != nil { - if !utiliptables.IsNotFoundError(err) { - glog.Errorf("Error removing pure-iptables proxy rule: %v", err) - encounteredError = true - } - } - } - - // Unlink the postrouting chain. - args = []string{ - "-m", "comment", "--comment", "kubernetes postrouting rules", - "-j", string(kubePostroutingChain), - } - if err := ipt.DeleteRule(utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { - if !utiliptables.IsNotFoundError(err) { - glog.Errorf("Error removing pure-iptables proxy rule: %v", err) - encounteredError = true - } - } - - // Flush and remove all of our chains. - iptablesData := bytes.NewBuffer(nil) - if err := ipt.SaveInto(utiliptables.TableNAT, iptablesData); err != nil { - glog.Errorf("Failed to execute iptables-save for %s: %v", utiliptables.TableNAT, err) - encounteredError = true - } else { - existingNATChains := utiliptables.GetChainLines(utiliptables.TableNAT, iptablesData.Bytes()) - natChains := bytes.NewBuffer(nil) - natRules := bytes.NewBuffer(nil) - writeLine(natChains, "*nat") - // Start with chains we know we need to remove. - for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} { - if _, found := existingNATChains[chain]; found { - chainString := string(chain) - writeLine(natChains, existingNATChains[chain]) // flush - writeLine(natRules, "-X", chainString) // delete - } - } - // Hunt for service and endpoint chains. - for chain := range existingNATChains { - chainString := string(chain) - if strings.HasPrefix(chainString, "KUBE-SVC-") || strings.HasPrefix(chainString, "KUBE-SEP-") || strings.HasPrefix(chainString, "KUBE-FW-") || strings.HasPrefix(chainString, "KUBE-XLB-") { - writeLine(natChains, existingNATChains[chain]) // flush - writeLine(natRules, "-X", chainString) // delete - } - } - writeLine(natRules, "COMMIT") - natLines := append(natChains.Bytes(), natRules.Bytes()...) - // Write it. - err = ipt.Restore(utiliptables.TableNAT, natLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) - if err != nil { - glog.Errorf("Failed to execute iptables-restore for %s: %v", utiliptables.TableNAT, err) - encounteredError = true - } - } - { - filterBuf := bytes.NewBuffer(nil) - writeLine(filterBuf, "*filter") - writeLine(filterBuf, fmt.Sprintf(":%s - [0:0]", kubeServicesChain)) - writeLine(filterBuf, fmt.Sprintf("-X %s", kubeServicesChain)) - writeLine(filterBuf, "COMMIT") - // Write it. - if err := ipt.Restore(utiliptables.TableFilter, filterBuf.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters); err != nil { - glog.Errorf("Failed to execute iptables-restore for %s: %v", utiliptables.TableFilter, err) - encounteredError = true - } - } - return encounteredError -} - -func computeProbability(n int) string { - return fmt.Sprintf("%0.5f", 1.0/float64(n)) -} - -// This assumes proxier.mu is held -func (proxier *Proxier) precomputeProbabilities(numberOfPrecomputed int) { - if len(proxier.precomputedProbabilities) == 0 { - proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, "<bad value>") - } - for i := len(proxier.precomputedProbabilities); i <= numberOfPrecomputed; i++ { - proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, computeProbability(i)) - } -} - -// This assumes proxier.mu is held -func (proxier *Proxier) probability(n int) string { - if n >= len(proxier.precomputedProbabilities) { - proxier.precomputeProbabilities(n) - } - return proxier.precomputedProbabilities[n] -} - -// Sync is called to synchronize the proxier state to iptables as soon as possible. -func (proxier *Proxier) Sync() { - proxier.syncRunner.Run() -} - -// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. -func (proxier *Proxier) SyncLoop() { - // Update healthz timestamp at beginning in case Sync() never succeeds. - if proxier.healthzServer != nil { - proxier.healthzServer.UpdateTimestamp() - } - proxier.syncRunner.Loop(wait.NeverStop) -} - -func (proxier *Proxier) setInitialized(value bool) { - var initialized int32 - if value { - initialized = 1 - } - atomic.StoreInt32(&proxier.initialized, initialized) -} - -func (proxier *Proxier) isInitialized() bool { - return atomic.LoadInt32(&proxier.initialized) > 0 -} - -func (proxier *Proxier) OnServiceAdd(service *api.Service) { - namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() { - proxier.syncRunner.Run() - } -} - -func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { - namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if proxier.serviceChanges.update(&namespacedName, oldService, service) && proxier.isInitialized() { - proxier.syncRunner.Run() - } -} - -func (proxier *Proxier) OnServiceDelete(service *api.Service) { - namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if proxier.serviceChanges.update(&namespacedName, service, nil) && proxier.isInitialized() { - proxier.syncRunner.Run() - } -} - -func (proxier *Proxier) OnServiceSynced() { - proxier.mu.Lock() - proxier.servicesSynced = true - proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) - proxier.mu.Unlock() - - // Sync unconditionally - this is called once per lifetime. - proxier.syncProxyRules() -} - -func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool { - // if ClusterIP is "None" or empty, skip proxying - if !helper.IsServiceIPSet(service) { - glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) - return true - } - // Even if ClusterIP is set, ServiceTypeExternalName services don't get proxied - if service.Spec.Type == api.ServiceTypeExternalName { - glog.V(3).Infof("Skipping service %s due to Type=ExternalName", svcName) - return true - } - return false -} - -// <serviceMap> is updated by this function (based on the given changes). -// <changes> map is cleared after applying them. -func updateServiceMap( - serviceMap proxyServiceMap, - changes *serviceChangeMap) (result updateServiceMapResult) { - result.staleServices = sets.NewString() - - func() { - changes.lock.Lock() - defer changes.lock.Unlock() - for _, change := range changes.items { - existingPorts := serviceMap.merge(change.current) - serviceMap.unmerge(change.previous, existingPorts, result.staleServices) - } - changes.items = make(map[types.NamespacedName]*serviceChange) - }() - - // TODO: If this will appear to be computationally expensive, consider - // computing this incrementally similarly to serviceMap. - result.hcServices = make(map[types.NamespacedName]uint16) - for svcPortName, info := range serviceMap { - if info.healthCheckNodePort != 0 { - result.hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort) - } - } - - return result -} - -func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) && proxier.isInitialized() { - proxier.syncRunner.Run() - } -} - -func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) && proxier.isInitialized() { - proxier.syncRunner.Run() - } -} - -func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) && proxier.isInitialized() { - proxier.syncRunner.Run() - } -} - -func (proxier *Proxier) OnEndpointsSynced() { - proxier.mu.Lock() - proxier.endpointsSynced = true - proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) - proxier.mu.Unlock() - - // Sync unconditionally - this is called once per lifetime. - proxier.syncProxyRules() -} - -// <endpointsMap> is updated by this function (based on the given changes). -// <changes> map is cleared after applying them. -func updateEndpointsMap( - endpointsMap proxyEndpointsMap, - changes *endpointsChangeMap, - hostname string) (result updateEndpointMapResult) { - result.staleEndpoints = make(map[endpointServicePair]bool) - result.staleServiceNames = make(map[proxy.ServicePortName]bool) - - func() { - changes.lock.Lock() - defer changes.lock.Unlock() - for _, change := range changes.items { - endpointsMap.unmerge(change.previous) - endpointsMap.merge(change.current) - detectStaleConnections(change.previous, change.current, result.staleEndpoints, result.staleServiceNames) - } - changes.items = make(map[types.NamespacedName]*endpointsChange) - }() - - if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) { - return - } - - // TODO: If this will appear to be computationally expensive, consider - // computing this incrementally similarly to endpointsMap. - result.hcEndpoints = make(map[types.NamespacedName]int) - localIPs := getLocalIPs(endpointsMap) - for nsn, ips := range localIPs { - result.hcEndpoints[nsn] = len(ips) - } - - return result -} - -// <staleEndpoints> and <staleServices> are modified by this function with detected stale connections. -func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool, staleServiceNames map[proxy.ServicePortName]bool) { - for svcPortName, epList := range oldEndpointsMap { - for _, ep := range epList { - stale := true - for i := range newEndpointsMap[svcPortName] { - if *newEndpointsMap[svcPortName][i] == *ep { - stale = false - break - } - } - if stale { - glog.V(4).Infof("Stale endpoint %v -> %v", svcPortName, ep.endpoint) - staleEndpoints[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPortName}] = true - } - } - } - - for svcPortName, epList := range newEndpointsMap { - // For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service. - if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 { - staleServiceNames[svcPortName] = true - } - } -} - -func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String { - localIPs := make(map[types.NamespacedName]sets.String) - for svcPortName := range endpointsMap { - for _, ep := range endpointsMap[svcPortName] { - if ep.isLocal { - nsn := svcPortName.NamespacedName - if localIPs[nsn] == nil { - localIPs[nsn] = sets.NewString() - } - localIPs[nsn].Insert(ep.IPPart()) // just the IP part - } - } - } - return localIPs -} - -// Translates single Endpoints object to proxyEndpointsMap. -// This function is used for incremental updated of endpointsMap. -// -// NOTE: endpoints object should NOT be modified. -func endpointsToEndpointsMap(endpoints *api.Endpoints, hostname string) proxyEndpointsMap { - if endpoints == nil { - return nil - } - - endpointsMap := make(proxyEndpointsMap) - // We need to build a map of portname -> all ip:ports for that - // portname. Explode Endpoints.Subsets[*] into this structure. - for i := range endpoints.Subsets { - ss := &endpoints.Subsets[i] - for i := range ss.Ports { - port := &ss.Ports[i] - if port.Port == 0 { - glog.Warningf("ignoring invalid endpoint port %s", port.Name) - continue - } - svcPortName := proxy.ServicePortName{ - NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, - Port: port.Name, - } - for i := range ss.Addresses { - addr := &ss.Addresses[i] - if addr.IP == "" { - glog.Warningf("ignoring invalid endpoint port %s with empty host", port.Name) - continue - } - epInfo := &endpointsInfo{ - endpoint: net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))), - isLocal: addr.NodeName != nil && *addr.NodeName == hostname, - } - endpointsMap[svcPortName] = append(endpointsMap[svcPortName], epInfo) - } - if glog.V(3) { - newEPList := []string{} - for _, ep := range endpointsMap[svcPortName] { - newEPList = append(newEPList, ep.endpoint) - } - glog.Infof("Setting endpoints for %q to %+v", svcPortName, newEPList) - } - } - } - return endpointsMap -} - -// Translates single Service object to proxyServiceMap. -// -// NOTE: service object should NOT be modified. -func serviceToServiceMap(service *api.Service) proxyServiceMap { - if service == nil { - return nil - } - svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if shouldSkipService(svcName, service) { - return nil - } - - serviceMap := make(proxyServiceMap) - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] - svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} - serviceMap[svcPortName] = newServiceInfo(svcPortName, servicePort, service) - } - return serviceMap -} - -// portProtoHash takes the ServicePortName and protocol for a service -// returns the associated 16 character hash. This is computed by hashing (sha256) -// then encoding to base32 and truncating to 16 chars. We do this because IPTables -// Chain Names must be <= 28 chars long, and the longer they are the harder they are to read. -func portProtoHash(servicePortName string, protocol string) string { - hash := sha256.Sum256([]byte(servicePortName + protocol)) - encoded := base32.StdEncoding.EncodeToString(hash[:]) - return encoded[:16] -} - -// servicePortChainName takes the ServicePortName for a service and -// returns the associated iptables chain. This is computed by hashing (sha256) -// then encoding to base32 and truncating with the prefix "KUBE-SVC-". -func servicePortChainName(servicePortName string, protocol string) utiliptables.Chain { - return utiliptables.Chain("KUBE-SVC-" + portProtoHash(servicePortName, protocol)) -} - -// serviceFirewallChainName takes the ServicePortName for a service and -// returns the associated iptables chain. This is computed by hashing (sha256) -// then encoding to base32 and truncating with the prefix "KUBE-FW-". -func serviceFirewallChainName(servicePortName string, protocol string) utiliptables.Chain { - return utiliptables.Chain("KUBE-FW-" + portProtoHash(servicePortName, protocol)) -} - -// serviceLBPortChainName takes the ServicePortName for a service and -// returns the associated iptables chain. This is computed by hashing (sha256) -// then encoding to base32 and truncating with the prefix "KUBE-XLB-". We do -// this because IPTables Chain Names must be <= 28 chars long, and the longer -// they are the harder they are to read. -func serviceLBChainName(servicePortName string, protocol string) utiliptables.Chain { - return utiliptables.Chain("KUBE-XLB-" + portProtoHash(servicePortName, protocol)) -} - -// This is the same as servicePortChainName but with the endpoint included. -func servicePortEndpointChainName(servicePortName string, protocol string, endpoint string) utiliptables.Chain { - hash := sha256.Sum256([]byte(servicePortName + protocol + endpoint)) - encoded := base32.StdEncoding.EncodeToString(hash[:]) - return utiliptables.Chain("KUBE-SEP-" + encoded[:16]) -} - -type endpointServicePair struct { - endpoint string - servicePortName proxy.ServicePortName -} - -func (esp *endpointServicePair) IPPart() string { - if index := strings.Index(esp.endpoint, ":"); index != -1 { - return esp.endpoint[0:index] - } - return esp.endpoint -} - -const noConnectionToDelete = "0 flow entries have been deleted" - -// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we -// risk sending more traffic to it, all of which will be lost (because UDP). -// This assumes the proxier mutex is held -func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServicePair]bool) { - for epSvcPair := range connectionMap { - if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP { - endpointIP := epSvcPair.endpoint[0:strings.Index(epSvcPair.endpoint, ":")] - glog.V(2).Infof("Deleting connection tracking state for service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP) - err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp") - if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { - // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. - // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it - // is expensive to baby sit all udp connections to kubernetes services. - glog.Errorf("conntrack return with error: %v", err) - } - } - } -} - -// This is where all of the iptables-save/restore calls happen. -// The only other iptables rules are those that are setup in iptablesInit() -// This assumes proxier.mu is NOT held -func (proxier *Proxier) syncProxyRules() { - proxier.mu.Lock() - defer proxier.mu.Unlock() - - start := time.Now() - defer func() { - SyncProxyRulesLatency.Observe(sinceInMicroseconds(start)) - glog.V(4).Infof("syncProxyRules took %v", time.Since(start)) - }() - // don't sync rules till we've received services and endpoints - if !proxier.endpointsSynced || !proxier.servicesSynced { - glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master") - return - } - - // We assume that if this was called, we really want to sync them, - // even if nothing changed in the meantime. In other words, callers are - // responsible for detecting no-op changes and not calling this function. - serviceUpdateResult := updateServiceMap( - proxier.serviceMap, &proxier.serviceChanges) - endpointUpdateResult := updateEndpointsMap( - proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname) - - staleServices := serviceUpdateResult.staleServices - // merge stale services gathered from updateEndpointsMap - for svcPortName := range endpointUpdateResult.staleServiceNames { - if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.protocol == api.ProtocolUDP { - glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.clusterIP.String()) - staleServices.Insert(svcInfo.clusterIP.String()) - } - } - - glog.V(3).Infof("Syncing iptables rules") - - // Create and link the kube services chain. - { - tablesNeedServicesChain := []utiliptables.Table{utiliptables.TableFilter, utiliptables.TableNAT} - for _, table := range tablesNeedServicesChain { - if _, err := proxier.iptables.EnsureChain(table, kubeServicesChain); err != nil { - glog.Errorf("Failed to ensure that %s chain %s exists: %v", table, kubeServicesChain, err) - return - } - } - - tableChainsNeedJumpServices := []struct { - table utiliptables.Table - chain utiliptables.Chain - }{ - {utiliptables.TableFilter, utiliptables.ChainInput}, - {utiliptables.TableFilter, utiliptables.ChainOutput}, - {utiliptables.TableNAT, utiliptables.ChainOutput}, - {utiliptables.TableNAT, utiliptables.ChainPrerouting}, - } - comment := "kubernetes service portals" - args := []string{"-m", "comment", "--comment", comment, "-j", string(kubeServicesChain)} - for _, tc := range tableChainsNeedJumpServices { - if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil { - glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeServicesChain, err) - return - } - } - } - - // Create and link the kube postrouting chain. - { - if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, kubePostroutingChain); err != nil { - glog.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubePostroutingChain, err) - return - } - - comment := "kubernetes postrouting rules" - args := []string{"-m", "comment", "--comment", comment, "-j", string(kubePostroutingChain)} - if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { - glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, kubePostroutingChain, err) - return - } - } - - // - // Below this point we will not return until we try to write the iptables rules. - // - - // Get iptables-save output so we can check for existing chains and rules. - // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore - existingFilterChains := make(map[utiliptables.Chain]string) - proxier.iptablesData.Reset() - err := proxier.iptables.SaveInto(utiliptables.TableFilter, proxier.iptablesData) - if err != nil { // if we failed to get any rules - glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) - } else { // otherwise parse the output - existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, proxier.iptablesData.Bytes()) - } - - existingNATChains := make(map[utiliptables.Chain]string) - proxier.iptablesData.Reset() - err = proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData) - if err != nil { // if we failed to get any rules - glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) - } else { // otherwise parse the output - existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, proxier.iptablesData.Bytes()) - } - - // Reset all buffers used later. - // This is to avoid memory reallocations and thus improve performance. - proxier.filterChains.Reset() - proxier.filterRules.Reset() - proxier.natChains.Reset() - proxier.natRules.Reset() - - // Write table headers. - writeLine(proxier.filterChains, "*filter") - writeLine(proxier.natChains, "*nat") - - // Make sure we keep stats for the top-level chains, if they existed - // (which most should have because we created them above). - if chain, ok := existingFilterChains[kubeServicesChain]; ok { - writeLine(proxier.filterChains, chain) - } else { - writeLine(proxier.filterChains, utiliptables.MakeChainLine(kubeServicesChain)) - } - if chain, ok := existingNATChains[kubeServicesChain]; ok { - writeLine(proxier.natChains, chain) - } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(kubeServicesChain)) - } - if chain, ok := existingNATChains[kubeNodePortsChain]; ok { - writeLine(proxier.natChains, chain) - } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(kubeNodePortsChain)) - } - if chain, ok := existingNATChains[kubePostroutingChain]; ok { - writeLine(proxier.natChains, chain) - } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(kubePostroutingChain)) - } - if chain, ok := existingNATChains[KubeMarkMasqChain]; ok { - writeLine(proxier.natChains, chain) - } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(KubeMarkMasqChain)) - } - - // Install the kubernetes-specific postrouting rules. We use a whole chain for - // this so that it is easier to flush and change, for example if the mark - // value should ever change. - writeLine(proxier.natRules, []string{ - "-A", string(kubePostroutingChain), - "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`, - "-m", "mark", "--mark", proxier.masqueradeMark, - "-j", "MASQUERADE", - }...) - - // Install the kubernetes-specific masquerade mark rule. We use a whole chain for - // this so that it is easier to flush and change, for example if the mark - // value should ever change. - writeLine(proxier.natRules, []string{ - "-A", string(KubeMarkMasqChain), - "-j", "MARK", "--set-xmark", proxier.masqueradeMark, - }...) - - // Accumulate NAT chains to keep. - activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set - - // Accumulate the set of local ports that we will be holding open once this update is complete - replacementPortsMap := map[localPort]closeable{} - - // We are creating those slices ones here to avoid memory reallocations - // in every loop. Note that reuse the memory, instead of doing: - // slice = <some new slice> - // you should always do one of the below: - // slice = slice[:0] // and then append to it - // slice = append(slice[:0], ...) - endpoints := make([]*endpointsInfo, 0) - endpointChains := make([]utiliptables.Chain, 0) - // To avoid growing this slice, we arbitrarily set its size to 64, - // there is never more than that many arguments for a single line. - // Note that even if we go over 64, it will still be correct - it - // is just for efficiency, not correctness. - args := make([]string, 64) - - // Build rules for each service. - var svcNameString string - for svcName, svcInfo := range proxier.serviceMap { - protocol := strings.ToLower(string(svcInfo.protocol)) - svcNameString = svcInfo.serviceNameString - - // Create the per-service chain, retaining counters if possible. - svcChain := svcInfo.servicePortChainName - if chain, ok := existingNATChains[svcChain]; ok { - writeLine(proxier.natChains, chain) - } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(svcChain)) - } - activeNATChains[svcChain] = true - - svcXlbChain := svcInfo.serviceLBChainName - if svcInfo.onlyNodeLocalEndpoints { - // Only for services request OnlyLocal traffic - // create the per-service LB chain, retaining counters if possible. - if lbChain, ok := existingNATChains[svcXlbChain]; ok { - writeLine(proxier.natChains, lbChain) - } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(svcXlbChain)) - } - activeNATChains[svcXlbChain] = true - } else if activeNATChains[svcXlbChain] { - // Cleanup the previously created XLB chain for this service - delete(activeNATChains, svcXlbChain) - } - - // Capture the clusterIP. - args = append(args[:0], - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString), - "-m", protocol, "-p", protocol, - "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), - "--dport", strconv.Itoa(svcInfo.port), - ) - if proxier.masqueradeAll { - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) - } else if len(proxier.clusterCIDR) > 0 { - // This masquerades off-cluster traffic to a service VIP. The idea - // is that you can establish a static route for your Service range, - // routing to any node, and that node will bridge into the Service - // for you. Since that might bounce off-node, we masquerade here. - // If/when we support "Local" policy for VIPs, we should update this. - writeLine(proxier.natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...) - } - writeLine(proxier.natRules, append(args, "-j", string(svcChain))...) - - // Capture externalIPs. - for _, externalIP := range svcInfo.externalIPs { - // If the "external" IP happens to be an IP that is local to this - // machine, hold the local port open so no other process can open it - // (because the socket might open but it would never work). - if local, err := isLocalIP(externalIP); err != nil { - glog.Errorf("can't determine if IP is local, assuming not: %v", err) - } else if local { - lp := localPort{ - desc: "externalIP for " + svcNameString, - ip: externalIP, - port: svcInfo.port, - protocol: protocol, - } - if proxier.portsMap[lp] != nil { - glog.V(4).Infof("Port %s was open before and is still needed", lp.String()) - replacementPortsMap[lp] = proxier.portsMap[lp] - } else { - socket, err := proxier.portMapper.OpenLocalPort(&lp) - if err != nil { - msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err) - - proxier.recorder.Eventf( - &clientv1.ObjectReference{ - Kind: "Node", - Name: proxier.hostname, - UID: types.UID(proxier.hostname), - Namespace: "", - }, api.EventTypeWarning, err.Error(), msg) - glog.Error(msg) - continue - } - replacementPortsMap[lp] = socket - } - } // We're holding the port, so it's OK to install iptables rules. - args = append(args[:0], - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString), - "-m", protocol, "-p", protocol, - "-d", fmt.Sprintf("%s/32", externalIP), - "--dport", strconv.Itoa(svcInfo.port), - ) - // We have to SNAT packets to external IPs. - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) - - // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container) - // nor from a local process to be forwarded to the service. - // This rule roughly translates to "all traffic from off-machine". - // This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later. - externalTrafficOnlyArgs := append(args, - "-m", "physdev", "!", "--physdev-is-in", - "-m", "addrtype", "!", "--src-type", "LOCAL") - writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", string(svcChain))...) - dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL") - // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local. - // This covers cases like GCE load-balancers which get added to the local routing table. - writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...) - - // If the service has no endpoints then reject packets coming via externalIP - // Install ICMP Reject rule in filter table for destination=externalIP and dport=svcport - if len(proxier.endpointsMap[svcName]) == 0 { - writeLine(proxier.filterRules, - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), - "-m", protocol, "-p", protocol, - "-d", fmt.Sprintf("%s/32", externalIP), - "--dport", strconv.Itoa(svcInfo.port), - "-j", "REJECT", - ) - } - } - - // Capture load-balancer ingress. - fwChain := svcInfo.serviceFirewallChainName - for _, ingress := range svcInfo.loadBalancerStatus.Ingress { - if ingress.IP != "" { - // create service firewall chain - if chain, ok := existingNATChains[fwChain]; ok { - writeLine(proxier.natChains, chain) - } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(fwChain)) - } - activeNATChains[fwChain] = true - // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. - // This currently works for loadbalancers that preserves source ips. - // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. - - args = append(args[:0], - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString), - "-m", protocol, "-p", protocol, - "-d", fmt.Sprintf("%s/32", ingress.IP), - "--dport", strconv.Itoa(svcInfo.port), - ) - // jump to service firewall chain - writeLine(proxier.natRules, append(args, "-j", string(fwChain))...) - - args = append(args[:0], - "-A", string(fwChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString), - ) - - // Each source match rule in the FW chain may jump to either the SVC or the XLB chain - chosenChain := svcXlbChain - // If we are proxying globally, we need to masquerade in case we cross nodes. - // If we are proxying only locally, we can retain the source IP. - if !svcInfo.onlyNodeLocalEndpoints { - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) - chosenChain = svcChain - } - - if len(svcInfo.loadBalancerSourceRanges) == 0 { - // allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain - writeLine(proxier.natRules, append(args, "-j", string(chosenChain))...) - } else { - // firewall filter based on each source range - allowFromNode := false - for _, src := range svcInfo.loadBalancerSourceRanges { - writeLine(proxier.natRules, append(args, "-s", src, "-j", string(chosenChain))...) - // ignore error because it has been validated - _, cidr, _ := net.ParseCIDR(src) - if cidr.Contains(proxier.nodeIP) { - allowFromNode = true - } - } - // generally, ip route rule was added to intercept request to loadbalancer vip from the - // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly. - // Need to add the following rule to allow request on host. - if allowFromNode { - writeLine(proxier.natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", string(chosenChain))...) - } - } - - // If the packet was able to reach the end of firewall chain, then it did not get DNATed. - // It means the packet cannot go thru the firewall, then mark it for DROP - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...) - } - } - - // Capture nodeports. If we had more than 2 rules it might be - // worthwhile to make a new per-service chain for nodeport rules, but - // with just 2 rules it ends up being a waste and a cognitive burden. - if svcInfo.nodePort != 0 { - // Hold the local port open so no other process can open it - // (because the socket might open but it would never work). - lp := localPort{ - desc: "nodePort for " + svcNameString, - ip: "", - port: svcInfo.nodePort, - protocol: protocol, - } - if proxier.portsMap[lp] != nil { - glog.V(4).Infof("Port %s was open before and is still needed", lp.String()) - replacementPortsMap[lp] = proxier.portsMap[lp] - } else { - socket, err := proxier.portMapper.OpenLocalPort(&lp) - if err != nil { - glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err) - continue - } - if lp.protocol == "udp" { - proxier.clearUDPConntrackForPort(lp.port) - } - replacementPortsMap[lp] = socket - } // We're holding the port, so it's OK to install iptables rules. - - args = append(args[:0], - "-A", string(kubeNodePortsChain), - "-m", "comment", "--comment", svcNameString, - "-m", protocol, "-p", protocol, - "--dport", strconv.Itoa(svcInfo.nodePort), - ) - if !svcInfo.onlyNodeLocalEndpoints { - // Nodeports need SNAT, unless they're local. - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) - // Jump to the service chain. - writeLine(proxier.natRules, append(args, "-j", string(svcChain))...) - } else { - // TODO: Make all nodePorts jump to the firewall chain. - // Currently we only create it for loadbalancers (#33586). - writeLine(proxier.natRules, append(args, "-j", string(svcXlbChain))...) - } - - // If the service has no endpoints then reject packets. The filter - // table doesn't currently have the same per-service structure that - // the nat table does, so we just stick this into the kube-services - // chain. - if len(proxier.endpointsMap[svcName]) == 0 { - writeLine(proxier.filterRules, - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), - "-m", "addrtype", "--dst-type", "LOCAL", - "-m", protocol, "-p", protocol, - "--dport", strconv.Itoa(svcInfo.nodePort), - "-j", "REJECT", - ) - } - } - - // If the service has no endpoints then reject packets. - if len(proxier.endpointsMap[svcName]) == 0 { - writeLine(proxier.filterRules, - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), - "-m", protocol, "-p", protocol, - "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), - "--dport", strconv.Itoa(svcInfo.port), - "-j", "REJECT", - ) - continue - } - - // From here on, we assume there are active endpoints. - - // Generate the per-endpoint chains. We do this in multiple passes so we - // can group rules together. - // These two slices parallel each other - keep in sync - endpoints = endpoints[:0] - endpointChains = endpointChains[:0] - var endpointChain utiliptables.Chain - for _, ep := range proxier.endpointsMap[svcName] { - endpoints = append(endpoints, ep) - endpointChain = ep.endpointChain(svcNameString, protocol) - endpointChains = append(endpointChains, endpointChain) - - // Create the endpoint chain, retaining counters if possible. - if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok { - writeLine(proxier.natChains, chain) - } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain)) - } - activeNATChains[endpointChain] = true - } - - // First write session affinity rules, if applicable. - if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { - for _, endpointChain := range endpointChains { - writeLine(proxier.natRules, - "-A", string(svcChain), - "-m", "comment", "--comment", svcNameString, - "-m", "recent", "--name", string(endpointChain), - "--rcheck", "--seconds", strconv.Itoa(svcInfo.stickyMaxAgeMinutes*60), "--reap", - "-j", string(endpointChain)) - } - } - - // Now write loadbalancing & DNAT rules. - n := len(endpointChains) - for i, endpointChain := range endpointChains { - // Balancing rules in the per-service chain. - args = append(args[:0], []string{ - "-A", string(svcChain), - "-m", "comment", "--comment", svcNameString, - }...) - if i < (n - 1) { - // Each rule is a probabilistic match. - args = append(args, - "-m", "statistic", - "--mode", "random", - "--probability", proxier.probability(n-i)) - } - // The final (or only if n == 1) rule is a guaranteed match. - args = append(args, "-j", string(endpointChain)) - writeLine(proxier.natRules, args...) - - // Rules in the per-endpoint chain. - args = append(args[:0], - "-A", string(endpointChain), - "-m", "comment", "--comment", svcNameString, - ) - // Handle traffic that loops back to the originator with SNAT. - writeLine(proxier.natRules, append(args, - "-s", fmt.Sprintf("%s/32", endpoints[i].IPPart()), - "-j", string(KubeMarkMasqChain))...) - // Update client-affinity lists. - if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { - args = append(args, "-m", "recent", "--name", string(endpointChain), "--set") - } - // DNAT to final destination. - args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].endpoint) - writeLine(proxier.natRules, args...) - } - - // The logic below this applies only if this service is marked as OnlyLocal - if !svcInfo.onlyNodeLocalEndpoints { - continue - } - - // Now write ingress loadbalancing & DNAT rules only for services that request OnlyLocal traffic. - // TODO - This logic may be combinable with the block above that creates the svc balancer chain - localEndpoints := make([]*endpointsInfo, 0) - localEndpointChains := make([]utiliptables.Chain, 0) - for i := range endpointChains { - if endpoints[i].isLocal { - // These slices parallel each other; must be kept in sync - localEndpoints = append(localEndpoints, endpoints[i]) - localEndpointChains = append(localEndpointChains, endpointChains[i]) - } - } - // First rule in the chain redirects all pod -> external VIP traffic to the - // Service's ClusterIP instead. This happens whether or not we have local - // endpoints; only if clusterCIDR is specified - if len(proxier.clusterCIDR) > 0 { - args = append(args[:0], - "-A", string(svcXlbChain), - "-m", "comment", "--comment", - `"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`, - "-s", proxier.clusterCIDR, - "-j", string(svcChain), - ) - writeLine(proxier.natRules, args...) - } - - numLocalEndpoints := len(localEndpointChains) - if numLocalEndpoints == 0 { - // Blackhole all traffic since there are no local endpoints - args = append(args[:0], - "-A", string(svcXlbChain), - "-m", "comment", "--comment", - fmt.Sprintf(`"%s has no local endpoints"`, svcNameString), - "-j", - string(KubeMarkDropChain), - ) - writeLine(proxier.natRules, args...) - } else { - // Setup probability filter rules only over local endpoints - for i, endpointChain := range localEndpointChains { - // Balancing rules in the per-service chain. - args = append(args[:0], - "-A", string(svcXlbChain), - "-m", "comment", "--comment", - fmt.Sprintf(`"Balancing rule %d for %s"`, i, svcNameString), - ) - if i < (numLocalEndpoints - 1) { - // Each rule is a probabilistic match. - args = append(args, - "-m", "statistic", - "--mode", "random", - "--probability", proxier.probability(numLocalEndpoints-i)) - } - // The final (or only if n == 1) rule is a guaranteed match. - args = append(args, "-j", string(endpointChain)) - writeLine(proxier.natRules, args...) - } - } - } - - // Delete chains no longer in use. - for chain := range existingNATChains { - if !activeNATChains[chain] { - chainString := string(chain) - if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") && !strings.HasPrefix(chainString, "KUBE-XLB-") { - // Ignore chains that aren't ours. - continue - } - // We must (as per iptables) write a chain-line for it, which has - // the nice effect of flushing the chain. Then we can remove the - // chain. - writeLine(proxier.natChains, existingNATChains[chain]) - writeLine(proxier.natRules, "-X", chainString) - } - } - - // Finally, tail-call to the nodeports chain. This needs to be after all - // other service portal rules. - writeLine(proxier.natRules, - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`, - "-m", "addrtype", "--dst-type", "LOCAL", - "-j", string(kubeNodePortsChain)) - - // Write the end-of-table markers. - writeLine(proxier.filterRules, "COMMIT") - writeLine(proxier.natRules, "COMMIT") - - // Sync rules. - // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table - proxier.iptablesData.Reset() - proxier.iptablesData.Write(proxier.filterChains.Bytes()) - proxier.iptablesData.Write(proxier.filterRules.Bytes()) - proxier.iptablesData.Write(proxier.natChains.Bytes()) - proxier.iptablesData.Write(proxier.natRules.Bytes()) - - glog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes()) - err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters) - if err != nil { - glog.Errorf("Failed to execute iptables-restore: %v", err) - glog.V(2).Infof("Rules:\n%s", proxier.iptablesData.Bytes()) - // Revert new local ports. - revertPorts(replacementPortsMap, proxier.portsMap) - return - } - - // Close old local ports and save new ones. - for k, v := range proxier.portsMap { - if replacementPortsMap[k] == nil { - v.Close() - } - } - proxier.portsMap = replacementPortsMap - - // Update healthz timestamp. - if proxier.healthzServer != nil { - proxier.healthzServer.UpdateTimestamp() - } - - // Update healthchecks. The endpoints list might include services that are - // not "OnlyLocal", but the services list will not, and the healthChecker - // will just drop those endpoints. - if err := proxier.healthChecker.SyncServices(serviceUpdateResult.hcServices); err != nil { - glog.Errorf("Error syncing healtcheck services: %v", err) - } - if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil { - glog.Errorf("Error syncing healthcheck endoints: %v", err) - } - - // Finish housekeeping. - // TODO: these and clearUDPConntrackForPort() could be made more consistent. - utilproxy.DeleteServiceConnections(proxier.exec, staleServices.List()) - proxier.deleteEndpointConnections(endpointUpdateResult.staleEndpoints) -} - -// Clear UDP conntrack for port or all conntrack entries when port equal zero. -// When a packet arrives, it will not go through NAT table again, because it is not "the first" packet. -// The solution is clearing the conntrack. Known issus: -// https://github.com/docker/docker/issues/8795 -// https://github.com/kubernetes/kubernetes/issues/31983 -func (proxier *Proxier) clearUDPConntrackForPort(port int) { - glog.V(2).Infof("Deleting conntrack entries for udp connections") - if port > 0 { - err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "-p", "udp", "--dport", strconv.Itoa(port)) - if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { - glog.Errorf("conntrack return with error: %v", err) - } - } else { - glog.Errorf("Wrong port number. The port number must be greater than zero") - } -} - -// Join all words with spaces, terminate with newline and write to buf. -func writeLine(buf *bytes.Buffer, words ...string) { - // We avoid strings.Join for performance reasons. - for i := range words { - buf.WriteString(words[i]) - if i < len(words)-1 { - buf.WriteByte(' ') - } else { - buf.WriteByte('\n') - } - } -} - -func isLocalIP(ip string) (bool, error) { - addrs, err := net.InterfaceAddrs() - if err != nil { - return false, err - } - for i := range addrs { - intf, _, err := net.ParseCIDR(addrs[i].String()) - if err != nil { - return false, err - } - if net.ParseIP(ip).Equal(intf) { - return true, nil - } - } - return false, nil -} - -func openLocalPort(lp *localPort) (closeable, error) { - // For ports on node IPs, open the actual port and hold it, even though we - // use iptables to redirect traffic. - // This ensures a) that it's safe to use that port and b) that (a) stays - // true. The risk is that some process on the node (e.g. sshd or kubelet) - // is using a port and we give that same port out to a Service. That would - // be bad because iptables would silently claim the traffic but the process - // would never know. - // NOTE: We should not need to have a real listen()ing socket - bind() - // should be enough, but I can't figure out a way to e2e test without - // it. Tools like 'ss' and 'netstat' do not show sockets that are - // bind()ed but not listen()ed, and at least the default debian netcat - // has no way to avoid about 10 seconds of retries. - var socket closeable - switch lp.protocol { - case "tcp": - listener, err := net.Listen("tcp", net.JoinHostPort(lp.ip, strconv.Itoa(lp.port))) - if err != nil { - return nil, err - } - socket = listener - case "udp": - addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(lp.ip, strconv.Itoa(lp.port))) - if err != nil { - return nil, err - } - conn, err := net.ListenUDP("udp", addr) - if err != nil { - return nil, err - } - socket = conn - default: - return nil, fmt.Errorf("unknown protocol %q", lp.protocol) - } - glog.V(2).Infof("Opened local port %s", lp.String()) - return socket, nil -} - -// revertPorts is closing ports in replacementPortsMap but not in originalPortsMap. In other words, it only -// closes the ports opened in this sync. -func revertPorts(replacementPortsMap, originalPortsMap map[localPort]closeable) { - for k, v := range replacementPortsMap { - // Only close newly opened local ports - leave ones that were open before this update - if originalPortsMap[k] == nil { - glog.V(2).Infof("Closing local port %s after iptables-restore failure", k.String()) - v.Close() - } - } -} |