diff options
author | Matthew Heon <matthew.heon@gmail.com> | 2018-01-04 16:56:51 -0500 |
---|---|---|
committer | Atomic Bot <atomic-devel@projectatomic.io> | 2018-01-08 20:48:31 +0000 |
commit | 21881679099491049db367504d09f9a48d3e5fa7 (patch) | |
tree | 2e9595d44d510bed28730893a9fd3b969a2c8bb4 | |
parent | 5c5c024e80a9c78e94f8d3d7d13755b27dd9a8bf (diff) | |
download | podman-21881679099491049db367504d09f9a48d3e5fa7.tar.gz podman-21881679099491049db367504d09f9a48d3e5fa7.tar.bz2 podman-21881679099491049db367504d09f9a48d3e5fa7.zip |
Remove vendored files unnecessary after Kube hostport removal
Signed-off-by: Matthew Heon <matthew.heon@gmail.com>
Closes: #189
Approved by: mheon
25 files changed, 10 insertions, 5249 deletions
diff --git a/libpod/runtime.go b/libpod/runtime.go index 2a659a1e3..aed6acd86 100644 --- a/libpod/runtime.go +++ b/libpod/runtime.go @@ -20,16 +20,16 @@ type RuntimeOption func(*Runtime) error // Runtime is the core libpod runtime type Runtime struct { - config *RuntimeConfig - state State - store storage.Store - storageService *storageService - imageContext *types.SystemContext - ociRuntime *OCIRuntime - lockDir string - netPlugin ocicni.CNIPlugin - valid bool - lock sync.RWMutex + config *RuntimeConfig + state State + store storage.Store + storageService *storageService + imageContext *types.SystemContext + ociRuntime *OCIRuntime + lockDir string + netPlugin ocicni.CNIPlugin + valid bool + lock sync.RWMutex } // RuntimeConfig contains configuration options used to set up the runtime diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/network/hostport/fake_iptables.go b/vendor/k8s.io/kubernetes/pkg/kubelet/network/hostport/fake_iptables.go deleted file mode 100644 index 42f7c6811..000000000 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/network/hostport/fake_iptables.go +++ /dev/null @@ -1,331 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package hostport - -import ( - "bytes" - "fmt" - "net" - "strings" - - utiliptables "k8s.io/kubernetes/pkg/util/iptables" -) - -type fakeChain struct { - name utiliptables.Chain - rules []string -} - -type fakeTable struct { - name utiliptables.Table - chains map[string]*fakeChain -} - -type fakeIPTables struct { - tables map[string]*fakeTable -} - -func NewFakeIPTables() *fakeIPTables { - return &fakeIPTables{ - tables: make(map[string]*fakeTable, 0), - } -} - -func (f *fakeIPTables) GetVersion() (string, error) { - return "1.4.21", nil -} - -func (f *fakeIPTables) getTable(tableName utiliptables.Table) (*fakeTable, error) { - table, ok := f.tables[string(tableName)] - if !ok { - return nil, fmt.Errorf("Table %s does not exist", tableName) - } - return table, nil -} - -func (f *fakeIPTables) getChain(tableName utiliptables.Table, chainName utiliptables.Chain) (*fakeTable, *fakeChain, error) { - table, err := f.getTable(tableName) - if err != nil { - return nil, nil, err - } - - chain, ok := table.chains[string(chainName)] - if !ok { - return table, nil, fmt.Errorf("Chain %s/%s does not exist", tableName, chainName) - } - - return table, chain, nil -} - -func (f *fakeIPTables) ensureChain(tableName utiliptables.Table, chainName utiliptables.Chain) (bool, *fakeChain) { - table, chain, err := f.getChain(tableName, chainName) - if err != nil { - // either table or table+chain don't exist yet - if table == nil { - table = &fakeTable{ - name: tableName, - chains: make(map[string]*fakeChain), - } - f.tables[string(tableName)] = table - } - chain := &fakeChain{ - name: chainName, - rules: make([]string, 0), - } - table.chains[string(chainName)] = chain - return false, chain - } - return true, chain -} - -func (f *fakeIPTables) EnsureChain(tableName utiliptables.Table, chainName utiliptables.Chain) (bool, error) { - existed, _ := f.ensureChain(tableName, chainName) - return existed, nil -} - -func (f *fakeIPTables) FlushChain(tableName utiliptables.Table, chainName utiliptables.Chain) error { - _, chain, err := f.getChain(tableName, chainName) - if err != nil { - return err - } - chain.rules = make([]string, 0) - return nil -} - -func (f *fakeIPTables) DeleteChain(tableName utiliptables.Table, chainName utiliptables.Chain) error { - table, _, err := f.getChain(tableName, chainName) - if err != nil { - return err - } - delete(table.chains, string(chainName)) - return nil -} - -// Returns index of rule in array; < 0 if rule is not found -func findRule(chain *fakeChain, rule string) int { - for i, candidate := range chain.rules { - if rule == candidate { - return i - } - } - return -1 -} - -func (f *fakeIPTables) ensureRule(position utiliptables.RulePosition, tableName utiliptables.Table, chainName utiliptables.Chain, rule string) (bool, error) { - _, chain, err := f.getChain(tableName, chainName) - if err != nil { - _, chain = f.ensureChain(tableName, chainName) - } - - rule, err = normalizeRule(rule) - if err != nil { - return false, err - } - ruleIdx := findRule(chain, rule) - if ruleIdx >= 0 { - return true, nil - } - - if position == utiliptables.Prepend { - chain.rules = append([]string{rule}, chain.rules...) - } else if position == utiliptables.Append { - chain.rules = append(chain.rules, rule) - } else { - return false, fmt.Errorf("Unknown position argument %q", position) - } - - return false, nil -} - -func normalizeRule(rule string) (string, error) { - normalized := "" - remaining := strings.TrimSpace(rule) - for { - var end int - - if strings.HasPrefix(remaining, "--to-destination=") { - remaining = strings.Replace(remaining, "=", " ", 1) - } - - if remaining[0] == '"' { - end = strings.Index(remaining[1:], "\"") - if end < 0 { - return "", fmt.Errorf("Invalid rule syntax: mismatched quotes") - } - end += 2 - } else { - end = strings.Index(remaining, " ") - if end < 0 { - end = len(remaining) - } - } - arg := remaining[:end] - - // Normalize un-prefixed IP addresses like iptables does - if net.ParseIP(arg) != nil { - arg = arg + "/32" - } - - if len(normalized) > 0 { - normalized += " " - } - normalized += strings.TrimSpace(arg) - if len(remaining) == end { - break - } - remaining = remaining[end+1:] - } - return normalized, nil -} - -func (f *fakeIPTables) EnsureRule(position utiliptables.RulePosition, tableName utiliptables.Table, chainName utiliptables.Chain, args ...string) (bool, error) { - ruleArgs := make([]string, 0) - for _, arg := range args { - // quote args with internal spaces (like comments) - if strings.Index(arg, " ") >= 0 { - arg = fmt.Sprintf("\"%s\"", arg) - } - ruleArgs = append(ruleArgs, arg) - } - return f.ensureRule(position, tableName, chainName, strings.Join(ruleArgs, " ")) -} - -func (f *fakeIPTables) DeleteRule(tableName utiliptables.Table, chainName utiliptables.Chain, args ...string) error { - _, chain, err := f.getChain(tableName, chainName) - if err == nil { - rule := strings.Join(args, " ") - ruleIdx := findRule(chain, rule) - if ruleIdx < 0 { - return nil - } - chain.rules = append(chain.rules[:ruleIdx], chain.rules[ruleIdx+1:]...) - } - return nil -} - -func (f *fakeIPTables) IsIpv6() bool { - return false -} - -func saveChain(chain *fakeChain, data *bytes.Buffer) { - for _, rule := range chain.rules { - data.WriteString(fmt.Sprintf("-A %s %s\n", chain.name, rule)) - } -} - -func (f *fakeIPTables) SaveInto(tableName utiliptables.Table, buffer *bytes.Buffer) error { - table, err := f.getTable(tableName) - if err != nil { - return err - } - - buffer.WriteString(fmt.Sprintf("*%s\n", table.name)) - - rules := bytes.NewBuffer(nil) - for _, chain := range table.chains { - buffer.WriteString(fmt.Sprintf(":%s - [0:0]\n", string(chain.name))) - saveChain(chain, rules) - } - buffer.Write(rules.Bytes()) - buffer.WriteString("COMMIT\n") - return nil -} - -func (f *fakeIPTables) restore(restoreTableName utiliptables.Table, data []byte, flush utiliptables.FlushFlag) error { - buf := bytes.NewBuffer(data) - var tableName utiliptables.Table - for { - line, err := buf.ReadString('\n') - if err != nil { - break - } - if line[0] == '#' { - continue - } - - line = strings.TrimSuffix(line, "\n") - if strings.HasPrefix(line, "*") { - tableName = utiliptables.Table(line[1:]) - } - if tableName != "" { - if restoreTableName != "" && restoreTableName != tableName { - continue - } - if strings.HasPrefix(line, ":") { - chainName := utiliptables.Chain(strings.Split(line[1:], " ")[0]) - if flush == utiliptables.FlushTables { - table, chain, _ := f.getChain(tableName, chainName) - if chain != nil { - delete(table.chains, string(chainName)) - } - } - _, _ = f.ensureChain(tableName, chainName) - } else if strings.HasPrefix(line, "-A") { - parts := strings.Split(line, " ") - if len(parts) < 3 { - return fmt.Errorf("Invalid iptables rule '%s'", line) - } - chainName := utiliptables.Chain(parts[1]) - rule := strings.TrimPrefix(line, fmt.Sprintf("-A %s ", chainName)) - _, err := f.ensureRule(utiliptables.Append, tableName, chainName, rule) - if err != nil { - return err - } - } else if strings.HasPrefix(line, "-I") { - parts := strings.Split(line, " ") - if len(parts) < 3 { - return fmt.Errorf("Invalid iptables rule '%s'", line) - } - chainName := utiliptables.Chain(parts[1]) - rule := strings.TrimPrefix(line, fmt.Sprintf("-I %s ", chainName)) - _, err := f.ensureRule(utiliptables.Prepend, tableName, chainName, rule) - if err != nil { - return err - } - } else if strings.HasPrefix(line, "-X") { - parts := strings.Split(line, " ") - if len(parts) < 2 { - return fmt.Errorf("Invalid iptables rule '%s'", line) - } - if err := f.DeleteChain(tableName, utiliptables.Chain(parts[1])); err != nil { - return err - } - } else if line == "COMMIT" { - if restoreTableName == tableName { - return nil - } - tableName = "" - } - } - } - - return nil -} - -func (f *fakeIPTables) Restore(tableName utiliptables.Table, data []byte, flush utiliptables.FlushFlag, counters utiliptables.RestoreCountersFlag) error { - return f.restore(tableName, data, flush) -} - -func (f *fakeIPTables) RestoreAll(data []byte, flush utiliptables.FlushFlag, counters utiliptables.RestoreCountersFlag) error { - return f.restore("", data, flush) -} - -func (f *fakeIPTables) AddReloadFunc(reloadFunc func()) { -} - -func (f *fakeIPTables) Destroy() { -} diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/network/hostport/hostport.go b/vendor/k8s.io/kubernetes/pkg/kubelet/network/hostport/hostport.go deleted file mode 100644 index c14c750ba..000000000 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/network/hostport/hostport.go +++ /dev/null @@ -1,200 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package hostport - -import ( - "fmt" - "net" - "strings" - - "github.com/golang/glog" - - "k8s.io/kubernetes/pkg/api/v1" - utiliptables "k8s.io/kubernetes/pkg/util/iptables" -) - -const ( - // the hostport chain - kubeHostportsChain utiliptables.Chain = "KUBE-HOSTPORTS" - // prefix for hostport chains - kubeHostportChainPrefix string = "KUBE-HP-" -) - -// PortMapping represents a network port in a container -type PortMapping struct { - Name string - HostPort int32 - ContainerPort int32 - Protocol v1.Protocol - HostIP string -} - -// PodPortMapping represents a pod's network state and associated container port mappings -type PodPortMapping struct { - Namespace string - Name string - PortMappings []*PortMapping - HostNetwork bool - IP net.IP -} - -// ConstructPodPortMapping creates a PodPortMapping from the ports specified in the pod's -// containers. -func ConstructPodPortMapping(pod *v1.Pod, podIP net.IP) *PodPortMapping { - portMappings := make([]*PortMapping, 0) - for _, c := range pod.Spec.Containers { - for _, port := range c.Ports { - portMappings = append(portMappings, &PortMapping{ - Name: port.Name, - HostPort: port.HostPort, - ContainerPort: port.ContainerPort, - Protocol: port.Protocol, - HostIP: port.HostIP, - }) - } - } - - return &PodPortMapping{ - Namespace: pod.Namespace, - Name: pod.Name, - PortMappings: portMappings, - HostNetwork: pod.Spec.HostNetwork, - IP: podIP, - } -} - -type hostport struct { - port int32 - protocol string -} - -type hostportOpener func(*hostport) (closeable, error) - -type closeable interface { - Close() error -} - -func openLocalPort(hp *hostport) (closeable, error) { - // For ports on node IPs, open the actual port and hold it, even though we - // use iptables to redirect traffic. - // This ensures a) that it's safe to use that port and b) that (a) stays - // true. The risk is that some process on the node (e.g. sshd or kubelet) - // is using a port and we give that same port out to a Service. That would - // be bad because iptables would silently claim the traffic but the process - // would never know. - // NOTE: We should not need to have a real listen()ing socket - bind() - // should be enough, but I can't figure out a way to e2e test without - // it. Tools like 'ss' and 'netstat' do not show sockets that are - // bind()ed but not listen()ed, and at least the default debian netcat - // has no way to avoid about 10 seconds of retries. - var socket closeable - switch hp.protocol { - case "tcp": - listener, err := net.Listen("tcp", fmt.Sprintf(":%d", hp.port)) - if err != nil { - return nil, err - } - socket = listener - case "udp": - addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", hp.port)) - if err != nil { - return nil, err - } - conn, err := net.ListenUDP("udp", addr) - if err != nil { - return nil, err - } - socket = conn - default: - return nil, fmt.Errorf("unknown protocol %q", hp.protocol) - } - glog.V(3).Infof("Opened local port %s", hp.String()) - return socket, nil -} - -// openHostports opens all given hostports using the given hostportOpener -// If encounter any error, clean up and return the error -// If all ports are opened successfully, return the hostport and socket mapping -// TODO: move openHostports and closeHostports into a common struct -func openHostports(portOpener hostportOpener, podPortMapping *PodPortMapping) (map[hostport]closeable, error) { - var retErr error - ports := make(map[hostport]closeable) - for _, pm := range podPortMapping.PortMappings { - if pm.HostPort <= 0 { - continue - } - hp := portMappingToHostport(pm) - socket, err := portOpener(&hp) - if err != nil { - retErr = fmt.Errorf("cannot open hostport %d for pod %s: %v", pm.HostPort, getPodFullName(podPortMapping), err) - break - } - ports[hp] = socket - } - - // If encounter any error, close all hostports that just got opened. - if retErr != nil { - for hp, socket := range ports { - if err := socket.Close(); err != nil { - glog.Errorf("Cannot clean up hostport %d for pod %s: %v", hp.port, getPodFullName(podPortMapping), err) - } - } - return nil, retErr - } - return ports, nil -} - -// portMappingToHostport creates hostport structure based on input portmapping -func portMappingToHostport(portMapping *PortMapping) hostport { - return hostport{ - port: portMapping.HostPort, - protocol: strings.ToLower(string(portMapping.Protocol)), - } -} - -// ensureKubeHostportChains ensures the KUBE-HOSTPORTS chain is setup correctly -func ensureKubeHostportChains(iptables utiliptables.Interface, natInterfaceName string) error { - glog.V(4).Info("Ensuring kubelet hostport chains") - // Ensure kubeHostportChain - if _, err := iptables.EnsureChain(utiliptables.TableNAT, kubeHostportsChain); err != nil { - return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubeHostportsChain, err) - } - tableChainsNeedJumpServices := []struct { - table utiliptables.Table - chain utiliptables.Chain - }{ - {utiliptables.TableNAT, utiliptables.ChainOutput}, - {utiliptables.TableNAT, utiliptables.ChainPrerouting}, - } - args := []string{"-m", "comment", "--comment", "kube hostport portals", - "-m", "addrtype", "--dst-type", "LOCAL", - "-j", string(kubeHostportsChain)} - for _, tc := range tableChainsNeedJumpServices { - // KUBE-HOSTPORTS chain needs to be appended to the system chains. - // This ensures KUBE-SERVICES chain gets processed first. - // Since rules in KUBE-HOSTPORTS chain matches broader cases, allow the more specific rules to be processed first. - if _, err := iptables.EnsureRule(utiliptables.Append, tc.table, tc.chain, args...); err != nil { - return fmt.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeHostportsChain, err) - } - } - // Need to SNAT traffic from localhost - args = []string{"-m", "comment", "--comment", "SNAT for localhost access to hostports", "-o", natInterfaceName, "-s", "127.0.0.0/8", "-j", "MASQUERADE"} - if _, err := iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { - return fmt.Errorf("Failed to ensure that %s chain %s jumps to MASQUERADE: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, err) - } - return nil -} diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/network/hostport/hostport_manager.go b/vendor/k8s.io/kubernetes/pkg/kubelet/network/hostport/hostport_manager.go deleted file mode 100644 index 1499ff9c6..000000000 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/network/hostport/hostport_manager.go +++ /dev/null @@ -1,329 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package hostport - -import ( - "bytes" - "crypto/sha256" - "encoding/base32" - "fmt" - "strings" - "sync" - - "github.com/golang/glog" - utilerrors "k8s.io/apimachinery/pkg/util/errors" - iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables" - utildbus "k8s.io/kubernetes/pkg/util/dbus" - utilexec "k8s.io/kubernetes/pkg/util/exec" - utiliptables "k8s.io/kubernetes/pkg/util/iptables" -) - -// HostPortManager is an interface for adding and removing hostport for a given pod sandbox. -type HostPortManager interface { - // Add implements port mappings. - // id should be a unique identifier for a pod, e.g. podSandboxID. - // podPortMapping is the associated port mapping information for the pod. - // natInterfaceName is the interface that localhost used to talk to the given pod. - Add(id string, podPortMapping *PodPortMapping, natInterfaceName string) error - // Remove cleans up matching port mappings - // Remove must be able to clean up port mappings without pod IP - Remove(id string, podPortMapping *PodPortMapping) error -} - -type hostportManager struct { - hostPortMap map[hostport]closeable - iptables utiliptables.Interface - portOpener hostportOpener - mu sync.Mutex -} - -func NewHostportManager() HostPortManager { - iptInterface := utiliptables.New(utilexec.New(), utildbus.New(), utiliptables.ProtocolIpv4) - return &hostportManager{ - hostPortMap: make(map[hostport]closeable), - iptables: iptInterface, - portOpener: openLocalPort, - } -} - -func (hm *hostportManager) Add(id string, podPortMapping *PodPortMapping, natInterfaceName string) (err error) { - if podPortMapping == nil || podPortMapping.HostNetwork { - return nil - } - podFullName := getPodFullName(podPortMapping) - - // skip if there is no hostport needed - hostportMappings := gatherHostportMappings(podPortMapping) - if len(hostportMappings) == 0 { - return nil - } - - if podPortMapping.IP.To4() == nil { - return fmt.Errorf("invalid or missing IP of pod %s", podFullName) - } - podIP := podPortMapping.IP.String() - - if err = ensureKubeHostportChains(hm.iptables, natInterfaceName); err != nil { - return err - } - - // Ensure atomicity for port opening and iptables operations - hm.mu.Lock() - defer hm.mu.Unlock() - - // try to open hostports - ports, err := openHostports(hm.portOpener, podPortMapping) - if err != nil { - return err - } - for hostport, socket := range ports { - hm.hostPortMap[hostport] = socket - } - - natChains := bytes.NewBuffer(nil) - natRules := bytes.NewBuffer(nil) - writeLine(natChains, "*nat") - - existingChains, existingRules, err := getExistingHostportIPTablesRules(hm.iptables) - if err != nil { - // clean up opened host port if encounter any error - return utilerrors.NewAggregate([]error{err, hm.closeHostports(hostportMappings)}) - } - - newChains := []utiliptables.Chain{} - for _, pm := range hostportMappings { - protocol := strings.ToLower(string(pm.Protocol)) - chain := getHostportChain(id, pm) - newChains = append(newChains, chain) - - // Add new hostport chain - writeLine(natChains, utiliptables.MakeChainLine(chain)) - - // Prepend the new chain to KUBE-HOSTPORTS - // This avoids any leaking iptables rule that takes up the same port - writeLine(natRules, "-I", string(kubeHostportsChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, podFullName, pm.HostPort), - "-m", protocol, "-p", protocol, "--dport", fmt.Sprintf("%d", pm.HostPort), - "-j", string(chain), - ) - - // SNAT if the traffic comes from the pod itself - writeLine(natRules, "-A", string(chain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, podFullName, pm.HostPort), - "-s", podIP, - "-j", string(iptablesproxy.KubeMarkMasqChain)) - - // DNAT to the podIP:containerPort - writeLine(natRules, "-A", string(chain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, podFullName, pm.HostPort), - "-m", protocol, "-p", protocol, - "-j", "DNAT", fmt.Sprintf("--to-destination=%s:%d", podIP, pm.ContainerPort)) - } - - // getHostportChain should be able to provide unique hostport chain name using hash - // if there is a chain conflict or multiple Adds have been triggered for a single pod, - // filtering should be able to avoid further problem - filterChains(existingChains, newChains) - existingRules = filterRules(existingRules, newChains) - - for _, chain := range existingChains { - writeLine(natChains, chain) - } - for _, rule := range existingRules { - writeLine(natRules, rule) - } - writeLine(natRules, "COMMIT") - - if err = hm.syncIPTables(append(natChains.Bytes(), natRules.Bytes()...)); err != nil { - // clean up opened host port if encounter any error - return utilerrors.NewAggregate([]error{err, hm.closeHostports(hostportMappings)}) - } - return nil -} - -func (hm *hostportManager) Remove(id string, podPortMapping *PodPortMapping) (err error) { - if podPortMapping == nil || podPortMapping.HostNetwork { - return nil - } - - hostportMappings := gatherHostportMappings(podPortMapping) - if len(hostportMappings) <= 0 { - return nil - } - - // Ensure atomicity for port closing and iptables operations - hm.mu.Lock() - defer hm.mu.Unlock() - - var existingChains map[utiliptables.Chain]string - var existingRules []string - existingChains, existingRules, err = getExistingHostportIPTablesRules(hm.iptables) - if err != nil { - return err - } - - // Gather target hostport chains for removal - chainsToRemove := []utiliptables.Chain{} - for _, pm := range hostportMappings { - chainsToRemove = append(chainsToRemove, getHostportChain(id, pm)) - - // To preserve backward compatibility for k8s 1.5 or earlier. - // Need to remove hostport chains added by hostportSyncer if there is any - // TODO: remove this in 1.7 - chainsToRemove = append(chainsToRemove, hostportChainName(pm, getPodFullName(podPortMapping))) - } - - // remove rules that consists of target chains - remainingRules := filterRules(existingRules, chainsToRemove) - - // gather target hostport chains that exists in iptables-save result - existingChainsToRemove := []utiliptables.Chain{} - for _, chain := range chainsToRemove { - if _, ok := existingChains[chain]; ok { - existingChainsToRemove = append(existingChainsToRemove, chain) - } - } - - natChains := bytes.NewBuffer(nil) - natRules := bytes.NewBuffer(nil) - writeLine(natChains, "*nat") - for _, chain := range existingChains { - writeLine(natChains, chain) - } - for _, rule := range remainingRules { - writeLine(natRules, rule) - } - for _, chain := range existingChainsToRemove { - writeLine(natRules, "-X", string(chain)) - } - writeLine(natRules, "COMMIT") - - if err = hm.syncIPTables(append(natChains.Bytes(), natRules.Bytes()...)); err != nil { - return err - } - - // clean up opened pod host ports - return hm.closeHostports(hostportMappings) -} - -// syncIPTables executes iptables-restore with given lines -func (hm *hostportManager) syncIPTables(lines []byte) error { - glog.V(3).Infof("Restoring iptables rules: %s", lines) - err := hm.iptables.RestoreAll(lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) - if err != nil { - return fmt.Errorf("Failed to execute iptables-restore: %v", err) - } - return nil -} - -// closeHostports tries to close all the listed host ports -// TODO: move closeHostports and openHostports into a common struct -func (hm *hostportManager) closeHostports(hostportMappings []*PortMapping) error { - errList := []error{} - for _, pm := range hostportMappings { - hp := portMappingToHostport(pm) - if socket, ok := hm.hostPortMap[hp]; ok { - glog.V(2).Infof("Closing host port %s", hp.String()) - if err := socket.Close(); err != nil { - errList = append(errList, fmt.Errorf("failed to close host port %s: %v", hp.String(), err)) - continue - } - delete(hm.hostPortMap, hp) - } - } - return utilerrors.NewAggregate(errList) -} - -// getHostportChain takes id, hostport and protocol for a pod and returns associated iptables chain. -// This is computed by hashing (sha256) then encoding to base32 and truncating with the prefix -// "KUBE-HP-". We do this because IPTables Chain Names must be <= 28 chars long, and the longer -// they are the harder they are to read. -// WARNING: Please do not change this function. Otherwise, HostportManager may not be able to -// identify existing iptables chains. -func getHostportChain(id string, pm *PortMapping) utiliptables.Chain { - hash := sha256.Sum256([]byte(id + string(pm.HostPort) + string(pm.Protocol))) - encoded := base32.StdEncoding.EncodeToString(hash[:]) - return utiliptables.Chain(kubeHostportChainPrefix + encoded[:16]) -} - -// gatherHostportMappings returns all the PortMappings which has hostport for a pod -func gatherHostportMappings(podPortMapping *PodPortMapping) []*PortMapping { - mappings := []*PortMapping{} - for _, pm := range podPortMapping.PortMappings { - if pm.HostPort <= 0 { - continue - } - mappings = append(mappings, pm) - } - return mappings -} - -// getExistingHostportIPTablesRules retrieves raw data from iptables-save, parse it, -// return all the hostport related chains and rules -func getExistingHostportIPTablesRules(iptables utiliptables.Interface) (map[utiliptables.Chain]string, []string, error) { - iptablesData := bytes.NewBuffer(nil) - err := iptables.SaveInto(utiliptables.TableNAT, iptablesData) - if err != nil { // if we failed to get any rules - return nil, nil, fmt.Errorf("failed to execute iptables-save: %v", err) - } - existingNATChains := utiliptables.GetChainLines(utiliptables.TableNAT, iptablesData.Bytes()) - - existingHostportChains := make(map[utiliptables.Chain]string) - existingHostportRules := []string{} - - for chain := range existingNATChains { - if strings.HasPrefix(string(chain), string(kubeHostportsChain)) || strings.HasPrefix(string(chain), kubeHostportChainPrefix) { - existingHostportChains[chain] = existingNATChains[chain] - } - } - - for _, line := range strings.Split(string(iptablesData.Bytes()), "\n") { - if strings.HasPrefix(line, fmt.Sprintf("-A %s", kubeHostportChainPrefix)) || - strings.HasPrefix(line, fmt.Sprintf("-A %s", string(kubeHostportsChain))) { - existingHostportRules = append(existingHostportRules, line) - } - } - return existingHostportChains, existingHostportRules, nil -} - -// filterRules filters input rules with input chains. Rules that did not involve any filter chain will be returned. -// The order of the input rules is important and is preserved. -func filterRules(rules []string, filters []utiliptables.Chain) []string { - filtered := []string{} - for _, rule := range rules { - skip := false - for _, filter := range filters { - if strings.Contains(rule, string(filter)) { - skip = true - break - } - } - if !skip { - filtered = append(filtered, rule) - } - } - return filtered -} - -// filterChains deletes all entries of filter chains from chain map -func filterChains(chains map[utiliptables.Chain]string, filterChains []utiliptables.Chain) { - for _, chain := range filterChains { - if _, ok := chains[chain]; ok { - delete(chains, chain) - } - } -} diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/network/hostport/hostport_syncer.go b/vendor/k8s.io/kubernetes/pkg/kubelet/network/hostport/hostport_syncer.go deleted file mode 100644 index d1c577dbd..000000000 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/network/hostport/hostport_syncer.go +++ /dev/null @@ -1,306 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package hostport - -import ( - "bytes" - "crypto/sha256" - "encoding/base32" - "fmt" - "strings" - "time" - - "github.com/golang/glog" - - iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables" - utildbus "k8s.io/kubernetes/pkg/util/dbus" - utilexec "k8s.io/kubernetes/pkg/util/exec" - utiliptables "k8s.io/kubernetes/pkg/util/iptables" -) - -// HostportSyncer takes a list of PodPortMappings and implements hostport all at once -type HostportSyncer interface { - // SyncHostports gathers all hostports on node and setup iptables rules to enable them. - // On each invocation existing ports are synced and stale rules are deleted. - SyncHostports(natInterfaceName string, activePodPortMappings []*PodPortMapping) error - // OpenPodHostportsAndSync opens hostports for a new PodPortMapping, gathers all hostports on - // node, sets up iptables rules enable them. On each invocation existing ports are synced and stale rules are deleted. - // 'newPortMapping' must also be present in 'activePodPortMappings'. - OpenPodHostportsAndSync(newPortMapping *PodPortMapping, natInterfaceName string, activePodPortMappings []*PodPortMapping) error -} - -type hostportSyncer struct { - hostPortMap map[hostport]closeable - iptables utiliptables.Interface - portOpener hostportOpener -} - -func NewHostportSyncer() HostportSyncer { - iptInterface := utiliptables.New(utilexec.New(), utildbus.New(), utiliptables.ProtocolIpv4) - return &hostportSyncer{ - hostPortMap: make(map[hostport]closeable), - iptables: iptInterface, - portOpener: openLocalPort, - } -} - -type targetPod struct { - podFullName string - podIP string -} - -func (hp *hostport) String() string { - return fmt.Sprintf("%s:%d", hp.protocol, hp.port) -} - -//openPodHostports opens all hostport for pod and returns the map of hostport and socket -func (h *hostportSyncer) openHostports(podHostportMapping *PodPortMapping) error { - var retErr error - ports := make(map[hostport]closeable) - for _, port := range podHostportMapping.PortMappings { - if port.HostPort <= 0 { - // Assume hostport is not specified in this portmapping. So skip - continue - } - hp := hostport{ - port: port.HostPort, - protocol: strings.ToLower(string(port.Protocol)), - } - socket, err := h.portOpener(&hp) - if err != nil { - retErr = fmt.Errorf("cannot open hostport %d for pod %s: %v", port.HostPort, getPodFullName(podHostportMapping), err) - break - } - ports[hp] = socket - } - - // If encounter any error, close all hostports that just got opened. - if retErr != nil { - for hp, socket := range ports { - if err := socket.Close(); err != nil { - glog.Errorf("Cannot clean up hostport %d for pod %s: %v", hp.port, getPodFullName(podHostportMapping), err) - } - } - return retErr - } - - for hostPort, socket := range ports { - h.hostPortMap[hostPort] = socket - } - - return nil -} - -func getPodFullName(pod *PodPortMapping) string { - // Use underscore as the delimiter because it is not allowed in pod name - // (DNS subdomain format), while allowed in the container name format. - return pod.Name + "_" + pod.Namespace -} - -// gatherAllHostports returns all hostports that should be presented on node, -// given the list of pods running on that node and ignoring host network -// pods (which don't need hostport <-> container port mapping). -func gatherAllHostports(activePodPortMappings []*PodPortMapping) (map[*PortMapping]targetPod, error) { - podHostportMap := make(map[*PortMapping]targetPod) - for _, pm := range activePodPortMappings { - if pm.IP.To4() == nil { - return nil, fmt.Errorf("Invalid or missing pod %s IP", getPodFullName(pm)) - } - // should not handle hostports for hostnetwork pods - if pm.HostNetwork { - continue - } - - for _, port := range pm.PortMappings { - if port.HostPort != 0 { - podHostportMap[port] = targetPod{podFullName: getPodFullName(pm), podIP: pm.IP.String()} - } - } - } - return podHostportMap, nil -} - -// Join all words with spaces, terminate with newline and write to buf. -func writeLine(buf *bytes.Buffer, words ...string) { - buf.WriteString(strings.Join(words, " ") + "\n") -} - -//hostportChainName takes containerPort for a pod and returns associated iptables chain. -// This is computed by hashing (sha256) -// then encoding to base32 and truncating with the prefix "KUBE-SVC-". We do -// this because IPTables Chain Names must be <= 28 chars long, and the longer -// they are the harder they are to read. -func hostportChainName(pm *PortMapping, podFullName string) utiliptables.Chain { - hash := sha256.Sum256([]byte(string(pm.HostPort) + string(pm.Protocol) + podFullName)) - encoded := base32.StdEncoding.EncodeToString(hash[:]) - return utiliptables.Chain(kubeHostportChainPrefix + encoded[:16]) -} - -// OpenPodHostportsAndSync opens hostports for a new PodPortMapping, gathers all hostports on -// node, sets up iptables rules enable them. And finally clean up stale hostports. -// 'newPortMapping' must also be present in 'activePodPortMappings'. -func (h *hostportSyncer) OpenPodHostportsAndSync(newPortMapping *PodPortMapping, natInterfaceName string, activePodPortMappings []*PodPortMapping) error { - // try to open pod host port if specified - if err := h.openHostports(newPortMapping); err != nil { - return err - } - - // Add the new pod to active pods if it's not present. - var found bool - for _, pm := range activePodPortMappings { - if pm.Namespace == newPortMapping.Namespace && pm.Name == newPortMapping.Name { - found = true - break - } - } - if !found { - activePodPortMappings = append(activePodPortMappings, newPortMapping) - } - - return h.SyncHostports(natInterfaceName, activePodPortMappings) -} - -// SyncHostports gathers all hostports on node and setup iptables rules enable them. And finally clean up stale hostports -func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMappings []*PodPortMapping) error { - start := time.Now() - defer func() { - glog.V(4).Infof("syncHostportsRules took %v", time.Since(start)) - }() - - hostportPodMap, err := gatherAllHostports(activePodPortMappings) - if err != nil { - return err - } - - // Ensure KUBE-HOSTPORTS chains - ensureKubeHostportChains(h.iptables, natInterfaceName) - - // Get iptables-save output so we can check for existing chains and rules. - // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore - existingNATChains := make(map[utiliptables.Chain]string) - iptablesData := bytes.NewBuffer(nil) - err = h.iptables.SaveInto(utiliptables.TableNAT, iptablesData) - if err != nil { // if we failed to get any rules - glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) - } else { // otherwise parse the output - existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, iptablesData.Bytes()) - } - - natChains := bytes.NewBuffer(nil) - natRules := bytes.NewBuffer(nil) - writeLine(natChains, "*nat") - // Make sure we keep stats for the top-level chains, if they existed - // (which most should have because we created them above). - if chain, ok := existingNATChains[kubeHostportsChain]; ok { - writeLine(natChains, chain) - } else { - writeLine(natChains, utiliptables.MakeChainLine(kubeHostportsChain)) - } - - // Accumulate NAT chains to keep. - activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set - - for port, target := range hostportPodMap { - protocol := strings.ToLower(string(port.Protocol)) - hostportChain := hostportChainName(port, target.podFullName) - if chain, ok := existingNATChains[hostportChain]; ok { - writeLine(natChains, chain) - } else { - writeLine(natChains, utiliptables.MakeChainLine(hostportChain)) - } - - activeNATChains[hostportChain] = true - - // Redirect to hostport chain - args := []string{ - "-A", string(kubeHostportsChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, port.HostPort), - "-m", protocol, "-p", protocol, - "--dport", fmt.Sprintf("%d", port.HostPort), - "-j", string(hostportChain), - } - writeLine(natRules, args...) - - // Assuming kubelet is syncing iptables KUBE-MARK-MASQ chain - // If the request comes from the pod that is serving the hostport, then SNAT - args = []string{ - "-A", string(hostportChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, port.HostPort), - "-s", target.podIP, "-j", string(iptablesproxy.KubeMarkMasqChain), - } - writeLine(natRules, args...) - - // Create hostport chain to DNAT traffic to final destination - // IPTables will maintained the stats for this chain - args = []string{ - "-A", string(hostportChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, port.HostPort), - "-m", protocol, "-p", protocol, - "-j", "DNAT", fmt.Sprintf("--to-destination=%s:%d", target.podIP, port.ContainerPort), - } - writeLine(natRules, args...) - } - - // Delete chains no longer in use. - for chain := range existingNATChains { - if !activeNATChains[chain] { - chainString := string(chain) - if !strings.HasPrefix(chainString, kubeHostportChainPrefix) { - // Ignore chains that aren't ours. - continue - } - // We must (as per iptables) write a chain-line for it, which has - // the nice effect of flushing the chain. Then we can remove the - // chain. - writeLine(natChains, existingNATChains[chain]) - writeLine(natRules, "-X", chainString) - } - } - writeLine(natRules, "COMMIT") - - natLines := append(natChains.Bytes(), natRules.Bytes()...) - glog.V(3).Infof("Restoring iptables rules: %s", natLines) - err = h.iptables.RestoreAll(natLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) - if err != nil { - return fmt.Errorf("Failed to execute iptables-restore: %v", err) - } - - h.cleanupHostportMap(hostportPodMap) - return nil -} - -// cleanupHostportMap closes obsolete hostports -func (h *hostportSyncer) cleanupHostportMap(containerPortMap map[*PortMapping]targetPod) { - // compute hostports that are supposed to be open - currentHostports := make(map[hostport]bool) - for containerPort := range containerPortMap { - hp := hostport{ - port: containerPort.HostPort, - protocol: strings.ToLower(string(containerPort.Protocol)), - } - currentHostports[hp] = true - } - - // close and delete obsolete hostports - for hp, socket := range h.hostPortMap { - if _, ok := currentHostports[hp]; !ok { - socket.Close() - glog.V(3).Infof("Closed local port %s", hp.String()) - delete(h.hostPortMap, hp) - } - } -} diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/doc.go b/vendor/k8s.io/kubernetes/pkg/proxy/doc.go deleted file mode 100644 index 3bed0fa39..000000000 --- a/vendor/k8s.io/kubernetes/pkg/proxy/doc.go +++ /dev/null @@ -1,18 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package proxy implements the layer-3 network proxy. -package proxy // import "k8s.io/kubernetes/pkg/proxy" diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/healthcheck/doc.go b/vendor/k8s.io/kubernetes/pkg/proxy/healthcheck/doc.go deleted file mode 100644 index 0a9ea0944..000000000 --- a/vendor/k8s.io/kubernetes/pkg/proxy/healthcheck/doc.go +++ /dev/null @@ -1,18 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package healthcheck provides tools for serving kube-proxy healthchecks. -package healthcheck // import "k8s.io/kubernetes/pkg/proxy/healthcheck" diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/healthcheck/healthcheck.go b/vendor/k8s.io/kubernetes/pkg/proxy/healthcheck/healthcheck.go deleted file mode 100644 index 39f10f71a..000000000 --- a/vendor/k8s.io/kubernetes/pkg/proxy/healthcheck/healthcheck.go +++ /dev/null @@ -1,333 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package healthcheck - -import ( - "fmt" - "net" - "net/http" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/golang/glog" - "github.com/renstrom/dedent" - - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/clock" - clientv1 "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/tools/record" - "k8s.io/kubernetes/pkg/api" -) - -// Server serves HTTP endpoints for each service name, with results -// based on the endpoints. If there are 0 endpoints for a service, it returns a -// 503 "Service Unavailable" error (telling LBs not to use this node). If there -// are 1 or more endpoints, it returns a 200 "OK". -type Server interface { - // Make the new set of services be active. Services that were open before - // will be closed. Services that are new will be opened. Service that - // existed and are in the new set will be left alone. The value of the map - // is the healthcheck-port to listen on. - SyncServices(newServices map[types.NamespacedName]uint16) error - // Make the new set of endpoints be active. Endpoints for services that do - // not exist will be dropped. The value of the map is the number of - // endpoints the service has on this node. - SyncEndpoints(newEndpoints map[types.NamespacedName]int) error -} - -// Listener allows for testing of Server. If the Listener argument -// to NewServer() is nil, the real net.Listen function will be used. -type Listener interface { - // Listen is very much like net.Listen, except the first arg (network) is - // fixed to be "tcp". - Listen(addr string) (net.Listener, error) -} - -// HTTPServerFactory allows for testing of Server. If the -// HTTPServerFactory argument to NewServer() is nil, the real -// http.Server type will be used. -type HTTPServerFactory interface { - // New creates an instance of a type satisfying HTTPServer. This is - // designed to include http.Server. - New(addr string, handler http.Handler) HTTPServer -} - -// HTTPServer allows for testing of Server. -type HTTPServer interface { - // Server is designed so that http.Server satifies this interface, - Serve(listener net.Listener) error -} - -// NewServer allocates a new healthcheck server manager. If either -// of the injected arguments are nil, defaults will be used. -func NewServer(hostname string, recorder record.EventRecorder, listener Listener, httpServerFactory HTTPServerFactory) Server { - if listener == nil { - listener = stdNetListener{} - } - if httpServerFactory == nil { - httpServerFactory = stdHTTPServerFactory{} - } - return &server{ - hostname: hostname, - recorder: recorder, - listener: listener, - httpFactory: httpServerFactory, - services: map[types.NamespacedName]*hcInstance{}, - } -} - -// Implement Listener in terms of net.Listen. -type stdNetListener struct{} - -func (stdNetListener) Listen(addr string) (net.Listener, error) { - return net.Listen("tcp", addr) -} - -var _ Listener = stdNetListener{} - -// Implement HTTPServerFactory in terms of http.Server. -type stdHTTPServerFactory struct{} - -func (stdHTTPServerFactory) New(addr string, handler http.Handler) HTTPServer { - return &http.Server{ - Addr: addr, - Handler: handler, - } -} - -var _ HTTPServerFactory = stdHTTPServerFactory{} - -type server struct { - hostname string - recorder record.EventRecorder // can be nil - listener Listener - httpFactory HTTPServerFactory - - lock sync.Mutex - services map[types.NamespacedName]*hcInstance -} - -func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) error { - hcs.lock.Lock() - defer hcs.lock.Unlock() - - // Remove any that are not needed any more. - for nsn, svc := range hcs.services { - if port, found := newServices[nsn]; !found || port != svc.port { - glog.V(2).Infof("Closing healthcheck %q on port %d", nsn.String(), svc.port) - if err := svc.listener.Close(); err != nil { - glog.Errorf("Close(%v): %v", svc.listener.Addr(), err) - } - delete(hcs.services, nsn) - } - } - - // Add any that are needed. - for nsn, port := range newServices { - if hcs.services[nsn] != nil { - glog.V(3).Infof("Existing healthcheck %q on port %d", nsn.String(), port) - continue - } - - glog.V(2).Infof("Opening healthcheck %q on port %d", nsn.String(), port) - svc := &hcInstance{port: port} - addr := fmt.Sprintf(":%d", port) - svc.server = hcs.httpFactory.New(addr, hcHandler{name: nsn, hcs: hcs}) - var err error - svc.listener, err = hcs.listener.Listen(addr) - if err != nil { - msg := fmt.Sprintf("node %s failed to start healthcheck %q on port %d: %v", hcs.hostname, nsn.String(), port, err) - - if hcs.recorder != nil { - hcs.recorder.Eventf( - &clientv1.ObjectReference{ - Kind: "Service", - Namespace: nsn.Namespace, - Name: nsn.Name, - UID: types.UID(nsn.String()), - }, api.EventTypeWarning, "FailedToStartHealthcheck", msg) - } - glog.Error(msg) - continue - } - hcs.services[nsn] = svc - - go func(nsn types.NamespacedName, svc *hcInstance) { - // Serve() will exit when the listener is closed. - glog.V(3).Infof("Starting goroutine for healthcheck %q on port %d", nsn.String(), svc.port) - if err := svc.server.Serve(svc.listener); err != nil { - glog.V(3).Infof("Healthcheck %q closed: %v", nsn.String(), err) - return - } - glog.V(3).Infof("Healthcheck %q closed", nsn.String()) - }(nsn, svc) - } - return nil -} - -type hcInstance struct { - port uint16 - listener net.Listener - server HTTPServer - endpoints int // number of local endpoints for a service -} - -type hcHandler struct { - name types.NamespacedName - hcs *server -} - -var _ http.Handler = hcHandler{} - -func (h hcHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { - h.hcs.lock.Lock() - svc, ok := h.hcs.services[h.name] - if !ok || svc == nil { - h.hcs.lock.Unlock() - glog.Errorf("Received request for closed healthcheck %q", h.name.String()) - return - } - count := svc.endpoints - h.hcs.lock.Unlock() - - resp.Header().Set("Content-Type", "application/json") - if count == 0 { - resp.WriteHeader(http.StatusServiceUnavailable) - } else { - resp.WriteHeader(http.StatusOK) - } - fmt.Fprintf(resp, strings.Trim(dedent.Dedent(fmt.Sprintf(` - { - "service": { - "namespace": %q, - "name": %q - }, - "localEndpoints": %d - } - `, h.name.Namespace, h.name.Name, count)), "\n")) -} - -func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error { - hcs.lock.Lock() - defer hcs.lock.Unlock() - - for nsn, count := range newEndpoints { - if hcs.services[nsn] == nil { - glog.V(3).Infof("Not saving endpoints for unknown healthcheck %q", nsn.String()) - continue - } - glog.V(3).Infof("Reporting %d endpoints for healthcheck %q", count, nsn.String()) - hcs.services[nsn].endpoints = count - } - for nsn, hci := range hcs.services { - if _, found := newEndpoints[nsn]; !found { - hci.endpoints = 0 - } - } - return nil -} - -// HealthzUpdater allows callers to update healthz timestamp only. -type HealthzUpdater interface { - UpdateTimestamp() -} - -// HealthzServer returns 200 "OK" by default. Once timestamp has been -// updated, it verifies we don't exceed max no respond duration since -// last update. -type HealthzServer struct { - listener Listener - httpFactory HTTPServerFactory - clock clock.Clock - - addr string - port int32 - healthTimeout time.Duration - - lastUpdated atomic.Value -} - -// NewDefaultHealthzServer returns a default healthz http server. -func NewDefaultHealthzServer(addr string, healthTimeout time.Duration) *HealthzServer { - return newHealthzServer(nil, nil, nil, addr, healthTimeout) -} - -func newHealthzServer(listener Listener, httpServerFactory HTTPServerFactory, c clock.Clock, addr string, healthTimeout time.Duration) *HealthzServer { - if listener == nil { - listener = stdNetListener{} - } - if httpServerFactory == nil { - httpServerFactory = stdHTTPServerFactory{} - } - if c == nil { - c = clock.RealClock{} - } - return &HealthzServer{ - listener: listener, - httpFactory: httpServerFactory, - clock: c, - addr: addr, - healthTimeout: healthTimeout, - } -} - -// UpdateTimestamp updates the lastUpdated timestamp. -func (hs *HealthzServer) UpdateTimestamp() { - hs.lastUpdated.Store(hs.clock.Now()) -} - -// Run starts the healthz http server and returns. -func (hs *HealthzServer) Run() { - serveMux := http.NewServeMux() - serveMux.Handle("/healthz", healthzHandler{hs: hs}) - server := hs.httpFactory.New(hs.addr, serveMux) - listener, err := hs.listener.Listen(hs.addr) - if err != nil { - glog.Errorf("Failed to start healthz on %s: %v", hs.addr, err) - return - } - go func() { - glog.V(3).Infof("Starting goroutine for healthz on %s", hs.addr) - if err := server.Serve(listener); err != nil { - glog.Errorf("Healhz closed: %v", err) - return - } - glog.Errorf("Unexpected healhz closed.") - }() -} - -type healthzHandler struct { - hs *HealthzServer -} - -func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { - lastUpdated := time.Time{} - if val := h.hs.lastUpdated.Load(); val != nil { - lastUpdated = val.(time.Time) - } - currentTime := h.hs.clock.Now() - - resp.Header().Set("Content-Type", "application/json") - if !lastUpdated.IsZero() && currentTime.After(lastUpdated.Add(h.hs.healthTimeout)) { - resp.WriteHeader(http.StatusServiceUnavailable) - } else { - resp.WriteHeader(http.StatusOK) - } - fmt.Fprintf(resp, fmt.Sprintf(`{"lastUpdated": %q,"currentTime": %q}`, lastUpdated, currentTime)) -} diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/iptables/metrics.go b/vendor/k8s.io/kubernetes/pkg/proxy/iptables/metrics.go deleted file mode 100644 index fabe6a595..000000000 --- a/vendor/k8s.io/kubernetes/pkg/proxy/iptables/metrics.go +++ /dev/null @@ -1,50 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package iptables - -import ( - "sync" - "time" - - "github.com/prometheus/client_golang/prometheus" -) - -const kubeProxySubsystem = "kubeproxy" - -var ( - SyncProxyRulesLatency = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Subsystem: kubeProxySubsystem, - Name: "sync_proxy_rules_latency_microseconds", - Help: "SyncProxyRules latency", - Buckets: prometheus.ExponentialBuckets(1000, 2, 15), - }, - ) -) - -var registerMetricsOnce sync.Once - -func RegisterMetrics() { - registerMetricsOnce.Do(func() { - prometheus.MustRegister(SyncProxyRulesLatency) - }) -} - -// Gets the time since the specified start in microseconds. -func sinceInMicroseconds(start time.Time) float64 { - return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) -} diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier.go b/vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier.go deleted file mode 100644 index 9d29d7b42..000000000 --- a/vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier.go +++ /dev/null @@ -1,1732 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package iptables - -// -// NOTE: this needs to be tested in e2e since it uses iptables for everything. -// - -import ( - "bytes" - "crypto/sha256" - "encoding/base32" - "fmt" - "net" - "reflect" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/golang/glog" - - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" - utilfeature "k8s.io/apiserver/pkg/util/feature" - clientv1 "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/tools/record" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/helper" - apiservice "k8s.io/kubernetes/pkg/api/service" - "k8s.io/kubernetes/pkg/features" - "k8s.io/kubernetes/pkg/proxy" - "k8s.io/kubernetes/pkg/proxy/healthcheck" - utilproxy "k8s.io/kubernetes/pkg/proxy/util" - "k8s.io/kubernetes/pkg/util/async" - utilexec "k8s.io/kubernetes/pkg/util/exec" - utiliptables "k8s.io/kubernetes/pkg/util/iptables" - utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" - utilversion "k8s.io/kubernetes/pkg/util/version" -) - -const ( - // iptablesMinVersion is the minimum version of iptables for which we will use the Proxier - // from this package instead of the userspace Proxier. While most of the - // features we need were available earlier, the '-C' flag was added more - // recently. We use that indirectly in Ensure* functions, and if we don't - // have it, we have to be extra careful about the exact args we feed in being - // the same as the args we read back (iptables itself normalizes some args). - // This is the "new" Proxier, so we require "new" versions of tools. - iptablesMinVersion = utiliptables.MinCheckVersion - - // the services chain - kubeServicesChain utiliptables.Chain = "KUBE-SERVICES" - - // the nodeports chain - kubeNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS" - - // the kubernetes postrouting chain - kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING" - - // the mark-for-masquerade chain - KubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ" - - // the mark-for-drop chain - KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP" -) - -// IPTablesVersioner can query the current iptables version. -type IPTablesVersioner interface { - // returns "X.Y.Z" - GetVersion() (string, error) -} - -// KernelCompatTester tests whether the required kernel capabilities are -// present to run the iptables proxier. -type KernelCompatTester interface { - IsCompatible() error -} - -// CanUseIPTablesProxier returns true if we should use the iptables Proxier -// instead of the "classic" userspace Proxier. This is determined by checking -// the iptables version and for the existence of kernel features. It may return -// an error if it fails to get the iptables version without error, in which -// case it will also return false. -func CanUseIPTablesProxier(iptver IPTablesVersioner, kcompat KernelCompatTester) (bool, error) { - minVersion, err := utilversion.ParseGeneric(iptablesMinVersion) - if err != nil { - return false, err - } - versionString, err := iptver.GetVersion() - if err != nil { - return false, err - } - version, err := utilversion.ParseGeneric(versionString) - if err != nil { - return false, err - } - if version.LessThan(minVersion) { - return false, nil - } - - // Check that the kernel supports what we need. - if err := kcompat.IsCompatible(); err != nil { - return false, err - } - return true, nil -} - -type LinuxKernelCompatTester struct{} - -func (lkct LinuxKernelCompatTester) IsCompatible() error { - // Check for the required sysctls. We don't care about the value, just - // that it exists. If this Proxier is chosen, we'll initialize it as we - // need. - _, err := utilsysctl.New().GetSysctl(sysctlRouteLocalnet) - return err -} - -const sysctlRouteLocalnet = "net/ipv4/conf/all/route_localnet" -const sysctlBridgeCallIPTables = "net/bridge/bridge-nf-call-iptables" - -// internal struct for string service information -type serviceInfo struct { - clusterIP net.IP - port int - protocol api.Protocol - nodePort int - loadBalancerStatus api.LoadBalancerStatus - sessionAffinityType api.ServiceAffinity - stickyMaxAgeMinutes int - externalIPs []string - loadBalancerSourceRanges []string - onlyNodeLocalEndpoints bool - healthCheckNodePort int - // The following fields are computed and stored for performance reasons. - serviceNameString string - servicePortChainName utiliptables.Chain - serviceFirewallChainName utiliptables.Chain - serviceLBChainName utiliptables.Chain -} - -// internal struct for endpoints information -type endpointsInfo struct { - endpoint string // TODO: should be an endpointString type - isLocal bool - // The following fields we lazily compute and store here for performance - // reasons. If the protocol is the same as you expect it to be, then the - // chainName can be reused, otherwise it should be recomputed. - protocol string - chainName utiliptables.Chain -} - -// Returns just the IP part of the endpoint. -func (e *endpointsInfo) IPPart() string { - if index := strings.Index(e.endpoint, ":"); index != -1 { - return e.endpoint[0:index] - } - return e.endpoint -} - -// Returns the endpoint chain name for a given endpointsInfo. -func (e *endpointsInfo) endpointChain(svcNameString, protocol string) utiliptables.Chain { - if e.protocol != protocol { - e.protocol = protocol - e.chainName = servicePortEndpointChainName(svcNameString, protocol, e.endpoint) - } - return e.chainName -} - -func (e *endpointsInfo) String() string { - return fmt.Sprintf("%v", *e) -} - -// returns a new serviceInfo struct -func newServiceInfo(svcPortName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo { - onlyNodeLocalEndpoints := false - if utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) && - apiservice.RequestsOnlyLocalTraffic(service) { - onlyNodeLocalEndpoints = true - } - info := &serviceInfo{ - clusterIP: net.ParseIP(service.Spec.ClusterIP), - port: int(port.Port), - protocol: port.Protocol, - nodePort: int(port.NodePort), - // Deep-copy in case the service instance changes - loadBalancerStatus: *helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer), - sessionAffinityType: service.Spec.SessionAffinity, - stickyMaxAgeMinutes: 180, // TODO: paramaterize this in the API. - externalIPs: make([]string, len(service.Spec.ExternalIPs)), - loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)), - onlyNodeLocalEndpoints: onlyNodeLocalEndpoints, - } - copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) - copy(info.externalIPs, service.Spec.ExternalIPs) - - if apiservice.NeedsHealthCheck(service) { - p := apiservice.GetServiceHealthCheckNodePort(service) - if p == 0 { - glog.Errorf("Service %q has no healthcheck nodeport", svcPortName.NamespacedName.String()) - } else { - info.healthCheckNodePort = int(p) - } - } - - // Store the following for performance reasons. - protocol := strings.ToLower(string(info.protocol)) - info.serviceNameString = svcPortName.String() - info.servicePortChainName = servicePortChainName(info.serviceNameString, protocol) - info.serviceFirewallChainName = serviceFirewallChainName(info.serviceNameString, protocol) - info.serviceLBChainName = serviceLBChainName(info.serviceNameString, protocol) - - return info -} - -type endpointsChange struct { - previous proxyEndpointsMap - current proxyEndpointsMap -} - -type endpointsChangeMap struct { - lock sync.Mutex - hostname string - items map[types.NamespacedName]*endpointsChange -} - -type serviceChange struct { - previous proxyServiceMap - current proxyServiceMap -} - -type serviceChangeMap struct { - lock sync.Mutex - items map[types.NamespacedName]*serviceChange -} - -type updateEndpointMapResult struct { - hcEndpoints map[types.NamespacedName]int - staleEndpoints map[endpointServicePair]bool - staleServiceNames map[proxy.ServicePortName]bool -} - -type updateServiceMapResult struct { - hcServices map[types.NamespacedName]uint16 - staleServices sets.String -} - -type proxyServiceMap map[proxy.ServicePortName]*serviceInfo -type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo - -func newEndpointsChangeMap(hostname string) endpointsChangeMap { - return endpointsChangeMap{ - hostname: hostname, - items: make(map[types.NamespacedName]*endpointsChange), - } -} - -func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) bool { - ecm.lock.Lock() - defer ecm.lock.Unlock() - - change, exists := ecm.items[*namespacedName] - if !exists { - change = &endpointsChange{} - change.previous = endpointsToEndpointsMap(previous, ecm.hostname) - ecm.items[*namespacedName] = change - } - change.current = endpointsToEndpointsMap(current, ecm.hostname) - if reflect.DeepEqual(change.previous, change.current) { - delete(ecm.items, *namespacedName) - } - return len(ecm.items) > 0 -} - -func newServiceChangeMap() serviceChangeMap { - return serviceChangeMap{ - items: make(map[types.NamespacedName]*serviceChange), - } -} - -func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) bool { - scm.lock.Lock() - defer scm.lock.Unlock() - - change, exists := scm.items[*namespacedName] - if !exists { - change = &serviceChange{} - change.previous = serviceToServiceMap(previous) - scm.items[*namespacedName] = change - } - change.current = serviceToServiceMap(current) - if reflect.DeepEqual(change.previous, change.current) { - delete(scm.items, *namespacedName) - } - return len(scm.items) > 0 -} - -func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String { - existingPorts := sets.NewString() - for svcPortName, info := range other { - existingPorts.Insert(svcPortName.Port) - _, exists := (*sm)[svcPortName] - if !exists { - glog.V(1).Infof("Adding new service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol) - } else { - glog.V(1).Infof("Updating existing service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol) - } - (*sm)[svcPortName] = info - } - return existingPorts -} - -func (sm *proxyServiceMap) unmerge(other proxyServiceMap, existingPorts, staleServices sets.String) { - for svcPortName := range other { - if existingPorts.Has(svcPortName.Port) { - continue - } - info, exists := (*sm)[svcPortName] - if exists { - glog.V(1).Infof("Removing service port %q", svcPortName) - if info.protocol == api.ProtocolUDP { - staleServices.Insert(info.clusterIP.String()) - } - delete(*sm, svcPortName) - } else { - glog.Errorf("Service port %q removed, but doesn't exists", svcPortName) - } - } -} - -func (em proxyEndpointsMap) merge(other proxyEndpointsMap) { - for svcPortName := range other { - em[svcPortName] = other[svcPortName] - } -} - -func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) { - for svcPortName := range other { - delete(em, svcPortName) - } -} - -// Proxier is an iptables based proxy for connections between a localhost:lport -// and services that provide the actual backends. -type Proxier struct { - // endpointsChanges and serviceChanges contains all changes to endpoints and - // services that happened since iptables was synced. For a single object, - // changes are accumulated, i.e. previous is state from before all of them, - // current is state after applying all of those. - endpointsChanges endpointsChangeMap - serviceChanges serviceChangeMap - - mu sync.Mutex // protects the following fields - serviceMap proxyServiceMap - endpointsMap proxyEndpointsMap - portsMap map[localPort]closeable - // endpointsSynced and servicesSynced are set to true when corresponding - // objects are synced after startup. This is used to avoid updating iptables - // with some partial data after kube-proxy restart. - endpointsSynced bool - servicesSynced bool - initialized int32 - syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules - - // These are effectively const and do not need the mutex to be held. - iptables utiliptables.Interface - masqueradeAll bool - masqueradeMark string - exec utilexec.Interface - clusterCIDR string - hostname string - nodeIP net.IP - portMapper portOpener - recorder record.EventRecorder - healthChecker healthcheck.Server - healthzServer healthcheck.HealthzUpdater - - // Since converting probabilities (floats) to strings is expensive - // and we are using only probabilities in the format of 1/n, we are - // precomputing some number of those and cache for future reuse. - precomputedProbabilities []string - - // The following buffers are used to reuse memory and avoid allocations - // that are significantly impacting performance. - iptablesData *bytes.Buffer - filterChains *bytes.Buffer - filterRules *bytes.Buffer - natChains *bytes.Buffer - natRules *bytes.Buffer -} - -type localPort struct { - desc string - ip string - port int - protocol string -} - -func (lp *localPort) String() string { - return fmt.Sprintf("%q (%s:%d/%s)", lp.desc, lp.ip, lp.port, lp.protocol) -} - -type closeable interface { - Close() error -} - -// portOpener is an interface around port opening/closing. -// Abstracted out for testing. -type portOpener interface { - OpenLocalPort(lp *localPort) (closeable, error) -} - -// listenPortOpener opens ports by calling bind() and listen(). -type listenPortOpener struct{} - -// OpenLocalPort holds the given local port open. -func (l *listenPortOpener) OpenLocalPort(lp *localPort) (closeable, error) { - return openLocalPort(lp) -} - -// Proxier implements ProxyProvider -var _ proxy.ProxyProvider = &Proxier{} - -// NewProxier returns a new Proxier given an iptables Interface instance. -// Because of the iptables logic, it is assumed that there is only a single Proxier active on a machine. -// An error will be returned if iptables fails to update or acquire the initial lock. -// Once a proxier is created, it will keep iptables up to date in the background and -// will not terminate if a particular iptables call fails. -func NewProxier(ipt utiliptables.Interface, - sysctl utilsysctl.Interface, - exec utilexec.Interface, - syncPeriod time.Duration, - minSyncPeriod time.Duration, - masqueradeAll bool, - masqueradeBit int, - clusterCIDR string, - hostname string, - nodeIP net.IP, - recorder record.EventRecorder, - healthzServer healthcheck.HealthzUpdater, -) (*Proxier, error) { - // check valid user input - if minSyncPeriod > syncPeriod { - return nil, fmt.Errorf("minSyncPeriod (%v) must be <= syncPeriod (%v)", minSyncPeriod, syncPeriod) - } - - // Set the route_localnet sysctl we need for - if err := sysctl.SetSysctl(sysctlRouteLocalnet, 1); err != nil { - return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err) - } - - // Proxy needs br_netfilter and bridge-nf-call-iptables=1 when containers - // are connected to a Linux bridge (but not SDN bridges). Until most - // plugins handle this, log when config is missing - if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 { - glog.Warningf("missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended") - } - - // Generate the masquerade mark to use for SNAT rules. - if masqueradeBit < 0 || masqueradeBit > 31 { - return nil, fmt.Errorf("invalid iptables-masquerade-bit %v not in [0, 31]", masqueradeBit) - } - masqueradeValue := 1 << uint(masqueradeBit) - masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue) - - if nodeIP == nil { - glog.Warningf("invalid nodeIP, initializing kube-proxy with 127.0.0.1 as nodeIP") - nodeIP = net.ParseIP("127.0.0.1") - } - - if len(clusterCIDR) == 0 { - glog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic") - } - - healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps - - proxier := &Proxier{ - portsMap: make(map[localPort]closeable), - serviceMap: make(proxyServiceMap), - serviceChanges: newServiceChangeMap(), - endpointsMap: make(proxyEndpointsMap), - endpointsChanges: newEndpointsChangeMap(hostname), - iptables: ipt, - masqueradeAll: masqueradeAll, - masqueradeMark: masqueradeMark, - exec: exec, - clusterCIDR: clusterCIDR, - hostname: hostname, - nodeIP: nodeIP, - portMapper: &listenPortOpener{}, - recorder: recorder, - healthChecker: healthChecker, - healthzServer: healthzServer, - precomputedProbabilities: make([]string, 0, 1001), - iptablesData: bytes.NewBuffer(nil), - filterChains: bytes.NewBuffer(nil), - filterRules: bytes.NewBuffer(nil), - natChains: bytes.NewBuffer(nil), - natRules: bytes.NewBuffer(nil), - } - burstSyncs := 2 - glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs) - proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) - return proxier, nil -} - -// CleanupLeftovers removes all iptables rules and chains created by the Proxier -// It returns true if an error was encountered. Errors are logged. -func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { - // Unlink the services chain. - args := []string{ - "-m", "comment", "--comment", "kubernetes service portals", - "-j", string(kubeServicesChain), - } - tableChainsWithJumpServices := []struct { - table utiliptables.Table - chain utiliptables.Chain - }{ - {utiliptables.TableFilter, utiliptables.ChainInput}, - {utiliptables.TableFilter, utiliptables.ChainOutput}, - {utiliptables.TableNAT, utiliptables.ChainOutput}, - {utiliptables.TableNAT, utiliptables.ChainPrerouting}, - } - for _, tc := range tableChainsWithJumpServices { - if err := ipt.DeleteRule(tc.table, tc.chain, args...); err != nil { - if !utiliptables.IsNotFoundError(err) { - glog.Errorf("Error removing pure-iptables proxy rule: %v", err) - encounteredError = true - } - } - } - - // Unlink the postrouting chain. - args = []string{ - "-m", "comment", "--comment", "kubernetes postrouting rules", - "-j", string(kubePostroutingChain), - } - if err := ipt.DeleteRule(utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { - if !utiliptables.IsNotFoundError(err) { - glog.Errorf("Error removing pure-iptables proxy rule: %v", err) - encounteredError = true - } - } - - // Flush and remove all of our chains. - iptablesData := bytes.NewBuffer(nil) - if err := ipt.SaveInto(utiliptables.TableNAT, iptablesData); err != nil { - glog.Errorf("Failed to execute iptables-save for %s: %v", utiliptables.TableNAT, err) - encounteredError = true - } else { - existingNATChains := utiliptables.GetChainLines(utiliptables.TableNAT, iptablesData.Bytes()) - natChains := bytes.NewBuffer(nil) - natRules := bytes.NewBuffer(nil) - writeLine(natChains, "*nat") - // Start with chains we know we need to remove. - for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} { - if _, found := existingNATChains[chain]; found { - chainString := string(chain) - writeLine(natChains, existingNATChains[chain]) // flush - writeLine(natRules, "-X", chainString) // delete - } - } - // Hunt for service and endpoint chains. - for chain := range existingNATChains { - chainString := string(chain) - if strings.HasPrefix(chainString, "KUBE-SVC-") || strings.HasPrefix(chainString, "KUBE-SEP-") || strings.HasPrefix(chainString, "KUBE-FW-") || strings.HasPrefix(chainString, "KUBE-XLB-") { - writeLine(natChains, existingNATChains[chain]) // flush - writeLine(natRules, "-X", chainString) // delete - } - } - writeLine(natRules, "COMMIT") - natLines := append(natChains.Bytes(), natRules.Bytes()...) - // Write it. - err = ipt.Restore(utiliptables.TableNAT, natLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) - if err != nil { - glog.Errorf("Failed to execute iptables-restore for %s: %v", utiliptables.TableNAT, err) - encounteredError = true - } - } - { - filterBuf := bytes.NewBuffer(nil) - writeLine(filterBuf, "*filter") - writeLine(filterBuf, fmt.Sprintf(":%s - [0:0]", kubeServicesChain)) - writeLine(filterBuf, fmt.Sprintf("-X %s", kubeServicesChain)) - writeLine(filterBuf, "COMMIT") - // Write it. - if err := ipt.Restore(utiliptables.TableFilter, filterBuf.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters); err != nil { - glog.Errorf("Failed to execute iptables-restore for %s: %v", utiliptables.TableFilter, err) - encounteredError = true - } - } - return encounteredError -} - -func computeProbability(n int) string { - return fmt.Sprintf("%0.5f", 1.0/float64(n)) -} - -// This assumes proxier.mu is held -func (proxier *Proxier) precomputeProbabilities(numberOfPrecomputed int) { - if len(proxier.precomputedProbabilities) == 0 { - proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, "<bad value>") - } - for i := len(proxier.precomputedProbabilities); i <= numberOfPrecomputed; i++ { - proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, computeProbability(i)) - } -} - -// This assumes proxier.mu is held -func (proxier *Proxier) probability(n int) string { - if n >= len(proxier.precomputedProbabilities) { - proxier.precomputeProbabilities(n) - } - return proxier.precomputedProbabilities[n] -} - -// Sync is called to synchronize the proxier state to iptables as soon as possible. -func (proxier *Proxier) Sync() { - proxier.syncRunner.Run() -} - -// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. -func (proxier *Proxier) SyncLoop() { - // Update healthz timestamp at beginning in case Sync() never succeeds. - if proxier.healthzServer != nil { - proxier.healthzServer.UpdateTimestamp() - } - proxier.syncRunner.Loop(wait.NeverStop) -} - -func (proxier *Proxier) setInitialized(value bool) { - var initialized int32 - if value { - initialized = 1 - } - atomic.StoreInt32(&proxier.initialized, initialized) -} - -func (proxier *Proxier) isInitialized() bool { - return atomic.LoadInt32(&proxier.initialized) > 0 -} - -func (proxier *Proxier) OnServiceAdd(service *api.Service) { - namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() { - proxier.syncRunner.Run() - } -} - -func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { - namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if proxier.serviceChanges.update(&namespacedName, oldService, service) && proxier.isInitialized() { - proxier.syncRunner.Run() - } -} - -func (proxier *Proxier) OnServiceDelete(service *api.Service) { - namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if proxier.serviceChanges.update(&namespacedName, service, nil) && proxier.isInitialized() { - proxier.syncRunner.Run() - } -} - -func (proxier *Proxier) OnServiceSynced() { - proxier.mu.Lock() - proxier.servicesSynced = true - proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) - proxier.mu.Unlock() - - // Sync unconditionally - this is called once per lifetime. - proxier.syncProxyRules() -} - -func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool { - // if ClusterIP is "None" or empty, skip proxying - if !helper.IsServiceIPSet(service) { - glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) - return true - } - // Even if ClusterIP is set, ServiceTypeExternalName services don't get proxied - if service.Spec.Type == api.ServiceTypeExternalName { - glog.V(3).Infof("Skipping service %s due to Type=ExternalName", svcName) - return true - } - return false -} - -// <serviceMap> is updated by this function (based on the given changes). -// <changes> map is cleared after applying them. -func updateServiceMap( - serviceMap proxyServiceMap, - changes *serviceChangeMap) (result updateServiceMapResult) { - result.staleServices = sets.NewString() - - func() { - changes.lock.Lock() - defer changes.lock.Unlock() - for _, change := range changes.items { - existingPorts := serviceMap.merge(change.current) - serviceMap.unmerge(change.previous, existingPorts, result.staleServices) - } - changes.items = make(map[types.NamespacedName]*serviceChange) - }() - - // TODO: If this will appear to be computationally expensive, consider - // computing this incrementally similarly to serviceMap. - result.hcServices = make(map[types.NamespacedName]uint16) - for svcPortName, info := range serviceMap { - if info.healthCheckNodePort != 0 { - result.hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort) - } - } - - return result -} - -func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) && proxier.isInitialized() { - proxier.syncRunner.Run() - } -} - -func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) && proxier.isInitialized() { - proxier.syncRunner.Run() - } -} - -func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) && proxier.isInitialized() { - proxier.syncRunner.Run() - } -} - -func (proxier *Proxier) OnEndpointsSynced() { - proxier.mu.Lock() - proxier.endpointsSynced = true - proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) - proxier.mu.Unlock() - - // Sync unconditionally - this is called once per lifetime. - proxier.syncProxyRules() -} - -// <endpointsMap> is updated by this function (based on the given changes). -// <changes> map is cleared after applying them. -func updateEndpointsMap( - endpointsMap proxyEndpointsMap, - changes *endpointsChangeMap, - hostname string) (result updateEndpointMapResult) { - result.staleEndpoints = make(map[endpointServicePair]bool) - result.staleServiceNames = make(map[proxy.ServicePortName]bool) - - func() { - changes.lock.Lock() - defer changes.lock.Unlock() - for _, change := range changes.items { - endpointsMap.unmerge(change.previous) - endpointsMap.merge(change.current) - detectStaleConnections(change.previous, change.current, result.staleEndpoints, result.staleServiceNames) - } - changes.items = make(map[types.NamespacedName]*endpointsChange) - }() - - if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) { - return - } - - // TODO: If this will appear to be computationally expensive, consider - // computing this incrementally similarly to endpointsMap. - result.hcEndpoints = make(map[types.NamespacedName]int) - localIPs := getLocalIPs(endpointsMap) - for nsn, ips := range localIPs { - result.hcEndpoints[nsn] = len(ips) - } - - return result -} - -// <staleEndpoints> and <staleServices> are modified by this function with detected stale connections. -func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool, staleServiceNames map[proxy.ServicePortName]bool) { - for svcPortName, epList := range oldEndpointsMap { - for _, ep := range epList { - stale := true - for i := range newEndpointsMap[svcPortName] { - if *newEndpointsMap[svcPortName][i] == *ep { - stale = false - break - } - } - if stale { - glog.V(4).Infof("Stale endpoint %v -> %v", svcPortName, ep.endpoint) - staleEndpoints[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPortName}] = true - } - } - } - - for svcPortName, epList := range newEndpointsMap { - // For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service. - if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 { - staleServiceNames[svcPortName] = true - } - } -} - -func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String { - localIPs := make(map[types.NamespacedName]sets.String) - for svcPortName := range endpointsMap { - for _, ep := range endpointsMap[svcPortName] { - if ep.isLocal { - nsn := svcPortName.NamespacedName - if localIPs[nsn] == nil { - localIPs[nsn] = sets.NewString() - } - localIPs[nsn].Insert(ep.IPPart()) // just the IP part - } - } - } - return localIPs -} - -// Translates single Endpoints object to proxyEndpointsMap. -// This function is used for incremental updated of endpointsMap. -// -// NOTE: endpoints object should NOT be modified. -func endpointsToEndpointsMap(endpoints *api.Endpoints, hostname string) proxyEndpointsMap { - if endpoints == nil { - return nil - } - - endpointsMap := make(proxyEndpointsMap) - // We need to build a map of portname -> all ip:ports for that - // portname. Explode Endpoints.Subsets[*] into this structure. - for i := range endpoints.Subsets { - ss := &endpoints.Subsets[i] - for i := range ss.Ports { - port := &ss.Ports[i] - if port.Port == 0 { - glog.Warningf("ignoring invalid endpoint port %s", port.Name) - continue - } - svcPortName := proxy.ServicePortName{ - NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, - Port: port.Name, - } - for i := range ss.Addresses { - addr := &ss.Addresses[i] - if addr.IP == "" { - glog.Warningf("ignoring invalid endpoint port %s with empty host", port.Name) - continue - } - epInfo := &endpointsInfo{ - endpoint: net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))), - isLocal: addr.NodeName != nil && *addr.NodeName == hostname, - } - endpointsMap[svcPortName] = append(endpointsMap[svcPortName], epInfo) - } - if glog.V(3) { - newEPList := []string{} - for _, ep := range endpointsMap[svcPortName] { - newEPList = append(newEPList, ep.endpoint) - } - glog.Infof("Setting endpoints for %q to %+v", svcPortName, newEPList) - } - } - } - return endpointsMap -} - -// Translates single Service object to proxyServiceMap. -// -// NOTE: service object should NOT be modified. -func serviceToServiceMap(service *api.Service) proxyServiceMap { - if service == nil { - return nil - } - svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if shouldSkipService(svcName, service) { - return nil - } - - serviceMap := make(proxyServiceMap) - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] - svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} - serviceMap[svcPortName] = newServiceInfo(svcPortName, servicePort, service) - } - return serviceMap -} - -// portProtoHash takes the ServicePortName and protocol for a service -// returns the associated 16 character hash. This is computed by hashing (sha256) -// then encoding to base32 and truncating to 16 chars. We do this because IPTables -// Chain Names must be <= 28 chars long, and the longer they are the harder they are to read. -func portProtoHash(servicePortName string, protocol string) string { - hash := sha256.Sum256([]byte(servicePortName + protocol)) - encoded := base32.StdEncoding.EncodeToString(hash[:]) - return encoded[:16] -} - -// servicePortChainName takes the ServicePortName for a service and -// returns the associated iptables chain. This is computed by hashing (sha256) -// then encoding to base32 and truncating with the prefix "KUBE-SVC-". -func servicePortChainName(servicePortName string, protocol string) utiliptables.Chain { - return utiliptables.Chain("KUBE-SVC-" + portProtoHash(servicePortName, protocol)) -} - -// serviceFirewallChainName takes the ServicePortName for a service and -// returns the associated iptables chain. This is computed by hashing (sha256) -// then encoding to base32 and truncating with the prefix "KUBE-FW-". -func serviceFirewallChainName(servicePortName string, protocol string) utiliptables.Chain { - return utiliptables.Chain("KUBE-FW-" + portProtoHash(servicePortName, protocol)) -} - -// serviceLBPortChainName takes the ServicePortName for a service and -// returns the associated iptables chain. This is computed by hashing (sha256) -// then encoding to base32 and truncating with the prefix "KUBE-XLB-". We do -// this because IPTables Chain Names must be <= 28 chars long, and the longer -// they are the harder they are to read. -func serviceLBChainName(servicePortName string, protocol string) utiliptables.Chain { - return utiliptables.Chain("KUBE-XLB-" + portProtoHash(servicePortName, protocol)) -} - -// This is the same as servicePortChainName but with the endpoint included. -func servicePortEndpointChainName(servicePortName string, protocol string, endpoint string) utiliptables.Chain { - hash := sha256.Sum256([]byte(servicePortName + protocol + endpoint)) - encoded := base32.StdEncoding.EncodeToString(hash[:]) - return utiliptables.Chain("KUBE-SEP-" + encoded[:16]) -} - -type endpointServicePair struct { - endpoint string - servicePortName proxy.ServicePortName -} - -func (esp *endpointServicePair) IPPart() string { - if index := strings.Index(esp.endpoint, ":"); index != -1 { - return esp.endpoint[0:index] - } - return esp.endpoint -} - -const noConnectionToDelete = "0 flow entries have been deleted" - -// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we -// risk sending more traffic to it, all of which will be lost (because UDP). -// This assumes the proxier mutex is held -func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServicePair]bool) { - for epSvcPair := range connectionMap { - if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP { - endpointIP := epSvcPair.endpoint[0:strings.Index(epSvcPair.endpoint, ":")] - glog.V(2).Infof("Deleting connection tracking state for service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP) - err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp") - if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { - // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. - // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it - // is expensive to baby sit all udp connections to kubernetes services. - glog.Errorf("conntrack return with error: %v", err) - } - } - } -} - -// This is where all of the iptables-save/restore calls happen. -// The only other iptables rules are those that are setup in iptablesInit() -// This assumes proxier.mu is NOT held -func (proxier *Proxier) syncProxyRules() { - proxier.mu.Lock() - defer proxier.mu.Unlock() - - start := time.Now() - defer func() { - SyncProxyRulesLatency.Observe(sinceInMicroseconds(start)) - glog.V(4).Infof("syncProxyRules took %v", time.Since(start)) - }() - // don't sync rules till we've received services and endpoints - if !proxier.endpointsSynced || !proxier.servicesSynced { - glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master") - return - } - - // We assume that if this was called, we really want to sync them, - // even if nothing changed in the meantime. In other words, callers are - // responsible for detecting no-op changes and not calling this function. - serviceUpdateResult := updateServiceMap( - proxier.serviceMap, &proxier.serviceChanges) - endpointUpdateResult := updateEndpointsMap( - proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname) - - staleServices := serviceUpdateResult.staleServices - // merge stale services gathered from updateEndpointsMap - for svcPortName := range endpointUpdateResult.staleServiceNames { - if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.protocol == api.ProtocolUDP { - glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.clusterIP.String()) - staleServices.Insert(svcInfo.clusterIP.String()) - } - } - - glog.V(3).Infof("Syncing iptables rules") - - // Create and link the kube services chain. - { - tablesNeedServicesChain := []utiliptables.Table{utiliptables.TableFilter, utiliptables.TableNAT} - for _, table := range tablesNeedServicesChain { - if _, err := proxier.iptables.EnsureChain(table, kubeServicesChain); err != nil { - glog.Errorf("Failed to ensure that %s chain %s exists: %v", table, kubeServicesChain, err) - return - } - } - - tableChainsNeedJumpServices := []struct { - table utiliptables.Table - chain utiliptables.Chain - }{ - {utiliptables.TableFilter, utiliptables.ChainInput}, - {utiliptables.TableFilter, utiliptables.ChainOutput}, - {utiliptables.TableNAT, utiliptables.ChainOutput}, - {utiliptables.TableNAT, utiliptables.ChainPrerouting}, - } - comment := "kubernetes service portals" - args := []string{"-m", "comment", "--comment", comment, "-j", string(kubeServicesChain)} - for _, tc := range tableChainsNeedJumpServices { - if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil { - glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeServicesChain, err) - return - } - } - } - - // Create and link the kube postrouting chain. - { - if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, kubePostroutingChain); err != nil { - glog.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubePostroutingChain, err) - return - } - - comment := "kubernetes postrouting rules" - args := []string{"-m", "comment", "--comment", comment, "-j", string(kubePostroutingChain)} - if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { - glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, kubePostroutingChain, err) - return - } - } - - // - // Below this point we will not return until we try to write the iptables rules. - // - - // Get iptables-save output so we can check for existing chains and rules. - // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore - existingFilterChains := make(map[utiliptables.Chain]string) - proxier.iptablesData.Reset() - err := proxier.iptables.SaveInto(utiliptables.TableFilter, proxier.iptablesData) - if err != nil { // if we failed to get any rules - glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) - } else { // otherwise parse the output - existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, proxier.iptablesData.Bytes()) - } - - existingNATChains := make(map[utiliptables.Chain]string) - proxier.iptablesData.Reset() - err = proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData) - if err != nil { // if we failed to get any rules - glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) - } else { // otherwise parse the output - existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, proxier.iptablesData.Bytes()) - } - - // Reset all buffers used later. - // This is to avoid memory reallocations and thus improve performance. - proxier.filterChains.Reset() - proxier.filterRules.Reset() - proxier.natChains.Reset() - proxier.natRules.Reset() - - // Write table headers. - writeLine(proxier.filterChains, "*filter") - writeLine(proxier.natChains, "*nat") - - // Make sure we keep stats for the top-level chains, if they existed - // (which most should have because we created them above). - if chain, ok := existingFilterChains[kubeServicesChain]; ok { - writeLine(proxier.filterChains, chain) - } else { - writeLine(proxier.filterChains, utiliptables.MakeChainLine(kubeServicesChain)) - } - if chain, ok := existingNATChains[kubeServicesChain]; ok { - writeLine(proxier.natChains, chain) - } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(kubeServicesChain)) - } - if chain, ok := existingNATChains[kubeNodePortsChain]; ok { - writeLine(proxier.natChains, chain) - } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(kubeNodePortsChain)) - } - if chain, ok := existingNATChains[kubePostroutingChain]; ok { - writeLine(proxier.natChains, chain) - } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(kubePostroutingChain)) - } - if chain, ok := existingNATChains[KubeMarkMasqChain]; ok { - writeLine(proxier.natChains, chain) - } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(KubeMarkMasqChain)) - } - - // Install the kubernetes-specific postrouting rules. We use a whole chain for - // this so that it is easier to flush and change, for example if the mark - // value should ever change. - writeLine(proxier.natRules, []string{ - "-A", string(kubePostroutingChain), - "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`, - "-m", "mark", "--mark", proxier.masqueradeMark, - "-j", "MASQUERADE", - }...) - - // Install the kubernetes-specific masquerade mark rule. We use a whole chain for - // this so that it is easier to flush and change, for example if the mark - // value should ever change. - writeLine(proxier.natRules, []string{ - "-A", string(KubeMarkMasqChain), - "-j", "MARK", "--set-xmark", proxier.masqueradeMark, - }...) - - // Accumulate NAT chains to keep. - activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set - - // Accumulate the set of local ports that we will be holding open once this update is complete - replacementPortsMap := map[localPort]closeable{} - - // We are creating those slices ones here to avoid memory reallocations - // in every loop. Note that reuse the memory, instead of doing: - // slice = <some new slice> - // you should always do one of the below: - // slice = slice[:0] // and then append to it - // slice = append(slice[:0], ...) - endpoints := make([]*endpointsInfo, 0) - endpointChains := make([]utiliptables.Chain, 0) - // To avoid growing this slice, we arbitrarily set its size to 64, - // there is never more than that many arguments for a single line. - // Note that even if we go over 64, it will still be correct - it - // is just for efficiency, not correctness. - args := make([]string, 64) - - // Build rules for each service. - var svcNameString string - for svcName, svcInfo := range proxier.serviceMap { - protocol := strings.ToLower(string(svcInfo.protocol)) - svcNameString = svcInfo.serviceNameString - - // Create the per-service chain, retaining counters if possible. - svcChain := svcInfo.servicePortChainName - if chain, ok := existingNATChains[svcChain]; ok { - writeLine(proxier.natChains, chain) - } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(svcChain)) - } - activeNATChains[svcChain] = true - - svcXlbChain := svcInfo.serviceLBChainName - if svcInfo.onlyNodeLocalEndpoints { - // Only for services request OnlyLocal traffic - // create the per-service LB chain, retaining counters if possible. - if lbChain, ok := existingNATChains[svcXlbChain]; ok { - writeLine(proxier.natChains, lbChain) - } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(svcXlbChain)) - } - activeNATChains[svcXlbChain] = true - } else if activeNATChains[svcXlbChain] { - // Cleanup the previously created XLB chain for this service - delete(activeNATChains, svcXlbChain) - } - - // Capture the clusterIP. - args = append(args[:0], - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString), - "-m", protocol, "-p", protocol, - "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), - "--dport", strconv.Itoa(svcInfo.port), - ) - if proxier.masqueradeAll { - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) - } else if len(proxier.clusterCIDR) > 0 { - // This masquerades off-cluster traffic to a service VIP. The idea - // is that you can establish a static route for your Service range, - // routing to any node, and that node will bridge into the Service - // for you. Since that might bounce off-node, we masquerade here. - // If/when we support "Local" policy for VIPs, we should update this. - writeLine(proxier.natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...) - } - writeLine(proxier.natRules, append(args, "-j", string(svcChain))...) - - // Capture externalIPs. - for _, externalIP := range svcInfo.externalIPs { - // If the "external" IP happens to be an IP that is local to this - // machine, hold the local port open so no other process can open it - // (because the socket might open but it would never work). - if local, err := isLocalIP(externalIP); err != nil { - glog.Errorf("can't determine if IP is local, assuming not: %v", err) - } else if local { - lp := localPort{ - desc: "externalIP for " + svcNameString, - ip: externalIP, - port: svcInfo.port, - protocol: protocol, - } - if proxier.portsMap[lp] != nil { - glog.V(4).Infof("Port %s was open before and is still needed", lp.String()) - replacementPortsMap[lp] = proxier.portsMap[lp] - } else { - socket, err := proxier.portMapper.OpenLocalPort(&lp) - if err != nil { - msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err) - - proxier.recorder.Eventf( - &clientv1.ObjectReference{ - Kind: "Node", - Name: proxier.hostname, - UID: types.UID(proxier.hostname), - Namespace: "", - }, api.EventTypeWarning, err.Error(), msg) - glog.Error(msg) - continue - } - replacementPortsMap[lp] = socket - } - } // We're holding the port, so it's OK to install iptables rules. - args = append(args[:0], - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString), - "-m", protocol, "-p", protocol, - "-d", fmt.Sprintf("%s/32", externalIP), - "--dport", strconv.Itoa(svcInfo.port), - ) - // We have to SNAT packets to external IPs. - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) - - // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container) - // nor from a local process to be forwarded to the service. - // This rule roughly translates to "all traffic from off-machine". - // This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later. - externalTrafficOnlyArgs := append(args, - "-m", "physdev", "!", "--physdev-is-in", - "-m", "addrtype", "!", "--src-type", "LOCAL") - writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", string(svcChain))...) - dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL") - // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local. - // This covers cases like GCE load-balancers which get added to the local routing table. - writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...) - - // If the service has no endpoints then reject packets coming via externalIP - // Install ICMP Reject rule in filter table for destination=externalIP and dport=svcport - if len(proxier.endpointsMap[svcName]) == 0 { - writeLine(proxier.filterRules, - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), - "-m", protocol, "-p", protocol, - "-d", fmt.Sprintf("%s/32", externalIP), - "--dport", strconv.Itoa(svcInfo.port), - "-j", "REJECT", - ) - } - } - - // Capture load-balancer ingress. - fwChain := svcInfo.serviceFirewallChainName - for _, ingress := range svcInfo.loadBalancerStatus.Ingress { - if ingress.IP != "" { - // create service firewall chain - if chain, ok := existingNATChains[fwChain]; ok { - writeLine(proxier.natChains, chain) - } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(fwChain)) - } - activeNATChains[fwChain] = true - // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. - // This currently works for loadbalancers that preserves source ips. - // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. - - args = append(args[:0], - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString), - "-m", protocol, "-p", protocol, - "-d", fmt.Sprintf("%s/32", ingress.IP), - "--dport", strconv.Itoa(svcInfo.port), - ) - // jump to service firewall chain - writeLine(proxier.natRules, append(args, "-j", string(fwChain))...) - - args = append(args[:0], - "-A", string(fwChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString), - ) - - // Each source match rule in the FW chain may jump to either the SVC or the XLB chain - chosenChain := svcXlbChain - // If we are proxying globally, we need to masquerade in case we cross nodes. - // If we are proxying only locally, we can retain the source IP. - if !svcInfo.onlyNodeLocalEndpoints { - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) - chosenChain = svcChain - } - - if len(svcInfo.loadBalancerSourceRanges) == 0 { - // allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain - writeLine(proxier.natRules, append(args, "-j", string(chosenChain))...) - } else { - // firewall filter based on each source range - allowFromNode := false - for _, src := range svcInfo.loadBalancerSourceRanges { - writeLine(proxier.natRules, append(args, "-s", src, "-j", string(chosenChain))...) - // ignore error because it has been validated - _, cidr, _ := net.ParseCIDR(src) - if cidr.Contains(proxier.nodeIP) { - allowFromNode = true - } - } - // generally, ip route rule was added to intercept request to loadbalancer vip from the - // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly. - // Need to add the following rule to allow request on host. - if allowFromNode { - writeLine(proxier.natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", string(chosenChain))...) - } - } - - // If the packet was able to reach the end of firewall chain, then it did not get DNATed. - // It means the packet cannot go thru the firewall, then mark it for DROP - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...) - } - } - - // Capture nodeports. If we had more than 2 rules it might be - // worthwhile to make a new per-service chain for nodeport rules, but - // with just 2 rules it ends up being a waste and a cognitive burden. - if svcInfo.nodePort != 0 { - // Hold the local port open so no other process can open it - // (because the socket might open but it would never work). - lp := localPort{ - desc: "nodePort for " + svcNameString, - ip: "", - port: svcInfo.nodePort, - protocol: protocol, - } - if proxier.portsMap[lp] != nil { - glog.V(4).Infof("Port %s was open before and is still needed", lp.String()) - replacementPortsMap[lp] = proxier.portsMap[lp] - } else { - socket, err := proxier.portMapper.OpenLocalPort(&lp) - if err != nil { - glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err) - continue - } - if lp.protocol == "udp" { - proxier.clearUDPConntrackForPort(lp.port) - } - replacementPortsMap[lp] = socket - } // We're holding the port, so it's OK to install iptables rules. - - args = append(args[:0], - "-A", string(kubeNodePortsChain), - "-m", "comment", "--comment", svcNameString, - "-m", protocol, "-p", protocol, - "--dport", strconv.Itoa(svcInfo.nodePort), - ) - if !svcInfo.onlyNodeLocalEndpoints { - // Nodeports need SNAT, unless they're local. - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) - // Jump to the service chain. - writeLine(proxier.natRules, append(args, "-j", string(svcChain))...) - } else { - // TODO: Make all nodePorts jump to the firewall chain. - // Currently we only create it for loadbalancers (#33586). - writeLine(proxier.natRules, append(args, "-j", string(svcXlbChain))...) - } - - // If the service has no endpoints then reject packets. The filter - // table doesn't currently have the same per-service structure that - // the nat table does, so we just stick this into the kube-services - // chain. - if len(proxier.endpointsMap[svcName]) == 0 { - writeLine(proxier.filterRules, - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), - "-m", "addrtype", "--dst-type", "LOCAL", - "-m", protocol, "-p", protocol, - "--dport", strconv.Itoa(svcInfo.nodePort), - "-j", "REJECT", - ) - } - } - - // If the service has no endpoints then reject packets. - if len(proxier.endpointsMap[svcName]) == 0 { - writeLine(proxier.filterRules, - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), - "-m", protocol, "-p", protocol, - "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), - "--dport", strconv.Itoa(svcInfo.port), - "-j", "REJECT", - ) - continue - } - - // From here on, we assume there are active endpoints. - - // Generate the per-endpoint chains. We do this in multiple passes so we - // can group rules together. - // These two slices parallel each other - keep in sync - endpoints = endpoints[:0] - endpointChains = endpointChains[:0] - var endpointChain utiliptables.Chain - for _, ep := range proxier.endpointsMap[svcName] { - endpoints = append(endpoints, ep) - endpointChain = ep.endpointChain(svcNameString, protocol) - endpointChains = append(endpointChains, endpointChain) - - // Create the endpoint chain, retaining counters if possible. - if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok { - writeLine(proxier.natChains, chain) - } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain)) - } - activeNATChains[endpointChain] = true - } - - // First write session affinity rules, if applicable. - if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { - for _, endpointChain := range endpointChains { - writeLine(proxier.natRules, - "-A", string(svcChain), - "-m", "comment", "--comment", svcNameString, - "-m", "recent", "--name", string(endpointChain), - "--rcheck", "--seconds", strconv.Itoa(svcInfo.stickyMaxAgeMinutes*60), "--reap", - "-j", string(endpointChain)) - } - } - - // Now write loadbalancing & DNAT rules. - n := len(endpointChains) - for i, endpointChain := range endpointChains { - // Balancing rules in the per-service chain. - args = append(args[:0], []string{ - "-A", string(svcChain), - "-m", "comment", "--comment", svcNameString, - }...) - if i < (n - 1) { - // Each rule is a probabilistic match. - args = append(args, - "-m", "statistic", - "--mode", "random", - "--probability", proxier.probability(n-i)) - } - // The final (or only if n == 1) rule is a guaranteed match. - args = append(args, "-j", string(endpointChain)) - writeLine(proxier.natRules, args...) - - // Rules in the per-endpoint chain. - args = append(args[:0], - "-A", string(endpointChain), - "-m", "comment", "--comment", svcNameString, - ) - // Handle traffic that loops back to the originator with SNAT. - writeLine(proxier.natRules, append(args, - "-s", fmt.Sprintf("%s/32", endpoints[i].IPPart()), - "-j", string(KubeMarkMasqChain))...) - // Update client-affinity lists. - if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { - args = append(args, "-m", "recent", "--name", string(endpointChain), "--set") - } - // DNAT to final destination. - args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].endpoint) - writeLine(proxier.natRules, args...) - } - - // The logic below this applies only if this service is marked as OnlyLocal - if !svcInfo.onlyNodeLocalEndpoints { - continue - } - - // Now write ingress loadbalancing & DNAT rules only for services that request OnlyLocal traffic. - // TODO - This logic may be combinable with the block above that creates the svc balancer chain - localEndpoints := make([]*endpointsInfo, 0) - localEndpointChains := make([]utiliptables.Chain, 0) - for i := range endpointChains { - if endpoints[i].isLocal { - // These slices parallel each other; must be kept in sync - localEndpoints = append(localEndpoints, endpoints[i]) - localEndpointChains = append(localEndpointChains, endpointChains[i]) - } - } - // First rule in the chain redirects all pod -> external VIP traffic to the - // Service's ClusterIP instead. This happens whether or not we have local - // endpoints; only if clusterCIDR is specified - if len(proxier.clusterCIDR) > 0 { - args = append(args[:0], - "-A", string(svcXlbChain), - "-m", "comment", "--comment", - `"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`, - "-s", proxier.clusterCIDR, - "-j", string(svcChain), - ) - writeLine(proxier.natRules, args...) - } - - numLocalEndpoints := len(localEndpointChains) - if numLocalEndpoints == 0 { - // Blackhole all traffic since there are no local endpoints - args = append(args[:0], - "-A", string(svcXlbChain), - "-m", "comment", "--comment", - fmt.Sprintf(`"%s has no local endpoints"`, svcNameString), - "-j", - string(KubeMarkDropChain), - ) - writeLine(proxier.natRules, args...) - } else { - // Setup probability filter rules only over local endpoints - for i, endpointChain := range localEndpointChains { - // Balancing rules in the per-service chain. - args = append(args[:0], - "-A", string(svcXlbChain), - "-m", "comment", "--comment", - fmt.Sprintf(`"Balancing rule %d for %s"`, i, svcNameString), - ) - if i < (numLocalEndpoints - 1) { - // Each rule is a probabilistic match. - args = append(args, - "-m", "statistic", - "--mode", "random", - "--probability", proxier.probability(numLocalEndpoints-i)) - } - // The final (or only if n == 1) rule is a guaranteed match. - args = append(args, "-j", string(endpointChain)) - writeLine(proxier.natRules, args...) - } - } - } - - // Delete chains no longer in use. - for chain := range existingNATChains { - if !activeNATChains[chain] { - chainString := string(chain) - if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") && !strings.HasPrefix(chainString, "KUBE-XLB-") { - // Ignore chains that aren't ours. - continue - } - // We must (as per iptables) write a chain-line for it, which has - // the nice effect of flushing the chain. Then we can remove the - // chain. - writeLine(proxier.natChains, existingNATChains[chain]) - writeLine(proxier.natRules, "-X", chainString) - } - } - - // Finally, tail-call to the nodeports chain. This needs to be after all - // other service portal rules. - writeLine(proxier.natRules, - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`, - "-m", "addrtype", "--dst-type", "LOCAL", - "-j", string(kubeNodePortsChain)) - - // Write the end-of-table markers. - writeLine(proxier.filterRules, "COMMIT") - writeLine(proxier.natRules, "COMMIT") - - // Sync rules. - // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table - proxier.iptablesData.Reset() - proxier.iptablesData.Write(proxier.filterChains.Bytes()) - proxier.iptablesData.Write(proxier.filterRules.Bytes()) - proxier.iptablesData.Write(proxier.natChains.Bytes()) - proxier.iptablesData.Write(proxier.natRules.Bytes()) - - glog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes()) - err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters) - if err != nil { - glog.Errorf("Failed to execute iptables-restore: %v", err) - glog.V(2).Infof("Rules:\n%s", proxier.iptablesData.Bytes()) - // Revert new local ports. - revertPorts(replacementPortsMap, proxier.portsMap) - return - } - - // Close old local ports and save new ones. - for k, v := range proxier.portsMap { - if replacementPortsMap[k] == nil { - v.Close() - } - } - proxier.portsMap = replacementPortsMap - - // Update healthz timestamp. - if proxier.healthzServer != nil { - proxier.healthzServer.UpdateTimestamp() - } - - // Update healthchecks. The endpoints list might include services that are - // not "OnlyLocal", but the services list will not, and the healthChecker - // will just drop those endpoints. - if err := proxier.healthChecker.SyncServices(serviceUpdateResult.hcServices); err != nil { - glog.Errorf("Error syncing healtcheck services: %v", err) - } - if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil { - glog.Errorf("Error syncing healthcheck endoints: %v", err) - } - - // Finish housekeeping. - // TODO: these and clearUDPConntrackForPort() could be made more consistent. - utilproxy.DeleteServiceConnections(proxier.exec, staleServices.List()) - proxier.deleteEndpointConnections(endpointUpdateResult.staleEndpoints) -} - -// Clear UDP conntrack for port or all conntrack entries when port equal zero. -// When a packet arrives, it will not go through NAT table again, because it is not "the first" packet. -// The solution is clearing the conntrack. Known issus: -// https://github.com/docker/docker/issues/8795 -// https://github.com/kubernetes/kubernetes/issues/31983 -func (proxier *Proxier) clearUDPConntrackForPort(port int) { - glog.V(2).Infof("Deleting conntrack entries for udp connections") - if port > 0 { - err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "-p", "udp", "--dport", strconv.Itoa(port)) - if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { - glog.Errorf("conntrack return with error: %v", err) - } - } else { - glog.Errorf("Wrong port number. The port number must be greater than zero") - } -} - -// Join all words with spaces, terminate with newline and write to buf. -func writeLine(buf *bytes.Buffer, words ...string) { - // We avoid strings.Join for performance reasons. - for i := range words { - buf.WriteString(words[i]) - if i < len(words)-1 { - buf.WriteByte(' ') - } else { - buf.WriteByte('\n') - } - } -} - -func isLocalIP(ip string) (bool, error) { - addrs, err := net.InterfaceAddrs() - if err != nil { - return false, err - } - for i := range addrs { - intf, _, err := net.ParseCIDR(addrs[i].String()) - if err != nil { - return false, err - } - if net.ParseIP(ip).Equal(intf) { - return true, nil - } - } - return false, nil -} - -func openLocalPort(lp *localPort) (closeable, error) { - // For ports on node IPs, open the actual port and hold it, even though we - // use iptables to redirect traffic. - // This ensures a) that it's safe to use that port and b) that (a) stays - // true. The risk is that some process on the node (e.g. sshd or kubelet) - // is using a port and we give that same port out to a Service. That would - // be bad because iptables would silently claim the traffic but the process - // would never know. - // NOTE: We should not need to have a real listen()ing socket - bind() - // should be enough, but I can't figure out a way to e2e test without - // it. Tools like 'ss' and 'netstat' do not show sockets that are - // bind()ed but not listen()ed, and at least the default debian netcat - // has no way to avoid about 10 seconds of retries. - var socket closeable - switch lp.protocol { - case "tcp": - listener, err := net.Listen("tcp", net.JoinHostPort(lp.ip, strconv.Itoa(lp.port))) - if err != nil { - return nil, err - } - socket = listener - case "udp": - addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(lp.ip, strconv.Itoa(lp.port))) - if err != nil { - return nil, err - } - conn, err := net.ListenUDP("udp", addr) - if err != nil { - return nil, err - } - socket = conn - default: - return nil, fmt.Errorf("unknown protocol %q", lp.protocol) - } - glog.V(2).Infof("Opened local port %s", lp.String()) - return socket, nil -} - -// revertPorts is closing ports in replacementPortsMap but not in originalPortsMap. In other words, it only -// closes the ports opened in this sync. -func revertPorts(replacementPortsMap, originalPortsMap map[localPort]closeable) { - for k, v := range replacementPortsMap { - // Only close newly opened local ports - leave ones that were open before this update - if originalPortsMap[k] == nil { - glog.V(2).Infof("Closing local port %s after iptables-restore failure", k.String()) - v.Close() - } - } -} diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/types.go b/vendor/k8s.io/kubernetes/pkg/proxy/types.go deleted file mode 100644 index 578baff69..000000000 --- a/vendor/k8s.io/kubernetes/pkg/proxy/types.go +++ /dev/null @@ -1,44 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package proxy - -import ( - "fmt" - - "k8s.io/apimachinery/pkg/types" -) - -// ProxyProvider is the interface provided by proxier implementations. -type ProxyProvider interface { - // Sync immediately synchronizes the ProxyProvider's current state to iptables. - Sync() - // SyncLoop runs periodic work. - // This is expected to run as a goroutine or as the main loop of the app. - // It does not return. - SyncLoop() -} - -// ServicePortName carries a namespace + name + portname. This is the unique -// identfier for a load-balanced service. -type ServicePortName struct { - types.NamespacedName - Port string -} - -func (spn ServicePortName) String() string { - return fmt.Sprintf("%s:%s", spn.NamespacedName.String(), spn.Port) -} diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/util/conntrack.go b/vendor/k8s.io/kubernetes/pkg/proxy/util/conntrack.go deleted file mode 100644 index 436045ecb..000000000 --- a/vendor/k8s.io/kubernetes/pkg/proxy/util/conntrack.go +++ /dev/null @@ -1,58 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "fmt" - "strings" - - "k8s.io/kubernetes/pkg/util/exec" - - "github.com/golang/glog" -) - -// Utilities for dealing with conntrack - -const noConnectionToDelete = "0 flow entries have been deleted" - -// DeleteServiceConnection uses the conntrack tool to delete the conntrack entries -// for the UDP connections specified by the given service IPs -func DeleteServiceConnections(execer exec.Interface, svcIPs []string) { - for _, ip := range svcIPs { - glog.V(2).Infof("Deleting connection tracking state for service IP %s", ip) - err := ExecConntrackTool(execer, "-D", "--orig-dst", ip, "-p", "udp") - if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { - // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. - // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it - // is expensive to baby-sit all udp connections to kubernetes services. - glog.Errorf("conntrack returned error: %v", err) - } - } -} - -// ExecConntrackTool executes the conntrack tool using the given parameters -func ExecConntrackTool(execer exec.Interface, parameters ...string) error { - conntrackPath, err := execer.LookPath("conntrack") - if err != nil { - return fmt.Errorf("error looking for path of conntrack: %v", err) - } - output, err := execer.Command(conntrackPath, parameters...).CombinedOutput() - if err != nil { - return fmt.Errorf("conntrack command returned: %q, error message: %s", string(output), err) - } - return nil -} diff --git a/vendor/k8s.io/kubernetes/pkg/util/async/bounded_frequency_runner.go b/vendor/k8s.io/kubernetes/pkg/util/async/bounded_frequency_runner.go deleted file mode 100644 index da6fc2a4f..000000000 --- a/vendor/k8s.io/kubernetes/pkg/util/async/bounded_frequency_runner.go +++ /dev/null @@ -1,239 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package async - -import ( - "fmt" - "sync" - "time" - - "k8s.io/client-go/util/flowcontrol" - - "github.com/golang/glog" -) - -// BoundedFrequencyRunner manages runs of a user-provided function. -// See NewBoundedFrequencyRunner for examples. -type BoundedFrequencyRunner struct { - name string // the name of this instance - minInterval time.Duration // the min time between runs, modulo bursts - maxInterval time.Duration // the max time between runs - - run chan struct{} // try an async run - - mu sync.Mutex // guards runs of fn and all mutations - fn func() // function to run - lastRun time.Time // time of last run - timer timer // timer for deferred runs - limiter rateLimiter // rate limiter for on-demand runs -} - -// designed so that flowcontrol.RateLimiter satisfies -type rateLimiter interface { - TryAccept() bool - Stop() -} - -type nullLimiter struct{} - -func (nullLimiter) TryAccept() bool { - return true -} - -func (nullLimiter) Stop() {} - -var _ rateLimiter = nullLimiter{} - -// for testing -type timer interface { - // C returns the timer's selectable channel. - C() <-chan time.Time - - // See time.Timer.Reset. - Reset(d time.Duration) bool - - // See time.Timer.Stop. - Stop() bool - - // See time.Now. - Now() time.Time - - // See time.Since. - Since(t time.Time) time.Duration - - // See time.Sleep. - Sleep(d time.Duration) -} - -// implement our timer in terms of std time.Timer. -type realTimer struct { - *time.Timer -} - -func (rt realTimer) C() <-chan time.Time { - return rt.Timer.C -} - -func (rt realTimer) Now() time.Time { - return time.Now() -} - -func (rt realTimer) Since(t time.Time) time.Duration { - return time.Since(t) -} - -func (rt realTimer) Sleep(d time.Duration) { - time.Sleep(d) -} - -var _ timer = realTimer{} - -// NewBoundedFrequencyRunner creates a new BoundedFrequencyRunner instance, -// which will manage runs of the specified function. -// -// All runs will be async to the caller of BoundedFrequencyRunner.Run, but -// multiple runs are serialized. If the function needs to hold locks, it must -// take them internally. -// -// Runs of the funtion will have at least minInterval between them (from -// completion to next start), except that up to bursts may be allowed. Burst -// runs are "accumulated" over time, one per minInterval up to burstRuns total. -// This can be used, for example, to mitigate the impact of expensive operations -// being called in response to user-initiated operations. Run requests that -// would violate the minInterval are coallesced and run at the next opportunity. -// -// The function will be run at least once per maxInterval. For example, this can -// force periodic refreshes of state in the absence of anyone calling Run. -// -// Examples: -// -// NewBoundedFrequencyRunner("name", fn, time.Second, 5*time.Second, 1) -// - fn will have at least 1 second between runs -// - fn will have no more than 5 seconds between runs -// -// NewBoundedFrequencyRunner("name", fn, 3*time.Second, 10*time.Second, 3) -// - fn will have at least 3 seconds between runs, with up to 3 burst runs -// - fn will have no more than 10 seconds between runs -// -// The maxInterval must be greater than or equal to the minInterval, If the -// caller passes a maxInterval less than minInterval, this function will panic. -func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner { - timer := realTimer{Timer: time.NewTimer(0)} // will tick immediately - <-timer.C() // consume the first tick - return construct(name, fn, minInterval, maxInterval, burstRuns, timer) -} - -// Make an instance with dependencies injected. -func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner { - if maxInterval < minInterval { - panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, minInterval, maxInterval)) - } - if timer == nil { - panic(fmt.Sprintf("%s: timer must be non-nil", name)) - } - - bfr := &BoundedFrequencyRunner{ - name: name, - fn: fn, - minInterval: minInterval, - maxInterval: maxInterval, - run: make(chan struct{}, 1), - timer: timer, - } - if minInterval == 0 { - bfr.limiter = nullLimiter{} - } else { - // allow burst updates in short succession - qps := float32(time.Second) / float32(minInterval) - bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer) - } - return bfr -} - -// Loop handles the periodic timer and run requests. This is expected to be -// called as a goroutine. -func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) { - glog.V(3).Infof("%s Loop running", bfr.name) - bfr.timer.Reset(bfr.maxInterval) - for { - select { - case <-stop: - bfr.stop() - glog.V(3).Infof("%s Loop stopping", bfr.name) - return - case <-bfr.timer.C(): - bfr.tryRun() - case <-bfr.run: - bfr.tryRun() - } - } -} - -// Run the function as soon as possible. If this is called while Loop is not -// running, the call may be deferred indefinitely. -// If there is already a queued request to call the underlying function, it -// may be dropped - it is just guaranteed that we will try calling the -// underlying function as soon as possible starting from now. -func (bfr *BoundedFrequencyRunner) Run() { - // If it takes a lot of time to run the underlying function, noone is really - // processing elements from <run> channel. So to avoid blocking here on the - // putting element to it, we simply skip it if there is already an element - // in it. - select { - case bfr.run <- struct{}{}: - default: - } -} - -// assumes the lock is not held -func (bfr *BoundedFrequencyRunner) stop() { - bfr.mu.Lock() - defer bfr.mu.Unlock() - bfr.limiter.Stop() - bfr.timer.Stop() -} - -// assumes the lock is not held -func (bfr *BoundedFrequencyRunner) tryRun() { - bfr.mu.Lock() - defer bfr.mu.Unlock() - - if bfr.limiter.TryAccept() { - // We're allowed to run the function right now. - bfr.fn() - bfr.lastRun = bfr.timer.Now() - bfr.timer.Stop() - bfr.timer.Reset(bfr.maxInterval) - glog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval) - return - } - - // It can't run right now, figure out when it can run next. - - elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run - nextPossible := bfr.minInterval - elapsed // time to next possible run - nextScheduled := bfr.maxInterval - elapsed // time to next periodic run - glog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled) - - if nextPossible < nextScheduled { - // Set the timer for ASAP, but don't drain here. Assuming Loop is running, - // it might get a delivery in the mean time, but that is OK. - bfr.timer.Stop() - bfr.timer.Reset(nextPossible) - glog.V(3).Infof("%s: throttled, scheduling run in %v", bfr.name, nextPossible) - } -} diff --git a/vendor/k8s.io/kubernetes/pkg/util/async/runner.go b/vendor/k8s.io/kubernetes/pkg/util/async/runner.go deleted file mode 100644 index 924f1d168..000000000 --- a/vendor/k8s.io/kubernetes/pkg/util/async/runner.go +++ /dev/null @@ -1,58 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package async - -import ( - "sync" -) - -// Runner is an abstraction to make it easy to start and stop groups of things that can be -// described by a single function which waits on a channel close to exit. -type Runner struct { - lock sync.Mutex - loopFuncs []func(stop chan struct{}) - stop *chan struct{} -} - -// NewRunner makes a runner for the given function(s). The function(s) should loop until -// the channel is closed. -func NewRunner(f ...func(stop chan struct{})) *Runner { - return &Runner{loopFuncs: f} -} - -// Start begins running. -func (r *Runner) Start() { - r.lock.Lock() - defer r.lock.Unlock() - if r.stop == nil { - c := make(chan struct{}) - r.stop = &c - for i := range r.loopFuncs { - go r.loopFuncs[i](*r.stop) - } - } -} - -// Stop stops running. -func (r *Runner) Stop() { - r.lock.Lock() - defer r.lock.Unlock() - if r.stop != nil { - close(*r.stop) - r.stop = nil - } -} diff --git a/vendor/k8s.io/kubernetes/pkg/util/dbus/dbus.go b/vendor/k8s.io/kubernetes/pkg/util/dbus/dbus.go deleted file mode 100644 index 702d16e5d..000000000 --- a/vendor/k8s.io/kubernetes/pkg/util/dbus/dbus.go +++ /dev/null @@ -1,133 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package dbus - -import ( - godbus "github.com/godbus/dbus" -) - -// Interface is an interface that presents a subset of the godbus/dbus API. Use this -// when you want to inject fakeable/mockable D-Bus behavior. -type Interface interface { - // SystemBus returns a connection to the system bus, connecting to it - // first if necessary - SystemBus() (Connection, error) - // SessionBus returns a connection to the session bus, connecting to it - // first if necessary - SessionBus() (Connection, error) -} - -// Connection represents a D-Bus connection -type Connection interface { - // Returns an Object representing the bus itself - BusObject() Object - - // Object creates a representation of a remote D-Bus object - Object(name, path string) Object - - // Signal registers or unregisters a channel to receive D-Bus signals - Signal(ch chan<- *godbus.Signal) -} - -// Object represents a remote D-Bus object -type Object interface { - // Call synchronously calls a D-Bus method - Call(method string, flags godbus.Flags, args ...interface{}) Call -} - -// Call represents a pending or completed D-Bus method call -type Call interface { - // Store returns a completed call's return values, or an error - Store(retvalues ...interface{}) error -} - -// Implements Interface in terms of actually talking to D-Bus -type dbusImpl struct { - systemBus *connImpl - sessionBus *connImpl -} - -// Implements Connection as a godbus.Conn -type connImpl struct { - conn *godbus.Conn -} - -// Implements Object as a godbus.Object -type objectImpl struct { - object godbus.BusObject -} - -// Implements Call as a godbus.Call -type callImpl struct { - call *godbus.Call -} - -// New returns a new Interface which will use godbus to talk to D-Bus -func New() Interface { - return &dbusImpl{} -} - -// SystemBus is part of Interface -func (db *dbusImpl) SystemBus() (Connection, error) { - if db.systemBus == nil { - bus, err := godbus.SystemBus() - if err != nil { - return nil, err - } - db.systemBus = &connImpl{bus} - } - - return db.systemBus, nil -} - -// SessionBus is part of Interface -func (db *dbusImpl) SessionBus() (Connection, error) { - if db.sessionBus == nil { - bus, err := godbus.SessionBus() - if err != nil { - return nil, err - } - db.sessionBus = &connImpl{bus} - } - - return db.sessionBus, nil -} - -// BusObject is part of the Connection interface -func (conn *connImpl) BusObject() Object { - return &objectImpl{conn.conn.BusObject()} -} - -// Object is part of the Connection interface -func (conn *connImpl) Object(name, path string) Object { - return &objectImpl{conn.conn.Object(name, godbus.ObjectPath(path))} -} - -// Signal is part of the Connection interface -func (conn *connImpl) Signal(ch chan<- *godbus.Signal) { - conn.conn.Signal(ch) -} - -// Call is part of the Object interface -func (obj *objectImpl) Call(method string, flags godbus.Flags, args ...interface{}) Call { - return &callImpl{obj.object.Call(method, flags, args...)} -} - -// Store is part of the Call interface -func (call *callImpl) Store(retvalues ...interface{}) error { - return call.call.Store(retvalues...) -} diff --git a/vendor/k8s.io/kubernetes/pkg/util/dbus/doc.go b/vendor/k8s.io/kubernetes/pkg/util/dbus/doc.go deleted file mode 100644 index b07da628d..000000000 --- a/vendor/k8s.io/kubernetes/pkg/util/dbus/doc.go +++ /dev/null @@ -1,18 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package dbus provides an injectable interface and implementations for D-Bus communication -package dbus // import "k8s.io/kubernetes/pkg/util/dbus" diff --git a/vendor/k8s.io/kubernetes/pkg/util/dbus/fake_dbus.go b/vendor/k8s.io/kubernetes/pkg/util/dbus/fake_dbus.go deleted file mode 100644 index 44131272e..000000000 --- a/vendor/k8s.io/kubernetes/pkg/util/dbus/fake_dbus.go +++ /dev/null @@ -1,135 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package dbus - -import ( - "fmt" - - godbus "github.com/godbus/dbus" -) - -// DBusFake is a simple fake Interface type. -type DBusFake struct { - systemBus *DBusFakeConnection - sessionBus *DBusFakeConnection -} - -// DBusFakeConnection represents a fake D-Bus connection -type DBusFakeConnection struct { - busObject *fakeObject - objects map[string]*fakeObject - signalHandlers []chan<- *godbus.Signal -} - -// DBusFakeHandler is used to handle fake D-Bus method calls -type DBusFakeHandler func(method string, args ...interface{}) ([]interface{}, error) - -type fakeObject struct { - handler DBusFakeHandler -} - -type fakeCall struct { - ret []interface{} - err error -} - -// NewFake returns a new Interface which will fake talking to D-Bus -func NewFake(systemBus *DBusFakeConnection, sessionBus *DBusFakeConnection) *DBusFake { - return &DBusFake{systemBus, sessionBus} -} - -func NewFakeConnection() *DBusFakeConnection { - return &DBusFakeConnection{ - objects: make(map[string]*fakeObject), - } -} - -// SystemBus is part of Interface -func (db *DBusFake) SystemBus() (Connection, error) { - if db.systemBus != nil { - return db.systemBus, nil - } else { - return nil, fmt.Errorf("DBus is not running") - } -} - -// SessionBus is part of Interface -func (db *DBusFake) SessionBus() (Connection, error) { - if db.sessionBus != nil { - return db.sessionBus, nil - } else { - return nil, fmt.Errorf("DBus is not running") - } -} - -// BusObject is part of the Connection interface -func (conn *DBusFakeConnection) BusObject() Object { - return conn.busObject -} - -// Object is part of the Connection interface -func (conn *DBusFakeConnection) Object(name, path string) Object { - return conn.objects[name+path] -} - -// Signal is part of the Connection interface -func (conn *DBusFakeConnection) Signal(ch chan<- *godbus.Signal) { - for i := range conn.signalHandlers { - if conn.signalHandlers[i] == ch { - conn.signalHandlers = append(conn.signalHandlers[:i], conn.signalHandlers[i+1:]...) - return - } - } - conn.signalHandlers = append(conn.signalHandlers, ch) -} - -// SetBusObject sets the handler for the BusObject of conn -func (conn *DBusFakeConnection) SetBusObject(handler DBusFakeHandler) { - conn.busObject = &fakeObject{handler} -} - -// AddObject adds a handler for the Object at name and path -func (conn *DBusFakeConnection) AddObject(name, path string, handler DBusFakeHandler) { - conn.objects[name+path] = &fakeObject{handler} -} - -// EmitSignal emits a signal on conn -func (conn *DBusFakeConnection) EmitSignal(name, path, iface, signal string, args ...interface{}) { - sig := &godbus.Signal{ - Sender: name, - Path: godbus.ObjectPath(path), - Name: iface + "." + signal, - Body: args, - } - for _, ch := range conn.signalHandlers { - ch <- sig - } -} - -// Call is part of the Object interface -func (obj *fakeObject) Call(method string, flags godbus.Flags, args ...interface{}) Call { - ret, err := obj.handler(method, args...) - return &fakeCall{ret, err} -} - -// Store is part of the Call interface -func (call *fakeCall) Store(retvalues ...interface{}) error { - if call.err != nil { - return call.err - } - return godbus.Store(call.ret, retvalues...) -} diff --git a/vendor/k8s.io/kubernetes/pkg/util/iptables/doc.go b/vendor/k8s.io/kubernetes/pkg/util/iptables/doc.go deleted file mode 100644 index f26498293..000000000 --- a/vendor/k8s.io/kubernetes/pkg/util/iptables/doc.go +++ /dev/null @@ -1,18 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package iptables provides an interface and implementations for running iptables commands. -package iptables // import "k8s.io/kubernetes/pkg/util/iptables" diff --git a/vendor/k8s.io/kubernetes/pkg/util/iptables/iptables.go b/vendor/k8s.io/kubernetes/pkg/util/iptables/iptables.go deleted file mode 100644 index b6c08fa37..000000000 --- a/vendor/k8s.io/kubernetes/pkg/util/iptables/iptables.go +++ /dev/null @@ -1,652 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package iptables - -import ( - "bytes" - "fmt" - "regexp" - "strings" - "sync" - - godbus "github.com/godbus/dbus" - "github.com/golang/glog" - "k8s.io/apimachinery/pkg/util/sets" - utildbus "k8s.io/kubernetes/pkg/util/dbus" - utilexec "k8s.io/kubernetes/pkg/util/exec" - utilversion "k8s.io/kubernetes/pkg/util/version" -) - -type RulePosition string - -const ( - Prepend RulePosition = "-I" - Append RulePosition = "-A" -) - -// An injectable interface for running iptables commands. Implementations must be goroutine-safe. -type Interface interface { - // GetVersion returns the "X.Y.Z" version string for iptables. - GetVersion() (string, error) - // EnsureChain checks if the specified chain exists and, if not, creates it. If the chain existed, return true. - EnsureChain(table Table, chain Chain) (bool, error) - // FlushChain clears the specified chain. If the chain did not exist, return error. - FlushChain(table Table, chain Chain) error - // DeleteChain deletes the specified chain. If the chain did not exist, return error. - DeleteChain(table Table, chain Chain) error - // EnsureRule checks if the specified rule is present and, if not, creates it. If the rule existed, return true. - EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error) - // DeleteRule checks if the specified rule is present and, if so, deletes it. - DeleteRule(table Table, chain Chain, args ...string) error - // IsIpv6 returns true if this is managing ipv6 tables - IsIpv6() bool - // SaveInto calls `iptables-save` for table and stores result in a given buffer. - SaveInto(table Table, buffer *bytes.Buffer) error - // Restore runs `iptables-restore` passing data through []byte. - // table is the Table to restore - // data should be formatted like the output of SaveInto() - // flush sets the presence of the "--noflush" flag. see: FlushFlag - // counters sets the "--counters" flag. see: RestoreCountersFlag - Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error - // RestoreAll is the same as Restore except that no table is specified. - RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error - // AddReloadFunc adds a function to call on iptables reload - AddReloadFunc(reloadFunc func()) - // Destroy cleans up resources used by the Interface - Destroy() -} - -type Protocol byte - -const ( - ProtocolIpv4 Protocol = iota + 1 - ProtocolIpv6 -) - -type Table string - -const ( - TableNAT Table = "nat" - TableFilter Table = "filter" -) - -type Chain string - -const ( - ChainPostrouting Chain = "POSTROUTING" - ChainPrerouting Chain = "PREROUTING" - ChainOutput Chain = "OUTPUT" - ChainInput Chain = "INPUT" -) - -const ( - cmdIPTablesSave string = "iptables-save" - cmdIPTablesRestore string = "iptables-restore" - cmdIPTables string = "iptables" - cmdIp6tables string = "ip6tables" -) - -// Option flag for Restore -type RestoreCountersFlag bool - -const RestoreCounters RestoreCountersFlag = true -const NoRestoreCounters RestoreCountersFlag = false - -// Option flag for Flush -type FlushFlag bool - -const FlushTables FlushFlag = true -const NoFlushTables FlushFlag = false - -// Versions of iptables less than this do not support the -C / --check flag -// (test whether a rule exists). -const MinCheckVersion = "1.4.11" - -// Minimum iptables versions supporting the -w and -w2 flags -const MinWaitVersion = "1.4.20" -const MinWait2Version = "1.4.22" - -const LockfilePath16x = "/run/xtables.lock" - -// runner implements Interface in terms of exec("iptables"). -type runner struct { - mu sync.Mutex - exec utilexec.Interface - dbus utildbus.Interface - protocol Protocol - hasCheck bool - waitFlag []string - restoreWaitFlag []string - lockfilePath string - - reloadFuncs []func() - signal chan *godbus.Signal -} - -// newInternal returns a new Interface which will exec iptables, and allows the -// caller to change the iptables-restore lockfile path -func newInternal(exec utilexec.Interface, dbus utildbus.Interface, protocol Protocol, lockfilePath string) Interface { - vstring, err := getIPTablesVersionString(exec) - if err != nil { - glog.Warningf("Error checking iptables version, assuming version at least %s: %v", MinCheckVersion, err) - vstring = MinCheckVersion - } - - if lockfilePath == "" { - lockfilePath = LockfilePath16x - } - - runner := &runner{ - exec: exec, - dbus: dbus, - protocol: protocol, - hasCheck: getIPTablesHasCheckCommand(vstring), - waitFlag: getIPTablesWaitFlag(vstring), - restoreWaitFlag: getIPTablesRestoreWaitFlag(exec), - lockfilePath: lockfilePath, - } - // TODO this needs to be moved to a separate Start() or Run() function so that New() has zero side - // effects. - runner.connectToFirewallD() - return runner -} - -// New returns a new Interface which will exec iptables. -func New(exec utilexec.Interface, dbus utildbus.Interface, protocol Protocol) Interface { - return newInternal(exec, dbus, protocol, "") -} - -// Destroy is part of Interface. -func (runner *runner) Destroy() { - if runner.signal != nil { - runner.signal <- nil - } -} - -const ( - firewalldName = "org.fedoraproject.FirewallD1" - firewalldPath = "/org/fedoraproject/FirewallD1" - firewalldInterface = "org.fedoraproject.FirewallD1" -) - -// Connects to D-Bus and listens for FirewallD start/restart. (On non-FirewallD-using -// systems, this is effectively a no-op; we listen for the signals, but they will never be -// emitted, so reload() will never be called.) -func (runner *runner) connectToFirewallD() { - bus, err := runner.dbus.SystemBus() - if err != nil { - glog.V(1).Infof("Could not connect to D-Bus system bus: %s", err) - return - } - - rule := fmt.Sprintf("type='signal',sender='%s',path='%s',interface='%s',member='Reloaded'", firewalldName, firewalldPath, firewalldInterface) - bus.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, rule) - - rule = fmt.Sprintf("type='signal',interface='org.freedesktop.DBus',member='NameOwnerChanged',path='/org/freedesktop/DBus',sender='org.freedesktop.DBus',arg0='%s'", firewalldName) - bus.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, rule) - - runner.signal = make(chan *godbus.Signal, 10) - bus.Signal(runner.signal) - - go runner.dbusSignalHandler(bus) -} - -// GetVersion returns the version string. -func (runner *runner) GetVersion() (string, error) { - return getIPTablesVersionString(runner.exec) -} - -// EnsureChain is part of Interface. -func (runner *runner) EnsureChain(table Table, chain Chain) (bool, error) { - fullArgs := makeFullArgs(table, chain) - - runner.mu.Lock() - defer runner.mu.Unlock() - - out, err := runner.run(opCreateChain, fullArgs) - if err != nil { - if ee, ok := err.(utilexec.ExitError); ok { - if ee.Exited() && ee.ExitStatus() == 1 { - return true, nil - } - } - return false, fmt.Errorf("error creating chain %q: %v: %s", chain, err, out) - } - return false, nil -} - -// FlushChain is part of Interface. -func (runner *runner) FlushChain(table Table, chain Chain) error { - fullArgs := makeFullArgs(table, chain) - - runner.mu.Lock() - defer runner.mu.Unlock() - - out, err := runner.run(opFlushChain, fullArgs) - if err != nil { - return fmt.Errorf("error flushing chain %q: %v: %s", chain, err, out) - } - return nil -} - -// DeleteChain is part of Interface. -func (runner *runner) DeleteChain(table Table, chain Chain) error { - fullArgs := makeFullArgs(table, chain) - - runner.mu.Lock() - defer runner.mu.Unlock() - - // TODO: we could call iptables -S first, ignore the output and check for non-zero return (more like DeleteRule) - out, err := runner.run(opDeleteChain, fullArgs) - if err != nil { - return fmt.Errorf("error deleting chain %q: %v: %s", chain, err, out) - } - return nil -} - -// EnsureRule is part of Interface. -func (runner *runner) EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error) { - fullArgs := makeFullArgs(table, chain, args...) - - runner.mu.Lock() - defer runner.mu.Unlock() - - exists, err := runner.checkRule(table, chain, args...) - if err != nil { - return false, err - } - if exists { - return true, nil - } - out, err := runner.run(operation(position), fullArgs) - if err != nil { - return false, fmt.Errorf("error appending rule: %v: %s", err, out) - } - return false, nil -} - -// DeleteRule is part of Interface. -func (runner *runner) DeleteRule(table Table, chain Chain, args ...string) error { - fullArgs := makeFullArgs(table, chain, args...) - - runner.mu.Lock() - defer runner.mu.Unlock() - - exists, err := runner.checkRule(table, chain, args...) - if err != nil { - return err - } - if !exists { - return nil - } - out, err := runner.run(opDeleteRule, fullArgs) - if err != nil { - return fmt.Errorf("error deleting rule: %v: %s", err, out) - } - return nil -} - -func (runner *runner) IsIpv6() bool { - return runner.protocol == ProtocolIpv6 -} - -// SaveInto is part of Interface. -func (runner *runner) SaveInto(table Table, buffer *bytes.Buffer) error { - runner.mu.Lock() - defer runner.mu.Unlock() - - // run and return - args := []string{"-t", string(table)} - glog.V(4).Infof("running iptables-save %v", args) - cmd := runner.exec.Command(cmdIPTablesSave, args...) - // Since CombinedOutput() doesn't support redirecting it to a buffer, - // we need to workaround it by redirecting stdout and stderr to buffer - // and explicitly calling Run() [CombinedOutput() underneath itself - // creates a new buffer, redirects stdout and stderr to it and also - // calls Run()]. - cmd.SetStdout(buffer) - cmd.SetStderr(buffer) - return cmd.Run() -} - -// Restore is part of Interface. -func (runner *runner) Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error { - // setup args - args := []string{"-T", string(table)} - return runner.restoreInternal(args, data, flush, counters) -} - -// RestoreAll is part of Interface. -func (runner *runner) RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error { - // setup args - args := make([]string, 0) - return runner.restoreInternal(args, data, flush, counters) -} - -type iptablesLocker interface { - Close() -} - -// restoreInternal is the shared part of Restore/RestoreAll -func (runner *runner) restoreInternal(args []string, data []byte, flush FlushFlag, counters RestoreCountersFlag) error { - runner.mu.Lock() - defer runner.mu.Unlock() - - if !flush { - args = append(args, "--noflush") - } - if counters { - args = append(args, "--counters") - } - - // Grab the iptables lock to prevent iptables-restore and iptables - // from stepping on each other. iptables-restore 1.6.2 will have - // a --wait option like iptables itself, but that's not widely deployed. - if len(runner.restoreWaitFlag) == 0 { - locker, err := grabIptablesLocks(runner.lockfilePath) - if err != nil { - return err - } - defer locker.Close() - } - - // run the command and return the output or an error including the output and error - fullArgs := append(runner.restoreWaitFlag, args...) - glog.V(4).Infof("running iptables-restore %v", fullArgs) - cmd := runner.exec.Command(cmdIPTablesRestore, fullArgs...) - cmd.SetStdin(bytes.NewBuffer(data)) - b, err := cmd.CombinedOutput() - if err != nil { - return fmt.Errorf("%v (%s)", err, b) - } - return nil -} - -func (runner *runner) iptablesCommand() string { - if runner.IsIpv6() { - return cmdIp6tables - } else { - return cmdIPTables - } -} - -func (runner *runner) run(op operation, args []string) ([]byte, error) { - iptablesCmd := runner.iptablesCommand() - - fullArgs := append(runner.waitFlag, string(op)) - fullArgs = append(fullArgs, args...) - glog.V(5).Infof("running iptables %s %v", string(op), args) - return runner.exec.Command(iptablesCmd, fullArgs...).CombinedOutput() - // Don't log err here - callers might not think it is an error. -} - -// Returns (bool, nil) if it was able to check the existence of the rule, or -// (<undefined>, error) if the process of checking failed. -func (runner *runner) checkRule(table Table, chain Chain, args ...string) (bool, error) { - if runner.hasCheck { - return runner.checkRuleUsingCheck(makeFullArgs(table, chain, args...)) - } else { - return runner.checkRuleWithoutCheck(table, chain, args...) - } -} - -var hexnumRE = regexp.MustCompile("0x0+([0-9])") - -func trimhex(s string) string { - return hexnumRE.ReplaceAllString(s, "0x$1") -} - -// Executes the rule check without using the "-C" flag, instead parsing iptables-save. -// Present for compatibility with <1.4.11 versions of iptables. This is full -// of hack and half-measures. We should nix this ASAP. -func (runner *runner) checkRuleWithoutCheck(table Table, chain Chain, args ...string) (bool, error) { - glog.V(1).Infof("running iptables-save -t %s", string(table)) - out, err := runner.exec.Command(cmdIPTablesSave, "-t", string(table)).CombinedOutput() - if err != nil { - return false, fmt.Errorf("error checking rule: %v", err) - } - - // Sadly, iptables has inconsistent quoting rules for comments. Just remove all quotes. - // Also, quoted multi-word comments (which are counted as a single arg) - // will be unpacked into multiple args, - // in order to compare against iptables-save output (which will be split at whitespace boundary) - // e.g. a single arg('"this must be before the NodePort rules"') will be unquoted and unpacked into 7 args. - var argsCopy []string - for i := range args { - tmpField := strings.Trim(args[i], "\"") - tmpField = trimhex(tmpField) - argsCopy = append(argsCopy, strings.Fields(tmpField)...) - } - argset := sets.NewString(argsCopy...) - - for _, line := range strings.Split(string(out), "\n") { - var fields = strings.Fields(line) - - // Check that this is a rule for the correct chain, and that it has - // the correct number of argument (+2 for "-A <chain name>") - if !strings.HasPrefix(line, fmt.Sprintf("-A %s", string(chain))) || len(fields) != len(argsCopy)+2 { - continue - } - - // Sadly, iptables has inconsistent quoting rules for comments. - // Just remove all quotes. - for i := range fields { - fields[i] = strings.Trim(fields[i], "\"") - fields[i] = trimhex(fields[i]) - } - - // TODO: This misses reorderings e.g. "-x foo ! -y bar" will match "! -x foo -y bar" - if sets.NewString(fields...).IsSuperset(argset) { - return true, nil - } - glog.V(5).Infof("DBG: fields is not a superset of args: fields=%v args=%v", fields, args) - } - - return false, nil -} - -// Executes the rule check using the "-C" flag -func (runner *runner) checkRuleUsingCheck(args []string) (bool, error) { - out, err := runner.run(opCheckRule, args) - if err == nil { - return true, nil - } - if ee, ok := err.(utilexec.ExitError); ok { - // iptables uses exit(1) to indicate a failure of the operation, - // as compared to a malformed commandline, for example. - if ee.Exited() && ee.ExitStatus() == 1 { - return false, nil - } - } - return false, fmt.Errorf("error checking rule: %v: %s", err, out) -} - -type operation string - -const ( - opCreateChain operation = "-N" - opFlushChain operation = "-F" - opDeleteChain operation = "-X" - opAppendRule operation = "-A" - opCheckRule operation = "-C" - opDeleteRule operation = "-D" -) - -func makeFullArgs(table Table, chain Chain, args ...string) []string { - return append([]string{string(chain), "-t", string(table)}, args...) -} - -// Checks if iptables has the "-C" flag -func getIPTablesHasCheckCommand(vstring string) bool { - minVersion, err := utilversion.ParseGeneric(MinCheckVersion) - if err != nil { - glog.Errorf("MinCheckVersion (%s) is not a valid version string: %v", MinCheckVersion, err) - return true - } - version, err := utilversion.ParseGeneric(vstring) - if err != nil { - glog.Errorf("vstring (%s) is not a valid version string: %v", vstring, err) - return true - } - return version.AtLeast(minVersion) -} - -// Checks if iptables version has a "wait" flag -func getIPTablesWaitFlag(vstring string) []string { - version, err := utilversion.ParseGeneric(vstring) - if err != nil { - glog.Errorf("vstring (%s) is not a valid version string: %v", vstring, err) - return nil - } - - minVersion, err := utilversion.ParseGeneric(MinWaitVersion) - if err != nil { - glog.Errorf("MinWaitVersion (%s) is not a valid version string: %v", MinWaitVersion, err) - return nil - } - if version.LessThan(minVersion) { - return nil - } - - minVersion, err = utilversion.ParseGeneric(MinWait2Version) - if err != nil { - glog.Errorf("MinWait2Version (%s) is not a valid version string: %v", MinWait2Version, err) - return nil - } - if version.LessThan(minVersion) { - return []string{"-w"} - } else { - return []string{"-w2"} - } -} - -// getIPTablesVersionString runs "iptables --version" to get the version string -// in the form "X.X.X" -func getIPTablesVersionString(exec utilexec.Interface) (string, error) { - // this doesn't access mutable state so we don't need to use the interface / runner - bytes, err := exec.Command(cmdIPTables, "--version").CombinedOutput() - if err != nil { - return "", err - } - versionMatcher := regexp.MustCompile("v([0-9]+(\\.[0-9]+)+)") - match := versionMatcher.FindStringSubmatch(string(bytes)) - if match == nil { - return "", fmt.Errorf("no iptables version found in string: %s", bytes) - } - return match[1], nil -} - -// Checks if iptables-restore has a "wait" flag -// --wait support landed in v1.6.1+ right before --version support, so -// any version of iptables-restore that supports --version will also -// support --wait -func getIPTablesRestoreWaitFlag(exec utilexec.Interface) []string { - vstring, err := getIPTablesRestoreVersionString(exec) - if err != nil || vstring == "" { - glog.V(3).Infof("couldn't get iptables-restore version; assuming it doesn't support --wait") - return nil - } - if _, err := utilversion.ParseGeneric(vstring); err != nil { - glog.V(3).Infof("couldn't parse iptables-restore version; assuming it doesn't support --wait") - return nil - } - - return []string{"--wait=2"} -} - -// getIPTablesRestoreVersionString runs "iptables-restore --version" to get the version string -// in the form "X.X.X" -func getIPTablesRestoreVersionString(exec utilexec.Interface) (string, error) { - // this doesn't access mutable state so we don't need to use the interface / runner - - // iptables-restore hasn't always had --version, and worse complains - // about unrecognized commands but doesn't exit when it gets them. - // Work around that by setting stdin to nothing so it exits immediately. - cmd := exec.Command(cmdIPTablesRestore, "--version") - cmd.SetStdin(bytes.NewReader([]byte{})) - bytes, err := cmd.CombinedOutput() - if err != nil { - return "", err - } - versionMatcher := regexp.MustCompile("v([0-9]+(\\.[0-9]+)+)") - match := versionMatcher.FindStringSubmatch(string(bytes)) - if match == nil { - return "", fmt.Errorf("no iptables version found in string: %s", bytes) - } - return match[1], nil -} - -// goroutine to listen for D-Bus signals -func (runner *runner) dbusSignalHandler(bus utildbus.Connection) { - firewalld := bus.Object(firewalldName, firewalldPath) - - for s := range runner.signal { - if s == nil { - // Unregister - bus.Signal(runner.signal) - return - } - - switch s.Name { - case "org.freedesktop.DBus.NameOwnerChanged": - name := s.Body[0].(string) - new_owner := s.Body[2].(string) - - if name != firewalldName || len(new_owner) == 0 { - continue - } - - // FirewallD startup (specifically the part where it deletes - // all existing iptables rules) may not yet be complete when - // we get this signal, so make a dummy request to it to - // synchronize. - firewalld.Call(firewalldInterface+".getDefaultZone", 0) - - runner.reload() - case firewalldInterface + ".Reloaded": - runner.reload() - } - } -} - -// AddReloadFunc is part of Interface -func (runner *runner) AddReloadFunc(reloadFunc func()) { - runner.reloadFuncs = append(runner.reloadFuncs, reloadFunc) -} - -// runs all reload funcs to re-sync iptables rules -func (runner *runner) reload() { - glog.V(1).Infof("reloading iptables rules") - - for _, f := range runner.reloadFuncs { - f() - } -} - -// IsNotFoundError returns true if the error indicates "not found". It parses -// the error string looking for known values, which is imperfect but works in -// practice. -func IsNotFoundError(err error) bool { - es := err.Error() - if strings.Contains(es, "No such file or directory") { - return true - } - if strings.Contains(es, "No chain/target/match by that name") { - return true - } - return false -} diff --git a/vendor/k8s.io/kubernetes/pkg/util/iptables/iptables_linux.go b/vendor/k8s.io/kubernetes/pkg/util/iptables/iptables_linux.go deleted file mode 100644 index 4f614cb52..000000000 --- a/vendor/k8s.io/kubernetes/pkg/util/iptables/iptables_linux.go +++ /dev/null @@ -1,93 +0,0 @@ -// +build linux - -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package iptables - -import ( - "fmt" - "net" - "os" - "time" - - "golang.org/x/sys/unix" - "k8s.io/apimachinery/pkg/util/wait" -) - -type locker struct { - lock16 *os.File - lock14 *net.UnixListener -} - -func (l *locker) Close() { - if l.lock16 != nil { - l.lock16.Close() - } - if l.lock14 != nil { - l.lock14.Close() - } -} - -func grabIptablesLocks(lockfilePath string) (iptablesLocker, error) { - var err error - var success bool - - l := &locker{} - defer func(l *locker) { - // Clean up immediately on failure - if !success { - l.Close() - } - }(l) - - // Grab both 1.6.x and 1.4.x-style locks; we don't know what the - // iptables-restore version is if it doesn't support --wait, so we - // can't assume which lock method it'll use. - - // Roughly duplicate iptables 1.6.x xtables_lock() function. - l.lock16, err = os.OpenFile(lockfilePath, os.O_CREATE, 0600) - if err != nil { - return nil, fmt.Errorf("failed to open iptables lock %s: %v", lockfilePath, err) - } - - if err := wait.PollImmediate(200*time.Millisecond, 2*time.Second, func() (bool, error) { - if err := grabIptablesFileLock(l.lock16); err != nil { - return false, nil - } - return true, nil - }); err != nil { - return nil, fmt.Errorf("failed to acquire new iptables lock: %v", err) - } - - // Roughly duplicate iptables 1.4.x xtables_lock() function. - if err := wait.PollImmediate(200*time.Millisecond, 2*time.Second, func() (bool, error) { - l.lock14, err = net.ListenUnix("unix", &net.UnixAddr{Name: "@xtables", Net: "unix"}) - if err != nil { - return false, nil - } - return true, nil - }); err != nil { - return nil, fmt.Errorf("failed to acquire old iptables lock: %v", err) - } - - success = true - return l, nil -} - -func grabIptablesFileLock(f *os.File) error { - return unix.Flock(int(f.Fd()), unix.LOCK_EX|unix.LOCK_NB) -} diff --git a/vendor/k8s.io/kubernetes/pkg/util/iptables/iptables_unsupported.go b/vendor/k8s.io/kubernetes/pkg/util/iptables/iptables_unsupported.go deleted file mode 100644 index c6a5f0d7d..000000000 --- a/vendor/k8s.io/kubernetes/pkg/util/iptables/iptables_unsupported.go +++ /dev/null @@ -1,32 +0,0 @@ -// +build !linux - -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package iptables - -import ( - "fmt" - "os" -) - -func grabIptablesLocks(lockfilePath string) (iptablesLocker, error) { - return nil, fmt.Errorf("iptables unsupported on this platform") -} - -func grabIptablesFileLock(f *os.File) error { - return fmt.Errorf("iptables unsupported on this platform") -} diff --git a/vendor/k8s.io/kubernetes/pkg/util/iptables/save_restore.go b/vendor/k8s.io/kubernetes/pkg/util/iptables/save_restore.go deleted file mode 100644 index 6f4eacaca..000000000 --- a/vendor/k8s.io/kubernetes/pkg/util/iptables/save_restore.go +++ /dev/null @@ -1,110 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package iptables - -import ( - "fmt" - "strings" -) - -// MakeChainLine return an iptables-save/restore formatted chain line given a Chain -func MakeChainLine(chain Chain) string { - return fmt.Sprintf(":%s - [0:0]", chain) -} - -// GetChainLines parses a table's iptables-save data to find chains in the table. -// It returns a map of iptables.Chain to string where the string is the chain line from the save (with counters etc). -func GetChainLines(table Table, save []byte) map[Chain]string { - chainsMap := make(map[Chain]string) - tablePrefix := "*" + string(table) - readIndex := 0 - // find beginning of table - for readIndex < len(save) { - line, n := ReadLine(readIndex, save) - readIndex = n - if strings.HasPrefix(line, tablePrefix) { - break - } - } - // parse table lines - for readIndex < len(save) { - line, n := ReadLine(readIndex, save) - readIndex = n - if len(line) == 0 { - continue - } - if strings.HasPrefix(line, "COMMIT") || strings.HasPrefix(line, "*") { - break - } else if strings.HasPrefix(line, "#") { - continue - } else if strings.HasPrefix(line, ":") && len(line) > 1 { - // We assume that the <line> contains space - chain lines have 3 fields, - // space delimited. If there is no space, this line will panic. - chain := Chain(line[1:strings.Index(line, " ")]) - chainsMap[chain] = line - } - } - return chainsMap -} - -func ReadLine(readIndex int, byteArray []byte) (string, int) { - currentReadIndex := readIndex - - // consume left spaces - for currentReadIndex < len(byteArray) { - if byteArray[currentReadIndex] == ' ' { - currentReadIndex++ - } else { - break - } - } - - // leftTrimIndex stores the left index of the line after the line is left-trimmed - leftTrimIndex := currentReadIndex - - // rightTrimIndex stores the right index of the line after the line is right-trimmed - // it is set to -1 since the correct value has not yet been determined. - rightTrimIndex := -1 - - for ; currentReadIndex < len(byteArray); currentReadIndex++ { - if byteArray[currentReadIndex] == ' ' { - // set rightTrimIndex - if rightTrimIndex == -1 { - rightTrimIndex = currentReadIndex - } - } else if (byteArray[currentReadIndex] == '\n') || (currentReadIndex == (len(byteArray) - 1)) { - // end of line or byte buffer is reached - if currentReadIndex <= leftTrimIndex { - return "", currentReadIndex + 1 - } - // set the rightTrimIndex - if rightTrimIndex == -1 { - rightTrimIndex = currentReadIndex - if currentReadIndex == (len(byteArray)-1) && (byteArray[currentReadIndex] != '\n') { - // ensure that the last character is part of the returned string, - // unless the last character is '\n' - rightTrimIndex = currentReadIndex + 1 - } - } - return string(byteArray[leftTrimIndex:rightTrimIndex]), currentReadIndex + 1 - } else { - // unset rightTrimIndex - rightTrimIndex = -1 - } - } - return "", currentReadIndex -} diff --git a/vendor/k8s.io/kubernetes/pkg/util/sysctl/sysctl.go b/vendor/k8s.io/kubernetes/pkg/util/sysctl/sysctl.go deleted file mode 100644 index 5c01dd88e..000000000 --- a/vendor/k8s.io/kubernetes/pkg/util/sysctl/sysctl.go +++ /dev/null @@ -1,78 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package sysctl - -import ( - "io/ioutil" - "path" - "strconv" - "strings" -) - -const ( - sysctlBase = "/proc/sys" - VmOvercommitMemory = "vm/overcommit_memory" - VmPanicOnOOM = "vm/panic_on_oom" - KernelPanic = "kernel/panic" - KernelPanicOnOops = "kernel/panic_on_oops" - RootMaxKeys = "kernel/keys/root_maxkeys" - RootMaxBytes = "kernel/keys/root_maxbytes" - - VmOvercommitMemoryAlways = 1 // kernel performs no memory over-commit handling - VmPanicOnOOMInvokeOOMKiller = 0 // kernel calls the oom_killer function when OOM occurs - - KernelPanicOnOopsAlways = 1 // kernel panics on kernel oops - KernelPanicRebootTimeout = 10 // seconds after a panic for the kernel to reboot - - RootMaxKeysSetting = 1000000 // Needed since docker creates a new key per container - RootMaxBytesSetting = RootMaxKeysSetting * 25 // allocate 25 bytes per key * number of MaxKeys -) - -// An injectable interface for running sysctl commands. -type Interface interface { - // GetSysctl returns the value for the specified sysctl setting - GetSysctl(sysctl string) (int, error) - // SetSysctl modifies the specified sysctl flag to the new value - SetSysctl(sysctl string, newVal int) error -} - -// New returns a new Interface for accessing sysctl -func New() Interface { - return &procSysctl{} -} - -// procSysctl implements Interface by reading and writing files under /proc/sys -type procSysctl struct { -} - -// GetSysctl returns the value for the specified sysctl setting -func (_ *procSysctl) GetSysctl(sysctl string) (int, error) { - data, err := ioutil.ReadFile(path.Join(sysctlBase, sysctl)) - if err != nil { - return -1, err - } - val, err := strconv.Atoi(strings.Trim(string(data), " \n")) - if err != nil { - return -1, err - } - return val, nil -} - -// SetSysctl modifies the specified sysctl flag to the new value -func (_ *procSysctl) SetSysctl(sysctl string, newVal int) error { - return ioutil.WriteFile(path.Join(sysctlBase, sysctl), []byte(strconv.Itoa(newVal)), 0640) -} diff --git a/vendor/k8s.io/kubernetes/pkg/util/version/doc.go b/vendor/k8s.io/kubernetes/pkg/util/version/doc.go deleted file mode 100644 index ebe43152e..000000000 --- a/vendor/k8s.io/kubernetes/pkg/util/version/doc.go +++ /dev/null @@ -1,18 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package version provides utilities for version number comparisons -package version // import "k8s.io/kubernetes/pkg/util/version" diff --git a/vendor/k8s.io/kubernetes/pkg/util/version/version.go b/vendor/k8s.io/kubernetes/pkg/util/version/version.go deleted file mode 100644 index 327f2e67f..000000000 --- a/vendor/k8s.io/kubernetes/pkg/util/version/version.go +++ /dev/null @@ -1,236 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package version - -import ( - "bytes" - "fmt" - "regexp" - "strconv" - "strings" -) - -// Version is an opqaue representation of a version number -type Version struct { - components []uint - semver bool - preRelease string - buildMetadata string -} - -var ( - // versionMatchRE splits a version string into numeric and "extra" parts - versionMatchRE = regexp.MustCompile(`^\s*v?([0-9]+(?:\.[0-9]+)*)(.*)*$`) - // extraMatchRE splits the "extra" part of versionMatchRE into semver pre-release and build metadata; it does not validate the "no leading zeroes" constraint for pre-release - extraMatchRE = regexp.MustCompile(`^(?:-([0-9A-Za-z-]+(?:\.[0-9A-Za-z-]+)*))?(?:\+([0-9A-Za-z-]+(?:\.[0-9A-Za-z-]+)*))?\s*$`) -) - -func parse(str string, semver bool) (*Version, error) { - parts := versionMatchRE.FindStringSubmatch(str) - if parts == nil { - return nil, fmt.Errorf("could not parse %q as version", str) - } - numbers, extra := parts[1], parts[2] - - components := strings.Split(numbers, ".") - if (semver && len(components) != 3) || (!semver && len(components) < 2) { - return nil, fmt.Errorf("illegal version string %q", str) - } - - v := &Version{ - components: make([]uint, len(components)), - semver: semver, - } - for i, comp := range components { - if (i == 0 || semver) && strings.HasPrefix(comp, "0") && comp != "0" { - return nil, fmt.Errorf("illegal zero-prefixed version component %q in %q", comp, str) - } - num, err := strconv.ParseUint(comp, 10, 0) - if err != nil { - return nil, fmt.Errorf("illegal non-numeric version component %q in %q: %v", comp, str, err) - } - v.components[i] = uint(num) - } - - if semver && extra != "" { - extraParts := extraMatchRE.FindStringSubmatch(extra) - if extraParts == nil { - return nil, fmt.Errorf("could not parse pre-release/metadata (%s) in version %q", extra, str) - } - v.preRelease, v.buildMetadata = extraParts[1], extraParts[2] - - for _, comp := range strings.Split(v.preRelease, ".") { - if _, err := strconv.ParseUint(comp, 10, 0); err == nil { - if strings.HasPrefix(comp, "0") && comp != "0" { - return nil, fmt.Errorf("illegal zero-prefixed version component %q in %q", comp, str) - } - } - } - } - - return v, nil -} - -// ParseGeneric parses a "generic" version string. The version string must consist of two -// or more dot-separated numeric fields (the first of which can't have leading zeroes), -// followed by arbitrary uninterpreted data (which need not be separated from the final -// numeric field by punctuation). For convenience, leading and trailing whitespace is -// ignored, and the version can be preceded by the letter "v". See also ParseSemantic. -func ParseGeneric(str string) (*Version, error) { - return parse(str, false) -} - -// MustParseGeneric is like ParseGeneric except that it panics on error -func MustParseGeneric(str string) *Version { - v, err := ParseGeneric(str) - if err != nil { - panic(err) - } - return v -} - -// ParseSemantic parses a version string that exactly obeys the syntax and semantics of -// the "Semantic Versioning" specification (http://semver.org/) (although it ignores -// leading and trailing whitespace, and allows the version to be preceded by "v"). For -// version strings that are not guaranteed to obey the Semantic Versioning syntax, use -// ParseGeneric. -func ParseSemantic(str string) (*Version, error) { - return parse(str, true) -} - -// MustParseSemantic is like ParseSemantic except that it panics on error -func MustParseSemantic(str string) *Version { - v, err := ParseSemantic(str) - if err != nil { - panic(err) - } - return v -} - -// BuildMetadata returns the build metadata, if v is a Semantic Version, or "" -func (v *Version) BuildMetadata() string { - return v.buildMetadata -} - -// String converts a Version back to a string; note that for versions parsed with -// ParseGeneric, this will not include the trailing uninterpreted portion of the version -// number. -func (v *Version) String() string { - var buffer bytes.Buffer - - for i, comp := range v.components { - if i > 0 { - buffer.WriteString(".") - } - buffer.WriteString(fmt.Sprintf("%d", comp)) - } - if v.preRelease != "" { - buffer.WriteString("-") - buffer.WriteString(v.preRelease) - } - if v.buildMetadata != "" { - buffer.WriteString("+") - buffer.WriteString(v.buildMetadata) - } - - return buffer.String() -} - -// compareInternal returns -1 if v is less than other, 1 if it is greater than other, or 0 -// if they are equal -func (v *Version) compareInternal(other *Version) int { - for i := range v.components { - switch { - case i >= len(other.components): - if v.components[i] != 0 { - return 1 - } - case other.components[i] < v.components[i]: - return 1 - case other.components[i] > v.components[i]: - return -1 - } - } - - if !v.semver || !other.semver { - return 0 - } - - switch { - case v.preRelease == "" && other.preRelease != "": - return 1 - case v.preRelease != "" && other.preRelease == "": - return -1 - case v.preRelease == other.preRelease: // includes case where both are "" - return 0 - } - - vPR := strings.Split(v.preRelease, ".") - oPR := strings.Split(other.preRelease, ".") - for i := range vPR { - if i >= len(oPR) { - return 1 - } - vNum, err := strconv.ParseUint(vPR[i], 10, 0) - if err == nil { - oNum, err := strconv.ParseUint(oPR[i], 10, 0) - if err == nil { - switch { - case oNum < vNum: - return 1 - case oNum > vNum: - return -1 - default: - continue - } - } - } - if oPR[i] < vPR[i] { - return 1 - } else if oPR[i] > vPR[i] { - return -1 - } - } - - return 0 -} - -// AtLeast tests if a version is at least equal to a given minimum version. If both -// Versions are Semantic Versions, this will use the Semantic Version comparison -// algorithm. Otherwise, it will compare only the numeric components, with non-present -// components being considered "0" (ie, "1.4" is equal to "1.4.0"). -func (v *Version) AtLeast(min *Version) bool { - return v.compareInternal(min) != -1 -} - -// LessThan tests if a version is less than a given version. (It is exactly the opposite -// of AtLeast, for situations where asking "is v too old?" makes more sense than asking -// "is v new enough?".) -func (v *Version) LessThan(other *Version) bool { - return v.compareInternal(other) == -1 -} - -// Compare compares v against a version string (which will be parsed as either Semantic -// or non-Semantic depending on v). On success it returns -1 if v is less than other, 1 if -// it is greater than other, or 0 if they are equal. -func (v *Version) Compare(other string) (int, error) { - ov, err := parse(other, v.semver) - if err != nil { - return 0, err - } - return v.compareInternal(ov), nil -} |