diff options
author | OpenShift Merge Robot <openshift-merge-robot@users.noreply.github.com> | 2022-08-11 15:44:55 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-08-11 15:44:55 +0000 |
commit | 92bbae40de3c48ee6b4692ab9e4d7cc14db242bb (patch) | |
tree | 89df1d0983bbb2c978e54e48d377f316e0b44485 /pkg | |
parent | 7af523ea5af8ba8ad61cd4b81a1494a42ac7b479 (diff) | |
parent | 79e21b5b162d3c2d3fb274b20bfe180c15284893 (diff) | |
download | podman-92bbae40de3c48ee6b4692ab9e4d7cc14db242bb.tar.gz podman-92bbae40de3c48ee6b4692ab9e4d7cc14db242bb.tar.bz2 podman-92bbae40de3c48ee6b4692ab9e4d7cc14db242bb.zip |
Merge pull request #15248 from vrothberg/RUN-1606
kube play: sd-notify integration
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/domain/infra/abi/play.go | 79 | ||||
-rw-r--r-- | pkg/domain/infra/abi/play_utils.go | 16 | ||||
-rw-r--r-- | pkg/domain/infra/abi/play_utils_test.go | 38 | ||||
-rw-r--r-- | pkg/specgen/container_validate.go | 6 | ||||
-rw-r--r-- | pkg/specgen/generate/container_create.go | 7 | ||||
-rw-r--r-- | pkg/systemd/notifyproxy/notifyproxy.go | 102 | ||||
-rw-r--r-- | pkg/systemd/notifyproxy/notifyproxy_test.go | 58 |
7 files changed, 299 insertions, 7 deletions
diff --git a/pkg/domain/infra/abi/play.go b/pkg/domain/infra/abi/play.go index 3f2fd5f92..faa89cc26 100644 --- a/pkg/domain/infra/abi/play.go +++ b/pkg/domain/infra/abi/play.go @@ -27,13 +27,19 @@ import ( "github.com/containers/podman/v4/pkg/specgen/generate" "github.com/containers/podman/v4/pkg/specgen/generate/kube" "github.com/containers/podman/v4/pkg/specgenutil" + "github.com/containers/podman/v4/pkg/systemd/notifyproxy" "github.com/containers/podman/v4/pkg/util" + "github.com/coreos/go-systemd/v22/daemon" "github.com/ghodss/yaml" "github.com/opencontainers/go-digest" "github.com/sirupsen/logrus" yamlv3 "gopkg.in/yaml.v3" ) +// sdNotifyAnnotation allows for configuring service-global and +// container-specific sd-notify modes. +const sdNotifyAnnotation = "io.containers.sdnotify" + // createServiceContainer creates a container that can later on // be associated with the pods of a K8s yaml. It will be started along with // the first pod. @@ -73,7 +79,12 @@ func (ic *ContainerEngine) createServiceContainer(ctx context.Context, name stri return nil, fmt.Errorf("creating runtime spec for service container: %w", err) } opts = append(opts, libpod.WithIsService()) - opts = append(opts, libpod.WithSdNotifyMode(define.SdNotifyModeConmon)) + + // Set the sd-notify mode to "ignore". Podman is responsible for + // sending the notify messages when all containers are ready. + // The mode for individual containers or entire pods can be configured + // via the `sdNotifyAnnotation` annotation in the K8s YAML. + opts = append(opts, libpod.WithSdNotifyMode(define.SdNotifyModeIgnore)) // Create a new libpod container based on the spec. ctr, err := ic.Libpod.NewContainer(ctx, runtimeSpec, spec, false, opts...) @@ -96,6 +107,10 @@ func k8sName(content []byte, suffix string) string { } func (ic *ContainerEngine) PlayKube(ctx context.Context, body io.Reader, options entities.PlayKubeOptions) (_ *entities.PlayKubeReport, finalErr error) { + if options.ServiceContainer && options.Start == types.OptionalBoolFalse { // Sanity check to be future proof + return nil, fmt.Errorf("running a service container requires starting the pod(s)") + } + report := &entities.PlayKubeReport{} validKinds := 0 @@ -121,6 +136,8 @@ func (ic *ContainerEngine) PlayKube(ctx context.Context, body io.Reader, options var configMaps []v1.ConfigMap + ranContainers := false + var serviceContainer *libpod.Container // create pod on each document if it is a pod or deployment // any other kube kind will be skipped for _, document := range documentList { @@ -130,8 +147,7 @@ func (ic *ContainerEngine) PlayKube(ctx context.Context, body io.Reader, options } // TODO: create constants for the various "kinds" of yaml files. - var serviceContainer *libpod.Container - if options.ServiceContainer && (kind == "Pod" || kind == "Deployment") { + if options.ServiceContainer && serviceContainer == nil && (kind == "Pod" || kind == "Deployment") { ctr, err := ic.createServiceContainer(ctx, k8sName(content, "service"), options) if err != nil { return nil, err @@ -178,6 +194,7 @@ func (ic *ContainerEngine) PlayKube(ctx context.Context, body io.Reader, options report.Pods = append(report.Pods, r.Pods...) validKinds++ + ranContainers = true case "Deployment": var deploymentYAML v1apps.Deployment @@ -192,6 +209,7 @@ func (ic *ContainerEngine) PlayKube(ctx context.Context, body io.Reader, options report.Pods = append(report.Pods, r.Pods...) validKinds++ + ranContainers = true case "PersistentVolumeClaim": var pvcYAML v1.PersistentVolumeClaim @@ -239,6 +257,20 @@ func (ic *ContainerEngine) PlayKube(ctx context.Context, body io.Reader, options return nil, fmt.Errorf("YAML document does not contain any supported kube kind") } + if options.ServiceContainer && ranContainers { + // We can consider the service to be up and running now. + // Send the sd-notify messages pointing systemd to the + // service container. + data, err := serviceContainer.Inspect(false) + if err != nil { + return nil, err + } + message := fmt.Sprintf("MAINPID=%d\n%s", data.State.ConmonPid, daemon.SdNotifyReady) + if err := notifyproxy.SendMessage("", message); err != nil { + return nil, err + } + } + return report, nil } @@ -280,6 +312,11 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY report entities.PlayKubeReport ) + mainSdNotifyMode, err := getSdNotifyMode(annotations, "") + if err != nil { + return nil, err + } + // Create the secret manager before hand secretsManager, err := ic.Libpod.SecretsManager() if err != nil { @@ -562,6 +599,9 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY initContainers = append(initContainers, ctr) } + + var sdNotifyProxies []*notifyproxy.NotifyProxy // containers' sd-notify proxies + for _, container := range podYAML.Spec.Containers { // Error out if the same name is used for more than one container if _, ok := ctrNames[container.Name]; ok { @@ -606,7 +646,31 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY if err != nil { return nil, err } - opts = append(opts, libpod.WithSdNotifyMode(define.SdNotifyModeIgnore)) + + sdNotifyMode := mainSdNotifyMode + ctrNotifyMode, err := getSdNotifyMode(annotations, container.Name) + if err != nil { + return nil, err + } + if ctrNotifyMode != "" { + sdNotifyMode = ctrNotifyMode + } + if sdNotifyMode == "" { // Default to "ignore" + sdNotifyMode = define.SdNotifyModeIgnore + } + + opts = append(opts, libpod.WithSdNotifyMode(sdNotifyMode)) + + // Create a notify proxy for the container. + if sdNotifyMode != "" && sdNotifyMode != define.SdNotifyModeIgnore { + proxy, err := notifyproxy.New("") + if err != nil { + return nil, err + } + sdNotifyProxies = append(sdNotifyProxies, proxy) + opts = append(opts, libpod.WithSdNotifySocket(proxy.SocketPath())) + } + ctr, err := generate.ExecuteCreate(ctx, ic.Libpod, rtSpec, spec, false, opts...) if err != nil { return nil, err @@ -624,6 +688,13 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY playKubePod.ContainerErrors = append(playKubePod.ContainerErrors, fmt.Errorf("error starting container %s: %w", id, err).Error()) fmt.Println(playKubePod.ContainerErrors) } + + // Wait for each proxy to receive a READY message. + for _, proxy := range sdNotifyProxies { + if err := proxy.WaitAndClose(); err != nil { + return nil, err + } + } } playKubePod.ID = pod.ID() diff --git a/pkg/domain/infra/abi/play_utils.go b/pkg/domain/infra/abi/play_utils.go new file mode 100644 index 000000000..482a158e6 --- /dev/null +++ b/pkg/domain/infra/abi/play_utils.go @@ -0,0 +1,16 @@ +package abi + +import "github.com/containers/podman/v4/libpod/define" + +// getSdNotifyMode returns the `sdNotifyAnnotation/$name` for the specified +// name. If name is empty, it'll only look for `sdNotifyAnnotation`. +func getSdNotifyMode(annotations map[string]string, name string) (string, error) { + var mode string + switch len(name) { + case 0: + mode = annotations[sdNotifyAnnotation] + default: + mode = annotations[sdNotifyAnnotation+"/"+name] + } + return mode, define.ValidateSdNotifyMode(mode) +} diff --git a/pkg/domain/infra/abi/play_utils_test.go b/pkg/domain/infra/abi/play_utils_test.go new file mode 100644 index 000000000..80a9fe543 --- /dev/null +++ b/pkg/domain/infra/abi/play_utils_test.go @@ -0,0 +1,38 @@ +package abi + +import ( + "testing" + + "github.com/containers/podman/v4/libpod/define" + "github.com/stretchr/testify/require" +) + +func TestGetSdNotifyMode(t *testing.T) { + tests := []struct { + key, value, name, result string + mustError bool + }{ + {sdNotifyAnnotation, define.SdNotifyModeConmon, "", define.SdNotifyModeConmon, false}, + {sdNotifyAnnotation + "/container-a", define.SdNotifyModeContainer, "container-a", define.SdNotifyModeContainer, false}, + {sdNotifyAnnotation + "/container-b", define.SdNotifyModeIgnore, "container-b", define.SdNotifyModeIgnore, false}, + {sdNotifyAnnotation + "/container-c", "", "container-c", "", false}, + {sdNotifyAnnotation + "-/wrong-key", "xxx", "wrong-key", "", false}, + {sdNotifyAnnotation + "/container-error", "invalid", "container-error", "", true}, + } + + annotations := make(map[string]string) + // Populate the annotations + for _, test := range tests { + annotations[test.key] = test.value + } + // Run the tests + for _, test := range tests { + result, err := getSdNotifyMode(annotations, test.name) + if test.mustError { + require.Error(t, err, "%v", test) + continue + } + require.NoError(t, err, "%v", test) + require.Equal(t, test.result, result, "%v", test) + } +} diff --git a/pkg/specgen/container_validate.go b/pkg/specgen/container_validate.go index 63d94b6b3..064245602 100644 --- a/pkg/specgen/container_validate.go +++ b/pkg/specgen/container_validate.go @@ -67,9 +67,9 @@ func (s *SpecGenerator) Validate() error { if len(s.ContainerBasicConfig.Systemd) > 0 && !util.StringInSlice(strings.ToLower(s.ContainerBasicConfig.Systemd), SystemDValues) { return fmt.Errorf("--systemd values must be one of %q: %w", strings.Join(SystemDValues, ", "), ErrInvalidSpecConfig) } - // sdnotify values must be container, conmon, or ignore - if len(s.ContainerBasicConfig.SdNotifyMode) > 0 && !util.StringInSlice(strings.ToLower(s.ContainerBasicConfig.SdNotifyMode), SdNotifyModeValues) { - return fmt.Errorf("--sdnotify values must be one of %q: %w", strings.Join(SdNotifyModeValues, ", "), ErrInvalidSpecConfig) + + if err := define.ValidateSdNotifyMode(s.ContainerBasicConfig.SdNotifyMode); err != nil { + return err } // diff --git a/pkg/specgen/generate/container_create.go b/pkg/specgen/generate/container_create.go index 8334d386f..e9cec2873 100644 --- a/pkg/specgen/generate/container_create.go +++ b/pkg/specgen/generate/container_create.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "os" "path/filepath" "strings" @@ -352,7 +353,13 @@ func createContainerOptions(rt *libpod.Runtime, s *specgen.SpecGenerator, pod *l } if len(s.SdNotifyMode) > 0 { options = append(options, libpod.WithSdNotifyMode(s.SdNotifyMode)) + if s.SdNotifyMode != define.SdNotifyModeIgnore { + if notify, ok := os.LookupEnv("NOTIFY_SOCKET"); ok { + options = append(options, libpod.WithSdNotifySocket(notify)) + } + } } + if pod != nil { logrus.Debugf("adding container to pod %s", pod.Name()) options = append(options, rt.WithPod(pod)) diff --git a/pkg/systemd/notifyproxy/notifyproxy.go b/pkg/systemd/notifyproxy/notifyproxy.go new file mode 100644 index 000000000..9e6eb4cf0 --- /dev/null +++ b/pkg/systemd/notifyproxy/notifyproxy.go @@ -0,0 +1,102 @@ +package notifyproxy + +import ( + "io/ioutil" + "net" + "os" + "strings" + "syscall" + + "github.com/coreos/go-systemd/v22/daemon" + "github.com/sirupsen/logrus" +) + +// SendMessage sends the specified message to the specified socket. +// No message is sent if no socketPath is provided and the NOTIFY_SOCKET +// variable is not set either. +func SendMessage(socketPath string, message string) error { + if socketPath == "" { + socketPath, _ = os.LookupEnv("NOTIFY_SOCKET") + if socketPath == "" { + return nil + } + } + socketAddr := &net.UnixAddr{ + Name: socketPath, + Net: "unixgram", + } + conn, err := net.DialUnix(socketAddr.Net, nil, socketAddr) + if err != nil { + return err + } + defer conn.Close() + + _, err = conn.Write([]byte(message)) + return err +} + +// NotifyProxy can be used to proxy notify messages. +type NotifyProxy struct { + connection *net.UnixConn + socketPath string +} + +// New creates a NotifyProxy. The specified temp directory can be left empty. +func New(tmpDir string) (*NotifyProxy, error) { + tempFile, err := ioutil.TempFile(tmpDir, "-podman-notify-proxy.sock") + if err != nil { + return nil, err + } + defer tempFile.Close() + + socketPath := tempFile.Name() + if err := syscall.Unlink(socketPath); err != nil { // Unlink the socket so we can bind it + return nil, err + } + + socketAddr := &net.UnixAddr{ + Name: socketPath, + Net: "unixgram", + } + conn, err := net.ListenUnixgram(socketAddr.Net, socketAddr) + if err != nil { + return nil, err + } + + return &NotifyProxy{connection: conn, socketPath: socketPath}, nil +} + +// SocketPath returns the path of the socket the proxy is listening on. +func (p *NotifyProxy) SocketPath() string { + return p.socketPath +} + +// close closes the listener and removes the socket. +func (p *NotifyProxy) close() error { + defer os.Remove(p.socketPath) + return p.connection.Close() +} + +// WaitAndClose waits until receiving the `READY` notify message and close the +// listener. Note that the this function must only be executed inside a systemd +// service which will kill the process after a given timeout. +func (p *NotifyProxy) WaitAndClose() error { + defer func() { + if err := p.close(); err != nil { + logrus.Errorf("Closing notify proxy: %v", err) + } + }() + + for { + buf := make([]byte, 1024) + num, err := p.connection.Read(buf) + if err != nil { + return err + } + for _, s := range strings.Split(string(buf[:num]), "\n") { + if s == daemon.SdNotifyReady { + return nil + } + } + } +} diff --git a/pkg/systemd/notifyproxy/notifyproxy_test.go b/pkg/systemd/notifyproxy/notifyproxy_test.go new file mode 100644 index 000000000..edad95659 --- /dev/null +++ b/pkg/systemd/notifyproxy/notifyproxy_test.go @@ -0,0 +1,58 @@ +package notifyproxy + +import ( + "testing" + "time" + + "github.com/coreos/go-systemd/v22/daemon" + "github.com/stretchr/testify/require" +) + +// Helper function to send the specified message over the socket of the proxy. +func sendMessage(t *testing.T, proxy *NotifyProxy, message string) { + err := SendMessage(proxy.SocketPath(), message) + require.NoError(t, err) +} + +func TestNotifyProxy(t *testing.T) { + proxy, err := New("") + require.NoError(t, err) + require.FileExists(t, proxy.SocketPath()) + require.NoError(t, proxy.close()) + require.NoFileExists(t, proxy.SocketPath()) +} + +func TestWaitAndClose(t *testing.T) { + proxy, err := New("") + require.NoError(t, err) + require.FileExists(t, proxy.SocketPath()) + + ch := make(chan error) + + go func() { + ch <- proxy.WaitAndClose() + }() + + sendMessage(t, proxy, "foo\n") + time.Sleep(250 * time.Millisecond) + select { + case err := <-ch: + t.Fatalf("Should stil be waiting but received %v", err) + default: + } + + sendMessage(t, proxy, daemon.SdNotifyReady+"\nsomething else") + done := func() bool { + for i := 0; i < 10; i++ { + select { + case err := <-ch: + require.NoError(t, err, "Waiting should succeed") + return true + default: + time.Sleep(time.Duration(i*250) * time.Millisecond) + } + } + return false + }() + require.True(t, done, "READY MESSAGE SHOULD HAVE ARRIVED") +} |