summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorOpenShift Merge Robot <openshift-merge-robot@users.noreply.github.com>2022-09-06 21:38:54 +0200
committerGitHub <noreply@github.com>2022-09-06 21:38:54 +0200
commitdb5ec4dcdc4d9b1105d1ce66cb3704fda328dec3 (patch)
tree49cc0208cd633bcd8f4ad003567c7c5e280ad8d3 /pkg
parent2f555c0c74d77e4a93ef90290f16d0908829e8c7 (diff)
parent274d34a25a3ed7b69a6e4caec07e845157048c96 (diff)
downloadpodman-db5ec4dcdc4d9b1105d1ce66cb3704fda328dec3.tar.gz
podman-db5ec4dcdc4d9b1105d1ce66cb3704fda328dec3.tar.bz2
podman-db5ec4dcdc4d9b1105d1ce66cb3704fda328dec3.zip
Merge pull request #15547 from vrothberg/RUN-1606
Support auto updates for Kubernetes workloads
Diffstat (limited to 'pkg')
-rw-r--r--pkg/autoupdate/autoupdate.go156
-rw-r--r--pkg/domain/infra/abi/play.go35
-rw-r--r--pkg/specgen/generate/kube/kube.go8
-rw-r--r--pkg/systemd/notifyproxy/notifyproxy.go65
-rw-r--r--pkg/systemd/notifyproxy/notifyproxy_test.go2
5 files changed, 185 insertions, 81 deletions
diff --git a/pkg/autoupdate/autoupdate.go b/pkg/autoupdate/autoupdate.go
index 297d6640e..9cf77d135 100644
--- a/pkg/autoupdate/autoupdate.go
+++ b/pkg/autoupdate/autoupdate.go
@@ -153,18 +153,19 @@ func AutoUpdate(ctx context.Context, runtime *libpod.Runtime, options entities.A
}
// Find auto-update tasks and assemble them by unit.
- errors := auto.assembleTasks(ctx)
+ allErrors := auto.assembleTasks(ctx)
// Nothing to do.
if len(auto.unitToTasks) == 0 {
- return nil, errors
+ return nil, allErrors
}
// Connect to DBUS.
conn, err := systemd.ConnectToDBUS()
if err != nil {
logrus.Errorf(err.Error())
- return nil, []error{err}
+ allErrors = append(allErrors, err)
+ return nil, allErrors
}
defer conn.Close()
auto.conn = conn
@@ -174,72 +175,94 @@ func AutoUpdate(ctx context.Context, runtime *libpod.Runtime, options entities.A
// Update all images/container according to their auto-update policy.
var allReports []*entities.AutoUpdateReport
for unit, tasks := range auto.unitToTasks {
- // Sanity check: we'll support that in the future.
- if len(tasks) != 1 {
- errors = append(errors, fmt.Errorf("only 1 task per unit supported but unit %s has %d", unit, len(tasks)))
- return nil, errors
+ unitErrors := auto.updateUnit(ctx, unit, tasks)
+ allErrors = append(allErrors, unitErrors...)
+ for _, task := range tasks {
+ allReports = append(allReports, task.report())
}
+ }
- for _, task := range tasks {
- err := func() error {
- // Transition from state to state. Will be
- // split into multiple loops in the future to
- // support more than one container/task per
- // unit.
- updateAvailable, err := task.updateAvailable(ctx)
- if err != nil {
- task.status = statusFailed
- return fmt.Errorf("checking image updates for container %s: %w", task.container.ID(), err)
- }
-
- if !updateAvailable {
- task.status = statusNotUpdated
- return nil
- }
-
- if options.DryRun {
- task.status = statusPending
- return nil
- }
-
- if err := task.update(ctx); err != nil {
- task.status = statusFailed
- return fmt.Errorf("updating image for container %s: %w", task.container.ID(), err)
- }
-
- updateError := auto.restartSystemdUnit(ctx, unit)
- if updateError == nil {
- task.status = statusUpdated
- return nil
- }
-
- if !options.Rollback {
- task.status = statusFailed
- return fmt.Errorf("restarting unit %s for container %s: %w", task.unit, task.container.ID(), err)
- }
-
- if err := task.rollbackImage(); err != nil {
- task.status = statusFailed
- return fmt.Errorf("rolling back image for container %s: %w", task.container.ID(), err)
- }
-
- if err := auto.restartSystemdUnit(ctx, unit); err != nil {
- task.status = statusFailed
- return fmt.Errorf("restarting unit %s for container %s during rollback: %w", task.unit, task.container.ID(), err)
- }
-
- task.status = statusRolledBack
- return nil
- }()
+ return allReports, allErrors
+}
+
+// updateUnit auto updates the tasks in the specified systemd unit.
+func (u *updater) updateUnit(ctx context.Context, unit string, tasks []*task) []error {
+ var errors []error
+ tasksUpdated := false
+ for _, task := range tasks {
+ err := func() error { // Use an anonymous function to avoid spaghetti continue's
+ updateAvailable, err := task.updateAvailable(ctx)
if err != nil {
- errors = append(errors, err)
+ task.status = statusFailed
+ return fmt.Errorf("checking image updates for container %s: %w", task.container.ID(), err)
}
- allReports = append(allReports, task.report())
+
+ if !updateAvailable {
+ task.status = statusNotUpdated
+ return nil
+ }
+
+ if u.options.DryRun {
+ task.status = statusPending
+ return nil
+ }
+
+ if err := task.update(ctx); err != nil {
+ task.status = statusFailed
+ return fmt.Errorf("updating image for container %s: %w", task.container.ID(), err)
+ }
+
+ tasksUpdated = true
+ return nil
+ }()
+
+ if err != nil {
+ errors = append(errors, err)
}
}
- return allReports, errors
+ // If no task has been updated, we can jump directly to the next unit.
+ if !tasksUpdated {
+ return errors
+ }
+
+ updateError := u.restartSystemdUnit(ctx, unit)
+ for _, task := range tasks {
+ if updateError == nil {
+ task.status = statusUpdated
+ } else {
+ task.status = statusFailed
+ }
+ }
+
+ // Jump to the next unit on successful update or if rollbacks are disabled.
+ if updateError == nil || !u.options.Rollback {
+ return errors
+ }
+
+ // The update has failed and rollbacks are enabled.
+ for _, task := range tasks {
+ if err := task.rollbackImage(); err != nil {
+ err = fmt.Errorf("rolling back image for container %s in unit %s: %w", task.container.ID(), unit, err)
+ errors = append(errors, err)
+ }
+ }
+
+ if err := u.restartSystemdUnit(ctx, unit); err != nil {
+ for _, task := range tasks {
+ task.status = statusFailed
+ }
+ err = fmt.Errorf("restarting unit %s during rollback: %w", unit, err)
+ errors = append(errors, err)
+ return errors
+ }
+
+ for _, task := range tasks {
+ task.status = statusRolledBack
+ }
+
+ return errors
}
// report creates an auto-update report for the task.
@@ -258,7 +281,16 @@ func (t *task) report() *entities.AutoUpdateReport {
func (t *task) updateAvailable(ctx context.Context) (bool, error) {
switch t.policy {
case PolicyRegistryImage:
- return t.registryUpdateAvailable(ctx)
+ // Errors checking for updates only should not be fatal.
+ // Especially on Edge systems, connection may be limited or
+ // there may just be a temporary downtime of the registry.
+ // But make sure to leave some breadcrumbs in the debug logs
+ // such that potential issues _can_ be analyzed if needed.
+ available, err := t.registryUpdateAvailable(ctx)
+ if err != nil {
+ logrus.Debugf("Error checking updates for image %s: %v (ignoring error)", t.rawImageName, err)
+ }
+ return available, nil
case PolicyLocalImage:
return t.localUpdateAvailable()
default:
diff --git a/pkg/domain/infra/abi/play.go b/pkg/domain/infra/abi/play.go
index 6ea20a4f2..12786afcd 100644
--- a/pkg/domain/infra/abi/play.go
+++ b/pkg/domain/infra/abi/play.go
@@ -661,9 +661,10 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
opts = append(opts, libpod.WithSdNotifyMode(sdNotifyMode))
+ var proxy *notifyproxy.NotifyProxy
// Create a notify proxy for the container.
if sdNotifyMode != "" && sdNotifyMode != define.SdNotifyModeIgnore {
- proxy, err := notifyproxy.New("")
+ proxy, err = notifyproxy.New("")
if err != nil {
return nil, err
}
@@ -675,6 +676,9 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
if err != nil {
return nil, err
}
+ if proxy != nil {
+ proxy.AddContainer(ctr)
+ }
containers = append(containers, ctr)
}
@@ -774,21 +778,26 @@ func (ic *ContainerEngine) getImageAndLabelInfo(ctx context.Context, cwd string,
}
// Handle kube annotations
- for k, v := range annotations {
- switch k {
- // Auto update annotation without container name will apply to
- // all containers within the pod
- case autoupdate.Label, autoupdate.AuthfileLabel:
- labels[k] = v
- // Auto update annotation with container name will apply only
- // to the specified container
- case fmt.Sprintf("%s/%s", autoupdate.Label, container.Name),
- fmt.Sprintf("%s/%s", autoupdate.AuthfileLabel, container.Name):
- prefixAndCtr := strings.Split(k, "/")
- labels[prefixAndCtr[0]] = v
+ setLabel := func(label string) {
+ var result string
+ ctrSpecific := fmt.Sprintf("%s/%s", label, container.Name)
+ for k, v := range annotations {
+ switch k {
+ case label:
+ result = v
+ case ctrSpecific:
+ labels[label] = v
+ return
+ }
+ }
+ if result != "" {
+ labels[label] = result
}
}
+ setLabel(autoupdate.Label)
+ setLabel(autoupdate.AuthfileLabel)
+
return pulledImage, labels, nil
}
diff --git a/pkg/specgen/generate/kube/kube.go b/pkg/specgen/generate/kube/kube.go
index 5862d3f1c..9fd0adecf 100644
--- a/pkg/specgen/generate/kube/kube.go
+++ b/pkg/specgen/generate/kube/kube.go
@@ -7,6 +7,7 @@ import (
"fmt"
"math"
"net"
+ "os"
"regexp"
"runtime"
"strconv"
@@ -26,6 +27,7 @@ import (
"github.com/containers/podman/v4/pkg/k8s.io/apimachinery/pkg/api/resource"
"github.com/containers/podman/v4/pkg/specgen"
"github.com/containers/podman/v4/pkg/specgen/generate"
+ systemdDefine "github.com/containers/podman/v4/pkg/systemd/define"
"github.com/containers/podman/v4/pkg/util"
"github.com/docker/docker/pkg/system"
"github.com/docker/go-units"
@@ -445,6 +447,12 @@ func ToSpecGen(ctx context.Context, opts *CtrSpecGenOptions) (*specgen.SpecGener
}
}
+ // Make sure the container runs in a systemd unit which is
+ // stored as a label at container creation.
+ if unit := os.Getenv(systemdDefine.EnvVariable); unit != "" {
+ s.Labels[systemdDefine.EnvVariable] = unit
+ }
+
return s, nil
}
diff --git a/pkg/systemd/notifyproxy/notifyproxy.go b/pkg/systemd/notifyproxy/notifyproxy.go
index 9e6eb4cf0..1bfab9ca0 100644
--- a/pkg/systemd/notifyproxy/notifyproxy.go
+++ b/pkg/systemd/notifyproxy/notifyproxy.go
@@ -1,12 +1,17 @@
package notifyproxy
import (
+ "errors"
+ "fmt"
+ "io"
"io/ioutil"
"net"
"os"
"strings"
"syscall"
+ "time"
+ "github.com/containers/podman/v4/libpod/define"
"github.com/coreos/go-systemd/v22/daemon"
"github.com/sirupsen/logrus"
)
@@ -39,6 +44,7 @@ func SendMessage(socketPath string, message string) error {
type NotifyProxy struct {
connection *net.UnixConn
socketPath string
+ container Container // optional
}
// New creates a NotifyProxy. The specified temp directory can be left empty.
@@ -77,9 +83,26 @@ func (p *NotifyProxy) close() error {
return p.connection.Close()
}
+// AddContainer associates a container with the proxy.
+func (p *NotifyProxy) AddContainer(container Container) {
+ p.container = container
+}
+
+// ErrNoReadyMessage is returned when we are waiting for the READY message of a
+// container that is not in the running state anymore.
+var ErrNoReadyMessage = errors.New("container stopped running before READY message was received")
+
+// Container avoids a circular dependency among this package and libpod.
+type Container interface {
+ State() (define.ContainerStatus, error)
+ ID() string
+}
+
// WaitAndClose waits until receiving the `READY` notify message and close the
// listener. Note that the this function must only be executed inside a systemd
// service which will kill the process after a given timeout.
+// If the (optional) container stopped running before the `READY` is received,
+// the waiting gets canceled and ErrNoReadyMessage is returned.
func (p *NotifyProxy) WaitAndClose() error {
defer func() {
if err := p.close(); err != nil {
@@ -87,16 +110,48 @@ func (p *NotifyProxy) WaitAndClose() error {
}
}()
+ const bufferSize = 1024
+ sBuilder := strings.Builder{}
for {
- buf := make([]byte, 1024)
- num, err := p.connection.Read(buf)
- if err != nil {
+ // Set a read deadline of one second such that we achieve a
+ // non-blocking read and can check if the container has already
+ // stopped running; in that case no READY message will be send
+ // and we're done.
+ if err := p.connection.SetReadDeadline(time.Now().Add(time.Second)); err != nil {
return err
}
- for _, s := range strings.Split(string(buf[:num]), "\n") {
- if s == daemon.SdNotifyReady {
+
+ for {
+ buffer := make([]byte, bufferSize)
+ num, err := p.connection.Read(buffer)
+ if err != nil {
+ if !errors.Is(err, os.ErrDeadlineExceeded) && !errors.Is(err, io.EOF) {
+ return err
+ }
+ }
+ sBuilder.Write(buffer[:num])
+ if num != bufferSize || buffer[num-1] == '\n' {
+ break
+ }
+ }
+
+ for _, line := range strings.Split(sBuilder.String(), "\n") {
+ if line == daemon.SdNotifyReady {
return nil
}
}
+ sBuilder.Reset()
+
+ if p.container == nil {
+ continue
+ }
+
+ state, err := p.container.State()
+ if err != nil {
+ return err
+ }
+ if state != define.ContainerStateRunning {
+ return fmt.Errorf("%w: %s", ErrNoReadyMessage, p.container.ID())
+ }
}
}
diff --git a/pkg/systemd/notifyproxy/notifyproxy_test.go b/pkg/systemd/notifyproxy/notifyproxy_test.go
index ce63fc9cd..066046cb8 100644
--- a/pkg/systemd/notifyproxy/notifyproxy_test.go
+++ b/pkg/systemd/notifyproxy/notifyproxy_test.go
@@ -41,7 +41,7 @@ func TestWaitAndClose(t *testing.T) {
default:
}
- sendMessage(t, proxy, daemon.SdNotifyReady+"\nsomething else")
+ sendMessage(t, proxy, daemon.SdNotifyReady+"\nsomething else\n")
done := func() bool {
for i := 0; i < 10; i++ {
select {