diff options
Diffstat (limited to 'libpod')
-rw-r--r-- | libpod/events.go | 17 | ||||
-rw-r--r-- | libpod/events/events_linux.go | 4 | ||||
-rw-r--r-- | libpod/events/journal_linux.go | 100 | ||||
-rw-r--r-- | libpod/events/logfile.go | 18 | ||||
-rw-r--r-- | libpod/events/nullout.go | 13 | ||||
-rw-r--r-- | libpod/runtime.go | 11 |
6 files changed, 87 insertions, 76 deletions
diff --git a/libpod/events.go b/libpod/events.go index 60142cb60..2f9799114 100644 --- a/libpod/events.go +++ b/libpod/events.go @@ -3,6 +3,7 @@ package libpod import ( "context" "fmt" + "path/filepath" "sync" "github.com/containers/podman/v4/libpod/events" @@ -11,6 +12,10 @@ import ( // newEventer returns an eventer that can be used to read/write events func (r *Runtime) newEventer() (events.Eventer, error) { + if r.config.Engine.EventsLogFilePath == "" { + // default, use path under tmpdir when none was explicitly set by the user + r.config.Engine.EventsLogFilePath = filepath.Join(r.config.Engine.TmpDir, "events", "events.log") + } options := events.EventerOptions{ EventerType: r.config.Engine.EventsLogger, LogFilePath: r.config.Engine.EventsLogFilePath, @@ -133,11 +138,7 @@ func (v *Volume) newVolumeEvent(status events.Status) { // Events is a wrapper function for everyone to begin tailing the events log // with options func (r *Runtime) Events(ctx context.Context, options events.ReadOptions) error { - eventer, err := r.newEventer() - if err != nil { - return err - } - return eventer.Read(ctx, options) + return r.eventer.Read(ctx, options) } // GetEvents reads the event log and returns events based on input filters @@ -149,10 +150,6 @@ func (r *Runtime) GetEvents(ctx context.Context, filters []string) ([]*events.Ev FromStart: true, Stream: false, } - eventer, err := r.newEventer() - if err != nil { - return nil, err - } logEvents := make([]*events.Event, 0, len(eventChannel)) readLock := sync.Mutex{} @@ -164,7 +161,7 @@ func (r *Runtime) GetEvents(ctx context.Context, filters []string) ([]*events.Ev readLock.Unlock() }() - readErr := eventer.Read(ctx, options) + readErr := r.eventer.Read(ctx, options) readLock.Lock() // Wait for the events to be consumed. return logEvents, readErr } diff --git a/libpod/events/events_linux.go b/libpod/events/events_linux.go index e7801af5b..66b125dd5 100644 --- a/libpod/events/events_linux.go +++ b/libpod/events/events_linux.go @@ -18,9 +18,9 @@ func NewEventer(options EventerOptions) (Eventer, error) { } return eventer, nil case strings.ToUpper(LogFile.String()): - return EventLogFile{options}, nil + return newLogFileEventer(options) case strings.ToUpper(Null.String()): - return NewNullEventer(), nil + return newNullEventer(), nil case strings.ToUpper(Memory.String()): return NewMemoryEventer(), nil default: diff --git a/libpod/events/journal_linux.go b/libpod/events/journal_linux.go index 16ef6504f..4986502a2 100644 --- a/libpod/events/journal_linux.go +++ b/libpod/events/journal_linux.go @@ -112,57 +112,16 @@ func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error { } } - // the api requires a next|prev before getting a cursor - if _, err := j.Next(); err != nil { - return fmt.Errorf("failed to move journal cursor to next entry: %w", err) - } - - prevCursor, err := j.GetCursor() - if err != nil { - return fmt.Errorf("failed to get journal cursor: %w", err) - } for { - select { - case <-ctx.Done(): - // the consumer has cancelled - return nil - default: - // fallthrough - } - - if _, err := j.Next(); err != nil { - return fmt.Errorf("failed to move journal cursor to next entry: %w", err) - } - newCursor, err := j.GetCursor() + entry, err := getNextEntry(ctx, j, options.Stream, untilTime) if err != nil { - return fmt.Errorf("failed to get journal cursor: %w", err) + return err } - if prevCursor == newCursor { - if !options.Stream || (len(options.Until) > 0 && time.Now().After(untilTime)) { - break - } - - // j.Wait() is blocking, this would cause the goroutine to hang forever - // if no more journal entries are generated and thus if the client - // has closed the connection in the meantime to leak memory. - // Waiting only 5 seconds makes sure we can check if the client closed in the - // meantime at least every 5 seconds. - t := 5 * time.Second - if len(options.Until) > 0 { - until := time.Until(untilTime) - if until < t { - t = until - } - } - _ = j.Wait(t) - continue + // no entry == we hit the end + if entry == nil { + return nil } - prevCursor = newCursor - entry, err := j.GetEntry() - if err != nil { - return fmt.Errorf("failed to read journal entry: %w", err) - } newEvent, err := newEventFromJournalEntry(entry) if err != nil { // We can't decode this event. @@ -177,7 +136,6 @@ func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error { options.EventChannel <- newEvent } } - return nil } func newEventFromJournalEntry(entry *sdjournal.JournalEntry) (*Event, error) { @@ -238,3 +196,51 @@ func newEventFromJournalEntry(entry *sdjournal.JournalEntry) (*Event, error) { func (e EventJournalD) String() string { return Journald.String() } + +// getNextEntry returns the next entry in the journal. If the end of the +// journal is reached and stream is not set or the current time is after +// the until time this function return nil,nil. +func getNextEntry(ctx context.Context, j *sdjournal.Journal, stream bool, untilTime time.Time) (*sdjournal.JournalEntry, error) { + for { + select { + case <-ctx.Done(): + // the consumer has cancelled + return nil, nil + default: + // fallthrough + } + // the api requires a next|prev before reading the event + ret, err := j.Next() + if err != nil { + return nil, fmt.Errorf("failed to move journal cursor to next entry: %w", err) + } + // ret == 0 equals EOF, see sd_journal_next(3) + if ret == 0 { + if !stream || (!untilTime.IsZero() && time.Now().After(untilTime)) { + // we hit the end and should not keep streaming + return nil, nil + } + // keep waiting for the next entry + // j.Wait() is blocking, this would cause the goroutine to hang forever + // if no more journal entries are generated and thus if the client + // has closed the connection in the meantime to leak memory. + // Waiting only 5 seconds makes sure we can check if the client closed in the + // meantime at least every 5 seconds. + t := 5 * time.Second + if !untilTime.IsZero() { + until := time.Until(untilTime) + if until < t { + t = until + } + } + _ = j.Wait(t) + continue + } + + entry, err := j.GetEntry() + if err != nil { + return nil, fmt.Errorf("failed to read journal entry: %w", err) + } + return entry, nil + } +} diff --git a/libpod/events/logfile.go b/libpod/events/logfile.go index 519e16629..d749a0d4d 100644 --- a/libpod/events/logfile.go +++ b/libpod/events/logfile.go @@ -12,6 +12,7 @@ import ( "io/ioutil" "os" "path" + "path/filepath" "time" "github.com/containers/podman/v4/pkg/util" @@ -27,6 +28,21 @@ type EventLogFile struct { options EventerOptions } +// newLogFileEventer creates a new EventLogFile eventer +func newLogFileEventer(options EventerOptions) (*EventLogFile, error) { + // Create events log dir + if err := os.MkdirAll(filepath.Dir(options.LogFilePath), 0700); err != nil { + return nil, fmt.Errorf("creating events dirs: %w", err) + } + // We have to make sure the file is created otherwise reading events will hang. + // https://github.com/containers/podman/issues/15688 + fd, err := os.OpenFile(options.LogFilePath, os.O_RDONLY|os.O_CREATE, 0700) + if err != nil { + return nil, fmt.Errorf("failed to create event log file: %w", err) + } + return &EventLogFile{options: options}, fd.Close() +} + // Writes to the log file func (e EventLogFile) Write(ee Event) error { // We need to lock events file @@ -108,6 +124,8 @@ func (e EventLogFile) Read(ctx context.Context, options ReadOptions) error { } }() } + logrus.Debugf("Reading events from file %q", e.options.LogFilePath) + var line *tail.Line var ok bool for { diff --git a/libpod/events/nullout.go b/libpod/events/nullout.go index 587a1b98b..da3820c23 100644 --- a/libpod/events/nullout.go +++ b/libpod/events/nullout.go @@ -2,10 +2,11 @@ package events import ( "context" + "errors" ) -// EventToNull is an eventer type that only performs write operations -// and only writes to /dev/null. It is meant for unittests only +// EventToNull is an eventer type that does nothing. +// It is meant for unittests only type EventToNull struct{} // Write eats the event and always returns nil @@ -13,14 +14,14 @@ func (e EventToNull) Write(ee Event) error { return nil } -// Read does nothing. Do not use it. +// Read does nothing and returns an error. func (e EventToNull) Read(ctx context.Context, options ReadOptions) error { - return nil + return errors.New("cannot read events with the \"none\" backend") } -// NewNullEventer returns a new null eventer. You should only do this for +// newNullEventer returns a new null eventer. You should only do this for // the purposes of internal libpod testing. -func NewNullEventer() Eventer { +func newNullEventer() Eventer { return EventToNull{} } diff --git a/libpod/runtime.go b/libpod/runtime.go index fe90b6df1..83c9f53e2 100644 --- a/libpod/runtime.go +++ b/libpod/runtime.go @@ -466,14 +466,6 @@ func makeRuntime(runtime *Runtime) (retErr error) { } } - // Create events log dir - if err := os.MkdirAll(filepath.Dir(runtime.config.Engine.EventsLogFilePath), 0700); err != nil { - // The directory is allowed to exist - if !errors.Is(err, os.ErrExist) { - return fmt.Errorf("creating events dirs: %w", err) - } - } - // Get us at least one working OCI runtime. runtime.ociRuntimes = make(map[string]OCIRuntime) @@ -1038,9 +1030,6 @@ func (r *Runtime) mergeDBConfig(dbConfig *DBConfig) { logrus.Debugf("Overriding tmp dir %q with %q from database", c.TmpDir, dbConfig.LibpodTmp) } c.TmpDir = dbConfig.LibpodTmp - if c.EventsLogFilePath == "" { - c.EventsLogFilePath = filepath.Join(dbConfig.LibpodTmp, "events", "events.log") - } } if !r.storageSet.VolumePathSet && dbConfig.VolumePath != "" { |